1use crate::{
2 run::spawn_dataflow,
3 tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7 config::{NodeId, OperatorId},
8 descriptor::DescriptorExt,
9 uhlc::{self, HLC},
10};
11use dora_message::{
12 BuildId, DataflowId, SessionId,
13 cli_to_coordinator::ControlRequest,
14 common::{DaemonId, GitSource},
15 coordinator_to_cli::{
16 ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
17 DataflowStatus, LogLevel, LogMessage,
18 },
19 coordinator_to_daemon::{
20 BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
21 },
22 daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
23 descriptor::{Descriptor, ResolvedNode},
24};
25use eyre::{ContextCompat, Result, WrapErr, bail, eyre};
26use futures::{Future, Stream, StreamExt, future::join_all, stream::FuturesUnordered};
27use futures_concurrency::stream::Merge;
28use itertools::Itertools;
29use log_subscriber::LogSubscriber;
30use petname::petname;
31use run::SpawnedDataflow;
32use std::{
33 collections::{BTreeMap, BTreeSet, HashMap},
34 net::SocketAddr,
35 path::PathBuf,
36 sync::Arc,
37 time::{Duration, Instant},
38};
39use tokio::{
40 net::TcpStream,
41 sync::{mpsc, oneshot},
42 task::JoinHandle,
43};
44use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
45use uuid::Uuid;
46
47mod control;
48mod listener;
49mod log_subscriber;
50mod run;
51mod tcp_utils;
52
53pub async fn start(
54 bind: SocketAddr,
55 bind_control: SocketAddr,
56 external_events: impl Stream<Item = Event> + Unpin,
57) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
58 let listener = listener::create_listener(bind).await?;
59 let port = listener
60 .local_addr()
61 .wrap_err("failed to get local addr of listener")?
62 .port();
63 let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
64 c.map(Event::NewDaemonConnection)
65 .wrap_err("failed to open connection")
66 .unwrap_or_else(Event::DaemonConnectError)
67 });
68
69 let mut tasks = FuturesUnordered::new();
70 let control_events = control::control_events(bind_control, &tasks)
71 .await
72 .wrap_err("failed to create control events")?;
73
74 let ctrlc_events = set_up_ctrlc_handler()?;
76
77 let events = (
78 external_events,
79 new_daemon_connections,
80 control_events,
81 ctrlc_events,
82 )
83 .merge();
84
85 let future = async move {
86 start_inner(events, &tasks).await?;
87
88 tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
89 while let Some(join_result) = tasks.next().await {
90 if let Err(err) = join_result {
91 tracing::error!("task panicked: {err}");
92 }
93 }
94 tracing::debug!("all spawned tasks finished, exiting..");
95 Ok(())
96 };
97 Ok((port, future))
98}
99
100fn resolve_name(
102 name: String,
103 running_dataflows: &HashMap<Uuid, RunningDataflow>,
104 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
105) -> eyre::Result<Uuid> {
106 let uuids: Vec<_> = running_dataflows
107 .iter()
108 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
109 .map(|(k, _)| k)
110 .copied()
111 .collect();
112 let archived_uuids: Vec<_> = archived_dataflows
113 .iter()
114 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
115 .map(|(k, _)| k)
116 .copied()
117 .collect();
118
119 if uuids.is_empty() {
120 if archived_uuids.is_empty() {
121 bail!("no dataflow with name `{name}`");
122 } else if let [uuid] = archived_uuids.as_slice() {
123 Ok(*uuid)
124 } else {
125 bail!(
127 "multiple archived dataflows found with name `{name}`, Please provide the UUID instead."
128 );
129 }
130 } else if let [uuid] = uuids.as_slice() {
131 Ok(*uuid)
132 } else {
133 bail!("multiple dataflows found with name `{name}`");
134 }
135}
136
137#[derive(Default)]
138struct DaemonConnections {
139 daemons: BTreeMap<DaemonId, DaemonConnection>,
140}
141
142impl DaemonConnections {
143 fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
144 let previous = self.daemons.insert(daemon_id.clone(), connection);
145 if previous.is_some() {
146 tracing::info!("closing previous connection `{daemon_id}` on new register");
147 }
148 }
149
150 fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
151 self.daemons.get(id)
152 }
153
154 fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
155 self.daemons.get_mut(id)
156 }
157
158 fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
159 self.daemons
160 .keys()
161 .find(|id| id.matches_machine_id(machine_id))
162 }
163
164 fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
165 std::mem::take(&mut self.daemons).into_iter()
166 }
167
168 fn is_empty(&self) -> bool {
169 self.daemons.is_empty()
170 }
171
172 fn keys(&self) -> impl Iterator<Item = &DaemonId> {
173 self.daemons.keys()
174 }
175
176 fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
177 self.daemons.iter_mut()
178 }
179
180 fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
181 self.daemons.remove(daemon_id)
182 }
183
184 fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
185 self.daemons.keys().filter(|id| id.machine_id().is_none())
186 }
187}
188
189async fn start_inner(
190 events: impl Stream<Item = Event> + Unpin,
191 tasks: &FuturesUnordered<JoinHandle<()>>,
192) -> eyre::Result<()> {
193 let clock = Arc::new(HLC::default());
194
195 let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
196 let mut daemon_events_tx = Some(daemon_events_tx);
197 let daemon_events = ReceiverStream::new(daemon_events);
198
199 let daemon_heartbeat_interval =
200 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
201 .map(|_| Event::DaemonHeartbeatInterval);
202
203 let (abortable_events, abort_handle) =
205 futures::stream::abortable((events, daemon_heartbeat_interval).merge());
206
207 let mut events = (abortable_events, daemon_events).merge();
208
209 let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
210 let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();
211
212 let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
213 let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
214 HashMap::new();
215 let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
216 let mut daemon_connections = DaemonConnections::default();
217
218 while let Some(event) = events.next().await {
219 let start = Instant::now();
221 let event_kind = event.kind();
222
223 if event.log() {
224 tracing::trace!("Handling event {event:?}");
225 }
226 match event {
227 Event::NewDaemonConnection(connection) => {
228 connection.set_nodelay(true)?;
229 let events_tx = daemon_events_tx.clone();
230 if let Some(events_tx) = events_tx {
231 let task = tokio::spawn(listener::handle_connection(
232 connection,
233 events_tx,
234 clock.clone(),
235 ));
236 tasks.push(task);
237 } else {
238 tracing::warn!(
239 "ignoring new daemon connection because events_tx was closed already"
240 );
241 }
242 }
243 Event::DaemonConnectError(err) => {
244 tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
245 }
246 Event::Daemon(event) => match event {
247 DaemonRequest::Register {
248 machine_id,
249 mut connection,
250 version_check_result,
251 } => {
252 let existing = match &machine_id {
253 Some(id) => daemon_connections.get_matching_daemon_id(id),
254 None => daemon_connections.unnamed().next(),
255 };
256 let existing_result = if existing.is_some() {
257 Err(format!(
258 "There is already a connected daemon with machine ID `{machine_id:?}`"
259 ))
260 } else {
261 Ok(())
262 };
263
264 let daemon_id = DaemonId::new(machine_id);
266
267 let reply: Timestamped<RegisterResult> = Timestamped {
268 inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
269 Ok(_) => RegisterResult::Ok {
270 daemon_id: daemon_id.clone(),
271 },
272 Err(err) => RegisterResult::Err(err.clone()),
273 },
274 timestamp: clock.new_timestamp(),
275 };
276
277 let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
278 .await
279 .context("tcp send failed");
280 match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
281 Ok(()) => {
282 daemon_connections.add(
283 daemon_id.clone(),
284 DaemonConnection {
285 stream: connection,
286 last_heartbeat: Instant::now(),
287 },
288 );
289 }
290 Err(err) => {
291 tracing::warn!(
292 "failed to register daemon connection for daemon `{daemon_id}`: {err}"
293 );
294 }
295 }
296 }
297 },
298 Event::Dataflow { uuid, event } => match event {
299 DataflowEvent::ReadyOnDaemon {
300 daemon_id,
301 exited_before_subscribe,
302 } => {
303 match running_dataflows.entry(uuid) {
304 std::collections::hash_map::Entry::Occupied(mut entry) => {
305 let dataflow = entry.get_mut();
306 dataflow.pending_daemons.remove(&daemon_id);
307 dataflow
308 .exited_before_subscribe
309 .extend(exited_before_subscribe);
310 if dataflow.pending_daemons.is_empty() {
311 tracing::debug!("sending all nodes ready message to daemons");
312 let message = serde_json::to_vec(&Timestamped {
313 inner: DaemonCoordinatorEvent::AllNodesReady {
314 dataflow_id: uuid,
315 exited_before_subscribe: dataflow
316 .exited_before_subscribe
317 .clone(),
318 },
319 timestamp: clock.new_timestamp(),
320 })
321 .wrap_err("failed to serialize AllNodesReady message")?;
322
323 for daemon_id in &dataflow.daemons {
325 let Some(connection) = daemon_connections.get_mut(daemon_id)
326 else {
327 tracing::warn!(
328 "no daemon connection found for machine `{daemon_id}`"
329 );
330 continue;
331 };
332 tcp_send(&mut connection.stream, &message)
333 .await
334 .wrap_err_with(|| {
335 format!(
336 "failed to send AllNodesReady({uuid}) message \
337 to machine {daemon_id}"
338 )
339 })?;
340 }
341 }
342 }
343 std::collections::hash_map::Entry::Vacant(_) => {
344 tracing::warn!("dataflow not running on ReadyOnMachine");
345 }
346 }
347 }
348 DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
349 tracing::debug!(
350 "coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})"
351 );
352 match running_dataflows.entry(uuid) {
353 std::collections::hash_map::Entry::Occupied(mut entry) => {
354 let dataflow = entry.get_mut();
355 dataflow.daemons.remove(&daemon_id);
356 tracing::info!(
357 "removed machine id: {daemon_id} from dataflow: {:#?}",
358 dataflow.uuid
359 );
360 dataflow_results
361 .entry(uuid)
362 .or_default()
363 .insert(daemon_id, result);
364
365 if dataflow.daemons.is_empty() {
366 archived_dataflows
368 .entry(uuid)
369 .or_insert_with(|| ArchivedDataflow::from(entry.get()));
370 let mut finished_dataflow = entry.remove();
371 let dataflow_id = finished_dataflow.uuid;
372 send_log_message(
373 &mut finished_dataflow.log_subscribers,
374 &LogMessage {
375 build_id: None,
376 dataflow_id: Some(dataflow_id),
377 node_id: None,
378 daemon_id: None,
379 level: LogLevel::Info.into(),
380 target: Some("coordinator".into()),
381 module_path: None,
382 file: None,
383 line: None,
384 message: "dataflow finished".into(),
385 },
386 )
387 .await;
388
389 let reply = ControlRequestReply::DataflowStopped {
390 uuid,
391 result: dataflow_results
392 .get(&uuid)
393 .map(|r| dataflow_result(r, uuid, &clock))
394 .unwrap_or_else(|| {
395 DataflowResult::ok_empty(uuid, clock.new_timestamp())
396 }),
397 };
398 for sender in finished_dataflow.stop_reply_senders {
399 let _ = sender.send(Ok(reply.clone()));
400 }
401 if !matches!(
402 finished_dataflow.spawn_result,
403 CachedResult::Cached { .. }
404 ) {
405 log::error!("pending spawn result on dataflow finish");
406 }
407 }
408 }
409 std::collections::hash_map::Entry::Vacant(_) => {
410 tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
411 }
412 }
413 }
414 },
415
416 Event::Control(event) => match event {
417 ControlEvent::IncomingRequest {
418 request,
419 reply_sender,
420 } => {
421 match request {
422 ControlRequest::Build {
423 session_id,
424 dataflow,
425 git_sources,
426 prev_git_sources,
427 local_working_dir,
428 uv,
429 } => {
430 let build_id = BuildId::generate();
432
433 let result = build_dataflow(
434 build_id,
435 session_id,
436 dataflow,
437 git_sources,
438 prev_git_sources,
439 local_working_dir,
440 &clock,
441 uv,
442 &mut daemon_connections,
443 )
444 .await;
445 match result {
446 Ok(build) => {
447 running_builds.insert(build_id, build);
448 let _ = reply_sender.send(Ok(
449 ControlRequestReply::DataflowBuildTriggered { build_id },
450 ));
451 }
452 Err(err) => {
453 let _ = reply_sender.send(Err(err));
454 }
455 }
456 }
457 ControlRequest::WaitForBuild { build_id } => {
458 if let Some(build) = running_builds.get_mut(&build_id) {
459 build.build_result.register(reply_sender);
460 } else if let Some(result) = finished_builds.get_mut(&build_id) {
461 result.register(reply_sender);
462 } else {
463 let _ =
464 reply_sender.send(Err(eyre!("unknown build id {build_id}")));
465 }
466 }
467 ControlRequest::Start {
468 build_id,
469 session_id,
470 dataflow,
471 name,
472 local_working_dir,
473 uv,
474 } => {
475 let name = name.or_else(|| petname(2, "-"));
476
477 let inner = async {
478 if let Some(name) = name.as_deref() {
479 if running_dataflows
481 .values()
482 .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
483 {
484 bail!(
485 "there is already a running dataflow with name `{name}`"
486 );
487 }
488 }
489 let dataflow = start_dataflow(
490 build_id,
491 session_id,
492 dataflow,
493 local_working_dir,
494 name,
495 &mut daemon_connections,
496 &clock,
497 uv,
498 )
499 .await?;
500 Ok(dataflow)
501 };
502 match inner.await {
503 Ok(dataflow) => {
504 let uuid = dataflow.uuid;
505 running_dataflows.insert(uuid, dataflow);
506 let _ = reply_sender.send(Ok(
507 ControlRequestReply::DataflowStartTriggered { uuid },
508 ));
509 }
510 Err(err) => {
511 let _ = reply_sender.send(Err(err));
512 }
513 }
514 }
515 ControlRequest::WaitForSpawn { dataflow_id } => {
516 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
517 dataflow.spawn_result.register(reply_sender);
518 } else {
519 let _ =
520 reply_sender.send(Err(eyre!("unknown dataflow {dataflow_id}")));
521 }
522 }
523 ControlRequest::Check { dataflow_uuid } => {
524 let status = match &running_dataflows.get(&dataflow_uuid) {
525 Some(_) => ControlRequestReply::DataflowSpawned {
526 uuid: dataflow_uuid,
527 },
528 None => ControlRequestReply::DataflowStopped {
529 uuid: dataflow_uuid,
530 result: dataflow_results
531 .get(&dataflow_uuid)
532 .map(|r| dataflow_result(r, dataflow_uuid, &clock))
533 .unwrap_or_else(|| {
534 DataflowResult::ok_empty(
535 dataflow_uuid,
536 clock.new_timestamp(),
537 )
538 }),
539 },
540 };
541 let _ = reply_sender.send(Ok(status));
542 }
543 ControlRequest::Reload {
544 dataflow_id,
545 node_id,
546 operator_id,
547 } => {
548 let reload = async {
549 reload_dataflow(
550 &running_dataflows,
551 dataflow_id,
552 node_id,
553 operator_id,
554 &mut daemon_connections,
555 clock.new_timestamp(),
556 )
557 .await?;
558 Result::<_, eyre::Report>::Ok(())
559 };
560 let reply =
561 reload
562 .await
563 .map(|()| ControlRequestReply::DataflowReloaded {
564 uuid: dataflow_id,
565 });
566 let _ = reply_sender.send(reply);
567 }
568 ControlRequest::Stop {
569 dataflow_uuid,
570 grace_duration,
571 } => {
572 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
573 let reply = ControlRequestReply::DataflowStopped {
574 uuid: dataflow_uuid,
575 result: dataflow_result(result, dataflow_uuid, &clock),
576 };
577 let _ = reply_sender.send(Ok(reply));
578
579 continue;
580 }
581
582 let dataflow = stop_dataflow(
583 &mut running_dataflows,
584 dataflow_uuid,
585 &mut daemon_connections,
586 clock.new_timestamp(),
587 grace_duration,
588 )
589 .await;
590
591 match dataflow {
592 Ok(dataflow) => {
593 dataflow.stop_reply_senders.push(reply_sender);
594 }
595 Err(err) => {
596 let _ = reply_sender.send(Err(err));
597 }
598 }
599 }
600 ControlRequest::StopByName {
601 name,
602 grace_duration,
603 } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
604 Ok(dataflow_uuid) => {
605 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
606 let reply = ControlRequestReply::DataflowStopped {
607 uuid: dataflow_uuid,
608 result: dataflow_result(result, dataflow_uuid, &clock),
609 };
610 let _ = reply_sender.send(Ok(reply));
611
612 continue;
613 }
614
615 let dataflow = stop_dataflow(
616 &mut running_dataflows,
617 dataflow_uuid,
618 &mut daemon_connections,
619 clock.new_timestamp(),
620 grace_duration,
621 )
622 .await;
623
624 match dataflow {
625 Ok(dataflow) => {
626 dataflow.stop_reply_senders.push(reply_sender);
627 }
628 Err(err) => {
629 let _ = reply_sender.send(Err(err));
630 }
631 }
632 }
633 Err(err) => {
634 let _ = reply_sender.send(Err(err));
635 }
636 },
637 ControlRequest::Logs { uuid, name, node } => {
638 let dataflow_uuid = if let Some(uuid) = uuid {
639 Ok(uuid)
640 } else if let Some(name) = name {
641 resolve_name(name, &running_dataflows, &archived_dataflows)
642 } else {
643 Err(eyre!("No uuid"))
644 };
645
646 match dataflow_uuid {
647 Ok(uuid) => {
648 let reply = retrieve_logs(
649 &running_dataflows,
650 &archived_dataflows,
651 uuid,
652 node.into(),
653 &mut daemon_connections,
654 clock.new_timestamp(),
655 )
656 .await
657 .map(ControlRequestReply::Logs);
658 let _ = reply_sender.send(reply);
659 }
660 Err(err) => {
661 let _ = reply_sender.send(Err(err));
662 }
663 }
664 }
665 ControlRequest::Destroy => {
666 tracing::info!("Received destroy command");
667
668 let reply = handle_destroy(
669 &mut running_dataflows,
670 &mut daemon_connections,
671 &abort_handle,
672 &mut daemon_events_tx,
673 &clock,
674 )
675 .await
676 .map(|()| ControlRequestReply::DestroyOk);
677 let _ = reply_sender.send(reply);
678 }
679 ControlRequest::List => {
680 let mut dataflows: Vec<_> = running_dataflows.values().collect();
681 dataflows.sort_by_key(|d| (&d.name, d.uuid));
682
683 let running = dataflows.into_iter().map(|d| DataflowListEntry {
684 id: DataflowIdAndName {
685 uuid: d.uuid,
686 name: d.name.clone(),
687 },
688 status: DataflowStatus::Running,
689 });
690 let finished_failed =
691 dataflow_results.iter().map(|(&uuid, results)| {
692 let name =
693 archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
694 let id = DataflowIdAndName { uuid, name };
695 let status = if results.values().all(|r| r.is_ok()) {
696 DataflowStatus::Finished
697 } else {
698 DataflowStatus::Failed
699 };
700 DataflowListEntry { id, status }
701 });
702
703 let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
704 running.chain(finished_failed).collect(),
705 )));
706 let _ = reply_sender.send(reply);
707 }
708 ControlRequest::DaemonConnected => {
709 let running = !daemon_connections.is_empty();
710 let _ = reply_sender
711 .send(Ok(ControlRequestReply::DaemonConnected(running)));
712 }
713 ControlRequest::ConnectedMachines => {
714 let reply = Ok(ControlRequestReply::ConnectedDaemons(
715 daemon_connections.keys().cloned().collect(),
716 ));
717 let _ = reply_sender.send(reply);
718 }
719 ControlRequest::LogSubscribe { .. } => {
720 let _ = reply_sender.send(Err(eyre::eyre!(
721 "LogSubscribe request should be handled separately"
722 )));
723 }
724 ControlRequest::BuildLogSubscribe { .. } => {
725 let _ = reply_sender.send(Err(eyre::eyre!(
726 "BuildLogSubscribe request should be handled separately"
727 )));
728 }
729 ControlRequest::CliAndDefaultDaemonOnSameMachine => {
730 let mut default_daemon_ip = None;
731 if let Some(default_id) = daemon_connections.unnamed().next() {
732 if let Some(connection) = daemon_connections.get(default_id) {
733 if let Ok(addr) = connection.stream.peer_addr() {
734 default_daemon_ip = Some(addr.ip());
735 }
736 }
737 }
738 let _ = reply_sender.send(Ok(
739 ControlRequestReply::CliAndDefaultDaemonIps {
740 default_daemon: default_daemon_ip,
741 cli: None, },
743 ));
744 }
745 }
746 }
747 ControlEvent::Error(err) => tracing::error!("{err:?}"),
748 ControlEvent::LogSubscribe {
749 dataflow_id,
750 level,
751 connection,
752 } => {
753 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
754 dataflow
755 .log_subscribers
756 .push(LogSubscriber::new(level, connection));
757 let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
758 for message in buffered {
759 send_log_message(&mut dataflow.log_subscribers, &message).await;
760 }
761 }
762 }
763 ControlEvent::BuildLogSubscribe {
764 build_id,
765 level,
766 connection,
767 } => {
768 if let Some(build) = running_builds.get_mut(&build_id) {
769 build
770 .log_subscribers
771 .push(LogSubscriber::new(level, connection));
772 let buffered = std::mem::take(&mut build.buffered_log_messages);
773 for message in buffered {
774 send_log_message(&mut build.log_subscribers, &message).await;
775 }
776 }
777 }
778 },
779 Event::DaemonHeartbeatInterval => {
780 let mut disconnected = BTreeSet::new();
781 for (machine_id, connection) in daemon_connections.iter_mut() {
782 if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
783 tracing::warn!(
784 "no heartbeat message from machine `{machine_id}` since {:?}",
785 connection.last_heartbeat.elapsed()
786 )
787 }
788 if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
789 disconnected.insert(machine_id.clone());
790 continue;
791 }
792 let result: eyre::Result<()> = tokio::time::timeout(
793 Duration::from_millis(500),
794 send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
795 )
796 .await
797 .wrap_err("timeout")
798 .and_then(|r| r)
799 .wrap_err_with(|| {
800 format!("failed to send heartbeat message to daemon at `{machine_id}`")
801 });
802 if let Err(err) = result {
803 tracing::warn!("{err:?}");
804 disconnected.insert(machine_id.clone());
805 }
806 }
807 if !disconnected.is_empty() {
808 tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
809 for machine_id in disconnected {
810 daemon_connections.remove(&machine_id);
811 }
812 }
813 }
814 Event::CtrlC => {
815 tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
816 handle_destroy(
817 &mut running_dataflows,
818 &mut daemon_connections,
819 &abort_handle,
820 &mut daemon_events_tx,
821 &clock,
822 )
823 .await?;
824 }
825 Event::DaemonHeartbeat {
826 daemon_id: machine_id,
827 } => {
828 if let Some(connection) = daemon_connections.get_mut(&machine_id) {
829 connection.last_heartbeat = Instant::now();
830 }
831 }
832 Event::Log(message) => {
833 if let Some(dataflow_id) = &message.dataflow_id {
834 if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
835 if dataflow.log_subscribers.is_empty() {
836 dataflow.buffered_log_messages.push(message);
838 } else {
839 send_log_message(&mut dataflow.log_subscribers, &message).await;
840 }
841 }
842 } else if let Some(build_id) = &message.build_id {
843 if let Some(build) = running_builds.get_mut(build_id) {
844 if build.log_subscribers.is_empty() {
845 build.buffered_log_messages.push(message);
847 } else {
848 send_log_message(&mut build.log_subscribers, &message).await;
849 }
850 }
851 }
852 }
853 Event::DaemonExit { daemon_id } => {
854 tracing::info!("Daemon `{daemon_id}` exited");
855 daemon_connections.remove(&daemon_id);
856 }
857 Event::DataflowBuildResult {
858 build_id,
859 daemon_id,
860 result,
861 } => match running_builds.get_mut(&build_id) {
862 Some(build) => {
863 build.pending_build_results.remove(&daemon_id);
864 match result {
865 Ok(()) => {}
866 Err(err) => {
867 build.errors.push(format!("{err:?}"));
868 }
869 };
870 if build.pending_build_results.is_empty() {
871 tracing::info!("dataflow build finished: `{build_id}`");
872 let mut build = running_builds.remove(&build_id).unwrap();
873 let result = if build.errors.is_empty() {
874 Ok(())
875 } else {
876 Err(format!("build failed: {}", build.errors.join("\n\n")))
877 };
878
879 build.build_result.set_result(Ok(
880 ControlRequestReply::DataflowBuildFinished { build_id, result },
881 ));
882
883 finished_builds.insert(build_id, build.build_result);
884 }
885 }
886 None => {
887 tracing::warn!(
888 "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
889 );
890 }
891 },
892 Event::DataflowSpawnResult {
893 dataflow_id,
894 daemon_id,
895 result,
896 } => match running_dataflows.get_mut(&dataflow_id) {
897 Some(dataflow) => {
898 dataflow.pending_spawn_results.remove(&daemon_id);
899 match result {
900 Ok(()) => {
901 if dataflow.pending_spawn_results.is_empty() {
902 tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
903 dataflow.spawn_result.set_result(Ok(
904 ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
905 ));
906 }
907 }
908 Err(err) => {
909 tracing::warn!("error while spawning dataflow `{dataflow_id}`");
910 dataflow.spawn_result.set_result(Err(err));
911 }
912 };
913 }
914 None => {
915 tracing::warn!(
916 "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
917 );
918 }
919 },
920 }
921
922 let elapsed = start.elapsed();
924 if elapsed > Duration::from_millis(100) {
925 tracing::warn!(
926 "Coordinator took {}ms for handling event: {event_kind}",
927 elapsed.as_millis()
928 );
929 }
930 }
931
932 tracing::info!("stopped");
933
934 Ok(())
935}
936
937async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
938 for subscriber in log_subscribers.iter_mut() {
939 let send_result =
940 tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
941
942 if send_result.await.is_err() {
943 subscriber.close();
944 }
945 }
946 log_subscribers.retain(|s| !s.is_closed());
947}
948
949fn dataflow_result(
950 results: &BTreeMap<DaemonId, DataflowDaemonResult>,
951 dataflow_uuid: Uuid,
952 clock: &uhlc::HLC,
953) -> DataflowResult {
954 let mut node_results = BTreeMap::new();
955 for result in results.values() {
956 node_results.extend(result.node_results.clone());
957 if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
958 tracing::warn!("failed to update HLC: {err}");
959 }
960 }
961
962 DataflowResult {
963 uuid: dataflow_uuid,
964 timestamp: clock.new_timestamp(),
965 node_results,
966 }
967}
968
969struct DaemonConnection {
970 stream: TcpStream,
971 last_heartbeat: Instant,
972}
973
974async fn handle_destroy(
975 running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
976 daemon_connections: &mut DaemonConnections,
977 abortable_events: &futures::stream::AbortHandle,
978 daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
979 clock: &HLC,
980) -> Result<(), eyre::ErrReport> {
981 abortable_events.abort();
982 for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
983 let _ = stop_dataflow(
984 running_dataflows,
985 dataflow_uuid,
986 daemon_connections,
987 clock.new_timestamp(),
988 None,
989 )
990 .await?;
991 }
992
993 let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
994 *daemon_events_tx = None;
995 result
996}
997
998async fn send_heartbeat_message(
999 connection: &mut TcpStream,
1000 timestamp: uhlc::Timestamp,
1001) -> eyre::Result<()> {
1002 let message = serde_json::to_vec(&Timestamped {
1003 inner: DaemonCoordinatorEvent::Heartbeat,
1004 timestamp,
1005 })
1006 .context("Could not serialize heartbeat message")?;
1007
1008 tcp_send(connection, &message)
1009 .await
1010 .wrap_err("failed to send heartbeat message to daemon")
1011}
1012
1013struct RunningBuild {
1014 errors: Vec<String>,
1015 build_result: CachedResult,
1016
1017 buffered_log_messages: Vec<LogMessage>,
1019 log_subscribers: Vec<LogSubscriber>,
1020
1021 pending_build_results: BTreeSet<DaemonId>,
1022}
1023
1024struct RunningDataflow {
1025 name: Option<String>,
1026 uuid: Uuid,
1027 daemons: BTreeSet<DaemonId>,
1029 pending_daemons: BTreeSet<DaemonId>,
1031 exited_before_subscribe: Vec<NodeId>,
1032 nodes: BTreeMap<NodeId, ResolvedNode>,
1033
1034 spawn_result: CachedResult,
1035 stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1036
1037 buffered_log_messages: Vec<LogMessage>,
1039 log_subscribers: Vec<LogSubscriber>,
1040
1041 pending_spawn_results: BTreeSet<DaemonId>,
1042}
1043
1044pub enum CachedResult {
1045 Pending {
1046 result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1047 },
1048 Cached {
1049 result: eyre::Result<ControlRequestReply>,
1050 },
1051}
1052
1053impl Default for CachedResult {
1054 fn default() -> Self {
1055 Self::Pending {
1056 result_senders: Vec::new(),
1057 }
1058 }
1059}
1060
1061impl CachedResult {
1062 fn register(
1063 &mut self,
1064 reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
1065 ) {
1066 match self {
1067 CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
1068 CachedResult::Cached { result } => {
1069 Self::send_result_to(result, reply_sender);
1070 }
1071 }
1072 }
1073
1074 fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
1075 match self {
1076 CachedResult::Pending { result_senders } => {
1077 for sender in result_senders.drain(..) {
1078 Self::send_result_to(&result, sender);
1079 }
1080 *self = CachedResult::Cached { result };
1081 }
1082 CachedResult::Cached { .. } => {}
1083 }
1084 }
1085
1086 fn send_result_to(
1087 result: &eyre::Result<ControlRequestReply>,
1088 sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
1089 ) {
1090 let result = match result {
1091 Ok(r) => Ok(r.clone()),
1092 Err(err) => Err(eyre!("{err:?}")),
1093 };
1094 let _ = sender.send(result);
1095 }
1096}
1097
1098struct ArchivedDataflow {
1099 name: Option<String>,
1100 nodes: BTreeMap<NodeId, ResolvedNode>,
1101}
1102
1103impl From<&RunningDataflow> for ArchivedDataflow {
1104 fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
1105 ArchivedDataflow {
1106 name: dataflow.name.clone(),
1107 nodes: dataflow.nodes.clone(),
1108 }
1109 }
1110}
1111
1112impl PartialEq for RunningDataflow {
1113 fn eq(&self, other: &Self) -> bool {
1114 self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
1115 }
1116}
1117
1118impl Eq for RunningDataflow {}
1119
1120async fn stop_dataflow<'a>(
1121 running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
1122 dataflow_uuid: Uuid,
1123 daemon_connections: &mut DaemonConnections,
1124 timestamp: uhlc::Timestamp,
1125 grace_duration: Option<Duration>,
1126) -> eyre::Result<&'a mut RunningDataflow> {
1127 let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
1128 bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
1129 };
1130
1131 let message = serde_json::to_vec(&Timestamped {
1132 inner: DaemonCoordinatorEvent::StopDataflow {
1133 dataflow_id: dataflow_uuid,
1134 grace_duration,
1135 },
1136 timestamp,
1137 })?;
1138
1139 for daemon_id in &dataflow.daemons {
1140 let daemon_connection = daemon_connections
1141 .get_mut(daemon_id)
1142 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
1144 .await
1145 .wrap_err("failed to send stop message to daemon")?;
1146
1147 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1149 .await
1150 .wrap_err("failed to receive stop reply from daemon")?;
1151 match serde_json::from_slice(&reply_raw)
1152 .wrap_err("failed to deserialize stop reply from daemon")?
1153 {
1154 DaemonCoordinatorReply::StopResult(result) => result
1155 .map_err(|e| eyre!(e))
1156 .wrap_err("failed to stop dataflow")?,
1157 other => bail!("unexpected reply after sending stop: {other:?}"),
1158 }
1159 }
1160
1161 tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
1162
1163 Ok(dataflow)
1164}
1165
1166async fn reload_dataflow(
1167 running_dataflows: &HashMap<Uuid, RunningDataflow>,
1168 dataflow_id: Uuid,
1169 node_id: NodeId,
1170 operator_id: Option<OperatorId>,
1171 daemon_connections: &mut DaemonConnections,
1172 timestamp: uhlc::Timestamp,
1173) -> eyre::Result<()> {
1174 let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
1175 bail!("No running dataflow found with UUID `{dataflow_id}`")
1176 };
1177 let message = serde_json::to_vec(&Timestamped {
1178 inner: DaemonCoordinatorEvent::ReloadDataflow {
1179 dataflow_id,
1180 node_id,
1181 operator_id,
1182 },
1183 timestamp,
1184 })?;
1185
1186 for machine_id in &dataflow.daemons {
1187 let daemon_connection = daemon_connections
1188 .get_mut(machine_id)
1189 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
1191 .await
1192 .wrap_err("failed to send reload message to daemon")?;
1193
1194 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1196 .await
1197 .wrap_err("failed to receive reload reply from daemon")?;
1198 match serde_json::from_slice(&reply_raw)
1199 .wrap_err("failed to deserialize reload reply from daemon")?
1200 {
1201 DaemonCoordinatorReply::ReloadResult(result) => result
1202 .map_err(|e| eyre!(e))
1203 .wrap_err("failed to reload dataflow")?,
1204 other => bail!("unexpected reply after sending reload: {other:?}"),
1205 }
1206 }
1207 tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
1208
1209 Ok(())
1210}
1211
1212async fn retrieve_logs(
1213 running_dataflows: &HashMap<Uuid, RunningDataflow>,
1214 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
1215 dataflow_id: Uuid,
1216 node_id: NodeId,
1217 daemon_connections: &mut DaemonConnections,
1218 timestamp: uhlc::Timestamp,
1219) -> eyre::Result<Vec<u8>> {
1220 let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
1221 dataflow.nodes.clone()
1222 } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
1223 dataflow.nodes.clone()
1224 } else {
1225 bail!("No dataflow found with UUID `{dataflow_id}`")
1226 };
1227
1228 let message = serde_json::to_vec(&Timestamped {
1229 inner: DaemonCoordinatorEvent::Logs {
1230 dataflow_id,
1231 node_id: node_id.clone(),
1232 },
1233 timestamp,
1234 })?;
1235
1236 let machine_ids: Vec<Option<String>> = nodes
1237 .values()
1238 .filter(|node| node.id == node_id)
1239 .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
1240 .collect();
1241
1242 let machine_id = if let [machine_id] = &machine_ids[..] {
1243 machine_id
1244 } else if machine_ids.is_empty() {
1245 bail!("No machine contains {}/{}", dataflow_id, node_id)
1246 } else {
1247 bail!(
1248 "More than one machine contains {}/{}. However, it should only be present on one.",
1249 dataflow_id,
1250 node_id
1251 )
1252 };
1253
1254 let daemon_ids: Vec<_> = match machine_id {
1255 None => daemon_connections.unnamed().collect(),
1256 Some(machine_id) => daemon_connections
1257 .get_matching_daemon_id(machine_id)
1258 .into_iter()
1259 .collect(),
1260 };
1261 let daemon_id = match &daemon_ids[..] {
1262 [id] => (*id).clone(),
1263 [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
1264 _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
1265 };
1266 let daemon_connection = daemon_connections
1267 .get_mut(&daemon_id)
1268 .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
1269 tcp_send(&mut daemon_connection.stream, &message)
1270 .await
1271 .wrap_err("failed to send logs message to daemon")?;
1272
1273 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1275 .await
1276 .wrap_err("failed to retrieve logs reply from daemon")?;
1277 let reply_logs = match serde_json::from_slice(&reply_raw)
1278 .wrap_err("failed to deserialize logs reply from daemon")?
1279 {
1280 DaemonCoordinatorReply::Logs(logs) => logs,
1281 other => bail!("unexpected reply after sending logs: {other:?}"),
1282 };
1283 tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
1284
1285 reply_logs.map_err(|err| eyre!(err))
1286}
1287
1288#[allow(clippy::too_many_arguments)]
1289#[tracing::instrument(skip(daemon_connections, clock))]
1290async fn build_dataflow(
1291 build_id: BuildId,
1292 session_id: SessionId,
1293 dataflow: Descriptor,
1294 git_sources: BTreeMap<NodeId, GitSource>,
1295 prev_git_sources: BTreeMap<NodeId, GitSource>,
1296 local_working_dir: Option<PathBuf>,
1297 clock: &HLC,
1298 uv: bool,
1299 daemon_connections: &mut DaemonConnections,
1300) -> eyre::Result<RunningBuild> {
1301 let nodes = dataflow.resolve_aliases_and_set_defaults()?;
1302
1303 let mut git_sources_by_daemon = git_sources
1304 .into_iter()
1305 .into_grouping_map_by(|(id, _)| {
1306 nodes
1307 .get(id)
1308 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1309 })
1310 .collect();
1311 let mut prev_git_sources_by_daemon = prev_git_sources
1312 .into_iter()
1313 .into_grouping_map_by(|(id, _)| {
1314 nodes
1315 .get(id)
1316 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1317 })
1318 .collect();
1319
1320 let nodes_by_daemon = nodes
1321 .values()
1322 .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
1323
1324 let mut daemons = BTreeSet::new();
1325 for (machine, nodes_on_machine) in &nodes_by_daemon {
1326 let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
1327 tracing::debug!(
1328 "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
1329 );
1330
1331 let build_command = BuildDataflowNodes {
1332 build_id,
1333 session_id,
1334 local_working_dir: local_working_dir.clone(),
1335 git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
1336 prev_git_sources: prev_git_sources_by_daemon
1337 .remove(machine)
1338 .unwrap_or_default(),
1339 dataflow_descriptor: dataflow.clone(),
1340 nodes_on_machine,
1341 uv,
1342 };
1343 let message = serde_json::to_vec(&Timestamped {
1344 inner: DaemonCoordinatorEvent::Build(build_command),
1345 timestamp: clock.new_timestamp(),
1346 })?;
1347
1348 let daemon_id =
1349 build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
1350 .await
1351 .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
1352 daemons.insert(daemon_id);
1353 }
1354
1355 tracing::info!("successfully triggered dataflow build `{build_id}`",);
1356
1357 Ok(RunningBuild {
1358 errors: Vec::new(),
1359 build_result: CachedResult::default(),
1360 buffered_log_messages: Vec::new(),
1361 log_subscribers: Vec::new(),
1362 pending_build_results: daemons,
1363 })
1364}
1365
1366async fn build_dataflow_on_machine(
1367 daemon_connections: &mut DaemonConnections,
1368 machine: Option<&str>,
1369 message: &[u8],
1370) -> Result<DaemonId, eyre::ErrReport> {
1371 let daemon_id = match machine {
1372 Some(machine) => daemon_connections
1373 .get_matching_daemon_id(machine)
1374 .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1375 .clone(),
1376 None => daemon_connections
1377 .unnamed()
1378 .next()
1379 .wrap_err("no unnamed daemon connections")?
1380 .clone(),
1381 };
1382
1383 let daemon_connection = daemon_connections
1384 .get_mut(&daemon_id)
1385 .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
1386 tcp_send(&mut daemon_connection.stream, message)
1387 .await
1388 .wrap_err("failed to send build message to daemon")?;
1389
1390 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1391 .await
1392 .wrap_err("failed to receive build reply from daemon")?;
1393 match serde_json::from_slice(&reply_raw)
1394 .wrap_err("failed to deserialize build reply from daemon")?
1395 {
1396 DaemonCoordinatorReply::TriggerBuildResult(result) => result
1397 .map_err(|e| eyre!(e))
1398 .wrap_err("daemon returned an error")?,
1399 _ => bail!("unexpected reply"),
1400 }
1401 Ok(daemon_id)
1402}
1403
1404#[allow(clippy::too_many_arguments)]
1405async fn start_dataflow(
1406 build_id: Option<BuildId>,
1407 session_id: SessionId,
1408 dataflow: Descriptor,
1409 local_working_dir: Option<PathBuf>,
1410 name: Option<String>,
1411 daemon_connections: &mut DaemonConnections,
1412 clock: &HLC,
1413 uv: bool,
1414) -> eyre::Result<RunningDataflow> {
1415 let SpawnedDataflow {
1416 uuid,
1417 daemons,
1418 nodes,
1419 } = spawn_dataflow(
1420 build_id,
1421 session_id,
1422 dataflow,
1423 local_working_dir,
1424 daemon_connections,
1425 clock,
1426 uv,
1427 )
1428 .await?;
1429 Ok(RunningDataflow {
1430 uuid,
1431 name,
1432 pending_daemons: if daemons.len() > 1 {
1433 daemons.clone()
1434 } else {
1435 BTreeSet::new()
1436 },
1437 exited_before_subscribe: Default::default(),
1438 daemons: daemons.clone(),
1439 nodes,
1440 spawn_result: CachedResult::default(),
1441 stop_reply_senders: Vec::new(),
1442 buffered_log_messages: Vec::new(),
1443 log_subscribers: Vec::new(),
1444 pending_spawn_results: daemons,
1445 })
1446}
1447
1448async fn destroy_daemon(
1449 daemon_id: DaemonId,
1450 mut daemon_connection: DaemonConnection,
1451
1452 timestamp: uhlc::Timestamp,
1453) -> Result<()> {
1454 let message = serde_json::to_vec(&Timestamped {
1455 inner: DaemonCoordinatorEvent::Destroy,
1456 timestamp,
1457 })?;
1458
1459 tcp_send(&mut daemon_connection.stream, &message)
1460 .await
1461 .wrap_err(format!(
1462 "failed to send destroy message to daemon `{daemon_id}`"
1463 ))?;
1464
1465 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1467 .await
1468 .wrap_err("failed to receive destroy reply from daemon")?;
1469 match serde_json::from_slice(&reply_raw)
1470 .wrap_err("failed to deserialize destroy reply from daemon")?
1471 {
1472 DaemonCoordinatorReply::DestroyResult { result, .. } => result
1473 .map_err(|e| eyre!(e))
1474 .wrap_err("failed to destroy dataflow")?,
1475 other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1476 }
1477
1478 tracing::info!("successfully destroyed daemon `{daemon_id}`");
1479 Ok(())
1480}
1481
1482async fn destroy_daemons(
1483 daemon_connections: &mut DaemonConnections,
1484 timestamp: uhlc::Timestamp,
1485) -> eyre::Result<()> {
1486 let futures = daemon_connections
1487 .drain()
1488 .map(|(daemon_id, daemon_connection)| {
1489 destroy_daemon(daemon_id, daemon_connection, timestamp)
1490 })
1491 .collect::<Vec<_>>();
1492 let results: Vec<std::result::Result<(), eyre::Error>> =
1493 join_all(futures).await.into_iter().collect::<Vec<_>>();
1494 for result in results {
1495 result?;
1496 }
1497 Ok(())
1498}
1499
1500#[derive(Debug)]
1501pub enum Event {
1502 NewDaemonConnection(TcpStream),
1503 DaemonConnectError(eyre::Report),
1504 DaemonHeartbeat {
1505 daemon_id: DaemonId,
1506 },
1507 Dataflow {
1508 uuid: Uuid,
1509 event: DataflowEvent,
1510 },
1511 Control(ControlEvent),
1512 Daemon(DaemonRequest),
1513 DaemonHeartbeatInterval,
1514 CtrlC,
1515 Log(LogMessage),
1516 DaemonExit {
1517 daemon_id: dora_message::common::DaemonId,
1518 },
1519 DataflowBuildResult {
1520 build_id: BuildId,
1521 daemon_id: DaemonId,
1522 result: eyre::Result<()>,
1523 },
1524 DataflowSpawnResult {
1525 dataflow_id: uuid::Uuid,
1526 daemon_id: DaemonId,
1527 result: eyre::Result<()>,
1528 },
1529}
1530
1531impl Event {
1532 #[allow(clippy::match_like_matches_macro)]
1534 pub fn log(&self) -> bool {
1535 match self {
1536 Event::DaemonHeartbeatInterval => false,
1537 _ => true,
1538 }
1539 }
1540
1541 fn kind(&self) -> &'static str {
1542 match self {
1543 Event::NewDaemonConnection(_) => "NewDaemonConnection",
1544 Event::DaemonConnectError(_) => "DaemonConnectError",
1545 Event::DaemonHeartbeat { .. } => "DaemonHeartbeat",
1546 Event::Dataflow { .. } => "Dataflow",
1547 Event::Control(_) => "Control",
1548 Event::Daemon(_) => "Daemon",
1549 Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1550 Event::CtrlC => "CtrlC",
1551 Event::Log(_) => "Log",
1552 Event::DaemonExit { .. } => "DaemonExit",
1553 Event::DataflowBuildResult { .. } => "DataflowBuildResult",
1554 Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
1555 }
1556 }
1557}
1558
1559#[derive(Debug)]
1560pub enum DataflowEvent {
1561 DataflowFinishedOnDaemon {
1562 daemon_id: DaemonId,
1563 result: DataflowDaemonResult,
1564 },
1565 ReadyOnDaemon {
1566 daemon_id: DaemonId,
1567 exited_before_subscribe: Vec<NodeId>,
1568 },
1569}
1570
1571#[derive(Debug)]
1572pub enum DaemonRequest {
1573 Register {
1574 version_check_result: Result<(), String>,
1575 machine_id: Option<String>,
1576 connection: TcpStream,
1577 },
1578}
1579
1580fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1581 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1582
1583 let mut ctrlc_sent = false;
1584 ctrlc::set_handler(move || {
1585 if ctrlc_sent {
1586 tracing::warn!("received second ctrlc signal -> aborting immediately");
1587 std::process::abort();
1588 } else {
1589 tracing::info!("received ctrlc signal");
1590 if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1591 tracing::error!("failed to report ctrl-c event to dora-coordinator");
1592 }
1593
1594 ctrlc_sent = true;
1595 }
1596 })
1597 .wrap_err("failed to set ctrl-c handler")?;
1598
1599 Ok(ReceiverStream::new(ctrlc_rx))
1600}