1use crate::{
2 run::spawn_dataflow,
3 tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7 config::{NodeId, OperatorId},
8 descriptor::DescriptorExt,
9 uhlc::{self, HLC},
10};
11use dora_message::{
12 BuildId, DataflowId, SessionId,
13 cli_to_coordinator::ControlRequest,
14 common::{DaemonId, GitSource},
15 coordinator_to_cli::{
16 ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
17 DataflowStatus, LogLevel, LogMessage,
18 },
19 coordinator_to_daemon::{
20 BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
21 },
22 daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
23 descriptor::{Descriptor, ResolvedNode},
24};
25use eyre::{ContextCompat, Result, WrapErr, bail, eyre};
26use futures::{Future, Stream, StreamExt, future::join_all, stream::FuturesUnordered};
27use futures_concurrency::stream::Merge;
28use itertools::Itertools;
29use log_subscriber::LogSubscriber;
30use petname::petname;
31use run::SpawnedDataflow;
32use std::{
33 collections::{BTreeMap, BTreeSet, HashMap},
34 net::SocketAddr,
35 path::PathBuf,
36 sync::Arc,
37 time::{Duration, Instant},
38};
39use tokio::{
40 net::TcpStream,
41 sync::{mpsc, oneshot},
42 task::JoinHandle,
43};
44use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
45use uuid::Uuid;
46
47mod control;
48mod listener;
49mod log_subscriber;
50mod run;
51mod tcp_utils;
52
53pub async fn start(
54 bind: SocketAddr,
55 bind_control: SocketAddr,
56 external_events: impl Stream<Item = Event> + Unpin,
57) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
58 let listener = listener::create_listener(bind).await?;
59 let port = listener
60 .local_addr()
61 .wrap_err("failed to get local addr of listener")?
62 .port();
63 let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
64 c.map(Event::NewDaemonConnection)
65 .wrap_err("failed to open connection")
66 .unwrap_or_else(Event::DaemonConnectError)
67 });
68
69 let mut tasks = FuturesUnordered::new();
70 let control_events = control::control_events(bind_control, &tasks)
71 .await
72 .wrap_err("failed to create control events")?;
73
74 let ctrlc_events = set_up_ctrlc_handler()?;
76
77 let events = (
78 external_events,
79 new_daemon_connections,
80 control_events,
81 ctrlc_events,
82 )
83 .merge();
84
85 let future = async move {
86 start_inner(events, &tasks).await?;
87
88 tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
89 while let Some(join_result) = tasks.next().await {
90 if let Err(err) = join_result {
91 tracing::error!("task panicked: {err}");
92 }
93 }
94 tracing::debug!("all spawned tasks finished, exiting..");
95 Ok(())
96 };
97 Ok((port, future))
98}
99
100fn resolve_name(
102 name: String,
103 running_dataflows: &HashMap<Uuid, RunningDataflow>,
104 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
105) -> eyre::Result<Uuid> {
106 let uuids: Vec<_> = running_dataflows
107 .iter()
108 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
109 .map(|(k, _)| k)
110 .copied()
111 .collect();
112 let archived_uuids: Vec<_> = archived_dataflows
113 .iter()
114 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
115 .map(|(k, _)| k)
116 .copied()
117 .collect();
118
119 if uuids.is_empty() {
120 if archived_uuids.is_empty() {
121 bail!("no dataflow with name `{name}`");
122 } else if let [uuid] = archived_uuids.as_slice() {
123 Ok(*uuid)
124 } else {
125 bail!(
127 "multiple archived dataflows found with name `{name}`, Please provide the UUID instead."
128 );
129 }
130 } else if let [uuid] = uuids.as_slice() {
131 Ok(*uuid)
132 } else {
133 bail!("multiple dataflows found with name `{name}`");
134 }
135}
136
137#[derive(Default)]
138struct DaemonConnections {
139 daemons: BTreeMap<DaemonId, DaemonConnection>,
140}
141
142impl DaemonConnections {
143 fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
144 let previous = self.daemons.insert(daemon_id.clone(), connection);
145 if previous.is_some() {
146 tracing::info!("closing previous connection `{daemon_id}` on new register");
147 }
148 }
149
150 fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
151 self.daemons.get(id)
152 }
153
154 fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
155 self.daemons.get_mut(id)
156 }
157
158 fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
159 self.daemons
160 .keys()
161 .find(|id| id.matches_machine_id(machine_id))
162 }
163
164 fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
165 std::mem::take(&mut self.daemons).into_iter()
166 }
167
168 fn is_empty(&self) -> bool {
169 self.daemons.is_empty()
170 }
171
172 fn keys(&self) -> impl Iterator<Item = &DaemonId> {
173 self.daemons.keys()
174 }
175
176 fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
177 self.daemons.iter_mut()
178 }
179
180 fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
181 self.daemons.remove(daemon_id)
182 }
183
184 fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
185 self.daemons.keys().filter(|id| id.machine_id().is_none())
186 }
187}
188
189async fn start_inner(
190 events: impl Stream<Item = Event> + Unpin,
191 tasks: &FuturesUnordered<JoinHandle<()>>,
192) -> eyre::Result<()> {
193 let clock = Arc::new(HLC::default());
194
195 let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
196 let mut daemon_events_tx = Some(daemon_events_tx);
197 let daemon_events = ReceiverStream::new(daemon_events);
198
199 let daemon_heartbeat_interval =
200 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
201 .map(|_| Event::DaemonHeartbeatInterval);
202
203 let (abortable_events, abort_handle) =
205 futures::stream::abortable((events, daemon_heartbeat_interval).merge());
206
207 let mut events = (abortable_events, daemon_events).merge();
208
209 let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
210 let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();
211
212 let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
213 let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
214 HashMap::new();
215 let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
216 let mut daemon_connections = DaemonConnections::default();
217
218 while let Some(event) = events.next().await {
219 let start = Instant::now();
221 let event_kind = event.kind();
222
223 if event.log() {
224 tracing::trace!("Handling event {event:?}");
225 }
226 match event {
227 Event::NewDaemonConnection(connection) => {
228 connection.set_nodelay(true)?;
229 let events_tx = daemon_events_tx.clone();
230 if let Some(events_tx) = events_tx {
231 let task = tokio::spawn(listener::handle_connection(
232 connection,
233 events_tx,
234 clock.clone(),
235 ));
236 tasks.push(task);
237 } else {
238 tracing::warn!(
239 "ignoring new daemon connection because events_tx was closed already"
240 );
241 }
242 }
243 Event::DaemonConnectError(err) => {
244 tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
245 }
246 Event::Daemon(event) => match event {
247 DaemonRequest::Register {
248 machine_id,
249 mut connection,
250 version_check_result,
251 } => {
252 let existing = match &machine_id {
253 Some(id) => daemon_connections.get_matching_daemon_id(id),
254 None => daemon_connections.unnamed().next(),
255 };
256 let existing_result = if existing.is_some() {
257 Err(format!(
258 "There is already a connected daemon with machine ID `{machine_id:?}`"
259 ))
260 } else {
261 Ok(())
262 };
263
264 let daemon_id = DaemonId::new(machine_id);
266
267 let reply: Timestamped<RegisterResult> = Timestamped {
268 inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
269 Ok(_) => RegisterResult::Ok {
270 daemon_id: daemon_id.clone(),
271 },
272 Err(err) => RegisterResult::Err(err.clone()),
273 },
274 timestamp: clock.new_timestamp(),
275 };
276
277 let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
278 .await
279 .context("tcp send failed");
280 match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
281 Ok(()) => {
282 daemon_connections.add(
283 daemon_id.clone(),
284 DaemonConnection {
285 stream: connection,
286 last_heartbeat: Instant::now(),
287 },
288 );
289 }
290 Err(err) => {
291 tracing::warn!(
292 "failed to register daemon connection for daemon `{daemon_id}`: {err}"
293 );
294 }
295 }
296 }
297 },
298 Event::Dataflow { uuid, event } => match event {
299 DataflowEvent::ReadyOnDaemon {
300 daemon_id,
301 exited_before_subscribe,
302 } => {
303 match running_dataflows.entry(uuid) {
304 std::collections::hash_map::Entry::Occupied(mut entry) => {
305 let dataflow = entry.get_mut();
306 dataflow.pending_daemons.remove(&daemon_id);
307 dataflow
308 .exited_before_subscribe
309 .extend(exited_before_subscribe);
310 if dataflow.pending_daemons.is_empty() {
311 tracing::debug!("sending all nodes ready message to daemons");
312 let message = serde_json::to_vec(&Timestamped {
313 inner: DaemonCoordinatorEvent::AllNodesReady {
314 dataflow_id: uuid,
315 exited_before_subscribe: dataflow
316 .exited_before_subscribe
317 .clone(),
318 },
319 timestamp: clock.new_timestamp(),
320 })
321 .wrap_err("failed to serialize AllNodesReady message")?;
322
323 for daemon_id in &dataflow.daemons {
325 let Some(connection) = daemon_connections.get_mut(daemon_id)
326 else {
327 tracing::warn!(
328 "no daemon connection found for machine `{daemon_id}`"
329 );
330 continue;
331 };
332 tcp_send(&mut connection.stream, &message)
333 .await
334 .wrap_err_with(|| {
335 format!(
336 "failed to send AllNodesReady({uuid}) message \
337 to machine {daemon_id}"
338 )
339 })?;
340 }
341 }
342 }
343 std::collections::hash_map::Entry::Vacant(_) => {
344 tracing::warn!("dataflow not running on ReadyOnMachine");
345 }
346 }
347 }
348 DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
349 tracing::debug!(
350 "coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})"
351 );
352 match running_dataflows.entry(uuid) {
353 std::collections::hash_map::Entry::Occupied(mut entry) => {
354 let dataflow = entry.get_mut();
355 dataflow.daemons.remove(&daemon_id);
356 tracing::info!(
357 "removed machine id: {daemon_id} from dataflow: {:#?}",
358 dataflow.uuid
359 );
360 dataflow_results
361 .entry(uuid)
362 .or_default()
363 .insert(daemon_id, result);
364
365 if dataflow.daemons.is_empty() {
366 archived_dataflows
368 .entry(uuid)
369 .or_insert_with(|| ArchivedDataflow::from(entry.get()));
370 let mut finished_dataflow = entry.remove();
371 let dataflow_id = finished_dataflow.uuid;
372 send_log_message(
373 &mut finished_dataflow.log_subscribers,
374 &LogMessage {
375 build_id: None,
376 dataflow_id: Some(dataflow_id),
377 node_id: None,
378 daemon_id: None,
379 level: LogLevel::Info.into(),
380 target: Some("coordinator".into()),
381 module_path: None,
382 file: None,
383 line: None,
384 message: "dataflow finished".into(),
385 timestamp: clock
386 .new_timestamp()
387 .get_time()
388 .to_system_time()
389 .into(),
390 fields: None,
391 },
392 )
393 .await;
394
395 let reply = ControlRequestReply::DataflowStopped {
396 uuid,
397 result: dataflow_results
398 .get(&uuid)
399 .map(|r| dataflow_result(r, uuid, &clock))
400 .unwrap_or_else(|| {
401 DataflowResult::ok_empty(uuid, clock.new_timestamp())
402 }),
403 };
404 for sender in finished_dataflow.stop_reply_senders {
405 let _ = sender.send(Ok(reply.clone()));
406 }
407 if !matches!(
408 finished_dataflow.spawn_result,
409 CachedResult::Cached { .. }
410 ) {
411 log::error!("pending spawn result on dataflow finish");
412 }
413 }
414 }
415 std::collections::hash_map::Entry::Vacant(_) => {
416 tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
417 }
418 }
419 }
420 },
421
422 Event::Control(event) => match event {
423 ControlEvent::IncomingRequest {
424 request,
425 reply_sender,
426 } => {
427 match request {
428 ControlRequest::Build {
429 session_id,
430 dataflow,
431 git_sources,
432 prev_git_sources,
433 local_working_dir,
434 uv,
435 } => {
436 let build_id = BuildId::generate();
438
439 let result = build_dataflow(
440 build_id,
441 session_id,
442 dataflow,
443 git_sources,
444 prev_git_sources,
445 local_working_dir,
446 &clock,
447 uv,
448 &mut daemon_connections,
449 )
450 .await;
451 match result {
452 Ok(build) => {
453 running_builds.insert(build_id, build);
454 let _ = reply_sender.send(Ok(
455 ControlRequestReply::DataflowBuildTriggered { build_id },
456 ));
457 }
458 Err(err) => {
459 let _ = reply_sender.send(Err(err));
460 }
461 }
462 }
463 ControlRequest::WaitForBuild { build_id } => {
464 if let Some(build) = running_builds.get_mut(&build_id) {
465 build.build_result.register(reply_sender);
466 } else if let Some(result) = finished_builds.get_mut(&build_id) {
467 result.register(reply_sender);
468 } else {
469 let _ =
470 reply_sender.send(Err(eyre!("unknown build id {build_id}")));
471 }
472 }
473 ControlRequest::Start {
474 build_id,
475 session_id,
476 dataflow,
477 name,
478 local_working_dir,
479 uv,
480 write_events_to,
481 } => {
482 let name = name.or_else(|| petname(2, "-"));
483
484 let inner = async {
485 if let Some(name) = name.as_deref() {
486 if running_dataflows
488 .values()
489 .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
490 {
491 bail!(
492 "there is already a running dataflow with name `{name}`"
493 );
494 }
495 }
496 let dataflow = start_dataflow(
497 build_id,
498 session_id,
499 dataflow,
500 local_working_dir,
501 name,
502 &mut daemon_connections,
503 &clock,
504 uv,
505 write_events_to,
506 )
507 .await?;
508 Ok(dataflow)
509 };
510 match inner.await {
511 Ok(dataflow) => {
512 let uuid = dataflow.uuid;
513 running_dataflows.insert(uuid, dataflow);
514 let _ = reply_sender.send(Ok(
515 ControlRequestReply::DataflowStartTriggered { uuid },
516 ));
517 }
518 Err(err) => {
519 let _ = reply_sender.send(Err(err));
520 }
521 }
522 }
523 ControlRequest::WaitForSpawn { dataflow_id } => {
524 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
525 dataflow.spawn_result.register(reply_sender);
526 } else {
527 let _ =
528 reply_sender.send(Err(eyre!("unknown dataflow {dataflow_id}")));
529 }
530 }
531 ControlRequest::Check { dataflow_uuid } => {
532 let status = match &running_dataflows.get(&dataflow_uuid) {
533 Some(_) => ControlRequestReply::DataflowSpawned {
534 uuid: dataflow_uuid,
535 },
536 None => ControlRequestReply::DataflowStopped {
537 uuid: dataflow_uuid,
538 result: dataflow_results
539 .get(&dataflow_uuid)
540 .map(|r| dataflow_result(r, dataflow_uuid, &clock))
541 .unwrap_or_else(|| {
542 DataflowResult::ok_empty(
543 dataflow_uuid,
544 clock.new_timestamp(),
545 )
546 }),
547 },
548 };
549 let _ = reply_sender.send(Ok(status));
550 }
551 ControlRequest::Reload {
552 dataflow_id,
553 node_id,
554 operator_id,
555 } => {
556 let reload = async {
557 reload_dataflow(
558 &running_dataflows,
559 dataflow_id,
560 node_id,
561 operator_id,
562 &mut daemon_connections,
563 clock.new_timestamp(),
564 )
565 .await?;
566 Result::<_, eyre::Report>::Ok(())
567 };
568 let reply =
569 reload
570 .await
571 .map(|()| ControlRequestReply::DataflowReloaded {
572 uuid: dataflow_id,
573 });
574 let _ = reply_sender.send(reply);
575 }
576 ControlRequest::Stop {
577 dataflow_uuid,
578 grace_duration,
579 force,
580 } => {
581 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
582 let reply = ControlRequestReply::DataflowStopped {
583 uuid: dataflow_uuid,
584 result: dataflow_result(result, dataflow_uuid, &clock),
585 };
586 let _ = reply_sender.send(Ok(reply));
587
588 continue;
589 }
590
591 let dataflow = stop_dataflow(
592 &mut running_dataflows,
593 dataflow_uuid,
594 &mut daemon_connections,
595 clock.new_timestamp(),
596 grace_duration,
597 force,
598 )
599 .await;
600
601 match dataflow {
602 Ok(dataflow) => {
603 dataflow.stop_reply_senders.push(reply_sender);
604 }
605 Err(err) => {
606 let _ = reply_sender.send(Err(err));
607 }
608 }
609 }
610 ControlRequest::StopByName {
611 name,
612 grace_duration,
613 force,
614 } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
615 Ok(dataflow_uuid) => {
616 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
617 let reply = ControlRequestReply::DataflowStopped {
618 uuid: dataflow_uuid,
619 result: dataflow_result(result, dataflow_uuid, &clock),
620 };
621 let _ = reply_sender.send(Ok(reply));
622
623 continue;
624 }
625
626 let dataflow = stop_dataflow(
627 &mut running_dataflows,
628 dataflow_uuid,
629 &mut daemon_connections,
630 clock.new_timestamp(),
631 grace_duration,
632 force,
633 )
634 .await;
635
636 match dataflow {
637 Ok(dataflow) => {
638 dataflow.stop_reply_senders.push(reply_sender);
639 }
640 Err(err) => {
641 let _ = reply_sender.send(Err(err));
642 }
643 }
644 }
645 Err(err) => {
646 let _ = reply_sender.send(Err(err));
647 }
648 },
649 ControlRequest::Logs {
650 uuid,
651 name,
652 node,
653 tail,
654 } => {
655 let dataflow_uuid = if let Some(uuid) = uuid {
656 Ok(uuid)
657 } else if let Some(name) = name {
658 resolve_name(name, &running_dataflows, &archived_dataflows)
659 } else {
660 Err(eyre!("No uuid"))
661 };
662
663 match dataflow_uuid {
664 Ok(uuid) => {
665 let reply = retrieve_logs(
666 &running_dataflows,
667 &archived_dataflows,
668 uuid,
669 node.into(),
670 &mut daemon_connections,
671 clock.new_timestamp(),
672 tail,
673 )
674 .await
675 .map(ControlRequestReply::Logs);
676 let _ = reply_sender.send(reply);
677 }
678 Err(err) => {
679 let _ = reply_sender.send(Err(err));
680 }
681 }
682 }
683 ControlRequest::Info { dataflow_uuid } => {
684 if let Some(dataflow) = running_dataflows.get(&dataflow_uuid) {
685 let _ = reply_sender.send(Ok(ControlRequestReply::DataflowInfo {
686 uuid: dataflow.uuid,
687 name: dataflow.name.clone(),
688 descriptor: dataflow.descriptor.clone(),
689 }));
690 } else {
691 let _ = reply_sender.send(Err(eyre!(
692 "No running dataflow with uuid `{dataflow_uuid}`"
693 )));
694 }
695 }
696 ControlRequest::Destroy => {
697 tracing::info!("Received destroy command");
698
699 let reply = handle_destroy(
700 &mut running_dataflows,
701 &mut daemon_connections,
702 &abort_handle,
703 &mut daemon_events_tx,
704 &clock,
705 )
706 .await
707 .map(|()| ControlRequestReply::DestroyOk);
708 let _ = reply_sender.send(reply);
709 }
710 ControlRequest::List => {
711 let mut dataflows: Vec<_> = running_dataflows.values().collect();
712 dataflows.sort_by_key(|d| (&d.name, d.uuid));
713
714 let running = dataflows.into_iter().map(|d| DataflowListEntry {
715 id: DataflowIdAndName {
716 uuid: d.uuid,
717 name: d.name.clone(),
718 },
719 status: DataflowStatus::Running,
720 });
721 let finished_failed =
722 dataflow_results.iter().map(|(&uuid, results)| {
723 let name =
724 archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
725 let id = DataflowIdAndName { uuid, name };
726 let status = if results.values().all(|r| r.is_ok()) {
727 DataflowStatus::Finished
728 } else {
729 DataflowStatus::Failed
730 };
731 DataflowListEntry { id, status }
732 });
733
734 let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
735 running.chain(finished_failed).collect(),
736 )));
737 let _ = reply_sender.send(reply);
738 }
739 ControlRequest::DaemonConnected => {
740 let running = !daemon_connections.is_empty();
741 let _ = reply_sender
742 .send(Ok(ControlRequestReply::DaemonConnected(running)));
743 }
744 ControlRequest::ConnectedMachines => {
745 let reply = Ok(ControlRequestReply::ConnectedDaemons(
746 daemon_connections.keys().cloned().collect(),
747 ));
748 let _ = reply_sender.send(reply);
749 }
750 ControlRequest::LogSubscribe { .. } => {
751 let _ = reply_sender.send(Err(eyre::eyre!(
752 "LogSubscribe request should be handled separately"
753 )));
754 }
755 ControlRequest::BuildLogSubscribe { .. } => {
756 let _ = reply_sender.send(Err(eyre::eyre!(
757 "BuildLogSubscribe request should be handled separately"
758 )));
759 }
760 ControlRequest::CliAndDefaultDaemonOnSameMachine => {
761 let mut default_daemon_ip = None;
762 if let Some(default_id) = daemon_connections.unnamed().next() {
763 if let Some(connection) = daemon_connections.get(default_id) {
764 if let Ok(addr) = connection.stream.peer_addr() {
765 default_daemon_ip = Some(addr.ip());
766 }
767 }
768 }
769 let _ = reply_sender.send(Ok(
770 ControlRequestReply::CliAndDefaultDaemonIps {
771 default_daemon: default_daemon_ip,
772 cli: None, },
774 ));
775 }
776 ControlRequest::GetNodeInfo => {
777 use dora_message::coordinator_to_cli::{NodeInfo, NodeMetricsInfo};
778
779 let mut node_infos = Vec::new();
780 for dataflow in running_dataflows.values() {
781 for (node_id, _node) in &dataflow.nodes {
782 if let Some(daemon_id) = dataflow.node_to_daemon.get(node_id) {
784 let metrics = dataflow.node_metrics.get(node_id).map(|m| {
786 NodeMetricsInfo {
787 pid: m.pid,
788 cpu_usage: m.cpu_usage,
789 memory_mb: m.memory_bytes as f64 / 1000.0 / 1000.0,
791 disk_read_mb_s: m
792 .disk_read_bytes
793 .map(|b| b as f64 / 1000.0 / 1000.0),
794 disk_write_mb_s: m
795 .disk_write_bytes
796 .map(|b| b as f64 / 1000.0 / 1000.0),
797 }
798 });
799
800 node_infos.push(NodeInfo {
801 dataflow_id: dataflow.uuid,
802 dataflow_name: dataflow.name.clone(),
803 node_id: node_id.clone(),
804 daemon_id: daemon_id.clone(),
805 metrics,
806 });
807 }
808 }
809 }
810 let _ = reply_sender
811 .send(Ok(ControlRequestReply::NodeInfoList(node_infos)));
812 }
813 }
814 }
815 ControlEvent::Error(err) => tracing::error!("{err:?}"),
816 ControlEvent::LogSubscribe {
817 dataflow_id,
818 level,
819 connection,
820 } => {
821 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
822 dataflow
823 .log_subscribers
824 .push(LogSubscriber::new(level, connection));
825 let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
826 for message in buffered {
827 send_log_message(&mut dataflow.log_subscribers, &message).await;
828 }
829 }
830 }
831 ControlEvent::BuildLogSubscribe {
832 build_id,
833 level,
834 connection,
835 } => {
836 if let Some(build) = running_builds.get_mut(&build_id) {
837 build
838 .log_subscribers
839 .push(LogSubscriber::new(level, connection));
840 let buffered = std::mem::take(&mut build.buffered_log_messages);
841 for message in buffered {
842 send_log_message(&mut build.log_subscribers, &message).await;
843 }
844 }
845 }
846 },
847 Event::DaemonHeartbeatInterval => {
848 let mut disconnected = BTreeSet::new();
849 for (machine_id, connection) in daemon_connections.iter_mut() {
850 if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
851 tracing::warn!(
852 "no heartbeat message from machine `{machine_id}` since {:?}",
853 connection.last_heartbeat.elapsed()
854 )
855 }
856 if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
857 disconnected.insert(machine_id.clone());
858 continue;
859 }
860 let result: eyre::Result<()> = tokio::time::timeout(
861 Duration::from_millis(500),
862 send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
863 )
864 .await
865 .wrap_err("timeout")
866 .and_then(|r| r)
867 .wrap_err_with(|| {
868 format!("failed to send heartbeat message to daemon at `{machine_id}`")
869 });
870 if let Err(err) = result {
871 tracing::warn!("{err:?}");
872 disconnected.insert(machine_id.clone());
873 }
874 }
875 if !disconnected.is_empty() {
876 tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
877 for machine_id in disconnected {
878 daemon_connections.remove(&machine_id);
879 }
880 }
881 }
882 Event::CtrlC => {
883 tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
884 handle_destroy(
885 &mut running_dataflows,
886 &mut daemon_connections,
887 &abort_handle,
888 &mut daemon_events_tx,
889 &clock,
890 )
891 .await?;
892 }
893 Event::DaemonHeartbeat {
894 daemon_id: machine_id,
895 } => {
896 if let Some(connection) = daemon_connections.get_mut(&machine_id) {
897 connection.last_heartbeat = Instant::now();
898 }
899 }
900 Event::Log(message) => {
901 if let Some(dataflow_id) = &message.dataflow_id {
902 if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
903 if dataflow.log_subscribers.is_empty() {
904 dataflow.buffered_log_messages.push(message);
906 } else {
907 send_log_message(&mut dataflow.log_subscribers, &message).await;
908 }
909 }
910 } else if let Some(build_id) = &message.build_id {
911 if let Some(build) = running_builds.get_mut(build_id) {
912 if build.log_subscribers.is_empty() {
913 build.buffered_log_messages.push(message);
915 } else {
916 send_log_message(&mut build.log_subscribers, &message).await;
917 }
918 }
919 }
920 }
921 Event::DaemonExit { daemon_id } => {
922 tracing::info!("Daemon `{daemon_id}` exited");
923 daemon_connections.remove(&daemon_id);
924 }
925 Event::NodeMetrics {
926 dataflow_id,
927 metrics,
928 } => {
929 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
931 for (node_id, node_metrics) in metrics {
932 dataflow.node_metrics.insert(node_id, node_metrics);
933 }
934 }
935 }
936 Event::DataflowBuildResult {
937 build_id,
938 daemon_id,
939 result,
940 } => match running_builds.get_mut(&build_id) {
941 Some(build) => {
942 build.pending_build_results.remove(&daemon_id);
943 match result {
944 Ok(()) => {}
945 Err(err) => {
946 build.errors.push(format!("{err:?}"));
947 }
948 };
949 if build.pending_build_results.is_empty() {
950 tracing::info!("dataflow build finished: `{build_id}`");
951 let mut build = running_builds.remove(&build_id).unwrap();
952 let result = if build.errors.is_empty() {
953 Ok(())
954 } else {
955 Err(format!("build failed: {}", build.errors.join("\n\n")))
956 };
957
958 build.build_result.set_result(Ok(
959 ControlRequestReply::DataflowBuildFinished { build_id, result },
960 ));
961
962 finished_builds.insert(build_id, build.build_result);
963 }
964 }
965 None => {
966 tracing::warn!(
967 "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
968 );
969 }
970 },
971 Event::DataflowSpawnResult {
972 dataflow_id,
973 daemon_id,
974 result,
975 } => match running_dataflows.get_mut(&dataflow_id) {
976 Some(dataflow) => {
977 dataflow.pending_spawn_results.remove(&daemon_id);
978 match result {
979 Ok(()) => {
980 if dataflow.pending_spawn_results.is_empty() {
981 tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
982 dataflow.spawn_result.set_result(Ok(
983 ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
984 ));
985 }
986 }
987 Err(err) => {
988 tracing::warn!("error while spawning dataflow `{dataflow_id}`");
989 dataflow.spawn_result.set_result(Err(err));
990 }
991 };
992 }
993 None => {
994 tracing::warn!(
995 "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
996 );
997 }
998 },
999 }
1000
1001 let elapsed = start.elapsed();
1003 if elapsed > Duration::from_millis(100) {
1004 tracing::warn!(
1005 "Coordinator took {}ms for handling event: {event_kind}",
1006 elapsed.as_millis()
1007 );
1008 }
1009 }
1010
1011 tracing::info!("stopped");
1012
1013 Ok(())
1014}
1015
1016async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
1017 for subscriber in log_subscribers.iter_mut() {
1018 let send_result =
1019 tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
1020
1021 if send_result.await.is_err() {
1022 subscriber.close();
1023 }
1024 }
1025 log_subscribers.retain(|s| !s.is_closed());
1026}
1027
1028fn dataflow_result(
1029 results: &BTreeMap<DaemonId, DataflowDaemonResult>,
1030 dataflow_uuid: Uuid,
1031 clock: &uhlc::HLC,
1032) -> DataflowResult {
1033 let mut node_results = BTreeMap::new();
1034 for result in results.values() {
1035 node_results.extend(result.node_results.clone());
1036 if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
1037 tracing::warn!("failed to update HLC: {err}");
1038 }
1039 }
1040
1041 DataflowResult {
1042 uuid: dataflow_uuid,
1043 timestamp: clock.new_timestamp(),
1044 node_results,
1045 }
1046}
1047
1048struct DaemonConnection {
1049 stream: TcpStream,
1050 last_heartbeat: Instant,
1051}
1052
1053async fn handle_destroy(
1054 running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
1055 daemon_connections: &mut DaemonConnections,
1056 abortable_events: &futures::stream::AbortHandle,
1057 daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
1058 clock: &HLC,
1059) -> Result<(), eyre::ErrReport> {
1060 abortable_events.abort();
1061 for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
1062 let _ = stop_dataflow(
1063 running_dataflows,
1064 dataflow_uuid,
1065 daemon_connections,
1066 clock.new_timestamp(),
1067 None,
1068 false,
1069 )
1070 .await?;
1071 }
1072
1073 let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
1074 *daemon_events_tx = None;
1075 result
1076}
1077
1078async fn send_heartbeat_message(
1079 connection: &mut TcpStream,
1080 timestamp: uhlc::Timestamp,
1081) -> eyre::Result<()> {
1082 let message = serde_json::to_vec(&Timestamped {
1083 inner: DaemonCoordinatorEvent::Heartbeat,
1084 timestamp,
1085 })
1086 .context("Could not serialize heartbeat message")?;
1087
1088 tcp_send(connection, &message)
1089 .await
1090 .wrap_err("failed to send heartbeat message to daemon")
1091}
1092
1093struct RunningBuild {
1094 errors: Vec<String>,
1095 build_result: CachedResult,
1096
1097 buffered_log_messages: Vec<LogMessage>,
1099 log_subscribers: Vec<LogSubscriber>,
1100
1101 pending_build_results: BTreeSet<DaemonId>,
1102}
1103
1104struct RunningDataflow {
1105 name: Option<String>,
1106 uuid: Uuid,
1107 descriptor: Descriptor,
1108 daemons: BTreeSet<DaemonId>,
1110 pending_daemons: BTreeSet<DaemonId>,
1112 exited_before_subscribe: Vec<NodeId>,
1113 nodes: BTreeMap<NodeId, ResolvedNode>,
1114 node_to_daemon: BTreeMap<NodeId, DaemonId>,
1116 node_metrics: BTreeMap<NodeId, dora_message::daemon_to_coordinator::NodeMetrics>,
1118
1119 spawn_result: CachedResult,
1120 stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1121
1122 buffered_log_messages: Vec<LogMessage>,
1124 log_subscribers: Vec<LogSubscriber>,
1125
1126 pending_spawn_results: BTreeSet<DaemonId>,
1127}
1128
1129pub enum CachedResult {
1130 Pending {
1131 result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1132 },
1133 Cached {
1134 result: eyre::Result<ControlRequestReply>,
1135 },
1136}
1137
1138impl Default for CachedResult {
1139 fn default() -> Self {
1140 Self::Pending {
1141 result_senders: Vec::new(),
1142 }
1143 }
1144}
1145
1146impl CachedResult {
1147 fn register(
1148 &mut self,
1149 reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
1150 ) {
1151 match self {
1152 CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
1153 CachedResult::Cached { result } => {
1154 Self::send_result_to(result, reply_sender);
1155 }
1156 }
1157 }
1158
1159 fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
1160 match self {
1161 CachedResult::Pending { result_senders } => {
1162 for sender in result_senders.drain(..) {
1163 Self::send_result_to(&result, sender);
1164 }
1165 *self = CachedResult::Cached { result };
1166 }
1167 CachedResult::Cached { .. } => {}
1168 }
1169 }
1170
1171 fn send_result_to(
1172 result: &eyre::Result<ControlRequestReply>,
1173 sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
1174 ) {
1175 let result = match result {
1176 Ok(r) => Ok(r.clone()),
1177 Err(err) => Err(eyre!("{err:?}")),
1178 };
1179 let _ = sender.send(result);
1180 }
1181}
1182
1183struct ArchivedDataflow {
1184 name: Option<String>,
1185 nodes: BTreeMap<NodeId, ResolvedNode>,
1186}
1187
1188impl From<&RunningDataflow> for ArchivedDataflow {
1189 fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
1190 ArchivedDataflow {
1191 name: dataflow.name.clone(),
1192 nodes: dataflow.nodes.clone(),
1193 }
1194 }
1195}
1196
1197impl PartialEq for RunningDataflow {
1198 fn eq(&self, other: &Self) -> bool {
1199 self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
1200 }
1201}
1202
1203impl Eq for RunningDataflow {}
1204
1205async fn stop_dataflow<'a>(
1206 running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
1207 dataflow_uuid: Uuid,
1208 daemon_connections: &mut DaemonConnections,
1209 timestamp: uhlc::Timestamp,
1210 grace_duration: Option<Duration>,
1211 force: bool,
1212) -> eyre::Result<&'a mut RunningDataflow> {
1213 let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
1214 bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
1215 };
1216
1217 let message = serde_json::to_vec(&Timestamped {
1218 inner: DaemonCoordinatorEvent::StopDataflow {
1219 dataflow_id: dataflow_uuid,
1220 grace_duration,
1221 force,
1222 },
1223 timestamp,
1224 })?;
1225
1226 for daemon_id in &dataflow.daemons {
1227 let daemon_connection = daemon_connections
1228 .get_mut(daemon_id)
1229 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
1231 .await
1232 .wrap_err("failed to send stop message to daemon")?;
1233
1234 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1236 .await
1237 .wrap_err("failed to receive stop reply from daemon")?;
1238 match serde_json::from_slice(&reply_raw)
1239 .wrap_err("failed to deserialize stop reply from daemon")?
1240 {
1241 DaemonCoordinatorReply::StopResult(result) => result
1242 .map_err(|e| eyre!(e))
1243 .wrap_err("failed to stop dataflow")?,
1244 other => bail!("unexpected reply after sending stop: {other:?}"),
1245 }
1246 }
1247
1248 tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
1249
1250 Ok(dataflow)
1251}
1252
1253async fn reload_dataflow(
1254 running_dataflows: &HashMap<Uuid, RunningDataflow>,
1255 dataflow_id: Uuid,
1256 node_id: NodeId,
1257 operator_id: Option<OperatorId>,
1258 daemon_connections: &mut DaemonConnections,
1259 timestamp: uhlc::Timestamp,
1260) -> eyre::Result<()> {
1261 let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
1262 bail!("No running dataflow found with UUID `{dataflow_id}`")
1263 };
1264 let message = serde_json::to_vec(&Timestamped {
1265 inner: DaemonCoordinatorEvent::ReloadDataflow {
1266 dataflow_id,
1267 node_id,
1268 operator_id,
1269 },
1270 timestamp,
1271 })?;
1272
1273 for machine_id in &dataflow.daemons {
1274 let daemon_connection = daemon_connections
1275 .get_mut(machine_id)
1276 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
1278 .await
1279 .wrap_err("failed to send reload message to daemon")?;
1280
1281 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1283 .await
1284 .wrap_err("failed to receive reload reply from daemon")?;
1285 match serde_json::from_slice(&reply_raw)
1286 .wrap_err("failed to deserialize reload reply from daemon")?
1287 {
1288 DaemonCoordinatorReply::ReloadResult(result) => result
1289 .map_err(|e| eyre!(e))
1290 .wrap_err("failed to reload dataflow")?,
1291 other => bail!("unexpected reply after sending reload: {other:?}"),
1292 }
1293 }
1294 tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
1295
1296 Ok(())
1297}
1298
1299async fn retrieve_logs(
1300 running_dataflows: &HashMap<Uuid, RunningDataflow>,
1301 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
1302 dataflow_id: Uuid,
1303 node_id: NodeId,
1304 daemon_connections: &mut DaemonConnections,
1305 timestamp: uhlc::Timestamp,
1306 tail: Option<usize>,
1307) -> eyre::Result<Vec<u8>> {
1308 let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
1309 dataflow.nodes.clone()
1310 } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
1311 dataflow.nodes.clone()
1312 } else {
1313 bail!("No dataflow found with UUID `{dataflow_id}`")
1314 };
1315
1316 let message = serde_json::to_vec(&Timestamped {
1317 inner: DaemonCoordinatorEvent::Logs {
1318 dataflow_id,
1319 node_id: node_id.clone(),
1320 tail,
1321 },
1322 timestamp,
1323 })?;
1324
1325 let machine_ids: Vec<Option<String>> = nodes
1326 .values()
1327 .filter(|node| node.id == node_id)
1328 .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
1329 .collect();
1330
1331 let machine_id = if let [machine_id] = &machine_ids[..] {
1332 machine_id
1333 } else if machine_ids.is_empty() {
1334 bail!("No machine contains {}/{}", dataflow_id, node_id)
1335 } else {
1336 bail!(
1337 "More than one machine contains {}/{}. However, it should only be present on one.",
1338 dataflow_id,
1339 node_id
1340 )
1341 };
1342
1343 let daemon_ids: Vec<_> = match machine_id {
1344 None => daemon_connections.unnamed().collect(),
1345 Some(machine_id) => daemon_connections
1346 .get_matching_daemon_id(machine_id)
1347 .into_iter()
1348 .collect(),
1349 };
1350 let daemon_id = match &daemon_ids[..] {
1351 [id] => (*id).clone(),
1352 [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
1353 _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
1354 };
1355 let daemon_connection = daemon_connections
1356 .get_mut(&daemon_id)
1357 .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
1358 tcp_send(&mut daemon_connection.stream, &message)
1359 .await
1360 .wrap_err("failed to send logs message to daemon")?;
1361
1362 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1364 .await
1365 .wrap_err("failed to retrieve logs reply from daemon")?;
1366 let reply_logs = match serde_json::from_slice(&reply_raw)
1367 .wrap_err("failed to deserialize logs reply from daemon")?
1368 {
1369 DaemonCoordinatorReply::Logs(logs) => logs,
1370 other => bail!("unexpected reply after sending logs: {other:?}"),
1371 };
1372 tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
1373
1374 reply_logs.map_err(|err| eyre!(err))
1375}
1376
1377#[allow(clippy::too_many_arguments)]
1378#[tracing::instrument(skip(daemon_connections, clock))]
1379async fn build_dataflow(
1380 build_id: BuildId,
1381 session_id: SessionId,
1382 dataflow: Descriptor,
1383 git_sources: BTreeMap<NodeId, GitSource>,
1384 prev_git_sources: BTreeMap<NodeId, GitSource>,
1385 local_working_dir: Option<PathBuf>,
1386 clock: &HLC,
1387 uv: bool,
1388 daemon_connections: &mut DaemonConnections,
1389) -> eyre::Result<RunningBuild> {
1390 let nodes = dataflow.resolve_aliases_and_set_defaults()?;
1391
1392 let mut git_sources_by_daemon = git_sources
1393 .into_iter()
1394 .into_grouping_map_by(|(id, _)| {
1395 nodes
1396 .get(id)
1397 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1398 })
1399 .collect();
1400 let mut prev_git_sources_by_daemon = prev_git_sources
1401 .into_iter()
1402 .into_grouping_map_by(|(id, _)| {
1403 nodes
1404 .get(id)
1405 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1406 })
1407 .collect();
1408
1409 let nodes_by_daemon = nodes
1410 .values()
1411 .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
1412
1413 let mut daemons = BTreeSet::new();
1414 for (machine, nodes_on_machine) in &nodes_by_daemon {
1415 let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
1416 tracing::debug!(
1417 "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
1418 );
1419
1420 let build_command = BuildDataflowNodes {
1421 build_id,
1422 session_id,
1423 local_working_dir: local_working_dir.clone(),
1424 git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
1425 prev_git_sources: prev_git_sources_by_daemon
1426 .remove(machine)
1427 .unwrap_or_default(),
1428 dataflow_descriptor: dataflow.clone(),
1429 nodes_on_machine,
1430 uv,
1431 };
1432 let message = serde_json::to_vec(&Timestamped {
1433 inner: DaemonCoordinatorEvent::Build(build_command),
1434 timestamp: clock.new_timestamp(),
1435 })?;
1436
1437 let daemon_id =
1438 build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
1439 .await
1440 .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
1441 daemons.insert(daemon_id);
1442 }
1443
1444 tracing::info!("successfully triggered dataflow build `{build_id}`",);
1445
1446 Ok(RunningBuild {
1447 errors: Vec::new(),
1448 build_result: CachedResult::default(),
1449 buffered_log_messages: Vec::new(),
1450 log_subscribers: Vec::new(),
1451 pending_build_results: daemons,
1452 })
1453}
1454
1455async fn build_dataflow_on_machine(
1456 daemon_connections: &mut DaemonConnections,
1457 machine: Option<&str>,
1458 message: &[u8],
1459) -> Result<DaemonId, eyre::ErrReport> {
1460 let daemon_id = match machine {
1461 Some(machine) => daemon_connections
1462 .get_matching_daemon_id(machine)
1463 .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1464 .clone(),
1465 None => daemon_connections
1466 .unnamed()
1467 .next()
1468 .wrap_err("no unnamed daemon connections")?
1469 .clone(),
1470 };
1471
1472 let daemon_connection = daemon_connections
1473 .get_mut(&daemon_id)
1474 .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
1475 tcp_send(&mut daemon_connection.stream, message)
1476 .await
1477 .wrap_err("failed to send build message to daemon")?;
1478
1479 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1480 .await
1481 .wrap_err("failed to receive build reply from daemon")?;
1482 match serde_json::from_slice(&reply_raw)
1483 .wrap_err("failed to deserialize build reply from daemon")?
1484 {
1485 DaemonCoordinatorReply::TriggerBuildResult(result) => result
1486 .map_err(|e| eyre!(e))
1487 .wrap_err("daemon returned an error")?,
1488 _ => bail!("unexpected reply"),
1489 }
1490 Ok(daemon_id)
1491}
1492
1493#[allow(clippy::too_many_arguments)]
1494async fn start_dataflow(
1495 build_id: Option<BuildId>,
1496 session_id: SessionId,
1497 dataflow: Descriptor,
1498 local_working_dir: Option<PathBuf>,
1499 name: Option<String>,
1500 daemon_connections: &mut DaemonConnections,
1501 clock: &HLC,
1502 uv: bool,
1503 write_events_to: Option<PathBuf>,
1504) -> eyre::Result<RunningDataflow> {
1505 let SpawnedDataflow {
1506 uuid,
1507 daemons,
1508 nodes,
1509 node_to_daemon,
1510 } = spawn_dataflow(
1511 build_id,
1512 session_id,
1513 dataflow.clone(),
1514 local_working_dir,
1515 daemon_connections,
1516 clock,
1517 uv,
1518 write_events_to,
1519 )
1520 .await?;
1521 Ok(RunningDataflow {
1522 uuid,
1523 name,
1524 descriptor: dataflow,
1525 pending_daemons: if daemons.len() > 1 {
1526 daemons.clone()
1527 } else {
1528 BTreeSet::new()
1529 },
1530 exited_before_subscribe: Default::default(),
1531 daemons: daemons.clone(),
1532 nodes,
1533 node_to_daemon,
1534 node_metrics: BTreeMap::new(),
1535 spawn_result: CachedResult::default(),
1536 stop_reply_senders: Vec::new(),
1537 buffered_log_messages: Vec::new(),
1538 log_subscribers: Vec::new(),
1539 pending_spawn_results: daemons,
1540 })
1541}
1542
1543async fn destroy_daemon(
1544 daemon_id: DaemonId,
1545 mut daemon_connection: DaemonConnection,
1546
1547 timestamp: uhlc::Timestamp,
1548) -> Result<()> {
1549 let message = serde_json::to_vec(&Timestamped {
1550 inner: DaemonCoordinatorEvent::Destroy,
1551 timestamp,
1552 })?;
1553
1554 tcp_send(&mut daemon_connection.stream, &message)
1555 .await
1556 .wrap_err(format!(
1557 "failed to send destroy message to daemon `{daemon_id}`"
1558 ))?;
1559
1560 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1562 .await
1563 .wrap_err("failed to receive destroy reply from daemon")?;
1564 match serde_json::from_slice(&reply_raw)
1565 .wrap_err("failed to deserialize destroy reply from daemon")?
1566 {
1567 DaemonCoordinatorReply::DestroyResult { result, .. } => result
1568 .map_err(|e| eyre!(e))
1569 .wrap_err("failed to destroy dataflow")?,
1570 other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1571 }
1572
1573 tracing::info!("successfully destroyed daemon `{daemon_id}`");
1574 Ok(())
1575}
1576
1577async fn destroy_daemons(
1578 daemon_connections: &mut DaemonConnections,
1579 timestamp: uhlc::Timestamp,
1580) -> eyre::Result<()> {
1581 let futures = daemon_connections
1582 .drain()
1583 .map(|(daemon_id, daemon_connection)| {
1584 destroy_daemon(daemon_id, daemon_connection, timestamp)
1585 })
1586 .collect::<Vec<_>>();
1587 let results: Vec<std::result::Result<(), eyre::Error>> =
1588 join_all(futures).await.into_iter().collect::<Vec<_>>();
1589 for result in results {
1590 result?;
1591 }
1592 Ok(())
1593}
1594
1595#[derive(Debug)]
1596pub enum Event {
1597 NewDaemonConnection(TcpStream),
1598 DaemonConnectError(eyre::Report),
1599 DaemonHeartbeat {
1600 daemon_id: DaemonId,
1601 },
1602 Dataflow {
1603 uuid: Uuid,
1604 event: DataflowEvent,
1605 },
1606 Control(ControlEvent),
1607 Daemon(DaemonRequest),
1608 DaemonHeartbeatInterval,
1609 CtrlC,
1610 Log(LogMessage),
1611 DaemonExit {
1612 daemon_id: dora_message::common::DaemonId,
1613 },
1614 DataflowBuildResult {
1615 build_id: BuildId,
1616 daemon_id: DaemonId,
1617 result: eyre::Result<()>,
1618 },
1619 DataflowSpawnResult {
1620 dataflow_id: uuid::Uuid,
1621 daemon_id: DaemonId,
1622 result: eyre::Result<()>,
1623 },
1624 NodeMetrics {
1625 dataflow_id: uuid::Uuid,
1626 metrics: BTreeMap<NodeId, dora_message::daemon_to_coordinator::NodeMetrics>,
1627 },
1628}
1629
1630impl Event {
1631 #[allow(clippy::match_like_matches_macro)]
1633 pub fn log(&self) -> bool {
1634 match self {
1635 Event::DaemonHeartbeatInterval => false,
1636 _ => true,
1637 }
1638 }
1639
1640 fn kind(&self) -> &'static str {
1641 match self {
1642 Event::NewDaemonConnection(_) => "NewDaemonConnection",
1643 Event::DaemonConnectError(_) => "DaemonConnectError",
1644 Event::DaemonHeartbeat { .. } => "DaemonHeartbeat",
1645 Event::Dataflow { .. } => "Dataflow",
1646 Event::Control(_) => "Control",
1647 Event::Daemon(_) => "Daemon",
1648 Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1649 Event::CtrlC => "CtrlC",
1650 Event::Log(_) => "Log",
1651 Event::DaemonExit { .. } => "DaemonExit",
1652 Event::DataflowBuildResult { .. } => "DataflowBuildResult",
1653 Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
1654 Event::NodeMetrics { .. } => "NodeMetrics",
1655 }
1656 }
1657}
1658
1659#[derive(Debug)]
1660pub enum DataflowEvent {
1661 DataflowFinishedOnDaemon {
1662 daemon_id: DaemonId,
1663 result: DataflowDaemonResult,
1664 },
1665 ReadyOnDaemon {
1666 daemon_id: DaemonId,
1667 exited_before_subscribe: Vec<NodeId>,
1668 },
1669}
1670
1671#[derive(Debug)]
1672pub enum DaemonRequest {
1673 Register {
1674 version_check_result: Result<(), String>,
1675 machine_id: Option<String>,
1676 connection: TcpStream,
1677 },
1678}
1679
1680fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1681 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1682
1683 let mut ctrlc_sent = false;
1684 ctrlc::set_handler(move || {
1685 if ctrlc_sent {
1686 tracing::warn!("received second ctrlc signal -> aborting immediately");
1687 std::process::abort();
1688 } else {
1689 tracing::info!("received ctrlc signal");
1690 if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1691 tracing::error!("failed to report ctrl-c event to dora-coordinator");
1692 }
1693
1694 ctrlc_sent = true;
1695 }
1696 })
1697 .wrap_err("failed to set ctrl-c handler")?;
1698
1699 Ok(ReceiverStream::new(ctrlc_rx))
1700}