1use crate::{
2 run::spawn_dataflow,
3 tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7 config::{NodeId, OperatorId},
8 descriptor::DescriptorExt,
9 uhlc::{self, HLC},
10};
11use dora_message::{
12 cli_to_coordinator::ControlRequest,
13 common::{DaemonId, GitSource},
14 coordinator_to_cli::{
15 ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
16 DataflowStatus, LogLevel, LogMessage,
17 },
18 coordinator_to_daemon::{
19 BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
20 },
21 daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
22 descriptor::{Descriptor, ResolvedNode},
23 BuildId, DataflowId, SessionId,
24};
25use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
26use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
27use futures_concurrency::stream::Merge;
28use itertools::Itertools;
29use log_subscriber::LogSubscriber;
30use run::SpawnedDataflow;
31use std::{
32 collections::{BTreeMap, BTreeSet, HashMap},
33 net::SocketAddr,
34 path::PathBuf,
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::{
39 net::TcpStream,
40 sync::{mpsc, oneshot},
41 task::JoinHandle,
42};
43use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
44use uuid::Uuid;
45
46mod control;
47mod listener;
48mod log_subscriber;
49mod run;
50mod tcp_utils;
51
52pub async fn start(
53 bind: SocketAddr,
54 bind_control: SocketAddr,
55 external_events: impl Stream<Item = Event> + Unpin,
56) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
57 let listener = listener::create_listener(bind).await?;
58 let port = listener
59 .local_addr()
60 .wrap_err("failed to get local addr of listener")?
61 .port();
62 let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
63 c.map(Event::NewDaemonConnection)
64 .wrap_err("failed to open connection")
65 .unwrap_or_else(Event::DaemonConnectError)
66 });
67
68 let mut tasks = FuturesUnordered::new();
69 let control_events = control::control_events(bind_control, &tasks)
70 .await
71 .wrap_err("failed to create control events")?;
72
73 let ctrlc_events = set_up_ctrlc_handler()?;
75
76 let events = (
77 external_events,
78 new_daemon_connections,
79 control_events,
80 ctrlc_events,
81 )
82 .merge();
83
84 let future = async move {
85 start_inner(events, &tasks).await?;
86
87 tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
88 while let Some(join_result) = tasks.next().await {
89 if let Err(err) = join_result {
90 tracing::error!("task panicked: {err}");
91 }
92 }
93 tracing::debug!("all spawned tasks finished, exiting..");
94 Ok(())
95 };
96 Ok((port, future))
97}
98
99fn resolve_name(
101 name: String,
102 running_dataflows: &HashMap<Uuid, RunningDataflow>,
103 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
104) -> eyre::Result<Uuid> {
105 let uuids: Vec<_> = running_dataflows
106 .iter()
107 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
108 .map(|(k, _)| k)
109 .copied()
110 .collect();
111 let archived_uuids: Vec<_> = archived_dataflows
112 .iter()
113 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
114 .map(|(k, _)| k)
115 .copied()
116 .collect();
117
118 if uuids.is_empty() {
119 if archived_uuids.is_empty() {
120 bail!("no dataflow with name `{name}`");
121 } else if let [uuid] = archived_uuids.as_slice() {
122 Ok(*uuid)
123 } else {
124 bail!("multiple archived dataflows found with name `{name}`, Please provide the UUID instead.");
126 }
127 } else if let [uuid] = uuids.as_slice() {
128 Ok(*uuid)
129 } else {
130 bail!("multiple dataflows found with name `{name}`");
131 }
132}
133
134#[derive(Default)]
135struct DaemonConnections {
136 daemons: BTreeMap<DaemonId, DaemonConnection>,
137}
138
139impl DaemonConnections {
140 fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
141 let previous = self.daemons.insert(daemon_id.clone(), connection);
142 if previous.is_some() {
143 tracing::info!("closing previous connection `{daemon_id}` on new register");
144 }
145 }
146
147 fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
148 self.daemons.get(id)
149 }
150
151 fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
152 self.daemons.get_mut(id)
153 }
154
155 fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
156 self.daemons
157 .keys()
158 .find(|id| id.matches_machine_id(machine_id))
159 }
160
161 fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
162 std::mem::take(&mut self.daemons).into_iter()
163 }
164
165 fn is_empty(&self) -> bool {
166 self.daemons.is_empty()
167 }
168
169 fn keys(&self) -> impl Iterator<Item = &DaemonId> {
170 self.daemons.keys()
171 }
172
173 fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
174 self.daemons.iter_mut()
175 }
176
177 fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
178 self.daemons.remove(daemon_id)
179 }
180
181 fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
182 self.daemons.keys().filter(|id| id.machine_id().is_none())
183 }
184}
185
186async fn start_inner(
187 events: impl Stream<Item = Event> + Unpin,
188 tasks: &FuturesUnordered<JoinHandle<()>>,
189) -> eyre::Result<()> {
190 let clock = Arc::new(HLC::default());
191
192 let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
193 let mut daemon_events_tx = Some(daemon_events_tx);
194 let daemon_events = ReceiverStream::new(daemon_events);
195
196 let daemon_heartbeat_interval =
197 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
198 .map(|_| Event::DaemonHeartbeatInterval);
199
200 let (abortable_events, abort_handle) =
202 futures::stream::abortable((events, daemon_heartbeat_interval).merge());
203
204 let mut events = (abortable_events, daemon_events).merge();
205
206 let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
207 let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();
208
209 let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
210 let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
211 HashMap::new();
212 let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
213 let mut daemon_connections = DaemonConnections::default();
214
215 while let Some(event) = events.next().await {
216 let start = Instant::now();
218 let event_kind = event.kind();
219
220 if event.log() {
221 tracing::trace!("Handling event {event:?}");
222 }
223 match event {
224 Event::NewDaemonConnection(connection) => {
225 connection.set_nodelay(true)?;
226 let events_tx = daemon_events_tx.clone();
227 if let Some(events_tx) = events_tx {
228 let task = tokio::spawn(listener::handle_connection(
229 connection,
230 events_tx,
231 clock.clone(),
232 ));
233 tasks.push(task);
234 } else {
235 tracing::warn!(
236 "ignoring new daemon connection because events_tx was closed already"
237 );
238 }
239 }
240 Event::DaemonConnectError(err) => {
241 tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
242 }
243 Event::Daemon(event) => match event {
244 DaemonRequest::Register {
245 machine_id,
246 mut connection,
247 version_check_result,
248 } => {
249 let existing = match &machine_id {
250 Some(id) => daemon_connections.get_matching_daemon_id(id),
251 None => daemon_connections.unnamed().next(),
252 };
253 let existing_result = if existing.is_some() {
254 Err(format!(
255 "There is already a connected daemon with machine ID `{machine_id:?}`"
256 ))
257 } else {
258 Ok(())
259 };
260
261 let daemon_id = DaemonId::new(machine_id);
263
264 let reply: Timestamped<RegisterResult> = Timestamped {
265 inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
266 Ok(_) => RegisterResult::Ok {
267 daemon_id: daemon_id.clone(),
268 },
269 Err(err) => RegisterResult::Err(err.clone()),
270 },
271 timestamp: clock.new_timestamp(),
272 };
273
274 let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
275 .await
276 .context("tcp send failed");
277 match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
278 Ok(()) => {
279 daemon_connections.add(
280 daemon_id.clone(),
281 DaemonConnection {
282 stream: connection,
283 last_heartbeat: Instant::now(),
284 },
285 );
286 }
287 Err(err) => {
288 tracing::warn!("failed to register daemon connection for daemon `{daemon_id}`: {err}");
289 }
290 }
291 }
292 },
293 Event::Dataflow { uuid, event } => match event {
294 DataflowEvent::ReadyOnDaemon {
295 daemon_id,
296 exited_before_subscribe,
297 } => {
298 match running_dataflows.entry(uuid) {
299 std::collections::hash_map::Entry::Occupied(mut entry) => {
300 let dataflow = entry.get_mut();
301 dataflow.pending_daemons.remove(&daemon_id);
302 dataflow
303 .exited_before_subscribe
304 .extend(exited_before_subscribe);
305 if dataflow.pending_daemons.is_empty() {
306 tracing::debug!("sending all nodes ready message to daemons");
307 let message = serde_json::to_vec(&Timestamped {
308 inner: DaemonCoordinatorEvent::AllNodesReady {
309 dataflow_id: uuid,
310 exited_before_subscribe: dataflow
311 .exited_before_subscribe
312 .clone(),
313 },
314 timestamp: clock.new_timestamp(),
315 })
316 .wrap_err("failed to serialize AllNodesReady message")?;
317
318 for daemon_id in &dataflow.daemons {
320 let Some(connection) = daemon_connections.get_mut(daemon_id)
321 else {
322 tracing::warn!(
323 "no daemon connection found for machine `{daemon_id}`"
324 );
325 continue;
326 };
327 tcp_send(&mut connection.stream, &message)
328 .await
329 .wrap_err_with(|| {
330 format!(
331 "failed to send AllNodesReady({uuid}) message \
332 to machine {daemon_id}"
333 )
334 })?;
335 }
336 }
337 }
338 std::collections::hash_map::Entry::Vacant(_) => {
339 tracing::warn!("dataflow not running on ReadyOnMachine");
340 }
341 }
342 }
343 DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
344 tracing::debug!("coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})");
345 match running_dataflows.entry(uuid) {
346 std::collections::hash_map::Entry::Occupied(mut entry) => {
347 let dataflow = entry.get_mut();
348 dataflow.daemons.remove(&daemon_id);
349 tracing::info!(
350 "removed machine id: {daemon_id} from dataflow: {:#?}",
351 dataflow.uuid
352 );
353 dataflow_results
354 .entry(uuid)
355 .or_default()
356 .insert(daemon_id, result);
357
358 if dataflow.daemons.is_empty() {
359 archived_dataflows
361 .entry(uuid)
362 .or_insert_with(|| ArchivedDataflow::from(entry.get()));
363 let mut finished_dataflow = entry.remove();
364 let dataflow_id = finished_dataflow.uuid;
365 send_log_message(
366 &mut finished_dataflow.log_subscribers,
367 &LogMessage {
368 build_id: None,
369 dataflow_id: Some(dataflow_id),
370 node_id: None,
371 daemon_id: None,
372 level: LogLevel::Info.into(),
373 target: Some("coordinator".into()),
374 module_path: None,
375 file: None,
376 line: None,
377 message: "dataflow finished".into(),
378 },
379 )
380 .await;
381
382 let reply = ControlRequestReply::DataflowStopped {
383 uuid,
384 result: dataflow_results
385 .get(&uuid)
386 .map(|r| dataflow_result(r, uuid, &clock))
387 .unwrap_or_else(|| {
388 DataflowResult::ok_empty(uuid, clock.new_timestamp())
389 }),
390 };
391 for sender in finished_dataflow.stop_reply_senders {
392 let _ = sender.send(Ok(reply.clone()));
393 }
394 if !matches!(
395 finished_dataflow.spawn_result,
396 CachedResult::Cached { .. }
397 ) {
398 log::error!("pending spawn result on dataflow finish");
399 }
400 }
401 }
402 std::collections::hash_map::Entry::Vacant(_) => {
403 tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
404 }
405 }
406 }
407 },
408
409 Event::Control(event) => match event {
410 ControlEvent::IncomingRequest {
411 request,
412 reply_sender,
413 } => {
414 match request {
415 ControlRequest::Build {
416 session_id,
417 dataflow,
418 git_sources,
419 prev_git_sources,
420 local_working_dir,
421 uv,
422 } => {
423 let build_id = BuildId::generate();
425
426 let result = build_dataflow(
427 build_id,
428 session_id,
429 dataflow,
430 git_sources,
431 prev_git_sources,
432 local_working_dir,
433 &clock,
434 uv,
435 &mut daemon_connections,
436 )
437 .await;
438 match result {
439 Ok(build) => {
440 running_builds.insert(build_id, build);
441 let _ = reply_sender.send(Ok(
442 ControlRequestReply::DataflowBuildTriggered { build_id },
443 ));
444 }
445 Err(err) => {
446 let _ = reply_sender.send(Err(err));
447 }
448 }
449 }
450 ControlRequest::WaitForBuild { build_id } => {
451 if let Some(build) = running_builds.get_mut(&build_id) {
452 build.build_result.register(reply_sender);
453 } else if let Some(result) = finished_builds.get_mut(&build_id) {
454 result.register(reply_sender);
455 } else {
456 let _ =
457 reply_sender.send(Err(eyre!("unknown build id {build_id}")));
458 }
459 }
460 ControlRequest::Start {
461 build_id,
462 session_id,
463 dataflow,
464 name,
465 local_working_dir,
466 uv,
467 } => {
468 let name = name.or_else(|| names::Generator::default().next());
469
470 let inner = async {
471 if let Some(name) = name.as_deref() {
472 if running_dataflows
474 .values()
475 .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
476 {
477 bail!("there is already a running dataflow with name `{name}`");
478 }
479 }
480 let dataflow = start_dataflow(
481 build_id,
482 session_id,
483 dataflow,
484 local_working_dir,
485 name,
486 &mut daemon_connections,
487 &clock,
488 uv,
489 )
490 .await?;
491 Ok(dataflow)
492 };
493 match inner.await {
494 Ok(dataflow) => {
495 let uuid = dataflow.uuid;
496 running_dataflows.insert(uuid, dataflow);
497 let _ = reply_sender.send(Ok(
498 ControlRequestReply::DataflowStartTriggered { uuid },
499 ));
500 }
501 Err(err) => {
502 let _ = reply_sender.send(Err(err));
503 }
504 }
505 }
506 ControlRequest::WaitForSpawn { dataflow_id } => {
507 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
508 dataflow.spawn_result.register(reply_sender);
509 } else {
510 let _ =
511 reply_sender.send(Err(eyre!("unknown dataflow {dataflow_id}")));
512 }
513 }
514 ControlRequest::Check { dataflow_uuid } => {
515 let status = match &running_dataflows.get(&dataflow_uuid) {
516 Some(_) => ControlRequestReply::DataflowSpawned {
517 uuid: dataflow_uuid,
518 },
519 None => ControlRequestReply::DataflowStopped {
520 uuid: dataflow_uuid,
521 result: dataflow_results
522 .get(&dataflow_uuid)
523 .map(|r| dataflow_result(r, dataflow_uuid, &clock))
524 .unwrap_or_else(|| {
525 DataflowResult::ok_empty(
526 dataflow_uuid,
527 clock.new_timestamp(),
528 )
529 }),
530 },
531 };
532 let _ = reply_sender.send(Ok(status));
533 }
534 ControlRequest::Reload {
535 dataflow_id,
536 node_id,
537 operator_id,
538 } => {
539 let reload = async {
540 reload_dataflow(
541 &running_dataflows,
542 dataflow_id,
543 node_id,
544 operator_id,
545 &mut daemon_connections,
546 clock.new_timestamp(),
547 )
548 .await?;
549 Result::<_, eyre::Report>::Ok(())
550 };
551 let reply =
552 reload
553 .await
554 .map(|()| ControlRequestReply::DataflowReloaded {
555 uuid: dataflow_id,
556 });
557 let _ = reply_sender.send(reply);
558 }
559 ControlRequest::Stop {
560 dataflow_uuid,
561 grace_duration,
562 } => {
563 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
564 let reply = ControlRequestReply::DataflowStopped {
565 uuid: dataflow_uuid,
566 result: dataflow_result(result, dataflow_uuid, &clock),
567 };
568 let _ = reply_sender.send(Ok(reply));
569
570 continue;
571 }
572
573 let dataflow = stop_dataflow(
574 &mut running_dataflows,
575 dataflow_uuid,
576 &mut daemon_connections,
577 clock.new_timestamp(),
578 grace_duration,
579 )
580 .await;
581
582 match dataflow {
583 Ok(dataflow) => {
584 dataflow.stop_reply_senders.push(reply_sender);
585 }
586 Err(err) => {
587 let _ = reply_sender.send(Err(err));
588 }
589 }
590 }
591 ControlRequest::StopByName {
592 name,
593 grace_duration,
594 } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
595 Ok(dataflow_uuid) => {
596 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
597 let reply = ControlRequestReply::DataflowStopped {
598 uuid: dataflow_uuid,
599 result: dataflow_result(result, dataflow_uuid, &clock),
600 };
601 let _ = reply_sender.send(Ok(reply));
602
603 continue;
604 }
605
606 let dataflow = stop_dataflow(
607 &mut running_dataflows,
608 dataflow_uuid,
609 &mut daemon_connections,
610 clock.new_timestamp(),
611 grace_duration,
612 )
613 .await;
614
615 match dataflow {
616 Ok(dataflow) => {
617 dataflow.stop_reply_senders.push(reply_sender);
618 }
619 Err(err) => {
620 let _ = reply_sender.send(Err(err));
621 }
622 }
623 }
624 Err(err) => {
625 let _ = reply_sender.send(Err(err));
626 }
627 },
628 ControlRequest::Logs { uuid, name, node } => {
629 let dataflow_uuid = if let Some(uuid) = uuid {
630 Ok(uuid)
631 } else if let Some(name) = name {
632 resolve_name(name, &running_dataflows, &archived_dataflows)
633 } else {
634 Err(eyre!("No uuid"))
635 };
636
637 match dataflow_uuid {
638 Ok(uuid) => {
639 let reply = retrieve_logs(
640 &running_dataflows,
641 &archived_dataflows,
642 uuid,
643 node.into(),
644 &mut daemon_connections,
645 clock.new_timestamp(),
646 )
647 .await
648 .map(ControlRequestReply::Logs);
649 let _ = reply_sender.send(reply);
650 }
651 Err(err) => {
652 let _ = reply_sender.send(Err(err));
653 }
654 }
655 }
656 ControlRequest::Destroy => {
657 tracing::info!("Received destroy command");
658
659 let reply = handle_destroy(
660 &mut running_dataflows,
661 &mut daemon_connections,
662 &abort_handle,
663 &mut daemon_events_tx,
664 &clock,
665 )
666 .await
667 .map(|()| ControlRequestReply::DestroyOk);
668 let _ = reply_sender.send(reply);
669 }
670 ControlRequest::List => {
671 let mut dataflows: Vec<_> = running_dataflows.values().collect();
672 dataflows.sort_by_key(|d| (&d.name, d.uuid));
673
674 let running = dataflows.into_iter().map(|d| DataflowListEntry {
675 id: DataflowIdAndName {
676 uuid: d.uuid,
677 name: d.name.clone(),
678 },
679 status: DataflowStatus::Running,
680 });
681 let finished_failed =
682 dataflow_results.iter().map(|(&uuid, results)| {
683 let name =
684 archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
685 let id = DataflowIdAndName { uuid, name };
686 let status = if results.values().all(|r| r.is_ok()) {
687 DataflowStatus::Finished
688 } else {
689 DataflowStatus::Failed
690 };
691 DataflowListEntry { id, status }
692 });
693
694 let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
695 running.chain(finished_failed).collect(),
696 )));
697 let _ = reply_sender.send(reply);
698 }
699 ControlRequest::DaemonConnected => {
700 let running = !daemon_connections.is_empty();
701 let _ = reply_sender
702 .send(Ok(ControlRequestReply::DaemonConnected(running)));
703 }
704 ControlRequest::ConnectedMachines => {
705 let reply = Ok(ControlRequestReply::ConnectedDaemons(
706 daemon_connections.keys().cloned().collect(),
707 ));
708 let _ = reply_sender.send(reply);
709 }
710 ControlRequest::LogSubscribe { .. } => {
711 let _ = reply_sender.send(Err(eyre::eyre!(
712 "LogSubscribe request should be handled separately"
713 )));
714 }
715 ControlRequest::BuildLogSubscribe { .. } => {
716 let _ = reply_sender.send(Err(eyre::eyre!(
717 "BuildLogSubscribe request should be handled separately"
718 )));
719 }
720 ControlRequest::CliAndDefaultDaemonOnSameMachine => {
721 let mut default_daemon_ip = None;
722 if let Some(default_id) = daemon_connections.unnamed().next() {
723 if let Some(connection) = daemon_connections.get(default_id) {
724 if let Ok(addr) = connection.stream.peer_addr() {
725 default_daemon_ip = Some(addr.ip());
726 }
727 }
728 }
729 let _ = reply_sender.send(Ok(
730 ControlRequestReply::CliAndDefaultDaemonIps {
731 default_daemon: default_daemon_ip,
732 cli: None, },
734 ));
735 }
736 }
737 }
738 ControlEvent::Error(err) => tracing::error!("{err:?}"),
739 ControlEvent::LogSubscribe {
740 dataflow_id,
741 level,
742 connection,
743 } => {
744 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
745 dataflow
746 .log_subscribers
747 .push(LogSubscriber::new(level, connection));
748 }
749 }
750 ControlEvent::BuildLogSubscribe {
751 build_id,
752 level,
753 connection,
754 } => {
755 if let Some(build) = running_builds.get_mut(&build_id) {
756 build
757 .log_subscribers
758 .push(LogSubscriber::new(level, connection));
759 }
760 }
761 },
762 Event::DaemonHeartbeatInterval => {
763 let mut disconnected = BTreeSet::new();
764 for (machine_id, connection) in daemon_connections.iter_mut() {
765 if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
766 tracing::warn!(
767 "no heartbeat message from machine `{machine_id}` since {:?}",
768 connection.last_heartbeat.elapsed()
769 )
770 }
771 if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
772 disconnected.insert(machine_id.clone());
773 continue;
774 }
775 let result: eyre::Result<()> = tokio::time::timeout(
776 Duration::from_millis(500),
777 send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
778 )
779 .await
780 .wrap_err("timeout")
781 .and_then(|r| r)
782 .wrap_err_with(|| {
783 format!("failed to send heartbeat message to daemon at `{machine_id}`")
784 });
785 if let Err(err) = result {
786 tracing::warn!("{err:?}");
787 disconnected.insert(machine_id.clone());
788 }
789 }
790 if !disconnected.is_empty() {
791 tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
792 for machine_id in disconnected {
793 daemon_connections.remove(&machine_id);
794 }
795 }
796 }
797 Event::CtrlC => {
798 tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
799 handle_destroy(
800 &mut running_dataflows,
801 &mut daemon_connections,
802 &abort_handle,
803 &mut daemon_events_tx,
804 &clock,
805 )
806 .await?;
807 }
808 Event::DaemonHeartbeat {
809 daemon_id: machine_id,
810 } => {
811 if let Some(connection) = daemon_connections.get_mut(&machine_id) {
812 connection.last_heartbeat = Instant::now();
813 }
814 }
815 Event::Log(message) => {
816 if let Some(dataflow_id) = &message.dataflow_id {
817 if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
818 send_log_message(&mut dataflow.log_subscribers, &message).await;
819 }
820 }
821 if let Some(build_id) = message.build_id {
822 if let Some(build) = running_builds.get_mut(&build_id) {
823 send_log_message(&mut build.log_subscribers, &message).await;
824 }
825 }
826 }
827 Event::DaemonExit { daemon_id } => {
828 tracing::info!("Daemon `{daemon_id}` exited");
829 daemon_connections.remove(&daemon_id);
830 }
831 Event::DataflowBuildResult {
832 build_id,
833 daemon_id,
834 result,
835 } => match running_builds.get_mut(&build_id) {
836 Some(build) => {
837 build.pending_build_results.remove(&daemon_id);
838 match result {
839 Ok(()) => {}
840 Err(err) => {
841 build.errors.push(format!("{err:?}"));
842 }
843 };
844 if build.pending_build_results.is_empty() {
845 tracing::info!("dataflow build finished: `{build_id}`");
846 let mut build = running_builds.remove(&build_id).unwrap();
847 let result = if build.errors.is_empty() {
848 Ok(())
849 } else {
850 Err(format!("build failed: {}", build.errors.join("\n\n")))
851 };
852
853 build.build_result.set_result(Ok(
854 ControlRequestReply::DataflowBuildFinished { build_id, result },
855 ));
856
857 finished_builds.insert(build_id, build.build_result);
858 }
859 }
860 None => {
861 tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
862 }
863 },
864 Event::DataflowSpawnResult {
865 dataflow_id,
866 daemon_id,
867 result,
868 } => match running_dataflows.get_mut(&dataflow_id) {
869 Some(dataflow) => {
870 dataflow.pending_spawn_results.remove(&daemon_id);
871 match result {
872 Ok(()) => {
873 if dataflow.pending_spawn_results.is_empty() {
874 tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
875 dataflow.spawn_result.set_result(Ok(
876 ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
877 ));
878 }
879 }
880 Err(err) => {
881 tracing::warn!("error while spawning dataflow `{dataflow_id}`");
882 dataflow.spawn_result.set_result(Err(err));
883 }
884 };
885 }
886 None => {
887 tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
888 }
889 },
890 }
891
892 let elapsed = start.elapsed();
894 if elapsed > Duration::from_millis(100) {
895 tracing::warn!(
896 "Coordinator took {}ms for handling event: {event_kind}",
897 elapsed.as_millis()
898 );
899 }
900 }
901
902 tracing::info!("stopped");
903
904 Ok(())
905}
906
907async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
908 for subscriber in log_subscribers.iter_mut() {
909 let send_result =
910 tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
911
912 if send_result.await.is_err() {
913 subscriber.close();
914 }
915 }
916 log_subscribers.retain(|s| !s.is_closed());
917}
918
919fn dataflow_result(
920 results: &BTreeMap<DaemonId, DataflowDaemonResult>,
921 dataflow_uuid: Uuid,
922 clock: &uhlc::HLC,
923) -> DataflowResult {
924 let mut node_results = BTreeMap::new();
925 for result in results.values() {
926 node_results.extend(result.node_results.clone());
927 if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
928 tracing::warn!("failed to update HLC: {err}");
929 }
930 }
931
932 DataflowResult {
933 uuid: dataflow_uuid,
934 timestamp: clock.new_timestamp(),
935 node_results,
936 }
937}
938
939struct DaemonConnection {
940 stream: TcpStream,
941 last_heartbeat: Instant,
942}
943
944async fn handle_destroy(
945 running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
946 daemon_connections: &mut DaemonConnections,
947 abortable_events: &futures::stream::AbortHandle,
948 daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
949 clock: &HLC,
950) -> Result<(), eyre::ErrReport> {
951 abortable_events.abort();
952 for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
953 let _ = stop_dataflow(
954 running_dataflows,
955 dataflow_uuid,
956 daemon_connections,
957 clock.new_timestamp(),
958 None,
959 )
960 .await?;
961 }
962
963 let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
964 *daemon_events_tx = None;
965 result
966}
967
968async fn send_heartbeat_message(
969 connection: &mut TcpStream,
970 timestamp: uhlc::Timestamp,
971) -> eyre::Result<()> {
972 let message = serde_json::to_vec(&Timestamped {
973 inner: DaemonCoordinatorEvent::Heartbeat,
974 timestamp,
975 })
976 .context("Could not serialize heartbeat message")?;
977
978 tcp_send(connection, &message)
979 .await
980 .wrap_err("failed to send heartbeat message to daemon")
981}
982
983struct RunningBuild {
984 errors: Vec<String>,
985 build_result: CachedResult,
986
987 log_subscribers: Vec<LogSubscriber>,
988
989 pending_build_results: BTreeSet<DaemonId>,
990}
991
992struct RunningDataflow {
993 name: Option<String>,
994 uuid: Uuid,
995 daemons: BTreeSet<DaemonId>,
997 pending_daemons: BTreeSet<DaemonId>,
999 exited_before_subscribe: Vec<NodeId>,
1000 nodes: BTreeMap<NodeId, ResolvedNode>,
1001
1002 spawn_result: CachedResult,
1003 stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1004
1005 log_subscribers: Vec<LogSubscriber>,
1006
1007 pending_spawn_results: BTreeSet<DaemonId>,
1008}
1009
1010pub enum CachedResult {
1011 Pending {
1012 result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1013 },
1014 Cached {
1015 result: eyre::Result<ControlRequestReply>,
1016 },
1017}
1018
1019impl Default for CachedResult {
1020 fn default() -> Self {
1021 Self::Pending {
1022 result_senders: Vec::new(),
1023 }
1024 }
1025}
1026
1027impl CachedResult {
1028 fn register(
1029 &mut self,
1030 reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
1031 ) {
1032 match self {
1033 CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
1034 CachedResult::Cached { result } => {
1035 Self::send_result_to(result, reply_sender);
1036 }
1037 }
1038 }
1039
1040 fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
1041 match self {
1042 CachedResult::Pending { result_senders } => {
1043 for sender in result_senders.drain(..) {
1044 Self::send_result_to(&result, sender);
1045 }
1046 *self = CachedResult::Cached { result };
1047 }
1048 CachedResult::Cached { .. } => {}
1049 }
1050 }
1051
1052 fn send_result_to(
1053 result: &eyre::Result<ControlRequestReply>,
1054 sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
1055 ) {
1056 let result = match result {
1057 Ok(r) => Ok(r.clone()),
1058 Err(err) => Err(eyre!("{err:?}")),
1059 };
1060 let _ = sender.send(result);
1061 }
1062}
1063
1064struct ArchivedDataflow {
1065 name: Option<String>,
1066 nodes: BTreeMap<NodeId, ResolvedNode>,
1067}
1068
1069impl From<&RunningDataflow> for ArchivedDataflow {
1070 fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
1071 ArchivedDataflow {
1072 name: dataflow.name.clone(),
1073 nodes: dataflow.nodes.clone(),
1074 }
1075 }
1076}
1077
1078impl PartialEq for RunningDataflow {
1079 fn eq(&self, other: &Self) -> bool {
1080 self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
1081 }
1082}
1083
1084impl Eq for RunningDataflow {}
1085
1086async fn stop_dataflow<'a>(
1087 running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
1088 dataflow_uuid: Uuid,
1089 daemon_connections: &mut DaemonConnections,
1090 timestamp: uhlc::Timestamp,
1091 grace_duration: Option<Duration>,
1092) -> eyre::Result<&'a mut RunningDataflow> {
1093 let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
1094 bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
1095 };
1096
1097 let message = serde_json::to_vec(&Timestamped {
1098 inner: DaemonCoordinatorEvent::StopDataflow {
1099 dataflow_id: dataflow_uuid,
1100 grace_duration,
1101 },
1102 timestamp,
1103 })?;
1104
1105 for daemon_id in &dataflow.daemons {
1106 let daemon_connection = daemon_connections
1107 .get_mut(daemon_id)
1108 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
1110 .await
1111 .wrap_err("failed to send stop message to daemon")?;
1112
1113 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1115 .await
1116 .wrap_err("failed to receive stop reply from daemon")?;
1117 match serde_json::from_slice(&reply_raw)
1118 .wrap_err("failed to deserialize stop reply from daemon")?
1119 {
1120 DaemonCoordinatorReply::StopResult(result) => result
1121 .map_err(|e| eyre!(e))
1122 .wrap_err("failed to stop dataflow")?,
1123 other => bail!("unexpected reply after sending stop: {other:?}"),
1124 }
1125 }
1126
1127 tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
1128
1129 Ok(dataflow)
1130}
1131
1132async fn reload_dataflow(
1133 running_dataflows: &HashMap<Uuid, RunningDataflow>,
1134 dataflow_id: Uuid,
1135 node_id: NodeId,
1136 operator_id: Option<OperatorId>,
1137 daemon_connections: &mut DaemonConnections,
1138 timestamp: uhlc::Timestamp,
1139) -> eyre::Result<()> {
1140 let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
1141 bail!("No running dataflow found with UUID `{dataflow_id}`")
1142 };
1143 let message = serde_json::to_vec(&Timestamped {
1144 inner: DaemonCoordinatorEvent::ReloadDataflow {
1145 dataflow_id,
1146 node_id,
1147 operator_id,
1148 },
1149 timestamp,
1150 })?;
1151
1152 for machine_id in &dataflow.daemons {
1153 let daemon_connection = daemon_connections
1154 .get_mut(machine_id)
1155 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
1157 .await
1158 .wrap_err("failed to send reload message to daemon")?;
1159
1160 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1162 .await
1163 .wrap_err("failed to receive reload reply from daemon")?;
1164 match serde_json::from_slice(&reply_raw)
1165 .wrap_err("failed to deserialize reload reply from daemon")?
1166 {
1167 DaemonCoordinatorReply::ReloadResult(result) => result
1168 .map_err(|e| eyre!(e))
1169 .wrap_err("failed to reload dataflow")?,
1170 other => bail!("unexpected reply after sending reload: {other:?}"),
1171 }
1172 }
1173 tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
1174
1175 Ok(())
1176}
1177
1178async fn retrieve_logs(
1179 running_dataflows: &HashMap<Uuid, RunningDataflow>,
1180 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
1181 dataflow_id: Uuid,
1182 node_id: NodeId,
1183 daemon_connections: &mut DaemonConnections,
1184 timestamp: uhlc::Timestamp,
1185) -> eyre::Result<Vec<u8>> {
1186 let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
1187 dataflow.nodes.clone()
1188 } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
1189 dataflow.nodes.clone()
1190 } else {
1191 bail!("No dataflow found with UUID `{dataflow_id}`")
1192 };
1193
1194 let message = serde_json::to_vec(&Timestamped {
1195 inner: DaemonCoordinatorEvent::Logs {
1196 dataflow_id,
1197 node_id: node_id.clone(),
1198 },
1199 timestamp,
1200 })?;
1201
1202 let machine_ids: Vec<Option<String>> = nodes
1203 .values()
1204 .filter(|node| node.id == node_id)
1205 .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
1206 .collect();
1207
1208 let machine_id = if let [machine_id] = &machine_ids[..] {
1209 machine_id
1210 } else if machine_ids.is_empty() {
1211 bail!("No machine contains {}/{}", dataflow_id, node_id)
1212 } else {
1213 bail!(
1214 "More than one machine contains {}/{}. However, it should only be present on one.",
1215 dataflow_id,
1216 node_id
1217 )
1218 };
1219
1220 let daemon_ids: Vec<_> = match machine_id {
1221 None => daemon_connections.unnamed().collect(),
1222 Some(machine_id) => daemon_connections
1223 .get_matching_daemon_id(machine_id)
1224 .into_iter()
1225 .collect(),
1226 };
1227 let daemon_id = match &daemon_ids[..] {
1228 [id] => (*id).clone(),
1229 [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
1230 _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
1231 };
1232 let daemon_connection = daemon_connections
1233 .get_mut(&daemon_id)
1234 .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
1235 tcp_send(&mut daemon_connection.stream, &message)
1236 .await
1237 .wrap_err("failed to send logs message to daemon")?;
1238
1239 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1241 .await
1242 .wrap_err("failed to retrieve logs reply from daemon")?;
1243 let reply_logs = match serde_json::from_slice(&reply_raw)
1244 .wrap_err("failed to deserialize logs reply from daemon")?
1245 {
1246 DaemonCoordinatorReply::Logs(logs) => logs,
1247 other => bail!("unexpected reply after sending logs: {other:?}"),
1248 };
1249 tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
1250
1251 reply_logs.map_err(|err| eyre!(err))
1252}
1253
1254#[allow(clippy::too_many_arguments)]
1255#[tracing::instrument(skip(daemon_connections, clock))]
1256async fn build_dataflow(
1257 build_id: BuildId,
1258 session_id: SessionId,
1259 dataflow: Descriptor,
1260 git_sources: BTreeMap<NodeId, GitSource>,
1261 prev_git_sources: BTreeMap<NodeId, GitSource>,
1262 local_working_dir: Option<PathBuf>,
1263 clock: &HLC,
1264 uv: bool,
1265 daemon_connections: &mut DaemonConnections,
1266) -> eyre::Result<RunningBuild> {
1267 let nodes = dataflow.resolve_aliases_and_set_defaults()?;
1268
1269 let mut git_sources_by_daemon = git_sources
1270 .into_iter()
1271 .into_grouping_map_by(|(id, _)| {
1272 nodes
1273 .get(id)
1274 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1275 })
1276 .collect();
1277 let mut prev_git_sources_by_daemon = prev_git_sources
1278 .into_iter()
1279 .into_grouping_map_by(|(id, _)| {
1280 nodes
1281 .get(id)
1282 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1283 })
1284 .collect();
1285
1286 let nodes_by_daemon = nodes
1287 .values()
1288 .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
1289
1290 let mut daemons = BTreeSet::new();
1291 for (machine, nodes_on_machine) in &nodes_by_daemon {
1292 let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
1293 tracing::debug!(
1294 "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
1295 );
1296
1297 let build_command = BuildDataflowNodes {
1298 build_id,
1299 session_id,
1300 local_working_dir: local_working_dir.clone(),
1301 git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
1302 prev_git_sources: prev_git_sources_by_daemon
1303 .remove(machine)
1304 .unwrap_or_default(),
1305 dataflow_descriptor: dataflow.clone(),
1306 nodes_on_machine,
1307 uv,
1308 };
1309 let message = serde_json::to_vec(&Timestamped {
1310 inner: DaemonCoordinatorEvent::Build(build_command),
1311 timestamp: clock.new_timestamp(),
1312 })?;
1313
1314 let daemon_id =
1315 build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
1316 .await
1317 .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
1318 daemons.insert(daemon_id);
1319 }
1320
1321 tracing::info!("successfully triggered dataflow build `{build_id}`",);
1322
1323 Ok(RunningBuild {
1324 errors: Vec::new(),
1325 build_result: CachedResult::default(),
1326 log_subscribers: Vec::new(),
1327 pending_build_results: daemons,
1328 })
1329}
1330
1331async fn build_dataflow_on_machine(
1332 daemon_connections: &mut DaemonConnections,
1333 machine: Option<&str>,
1334 message: &[u8],
1335) -> Result<DaemonId, eyre::ErrReport> {
1336 let daemon_id = match machine {
1337 Some(machine) => daemon_connections
1338 .get_matching_daemon_id(machine)
1339 .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1340 .clone(),
1341 None => daemon_connections
1342 .unnamed()
1343 .next()
1344 .wrap_err("no unnamed daemon connections")?
1345 .clone(),
1346 };
1347
1348 let daemon_connection = daemon_connections
1349 .get_mut(&daemon_id)
1350 .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
1351 tcp_send(&mut daemon_connection.stream, message)
1352 .await
1353 .wrap_err("failed to send build message to daemon")?;
1354
1355 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1356 .await
1357 .wrap_err("failed to receive build reply from daemon")?;
1358 match serde_json::from_slice(&reply_raw)
1359 .wrap_err("failed to deserialize build reply from daemon")?
1360 {
1361 DaemonCoordinatorReply::TriggerBuildResult(result) => result
1362 .map_err(|e| eyre!(e))
1363 .wrap_err("daemon returned an error")?,
1364 _ => bail!("unexpected reply"),
1365 }
1366 Ok(daemon_id)
1367}
1368
1369#[allow(clippy::too_many_arguments)]
1370async fn start_dataflow(
1371 build_id: Option<BuildId>,
1372 session_id: SessionId,
1373 dataflow: Descriptor,
1374 local_working_dir: Option<PathBuf>,
1375 name: Option<String>,
1376 daemon_connections: &mut DaemonConnections,
1377 clock: &HLC,
1378 uv: bool,
1379) -> eyre::Result<RunningDataflow> {
1380 let SpawnedDataflow {
1381 uuid,
1382 daemons,
1383 nodes,
1384 } = spawn_dataflow(
1385 build_id,
1386 session_id,
1387 dataflow,
1388 local_working_dir,
1389 daemon_connections,
1390 clock,
1391 uv,
1392 )
1393 .await?;
1394 Ok(RunningDataflow {
1395 uuid,
1396 name,
1397 pending_daemons: if daemons.len() > 1 {
1398 daemons.clone()
1399 } else {
1400 BTreeSet::new()
1401 },
1402 exited_before_subscribe: Default::default(),
1403 daemons: daemons.clone(),
1404 nodes,
1405 spawn_result: CachedResult::default(),
1406 stop_reply_senders: Vec::new(),
1407 log_subscribers: Vec::new(),
1408 pending_spawn_results: daemons,
1409 })
1410}
1411
1412async fn destroy_daemon(
1413 daemon_id: DaemonId,
1414 mut daemon_connection: DaemonConnection,
1415
1416 timestamp: uhlc::Timestamp,
1417) -> Result<()> {
1418 let message = serde_json::to_vec(&Timestamped {
1419 inner: DaemonCoordinatorEvent::Destroy,
1420 timestamp,
1421 })?;
1422
1423 tcp_send(&mut daemon_connection.stream, &message)
1424 .await
1425 .wrap_err(format!(
1426 "failed to send destroy message to daemon `{daemon_id}`"
1427 ))?;
1428
1429 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1431 .await
1432 .wrap_err("failed to receive destroy reply from daemon")?;
1433 match serde_json::from_slice(&reply_raw)
1434 .wrap_err("failed to deserialize destroy reply from daemon")?
1435 {
1436 DaemonCoordinatorReply::DestroyResult { result, .. } => result
1437 .map_err(|e| eyre!(e))
1438 .wrap_err("failed to destroy dataflow")?,
1439 other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1440 }
1441
1442 tracing::info!("successfully destroyed daemon `{daemon_id}`");
1443 Ok(())
1444}
1445
1446async fn destroy_daemons(
1447 daemon_connections: &mut DaemonConnections,
1448 timestamp: uhlc::Timestamp,
1449) -> eyre::Result<()> {
1450 let futures = daemon_connections
1451 .drain()
1452 .map(|(daemon_id, daemon_connection)| {
1453 destroy_daemon(daemon_id, daemon_connection, timestamp)
1454 })
1455 .collect::<Vec<_>>();
1456 let results: Vec<std::result::Result<(), eyre::Error>> =
1457 join_all(futures).await.into_iter().collect::<Vec<_>>();
1458 for result in results {
1459 result?;
1460 }
1461 Ok(())
1462}
1463
1464#[derive(Debug)]
1465pub enum Event {
1466 NewDaemonConnection(TcpStream),
1467 DaemonConnectError(eyre::Report),
1468 DaemonHeartbeat {
1469 daemon_id: DaemonId,
1470 },
1471 Dataflow {
1472 uuid: Uuid,
1473 event: DataflowEvent,
1474 },
1475 Control(ControlEvent),
1476 Daemon(DaemonRequest),
1477 DaemonHeartbeatInterval,
1478 CtrlC,
1479 Log(LogMessage),
1480 DaemonExit {
1481 daemon_id: dora_message::common::DaemonId,
1482 },
1483 DataflowBuildResult {
1484 build_id: BuildId,
1485 daemon_id: DaemonId,
1486 result: eyre::Result<()>,
1487 },
1488 DataflowSpawnResult {
1489 dataflow_id: uuid::Uuid,
1490 daemon_id: DaemonId,
1491 result: eyre::Result<()>,
1492 },
1493}
1494
1495impl Event {
1496 #[allow(clippy::match_like_matches_macro)]
1498 pub fn log(&self) -> bool {
1499 match self {
1500 Event::DaemonHeartbeatInterval => false,
1501 _ => true,
1502 }
1503 }
1504
1505 fn kind(&self) -> &'static str {
1506 match self {
1507 Event::NewDaemonConnection(_) => "NewDaemonConnection",
1508 Event::DaemonConnectError(_) => "DaemonConnectError",
1509 Event::DaemonHeartbeat { .. } => "DaemonHeartbeat",
1510 Event::Dataflow { .. } => "Dataflow",
1511 Event::Control(_) => "Control",
1512 Event::Daemon(_) => "Daemon",
1513 Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1514 Event::CtrlC => "CtrlC",
1515 Event::Log(_) => "Log",
1516 Event::DaemonExit { .. } => "DaemonExit",
1517 Event::DataflowBuildResult { .. } => "DataflowBuildResult",
1518 Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
1519 }
1520 }
1521}
1522
1523#[derive(Debug)]
1524pub enum DataflowEvent {
1525 DataflowFinishedOnDaemon {
1526 daemon_id: DaemonId,
1527 result: DataflowDaemonResult,
1528 },
1529 ReadyOnDaemon {
1530 daemon_id: DaemonId,
1531 exited_before_subscribe: Vec<NodeId>,
1532 },
1533}
1534
1535#[derive(Debug)]
1536pub enum DaemonRequest {
1537 Register {
1538 version_check_result: Result<(), String>,
1539 machine_id: Option<String>,
1540 connection: TcpStream,
1541 },
1542}
1543
1544fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1545 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1546
1547 let mut ctrlc_sent = false;
1548 ctrlc::set_handler(move || {
1549 if ctrlc_sent {
1550 tracing::warn!("received second ctrlc signal -> aborting immediately");
1551 std::process::abort();
1552 } else {
1553 tracing::info!("received ctrlc signal");
1554 if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1555 tracing::error!("failed to report ctrl-c event to dora-coordinator");
1556 }
1557
1558 ctrlc_sent = true;
1559 }
1560 })
1561 .wrap_err("failed to set ctrl-c handler")?;
1562
1563 Ok(ReceiverStream::new(ctrlc_rx))
1564}