1use crate::{
2 run::spawn_dataflow,
3 tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7 config::{NodeId, OperatorId},
8 uhlc::{self, HLC},
9};
10use dora_message::{
11 cli_to_coordinator::ControlRequest,
12 common::DaemonId,
13 coordinator_to_cli::{
14 ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
15 DataflowStatus, LogLevel, LogMessage,
16 },
17 coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
18 daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
19 descriptor::{Descriptor, ResolvedNode},
20};
21use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
22use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
23use futures_concurrency::stream::Merge;
24use log_subscriber::LogSubscriber;
25use run::SpawnedDataflow;
26use std::{
27 collections::{BTreeMap, BTreeSet, HashMap},
28 net::SocketAddr,
29 path::PathBuf,
30 sync::Arc,
31 time::{Duration, Instant},
32};
33use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
34use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
35use uuid::Uuid;
36
37mod control;
38mod listener;
39mod log_subscriber;
40mod run;
41mod tcp_utils;
42
43pub async fn start(
44 bind: SocketAddr,
45 bind_control: SocketAddr,
46 external_events: impl Stream<Item = Event> + Unpin,
47) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
48 let listener = listener::create_listener(bind).await?;
49 let port = listener
50 .local_addr()
51 .wrap_err("failed to get local addr of listener")?
52 .port();
53 let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
54 c.map(Event::NewDaemonConnection)
55 .wrap_err("failed to open connection")
56 .unwrap_or_else(Event::DaemonConnectError)
57 });
58
59 let mut tasks = FuturesUnordered::new();
60 let control_events = control::control_events(bind_control, &tasks)
61 .await
62 .wrap_err("failed to create control events")?;
63
64 let ctrlc_events = set_up_ctrlc_handler()?;
66
67 let events = (
68 external_events,
69 new_daemon_connections,
70 control_events,
71 ctrlc_events,
72 )
73 .merge();
74
75 let future = async move {
76 start_inner(events, &tasks).await?;
77
78 tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
79 while let Some(join_result) = tasks.next().await {
80 if let Err(err) = join_result {
81 tracing::error!("task panicked: {err}");
82 }
83 }
84 tracing::debug!("all spawned tasks finished, exiting..");
85 Ok(())
86 };
87 Ok((port, future))
88}
89
90fn resolve_name(
92 name: String,
93 running_dataflows: &HashMap<Uuid, RunningDataflow>,
94 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
95) -> eyre::Result<Uuid> {
96 let uuids: Vec<_> = running_dataflows
97 .iter()
98 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
99 .map(|(k, _)| k)
100 .copied()
101 .collect();
102 let archived_uuids: Vec<_> = archived_dataflows
103 .iter()
104 .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
105 .map(|(k, _)| k)
106 .copied()
107 .collect();
108
109 if uuids.is_empty() {
110 if archived_uuids.is_empty() {
111 bail!("no dataflow with name `{name}`");
112 } else if let [uuid] = archived_uuids.as_slice() {
113 Ok(*uuid)
114 } else {
115 bail!("multiple archived dataflows found with name `{name}`, Please provide the UUID instead.");
117 }
118 } else if let [uuid] = uuids.as_slice() {
119 Ok(*uuid)
120 } else {
121 bail!("multiple dataflows found with name `{name}`");
122 }
123}
124
125#[derive(Default)]
126struct DaemonConnections {
127 daemons: BTreeMap<DaemonId, DaemonConnection>,
128}
129
130impl DaemonConnections {
131 fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
132 let previous = self.daemons.insert(daemon_id.clone(), connection);
133 if previous.is_some() {
134 tracing::info!("closing previous connection `{daemon_id}` on new register");
135 }
136 }
137
138 fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
139 self.daemons.get_mut(id)
140 }
141
142 fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
143 self.daemons
144 .keys()
145 .find(|id| id.matches_machine_id(machine_id))
146 }
147
148 fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
149 std::mem::take(&mut self.daemons).into_iter()
150 }
151
152 fn is_empty(&self) -> bool {
153 self.daemons.is_empty()
154 }
155
156 fn keys(&self) -> impl Iterator<Item = &DaemonId> {
157 self.daemons.keys()
158 }
159
160 fn iter(&self) -> impl Iterator<Item = (&DaemonId, &DaemonConnection)> {
161 self.daemons.iter()
162 }
163
164 fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
165 self.daemons.iter_mut()
166 }
167
168 fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
169 self.daemons.remove(daemon_id)
170 }
171
172 fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
173 self.daemons.keys().filter(|id| id.machine_id().is_none())
174 }
175}
176
177async fn start_inner(
178 events: impl Stream<Item = Event> + Unpin,
179 tasks: &FuturesUnordered<JoinHandle<()>>,
180) -> eyre::Result<()> {
181 let clock = Arc::new(HLC::default());
182
183 let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
184 let mut daemon_events_tx = Some(daemon_events_tx);
185 let daemon_events = ReceiverStream::new(daemon_events);
186
187 let daemon_heartbeat_interval =
188 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
189 .map(|_| Event::DaemonHeartbeatInterval);
190
191 let (abortable_events, abort_handle) =
193 futures::stream::abortable((events, daemon_heartbeat_interval).merge());
194
195 let mut events = (abortable_events, daemon_events).merge();
196
197 let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
198 let mut dataflow_results: HashMap<Uuid, BTreeMap<DaemonId, DataflowDaemonResult>> =
199 HashMap::new();
200 let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
201 let mut daemon_connections = DaemonConnections::default();
202
203 while let Some(event) = events.next().await {
204 if event.log() {
205 tracing::trace!("Handling event {event:?}");
206 }
207 match event {
208 Event::NewDaemonConnection(connection) => {
209 connection.set_nodelay(true)?;
210 let events_tx = daemon_events_tx.clone();
211 if let Some(events_tx) = events_tx {
212 let task = tokio::spawn(listener::handle_connection(
213 connection,
214 events_tx,
215 clock.clone(),
216 ));
217 tasks.push(task);
218 } else {
219 tracing::warn!(
220 "ignoring new daemon connection because events_tx was closed already"
221 );
222 }
223 }
224 Event::DaemonConnectError(err) => {
225 tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
226 }
227 Event::Daemon(event) => match event {
228 DaemonRequest::Register {
229 machine_id,
230 mut connection,
231 version_check_result,
232 } => {
233 let existing = match &machine_id {
234 Some(id) => daemon_connections.get_matching_daemon_id(id),
235 None => daemon_connections.unnamed().next(),
236 };
237 let existing_result = if existing.is_some() {
238 Err(format!(
239 "There is already a connected daemon with machine ID `{machine_id:?}`"
240 ))
241 } else {
242 Ok(())
243 };
244
245 let daemon_id = DaemonId::new(machine_id);
247
248 let reply: Timestamped<RegisterResult> = Timestamped {
249 inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
250 Ok(_) => RegisterResult::Ok {
251 daemon_id: daemon_id.clone(),
252 },
253 Err(err) => RegisterResult::Err(err.clone()),
254 },
255 timestamp: clock.new_timestamp(),
256 };
257
258 let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
259 .await
260 .context("tcp send failed");
261 match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
262 Ok(()) => {
263 daemon_connections.add(
264 daemon_id.clone(),
265 DaemonConnection {
266 stream: connection,
267 last_heartbeat: Instant::now(),
268 },
269 );
270 }
271 Err(err) => {
272 tracing::warn!("failed to register daemon connection for daemon `{daemon_id}`: {err}");
273 }
274 }
275 }
276 },
277 Event::Dataflow { uuid, event } => match event {
278 DataflowEvent::ReadyOnDaemon {
279 daemon_id,
280 exited_before_subscribe,
281 } => {
282 match running_dataflows.entry(uuid) {
283 std::collections::hash_map::Entry::Occupied(mut entry) => {
284 let dataflow = entry.get_mut();
285 dataflow.pending_daemons.remove(&daemon_id);
286 dataflow
287 .exited_before_subscribe
288 .extend(exited_before_subscribe);
289 if dataflow.pending_daemons.is_empty() {
290 tracing::debug!("sending all nodes ready message to daemons");
291 let message = serde_json::to_vec(&Timestamped {
292 inner: DaemonCoordinatorEvent::AllNodesReady {
293 dataflow_id: uuid,
294 exited_before_subscribe: dataflow
295 .exited_before_subscribe
296 .clone(),
297 },
298 timestamp: clock.new_timestamp(),
299 })
300 .wrap_err("failed to serialize AllNodesReady message")?;
301
302 for daemon_id in &dataflow.daemons {
304 let Some(connection) = daemon_connections.get_mut(daemon_id)
305 else {
306 tracing::warn!(
307 "no daemon connection found for machine `{daemon_id}`"
308 );
309 continue;
310 };
311 tcp_send(&mut connection.stream, &message)
312 .await
313 .wrap_err_with(|| {
314 format!(
315 "failed to send AllNodesReady({uuid}) message \
316 to machine {daemon_id}"
317 )
318 })?;
319 }
320 }
321 }
322 std::collections::hash_map::Entry::Vacant(_) => {
323 tracing::warn!("dataflow not running on ReadyOnMachine");
324 }
325 }
326 }
327 DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
328 tracing::debug!("coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})");
329 match running_dataflows.entry(uuid) {
330 std::collections::hash_map::Entry::Occupied(mut entry) => {
331 let dataflow = entry.get_mut();
332 dataflow.daemons.remove(&daemon_id);
333 tracing::info!(
334 "removed machine id: {daemon_id} from dataflow: {:#?}",
335 dataflow.uuid
336 );
337 dataflow_results
338 .entry(uuid)
339 .or_default()
340 .insert(daemon_id, result);
341
342 if dataflow.daemons.is_empty() {
343 archived_dataflows
345 .entry(uuid)
346 .or_insert_with(|| ArchivedDataflow::from(entry.get()));
347 let mut finished_dataflow = entry.remove();
348 let dataflow_id = finished_dataflow.uuid;
349 send_log_message(
350 &mut finished_dataflow,
351 &LogMessage {
352 dataflow_id,
353 node_id: None,
354 daemon_id: None,
355 level: LogLevel::Info,
356 target: Some("coordinator".into()),
357 module_path: None,
358 file: None,
359 line: None,
360 message: "dataflow finished".into(),
361 },
362 )
363 .await;
364
365 let reply = ControlRequestReply::DataflowStopped {
366 uuid,
367 result: dataflow_results
368 .get(&uuid)
369 .map(|r| dataflow_result(r, uuid, &clock))
370 .unwrap_or_else(|| {
371 DataflowResult::ok_empty(uuid, clock.new_timestamp())
372 }),
373 };
374 for sender in finished_dataflow.reply_senders {
375 let _ = sender.send(Ok(reply.clone()));
376 }
377 }
378 }
379 std::collections::hash_map::Entry::Vacant(_) => {
380 tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
381 }
382 }
383 }
384 },
385
386 Event::Control(event) => match event {
387 ControlEvent::IncomingRequest {
388 request,
389 reply_sender,
390 } => {
391 match request {
392 ControlRequest::Start {
393 dataflow,
394 name,
395 local_working_dir,
396 uv,
397 } => {
398 let name = name.or_else(|| names::Generator::default().next());
399
400 let inner = async {
401 if let Some(name) = name.as_deref() {
402 if running_dataflows
404 .values()
405 .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
406 {
407 bail!("there is already a running dataflow with name `{name}`");
408 }
409 }
410 let dataflow = start_dataflow(
411 dataflow,
412 local_working_dir,
413 name,
414 &mut daemon_connections,
415 &clock,
416 uv,
417 )
418 .await?;
419 Ok(dataflow)
420 };
421 let reply = inner.await.map(|dataflow| {
422 let uuid = dataflow.uuid;
423 running_dataflows.insert(uuid, dataflow);
424 ControlRequestReply::DataflowStarted { uuid }
425 });
426 let _ = reply_sender.send(reply);
427 }
428 ControlRequest::Check { dataflow_uuid } => {
429 let status = match &running_dataflows.get(&dataflow_uuid) {
430 Some(_) => ControlRequestReply::DataflowStarted {
431 uuid: dataflow_uuid,
432 },
433 None => ControlRequestReply::DataflowStopped {
434 uuid: dataflow_uuid,
435 result: dataflow_results
436 .get(&dataflow_uuid)
437 .map(|r| dataflow_result(r, dataflow_uuid, &clock))
438 .unwrap_or_else(|| {
439 DataflowResult::ok_empty(
440 dataflow_uuid,
441 clock.new_timestamp(),
442 )
443 }),
444 },
445 };
446 let _ = reply_sender.send(Ok(status));
447 }
448 ControlRequest::Reload {
449 dataflow_id,
450 node_id,
451 operator_id,
452 } => {
453 let reload = async {
454 reload_dataflow(
455 &running_dataflows,
456 dataflow_id,
457 node_id,
458 operator_id,
459 &mut daemon_connections,
460 clock.new_timestamp(),
461 )
462 .await?;
463 Result::<_, eyre::Report>::Ok(())
464 };
465 let reply =
466 reload
467 .await
468 .map(|()| ControlRequestReply::DataflowReloaded {
469 uuid: dataflow_id,
470 });
471 let _ = reply_sender.send(reply);
472 }
473 ControlRequest::Stop {
474 dataflow_uuid,
475 grace_duration,
476 } => {
477 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
478 let reply = ControlRequestReply::DataflowStopped {
479 uuid: dataflow_uuid,
480 result: dataflow_result(result, dataflow_uuid, &clock),
481 };
482 let _ = reply_sender.send(Ok(reply));
483
484 continue;
485 }
486
487 let dataflow = stop_dataflow(
488 &mut running_dataflows,
489 dataflow_uuid,
490 &mut daemon_connections,
491 clock.new_timestamp(),
492 grace_duration,
493 )
494 .await;
495
496 match dataflow {
497 Ok(dataflow) => {
498 dataflow.reply_senders.push(reply_sender);
499 }
500 Err(err) => {
501 let _ = reply_sender.send(Err(err));
502 }
503 }
504 }
505 ControlRequest::StopByName {
506 name,
507 grace_duration,
508 } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
509 Ok(dataflow_uuid) => {
510 if let Some(result) = dataflow_results.get(&dataflow_uuid) {
511 let reply = ControlRequestReply::DataflowStopped {
512 uuid: dataflow_uuid,
513 result: dataflow_result(result, dataflow_uuid, &clock),
514 };
515 let _ = reply_sender.send(Ok(reply));
516
517 continue;
518 }
519
520 let dataflow = stop_dataflow(
521 &mut running_dataflows,
522 dataflow_uuid,
523 &mut daemon_connections,
524 clock.new_timestamp(),
525 grace_duration,
526 )
527 .await;
528
529 match dataflow {
530 Ok(dataflow) => {
531 dataflow.reply_senders.push(reply_sender);
532 }
533 Err(err) => {
534 let _ = reply_sender.send(Err(err));
535 }
536 }
537 }
538 Err(err) => {
539 let _ = reply_sender.send(Err(err));
540 }
541 },
542 ControlRequest::Logs { uuid, name, node } => {
543 let dataflow_uuid = if let Some(uuid) = uuid {
544 Ok(uuid)
545 } else if let Some(name) = name {
546 resolve_name(name, &running_dataflows, &archived_dataflows)
547 } else {
548 Err(eyre!("No uuid"))
549 };
550
551 match dataflow_uuid {
552 Ok(uuid) => {
553 let reply = retrieve_logs(
554 &running_dataflows,
555 &archived_dataflows,
556 uuid,
557 node.into(),
558 &mut daemon_connections,
559 clock.new_timestamp(),
560 )
561 .await
562 .map(ControlRequestReply::Logs);
563 let _ = reply_sender.send(reply);
564 }
565 Err(err) => {
566 let _ = reply_sender.send(Err(err));
567 }
568 }
569 }
570 ControlRequest::Destroy => {
571 tracing::info!("Received destroy command");
572
573 let reply = handle_destroy(
574 &mut running_dataflows,
575 &mut daemon_connections,
576 &abort_handle,
577 &mut daemon_events_tx,
578 &clock,
579 )
580 .await
581 .map(|()| ControlRequestReply::DestroyOk);
582 let _ = reply_sender.send(reply);
583 }
584 ControlRequest::List => {
585 let mut dataflows: Vec<_> = running_dataflows.values().collect();
586 dataflows.sort_by_key(|d| (&d.name, d.uuid));
587
588 let running = dataflows.into_iter().map(|d| DataflowListEntry {
589 id: DataflowIdAndName {
590 uuid: d.uuid,
591 name: d.name.clone(),
592 },
593 status: DataflowStatus::Running,
594 });
595 let finished_failed =
596 dataflow_results.iter().map(|(&uuid, results)| {
597 let name =
598 archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
599 let id = DataflowIdAndName { uuid, name };
600 let status = if results.values().all(|r| r.is_ok()) {
601 DataflowStatus::Finished
602 } else {
603 DataflowStatus::Failed
604 };
605 DataflowListEntry { id, status }
606 });
607
608 let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
609 running.chain(finished_failed).collect(),
610 )));
611 let _ = reply_sender.send(reply);
612 }
613 ControlRequest::DaemonConnected => {
614 let running = !daemon_connections.is_empty();
615 let _ = reply_sender
616 .send(Ok(ControlRequestReply::DaemonConnected(running)));
617 }
618 ControlRequest::ConnectedMachines => {
619 let reply = Ok(ControlRequestReply::ConnectedDaemons(
620 daemon_connections.keys().cloned().collect(),
621 ));
622 let _ = reply_sender.send(reply);
623 }
624 ControlRequest::LogSubscribe { .. } => {
625 let _ = reply_sender.send(Err(eyre::eyre!(
626 "LogSubscribe request should be handled separately"
627 )));
628 }
629 }
630 }
631 ControlEvent::Error(err) => tracing::error!("{err:?}"),
632 ControlEvent::LogSubscribe {
633 dataflow_id,
634 level,
635 connection,
636 } => {
637 if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
638 dataflow
639 .log_subscribers
640 .push(LogSubscriber::new(level, connection));
641 }
642 }
643 },
644 Event::DaemonHeartbeatInterval => {
645 let mut disconnected = BTreeSet::new();
646 for (machine_id, connection) in daemon_connections.iter_mut() {
647 if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
648 tracing::warn!(
649 "no heartbeat message from machine `{machine_id}` since {:?}",
650 connection.last_heartbeat.elapsed()
651 )
652 }
653 if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
654 disconnected.insert(machine_id.clone());
655 continue;
656 }
657 let result: eyre::Result<()> = tokio::time::timeout(
658 Duration::from_millis(500),
659 send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
660 )
661 .await
662 .wrap_err("timeout")
663 .and_then(|r| r)
664 .wrap_err_with(|| {
665 format!("failed to send heartbeat message to daemon at `{machine_id}`")
666 });
667 if let Err(err) = result {
668 tracing::warn!("{err:?}");
669 disconnected.insert(machine_id.clone());
670 }
671 }
672 if !disconnected.is_empty() {
673 tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
674 for machine_id in disconnected {
675 daemon_connections.remove(&machine_id);
676 }
677 }
678 }
679 Event::CtrlC => {
680 tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
681 handle_destroy(
682 &mut running_dataflows,
683 &mut daemon_connections,
684 &abort_handle,
685 &mut daemon_events_tx,
686 &clock,
687 )
688 .await?;
689 }
690 Event::DaemonHeartbeat {
691 daemon_id: machine_id,
692 } => {
693 if let Some(connection) = daemon_connections.get_mut(&machine_id) {
694 connection.last_heartbeat = Instant::now();
695 }
696 }
697 Event::Log(message) => {
698 if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
699 send_log_message(dataflow, &message).await;
700 }
701 }
702 Event::DaemonExit { daemon_id } => {
703 tracing::info!("Daemon `{daemon_id}` exited");
704 daemon_connections.remove(&daemon_id);
705 }
706 }
707 }
708
709 tracing::info!("stopped");
710
711 Ok(())
712}
713
714async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) {
715 for subscriber in &mut dataflow.log_subscribers {
716 let send_result =
717 tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
718
719 if send_result.await.is_err() {
720 subscriber.close();
721 }
722 }
723 dataflow.log_subscribers.retain(|s| !s.is_closed());
724}
725
726fn dataflow_result(
727 results: &BTreeMap<DaemonId, DataflowDaemonResult>,
728 dataflow_uuid: Uuid,
729 clock: &uhlc::HLC,
730) -> DataflowResult {
731 let mut node_results = BTreeMap::new();
732 for result in results.values() {
733 node_results.extend(result.node_results.clone());
734 if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
735 tracing::warn!("failed to update HLC: {err}");
736 }
737 }
738
739 DataflowResult {
740 uuid: dataflow_uuid,
741 timestamp: clock.new_timestamp(),
742 node_results,
743 }
744}
745
746struct DaemonConnection {
747 stream: TcpStream,
748 last_heartbeat: Instant,
749}
750
751async fn handle_destroy(
752 running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
753 daemon_connections: &mut DaemonConnections,
754 abortable_events: &futures::stream::AbortHandle,
755 daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
756 clock: &HLC,
757) -> Result<(), eyre::ErrReport> {
758 abortable_events.abort();
759 for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
760 let _ = stop_dataflow(
761 running_dataflows,
762 dataflow_uuid,
763 daemon_connections,
764 clock.new_timestamp(),
765 None,
766 )
767 .await?;
768 }
769
770 let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
771 *daemon_events_tx = None;
772 result
773}
774
775async fn send_heartbeat_message(
776 connection: &mut TcpStream,
777 timestamp: uhlc::Timestamp,
778) -> eyre::Result<()> {
779 let message = serde_json::to_vec(&Timestamped {
780 inner: DaemonCoordinatorEvent::Heartbeat,
781 timestamp,
782 })
783 .context("Could not serialize heartbeat message")?;
784
785 tcp_send(connection, &message)
786 .await
787 .wrap_err("failed to send heartbeat message to daemon")
788}
789
790struct RunningDataflow {
791 name: Option<String>,
792 uuid: Uuid,
793 daemons: BTreeSet<DaemonId>,
795 pending_daemons: BTreeSet<DaemonId>,
797 exited_before_subscribe: Vec<NodeId>,
798 nodes: BTreeMap<NodeId, ResolvedNode>,
799
800 reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
801
802 log_subscribers: Vec<LogSubscriber>,
803}
804
805struct ArchivedDataflow {
806 name: Option<String>,
807 nodes: BTreeMap<NodeId, ResolvedNode>,
808}
809
810impl From<&RunningDataflow> for ArchivedDataflow {
811 fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
812 ArchivedDataflow {
813 name: dataflow.name.clone(),
814 nodes: dataflow.nodes.clone(),
815 }
816 }
817}
818
819impl PartialEq for RunningDataflow {
820 fn eq(&self, other: &Self) -> bool {
821 self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
822 }
823}
824
825impl Eq for RunningDataflow {}
826
827async fn stop_dataflow<'a>(
828 running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
829 dataflow_uuid: Uuid,
830 daemon_connections: &mut DaemonConnections,
831 timestamp: uhlc::Timestamp,
832 grace_duration: Option<Duration>,
833) -> eyre::Result<&'a mut RunningDataflow> {
834 let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
835 bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
836 };
837
838 let message = serde_json::to_vec(&Timestamped {
839 inner: DaemonCoordinatorEvent::StopDataflow {
840 dataflow_id: dataflow_uuid,
841 grace_duration,
842 },
843 timestamp,
844 })?;
845
846 for daemon_id in &dataflow.daemons {
847 let daemon_connection = daemon_connections
848 .get_mut(daemon_id)
849 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
851 .await
852 .wrap_err("failed to send stop message to daemon")?;
853
854 let reply_raw = tcp_receive(&mut daemon_connection.stream)
856 .await
857 .wrap_err("failed to receive stop reply from daemon")?;
858 match serde_json::from_slice(&reply_raw)
859 .wrap_err("failed to deserialize stop reply from daemon")?
860 {
861 DaemonCoordinatorReply::StopResult(result) => result
862 .map_err(|e| eyre!(e))
863 .wrap_err("failed to stop dataflow")?,
864 other => bail!("unexpected reply after sending stop: {other:?}"),
865 }
866 }
867
868 tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
869
870 Ok(dataflow)
871}
872
873async fn reload_dataflow(
874 running_dataflows: &HashMap<Uuid, RunningDataflow>,
875 dataflow_id: Uuid,
876 node_id: NodeId,
877 operator_id: Option<OperatorId>,
878 daemon_connections: &mut DaemonConnections,
879 timestamp: uhlc::Timestamp,
880) -> eyre::Result<()> {
881 let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
882 bail!("No running dataflow found with UUID `{dataflow_id}`")
883 };
884 let message = serde_json::to_vec(&Timestamped {
885 inner: DaemonCoordinatorEvent::ReloadDataflow {
886 dataflow_id,
887 node_id,
888 operator_id,
889 },
890 timestamp,
891 })?;
892
893 for machine_id in &dataflow.daemons {
894 let daemon_connection = daemon_connections
895 .get_mut(machine_id)
896 .wrap_err("no daemon connection")?; tcp_send(&mut daemon_connection.stream, &message)
898 .await
899 .wrap_err("failed to send reload message to daemon")?;
900
901 let reply_raw = tcp_receive(&mut daemon_connection.stream)
903 .await
904 .wrap_err("failed to receive reload reply from daemon")?;
905 match serde_json::from_slice(&reply_raw)
906 .wrap_err("failed to deserialize reload reply from daemon")?
907 {
908 DaemonCoordinatorReply::ReloadResult(result) => result
909 .map_err(|e| eyre!(e))
910 .wrap_err("failed to reload dataflow")?,
911 other => bail!("unexpected reply after sending reload: {other:?}"),
912 }
913 }
914 tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
915
916 Ok(())
917}
918
919async fn retrieve_logs(
920 running_dataflows: &HashMap<Uuid, RunningDataflow>,
921 archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
922 dataflow_id: Uuid,
923 node_id: NodeId,
924 daemon_connections: &mut DaemonConnections,
925 timestamp: uhlc::Timestamp,
926) -> eyre::Result<Vec<u8>> {
927 let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
928 dataflow.nodes.clone()
929 } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
930 dataflow.nodes.clone()
931 } else {
932 bail!("No dataflow found with UUID `{dataflow_id}`")
933 };
934
935 let message = serde_json::to_vec(&Timestamped {
936 inner: DaemonCoordinatorEvent::Logs {
937 dataflow_id,
938 node_id: node_id.clone(),
939 },
940 timestamp,
941 })?;
942
943 let machine_ids: Vec<Option<String>> = nodes
944 .values()
945 .filter(|node| node.id == node_id)
946 .map(|node| node.deploy.machine.clone())
947 .collect();
948
949 let machine_id = if let [machine_id] = &machine_ids[..] {
950 machine_id
951 } else if machine_ids.is_empty() {
952 bail!("No machine contains {}/{}", dataflow_id, node_id)
953 } else {
954 bail!(
955 "More than one machine contains {}/{}. However, it should only be present on one.",
956 dataflow_id,
957 node_id
958 )
959 };
960
961 let daemon_ids: Vec<_> = match machine_id {
962 None => daemon_connections.unnamed().collect(),
963 Some(machine_id) => daemon_connections
964 .get_matching_daemon_id(machine_id)
965 .into_iter()
966 .collect(),
967 };
968 let daemon_id = match &daemon_ids[..] {
969 [id] => (*id).clone(),
970 [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
971 _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
972 };
973 let daemon_connection = daemon_connections
974 .get_mut(&daemon_id)
975 .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
976 tcp_send(&mut daemon_connection.stream, &message)
977 .await
978 .wrap_err("failed to send logs message to daemon")?;
979
980 let reply_raw = tcp_receive(&mut daemon_connection.stream)
982 .await
983 .wrap_err("failed to retrieve logs reply from daemon")?;
984 let reply_logs = match serde_json::from_slice(&reply_raw)
985 .wrap_err("failed to deserialize logs reply from daemon")?
986 {
987 DaemonCoordinatorReply::Logs(logs) => logs,
988 other => bail!("unexpected reply after sending logs: {other:?}"),
989 };
990 tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
991
992 reply_logs.map_err(|err| eyre!(err))
993}
994
995async fn start_dataflow(
996 dataflow: Descriptor,
997 working_dir: PathBuf,
998 name: Option<String>,
999 daemon_connections: &mut DaemonConnections,
1000 clock: &HLC,
1001 uv: bool,
1002) -> eyre::Result<RunningDataflow> {
1003 let SpawnedDataflow {
1004 uuid,
1005 daemons,
1006 nodes,
1007 } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock, uv).await?;
1008 Ok(RunningDataflow {
1009 uuid,
1010 name,
1011 pending_daemons: if daemons.len() > 1 {
1012 daemons.clone()
1013 } else {
1014 BTreeSet::new()
1015 },
1016 exited_before_subscribe: Default::default(),
1017 daemons,
1018 nodes,
1019 reply_senders: Vec::new(),
1020 log_subscribers: Vec::new(),
1021 })
1022}
1023
1024async fn destroy_daemon(
1025 daemon_id: DaemonId,
1026 mut daemon_connection: DaemonConnection,
1027
1028 timestamp: uhlc::Timestamp,
1029) -> Result<()> {
1030 let message = serde_json::to_vec(&Timestamped {
1031 inner: DaemonCoordinatorEvent::Destroy,
1032 timestamp,
1033 })?;
1034
1035 tcp_send(&mut daemon_connection.stream, &message)
1036 .await
1037 .wrap_err(format!(
1038 "failed to send destroy message to daemon `{daemon_id}`"
1039 ))?;
1040
1041 let reply_raw = tcp_receive(&mut daemon_connection.stream)
1043 .await
1044 .wrap_err("failed to receive destroy reply from daemon")?;
1045 match serde_json::from_slice(&reply_raw)
1046 .wrap_err("failed to deserialize destroy reply from daemon")?
1047 {
1048 DaemonCoordinatorReply::DestroyResult { result, .. } => result
1049 .map_err(|e| eyre!(e))
1050 .wrap_err("failed to destroy dataflow")?,
1051 other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1052 }
1053
1054 tracing::info!("successfully destroyed daemon `{daemon_id}`");
1055 Ok(())
1056}
1057
1058async fn destroy_daemons(
1059 daemon_connections: &mut DaemonConnections,
1060 timestamp: uhlc::Timestamp,
1061) -> eyre::Result<()> {
1062 let futures = daemon_connections
1063 .drain()
1064 .map(|(daemon_id, daemon_connection)| {
1065 destroy_daemon(daemon_id, daemon_connection, timestamp)
1066 })
1067 .collect::<Vec<_>>();
1068 let results: Vec<std::result::Result<(), eyre::Error>> =
1069 join_all(futures).await.into_iter().collect::<Vec<_>>();
1070 for result in results {
1071 result?;
1072 }
1073 Ok(())
1074}
1075
1076#[derive(Debug)]
1077pub enum Event {
1078 NewDaemonConnection(TcpStream),
1079 DaemonConnectError(eyre::Report),
1080 DaemonHeartbeat {
1081 daemon_id: DaemonId,
1082 },
1083 Dataflow {
1084 uuid: Uuid,
1085 event: DataflowEvent,
1086 },
1087 Control(ControlEvent),
1088 Daemon(DaemonRequest),
1089 DaemonHeartbeatInterval,
1090 CtrlC,
1091 Log(LogMessage),
1092 DaemonExit {
1093 daemon_id: dora_message::common::DaemonId,
1094 },
1095}
1096
1097impl Event {
1098 #[allow(clippy::match_like_matches_macro)]
1100 pub fn log(&self) -> bool {
1101 match self {
1102 Event::DaemonHeartbeatInterval => false,
1103 _ => true,
1104 }
1105 }
1106}
1107
1108#[derive(Debug)]
1109pub enum DataflowEvent {
1110 DataflowFinishedOnDaemon {
1111 daemon_id: DaemonId,
1112 result: DataflowDaemonResult,
1113 },
1114 ReadyOnDaemon {
1115 daemon_id: DaemonId,
1116 exited_before_subscribe: Vec<NodeId>,
1117 },
1118}
1119
1120#[derive(Debug)]
1121pub enum DaemonRequest {
1122 Register {
1123 version_check_result: Result<(), String>,
1124 machine_id: Option<String>,
1125 connection: TcpStream,
1126 },
1127}
1128
1129fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1130 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1131
1132 let mut ctrlc_sent = false;
1133 ctrlc::set_handler(move || {
1134 if ctrlc_sent {
1135 tracing::warn!("received second ctrlc signal -> aborting immediately");
1136 std::process::abort();
1137 } else {
1138 tracing::info!("received ctrlc signal");
1139 if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1140 tracing::error!("failed to report ctrl-c event to dora-coordinator");
1141 }
1142
1143 ctrlc_sent = true;
1144 }
1145 })
1146 .wrap_err("failed to set ctrl-c handler")?;
1147
1148 Ok(ReceiverStream::new(ctrlc_rx))
1149}