1use crate::{server::CoordinatorControlServer, tcp_utils::tcp_send};
2pub use control::ControlEvent;
3use dashmap::{
4 DashMap,
5 mapref::one::{Ref, RefMut},
6};
7use dora_core::{
8 config::{NodeId, OperatorId},
9 descriptor::DescriptorExt,
10 uhlc::{self, HLC},
11};
12use dora_message::{
13 BuildId, SessionId,
14 cli_to_coordinator::{
15 BuildRequest, CoordinatorControl, CoordinatorControlClient, CoordinatorControlRequest,
16 CoordinatorControlResponse,
17 },
18 common::DaemonId,
19 coordinator_to_cli::{DataflowResult, LogMessage, StopDataflowReply},
20 coordinator_to_daemon::{
21 BuildDataflowNodes, DaemonControlClient, DaemonControlRequest, DaemonControlResponse,
22 RegisterResult, Timestamped,
23 },
24 daemon_to_coordinator::DataflowDaemonResult,
25 descriptor::{Descriptor, ResolvedNode},
26 tarpc::{
27 self, ClientMessage, Response, Transport, client,
28 server::{BaseChannel, Channel},
29 tokio_serde,
30 },
31};
32use eyre::{ContextCompat, Result, WrapErr, bail, eyre};
33use futures::{Future, Stream, StreamExt, future, stream::FuturesUnordered};
34use futures_concurrency::stream::Merge;
35use itertools::Itertools;
36use log_subscriber::LogSubscriber;
37
38use std::{
39 collections::{BTreeMap, BTreeSet},
40 net::SocketAddr,
41 path::PathBuf,
42 sync::Arc,
43 time::{Duration, Instant},
44};
45use tokio::{
46 net::TcpStream,
47 sync::{mpsc, oneshot},
48 task::JoinHandle,
49};
50use tokio_stream::wrappers::ReceiverStream;
51use uuid::Uuid;
52
53mod control;
54mod listener;
55mod log_subscriber;
56mod run;
57mod server;
58mod state;
59mod tcp_utils;
60
61pub async fn start(
64 bind: SocketAddr,
65 bind_control: SocketAddr,
66 external_events: impl Stream<Item = Event> + Unpin,
67) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
68 let tasks = FuturesUnordered::new();
69 let control_events = control::control_events(bind_control, &tasks)
70 .await
71 .wrap_err("failed to create control events")?;
72
73 let (daemon_port, coordinator_state, future) =
74 init_coordinator(bind, external_events, control_events, tasks).await?;
75
76 let rpc_bind = SocketAddr::new(
78 bind_control.ip(),
79 dora_core::topics::dora_coordinator_port_rpc(bind_control.port()),
80 );
81 let listener =
82 tarpc::serde_transport::tcp::listen(rpc_bind, tokio_serde::formats::Json::default)
83 .await
84 .wrap_err("failed to start tarpc server for control messages")?;
85
86 let stream = listener
87 .filter_map(|c| future::ready(c.ok()))
89 .map(move |transport| {
90 let client_ip = transport.peer_addr().ok().map(|addr| addr.ip());
91 serve_control_requests(transport, coordinator_state.clone(), client_ip)
92 });
93 tokio::spawn(stream.for_each(|handle_connection| async {
94 tokio::spawn(handle_connection);
95 }));
96
97 Ok((daemon_port, future))
98}
99
100pub async fn start_with_channel_rpc(
106 bind: SocketAddr,
107 external_events: impl Stream<Item = Event> + Unpin,
108) -> Result<
109 (
110 CoordinatorControlClient,
111 impl Future<Output = eyre::Result<()>>,
112 ),
113 eyre::ErrReport,
114> {
115 let tasks = FuturesUnordered::new();
116
117 let (_daemon_port, coordinator_state, future) =
118 init_coordinator(bind, external_events, futures::stream::empty(), tasks).await?;
119
120 let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
122 tokio::spawn(serve_control_requests(
123 server_transport,
124 coordinator_state,
125 None,
126 ));
127 let control_client =
128 CoordinatorControlClient::new(client::Config::default(), client_transport).spawn();
129
130 Ok((control_client, future))
131}
132
133async fn init_coordinator(
135 bind: SocketAddr,
136 external_events: impl Stream<Item = Event> + Unpin,
137 control_events: impl Stream<Item = Event> + Unpin,
138 mut tasks: FuturesUnordered<JoinHandle<()>>,
139) -> Result<(
140 u16,
141 Arc<state::CoordinatorState>,
142 impl Future<Output = eyre::Result<()>>,
143)> {
144 use tokio_stream::wrappers::TcpListenerStream;
145
146 let daemon_listener = listener::create_listener(bind).await?;
147 let daemon_port = daemon_listener
148 .local_addr()
149 .wrap_err("failed to get local addr of daemon listener")?
150 .port();
151 let new_daemon_connections = TcpListenerStream::new(daemon_listener).map(|c| {
152 c.map(Event::NewDaemonConnection)
153 .wrap_err("failed to open connection")
154 .unwrap_or_else(Event::DaemonConnectError)
155 });
156
157 let ctrlc_events = set_up_ctrlc_handler()?;
159
160 let events = (
161 external_events,
162 new_daemon_connections,
163 control_events,
164 ctrlc_events,
165 )
166 .merge();
167
168 let daemon_heartbeat_interval =
169 tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
170 .map(|_| Event::DaemonHeartbeatInterval);
171
172 let (abortable_events, abort_handle) =
174 futures::stream::abortable((events, daemon_heartbeat_interval).merge());
175
176 let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(100);
177 let coordinator_state = Arc::new(state::CoordinatorState {
178 clock: Arc::new(HLC::default()),
179 running_builds: Default::default(),
180 finished_builds: Default::default(),
181 running_dataflows: Default::default(),
182 dataflow_results: Default::default(),
183 archived_dataflows: Default::default(),
184 daemon_connections: Default::default(),
185 daemon_events_tx,
186 abort_handle,
187 });
188
189 let state_for_caller = coordinator_state.clone();
190
191 let future = async move {
192 start_inner(abortable_events, &tasks, daemon_events, coordinator_state).await?;
193
194 tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
195 while let Some(join_result) = tasks.next().await {
196 if let Err(err) = join_result {
197 tracing::error!("task panicked: {err}");
198 }
199 }
200 tracing::debug!("all spawned tasks finished, exiting..");
201 Ok(())
202 };
203 Ok((daemon_port, state_for_caller, future))
204}
205
206fn serve_control_requests<T>(
208 transport: T,
209 state: Arc<state::CoordinatorState>,
210 client_ip: Option<std::net::IpAddr>,
211) -> impl Future<Output = ()>
212where
213 T: Transport<Response<CoordinatorControlResponse>, ClientMessage<CoordinatorControlRequest>>
214 + Send
215 + 'static,
216 T::Error: std::error::Error + Send + Sync + 'static,
217{
218 let channel = BaseChannel::with_defaults(transport);
219 let server = CoordinatorControlServer { state, client_ip };
220 channel.execute(server.serve()).for_each(|fut| async {
221 tokio::spawn(fut);
222 })
223}
224
225fn resolve_name(
227 name: String,
228 running_dataflows: &DashMap<Uuid, RunningDataflow>,
229 archived_dataflows: &DashMap<Uuid, ArchivedDataflow>,
230) -> eyre::Result<Uuid> {
231 let uuids: Vec<_> = running_dataflows
232 .iter()
233 .filter(|r| r.value().name.as_deref() == Some(name.as_str()))
234 .map(|r| *r.key())
235 .collect();
236 let archived_uuids: Vec<_> = archived_dataflows
237 .iter()
238 .filter(|r| r.value().name.as_deref() == Some(name.as_str()))
239 .map(|r| *r.key())
240 .collect();
241
242 if uuids.is_empty() {
243 if archived_uuids.is_empty() {
244 bail!("no dataflow with name `{name}`");
245 } else if let [uuid] = archived_uuids.as_slice() {
246 Ok(*uuid)
247 } else {
248 bail!(
250 "multiple archived dataflows found with name `{name}`, Please provide the UUID instead."
251 );
252 }
253 } else if let [uuid] = uuids.as_slice() {
254 Ok(*uuid)
255 } else {
256 bail!("multiple dataflows found with name `{name}`");
257 }
258}
259
260#[derive(Default)]
261pub(crate) struct DaemonConnections {
262 daemons: DashMap<DaemonId, DaemonConnection>,
263}
264
265impl DaemonConnections {
266 fn add(&self, daemon_id: DaemonId, connection: DaemonConnection) {
267 let previous = self.daemons.insert(daemon_id.clone(), connection);
268 if previous.is_some() {
269 tracing::info!("closing previous connection `{daemon_id}` on new register");
270 }
271 }
272
273 fn get(&self, id: &DaemonId) -> Option<Ref<'_, DaemonId, DaemonConnection>> {
274 self.daemons.get(id)
275 }
276
277 pub(crate) fn get_mut(&self, id: &DaemonId) -> Option<RefMut<'_, DaemonId, DaemonConnection>> {
278 self.daemons.get_mut(id)
279 }
280
281 fn get_matching_daemon_id(&self, machine_id: &str) -> Option<DaemonId> {
282 self.daemons
283 .iter()
284 .find(|r| r.key().matches_machine_id(machine_id))
285 .map(|r| r.key().clone())
286 }
287
288 fn clear(&self) {
289 self.daemons.clear();
290 }
291
292 fn is_empty(&self) -> bool {
293 self.daemons.is_empty()
294 }
295
296 fn keys(&self) -> impl Iterator<Item = DaemonId> {
297 self.daemons.iter().map(|r| r.key().clone())
298 }
299
300 fn iter(
301 &self,
302 ) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, DaemonId, DaemonConnection>>
303 {
304 self.daemons.iter()
305 }
306
307 pub(crate) fn remove(&self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
308 self.daemons
309 .remove(daemon_id)
310 .map(|(_, connection)| connection)
311 }
312
313 fn unnamed(&self) -> impl Iterator<Item = DaemonId> {
314 self.daemons
315 .iter()
316 .filter(|r| r.key().machine_id().is_none())
317 .map(|r| r.key().clone())
318 }
319}
320
321async fn start_inner(
322 events: impl Stream<Item = Event> + Unpin,
323 tasks: &FuturesUnordered<JoinHandle<()>>,
324 daemon_events: tokio::sync::mpsc::Receiver<Event>,
325 coordinator_state: Arc<state::CoordinatorState>,
326) -> eyre::Result<()> {
327 let clock = coordinator_state.clock.clone();
328
329 let daemon_events = ReceiverStream::new(daemon_events);
330
331 let mut events = (events, daemon_events).merge();
332
333 while let Some(event) = events.next().await {
334 let start = Instant::now();
336 let event_kind = event.kind();
337
338 if event.log() {
339 tracing::trace!("Handling event {event:?}");
340 }
341 match event {
342 Event::Close => {
343 tracing::info!("Received Close event, shutting down coordinator");
344 break;
345 }
346 Event::NewDaemonConnection(connection) => {
347 connection.set_nodelay(true)?;
348 let events_tx = coordinator_state.daemon_events_tx.clone();
349 if !events_tx.is_closed() {
350 let task = tokio::spawn(listener::handle_connection(
351 connection,
352 events_tx,
353 clock.clone(),
354 ));
355 tasks.push(task);
356 } else {
357 tracing::warn!(
358 "ignoring new daemon connection because events_tx was closed already"
359 );
360 }
361 }
362 Event::DaemonConnectError(err) => {
363 tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
364 }
365 Event::Daemon(event) => match event {
366 DaemonRequest::Register {
367 machine_id,
368 machine_uid,
369 mut connection,
370 version_check_result,
371 } => {
372 let existing = match &machine_id {
373 Some(id) => coordinator_state
374 .daemon_connections
375 .get_matching_daemon_id(id),
376 None => coordinator_state.daemon_connections.unnamed().next(),
377 };
378 let existing_result = if existing.is_some() {
379 Err(format!(
380 "There is already a connected daemon with machine ID `{machine_id:?}`"
381 ))
382 } else {
383 Ok(())
384 };
385
386 let daemon_id = DaemonId::new(machine_id);
388
389 let reply: Timestamped<RegisterResult> = Timestamped {
390 inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
391 Ok(_) => RegisterResult::Ok {
392 daemon_id: daemon_id.clone(),
393 },
394 Err(err) => RegisterResult::Err(err.clone()),
395 },
396 timestamp: clock.new_timestamp(),
397 };
398
399 let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
400 .await
401 .context("tcp send failed");
402 match version_check_result
403 .map_err(|e| eyre!(e))
404 .and(existing_result.map_err(|e| eyre!(e)))
405 .and(send_result)
406 {
407 Ok(()) => {
408 let peer_addr = connection.peer_addr().ok();
411 let codec = tokio_serde::formats::Json::<
412 Response<DaemonControlResponse>,
413 ClientMessage<DaemonControlRequest>,
414 >::default();
415 let transport =
416 tarpc::serde_transport::Transport::from((connection, codec));
417 let daemon_client =
418 DaemonControlClient::new(client::Config::default(), transport)
419 .spawn();
420
421 coordinator_state.daemon_connections.add(
422 daemon_id.clone(),
423 DaemonConnection {
424 client: daemon_client,
425 last_heartbeat: Instant::now(),
426 peer_addr,
427 machine_uid,
428 },
429 );
430 }
431 Err(err) => {
432 tracing::warn!(
433 "failed to register daemon connection for daemon `{daemon_id}`: {err}"
434 );
435 }
436 }
437 }
438 DaemonRequest::RegisterNotificationChannel {
439 daemon_id,
440 connection,
441 } => {
442 use dora_message::daemon_to_coordinator::{
445 CoordinatorNotify, CoordinatorNotifyRequest, CoordinatorNotifyResponse,
446 };
447 use tarpc::server::{BaseChannel, Channel};
448
449 let codec = tokio_serde::formats::Json::<
450 ClientMessage<CoordinatorNotifyRequest>,
451 Response<CoordinatorNotifyResponse>,
452 >::default();
453 let transport = tarpc::serde_transport::Transport::from((connection, codec));
454
455 let server = listener::CoordinatorNotifyServer {
456 daemon_id: daemon_id.clone(),
457 coordinator_state: coordinator_state.clone(),
458 };
459
460 let channel = BaseChannel::with_defaults(transport);
461 tokio::spawn(channel.execute(server.serve()).for_each(|fut| async {
462 tokio::spawn(fut);
463 }));
464
465 tracing::info!(
466 "reverse-channel RPC server established for daemon `{daemon_id}`"
467 );
468 }
469 },
470 Event::Control(event) => match event {
471 ControlEvent::Error(err) => tracing::error!("{err:?}"),
472 ControlEvent::LogSubscribe {
473 dataflow_id,
474 level,
475 connection,
476 } => {
477 if let Some(mut dataflow) =
478 coordinator_state.running_dataflows.get_mut(&dataflow_id)
479 {
480 dataflow
481 .log_subscribers
482 .push(LogSubscriber::new(level, connection));
483 let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
484 for message in buffered {
485 send_log_message(&mut dataflow.log_subscribers, &message).await;
486 }
487 }
488 }
489 ControlEvent::BuildLogSubscribe {
490 build_id,
491 level,
492 connection,
493 } => {
494 if let Some(mut build) = coordinator_state.running_builds.get_mut(&build_id) {
495 build
496 .log_subscribers
497 .push(LogSubscriber::new(level, connection));
498 let buffered = std::mem::take(&mut build.buffered_log_messages);
499 for message in buffered {
500 send_log_message(&mut build.log_subscribers, &message).await;
501 }
502 }
503 }
504 },
505 Event::DaemonHeartbeatInterval => {
506 let daemons_to_check: Vec<(DaemonId, Duration, DaemonControlClient)> =
510 coordinator_state
511 .daemon_connections
512 .iter()
513 .map(|r| {
514 (
515 r.key().clone(),
516 r.value().last_heartbeat.elapsed(),
517 r.value().client.clone(),
518 )
519 })
520 .collect();
521 let mut disconnected = BTreeSet::new();
524 for (machine_id, elapsed, client) in daemons_to_check {
525 if elapsed > Duration::from_secs(15) {
526 tracing::warn!(
527 "no heartbeat message from machine `{machine_id}` since {elapsed:?}",
528 )
529 }
530 if elapsed > Duration::from_secs(30) {
531 disconnected.insert(machine_id);
532 continue;
533 }
534 tokio::spawn(async move {
540 if let Err(err) = client.heartbeat(tarpc::context::current()).await {
541 tracing::warn!(
542 "failed to send heartbeat to daemon `{machine_id}`: {err}"
543 );
544 }
545 });
546 }
547 if !disconnected.is_empty() {
548 tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
549 for machine_id in disconnected {
550 coordinator_state.daemon_connections.remove(&machine_id);
551 }
552 }
553 }
554 Event::CtrlC => {
555 tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
556 handle_destroy(&coordinator_state).await?;
557 }
558 Event::Log(message) => {
559 if let Some(dataflow_id) = &message.dataflow_id {
560 if let Some(mut dataflow) =
561 coordinator_state.running_dataflows.get_mut(dataflow_id)
562 {
563 if dataflow.log_subscribers.is_empty() {
564 dataflow.buffered_log_messages.push(message);
566 } else {
567 send_log_message(&mut dataflow.log_subscribers, &message).await;
568 }
569 }
570 } else if let Some(build_id) = &message.build_id {
571 if let Some(mut build) = coordinator_state.running_builds.get_mut(build_id) {
572 if build.log_subscribers.is_empty() {
573 build.buffered_log_messages.push(message);
575 } else {
576 send_log_message(&mut build.log_subscribers, &message).await;
577 }
578 }
579 }
580 }
581 }
582
583 let elapsed = start.elapsed();
585 if elapsed > Duration::from_millis(100) {
586 tracing::warn!(
587 "Coordinator took {}ms for handling event: {event_kind}",
588 elapsed.as_millis()
589 );
590 }
591 }
592
593 tracing::info!("stopped");
594
595 Ok(())
596}
597
598pub(crate) async fn send_log_message(
599 log_subscribers: &mut Vec<LogSubscriber>,
600 message: &LogMessage,
601) {
602 for subscriber in log_subscribers.iter_mut() {
603 let send_result =
604 tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
605
606 if send_result.await.is_err() {
607 subscriber.close();
608 }
609 }
610 log_subscribers.retain(|s| !s.is_closed());
611}
612
613pub(crate) fn dataflow_result(
614 results: &BTreeMap<DaemonId, DataflowDaemonResult>,
615 dataflow_uuid: Uuid,
616 clock: &uhlc::HLC,
617) -> DataflowResult {
618 let mut node_results = BTreeMap::new();
619 for result in results.values() {
620 node_results.extend(result.node_results.clone());
621 if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
622 tracing::warn!("failed to update HLC: {err}");
623 }
624 }
625
626 DataflowResult {
627 uuid: dataflow_uuid,
628 timestamp: clock.new_timestamp(),
629 node_results,
630 }
631}
632
633pub(crate) struct DaemonConnection {
634 client: DaemonControlClient,
635 pub(crate) last_heartbeat: Instant,
636 peer_addr: Option<SocketAddr>,
637 machine_uid: Option<String>,
639}
640
641async fn handle_destroy(
642 coordinator_state: &state::CoordinatorState,
643) -> Result<(), eyre::ErrReport> {
644 coordinator_state.abort_handle.abort();
645 for dataflow_uuid in coordinator_state
646 .running_dataflows
647 .iter()
648 .map(|entry| *entry.key())
649 .collect::<Vec<_>>()
650 {
651 let _ = stop_dataflow(
652 &coordinator_state.running_dataflows,
653 dataflow_uuid,
654 &coordinator_state.daemon_connections,
655 None,
656 false,
657 )
658 .await?;
659 }
660
661 let result = destroy_daemons(&coordinator_state.daemon_connections).await;
662
663 let _ = coordinator_state.daemon_events_tx.send(Event::Close).await;
664 result
665}
666
667#[derive(Debug, Clone)]
669pub struct BuildFinishedResult {
670 pub build_id: BuildId,
671 pub result: Result<(), String>,
672}
673
674pub(crate) struct RunningBuild {
675 pub(crate) errors: Vec<String>,
676 pub(crate) build_result: CachedResult<BuildFinishedResult>,
677
678 pub(crate) buffered_log_messages: Vec<LogMessage>,
680 pub(crate) log_subscribers: Vec<LogSubscriber>,
681
682 pub(crate) pending_build_results: BTreeSet<DaemonId>,
683}
684
685pub(crate) struct RunningDataflow {
686 name: Option<String>,
687 pub(crate) uuid: Uuid,
688 descriptor: Descriptor,
689 pub(crate) daemons: BTreeSet<DaemonId>,
691 pub(crate) pending_daemons: BTreeSet<DaemonId>,
693 pub(crate) exited_before_subscribe: Vec<NodeId>,
694 nodes: BTreeMap<NodeId, ResolvedNode>,
695 node_to_daemon: BTreeMap<NodeId, DaemonId>,
697 node_metrics: BTreeMap<NodeId, dora_message::daemon_to_coordinator::NodeMetrics>,
699
700 pub(crate) spawn_result: CachedResult<Uuid>,
701 pub(crate) stop_reply_senders:
702 Vec<tokio::sync::oneshot::Sender<eyre::Result<StopDataflowReply>>>,
703
704 pub(crate) buffered_log_messages: Vec<LogMessage>,
706 pub(crate) log_subscribers: Vec<LogSubscriber>,
707
708 pub(crate) pending_spawn_results: BTreeSet<DaemonId>,
709}
710
711pub enum CachedResult<T> {
712 Pending {
713 result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<T>>>,
714 },
715 Cached {
716 result: eyre::Result<T>,
717 },
718}
719
720impl<T> Default for CachedResult<T> {
721 fn default() -> Self {
722 Self::Pending {
723 result_senders: Vec::new(),
724 }
725 }
726}
727
728impl<T: Clone> CachedResult<T> {
729 fn register(&mut self, reply_sender: tokio::sync::oneshot::Sender<eyre::Result<T>>) {
730 match self {
731 CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
732 CachedResult::Cached { result } => {
733 Self::send_result_to(result, reply_sender);
734 }
735 }
736 }
737
738 fn set_result(&mut self, result: eyre::Result<T>) {
739 match self {
740 CachedResult::Pending { result_senders } => {
741 for sender in result_senders.drain(..) {
742 Self::send_result_to(&result, sender);
743 }
744 *self = CachedResult::Cached { result };
745 }
746 CachedResult::Cached { .. } => {}
747 }
748 }
749
750 fn send_result_to(result: &eyre::Result<T>, sender: oneshot::Sender<eyre::Result<T>>) {
751 let result = match result {
752 Ok(r) => Ok(r.clone()),
753 Err(err) => Err(eyre!("{err:?}")),
754 };
755 let _ = sender.send(result);
756 }
757}
758
759pub(crate) struct ArchivedDataflow {
760 name: Option<String>,
761 nodes: BTreeMap<NodeId, ResolvedNode>,
762}
763
764impl From<&RunningDataflow> for ArchivedDataflow {
765 fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
766 ArchivedDataflow {
767 name: dataflow.name.clone(),
768 nodes: dataflow.nodes.clone(),
769 }
770 }
771}
772
773impl PartialEq for RunningDataflow {
774 fn eq(&self, other: &Self) -> bool {
775 self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
776 }
777}
778
779impl Eq for RunningDataflow {}
780
781async fn stop_dataflow<'a>(
782 running_dataflows: &'a DashMap<Uuid, RunningDataflow>,
783 dataflow_uuid: Uuid,
784 daemon_connections: &DaemonConnections,
785 grace_duration: Option<Duration>,
786 force: bool,
787) -> eyre::Result<RefMut<'a, Uuid, RunningDataflow>> {
788 let daemon_ids: Vec<DaemonId> = {
790 let Some(dataflow) = running_dataflows.get(&dataflow_uuid) else {
791 bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
792 };
793 dataflow.daemons.iter().cloned().collect()
794 };
795 for daemon_id in &daemon_ids {
798 let client = daemon_connections
799 .get(daemon_id)
800 .wrap_err("no daemon connection")?
801 .client
802 .clone();
803 client
805 .stop_dataflow(
806 tarpc::context::current(),
807 dataflow_uuid,
808 grace_duration,
809 force,
810 )
811 .await
812 .context("RPC transport error")?
813 .map_err(|e: String| eyre!(e))
814 .wrap_err("failed to stop dataflow")?;
815 }
816
817 tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
818
819 running_dataflows
821 .get_mut(&dataflow_uuid)
822 .wrap_err("dataflow was removed while sending stop commands")
823}
824
825async fn reload_dataflow(
826 running_dataflows: &DashMap<Uuid, RunningDataflow>,
827 dataflow_id: Uuid,
828 node_id: NodeId,
829 operator_id: Option<OperatorId>,
830 daemon_connections: &DaemonConnections,
831) -> eyre::Result<()> {
832 let daemon_ids: Vec<DaemonId> = {
834 let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
835 bail!("No running dataflow found with UUID `{dataflow_id}`")
836 };
837 dataflow.daemons.iter().cloned().collect()
838 };
839 for machine_id in &daemon_ids {
842 let client = daemon_connections
843 .get(machine_id)
844 .wrap_err("no daemon connection")?
845 .client
846 .clone();
847 client
848 .reload_dataflow(
849 tarpc::context::current(),
850 dataflow_id,
851 node_id.clone(),
852 operator_id.clone(),
853 )
854 .await
855 .context("RPC transport error")?
856 .map_err(|e: String| eyre!(e))
857 .wrap_err("failed to reload dataflow")?;
858 }
859 tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
860
861 Ok(())
862}
863
864async fn retrieve_logs(
865 running_dataflows: &DashMap<Uuid, RunningDataflow>,
866 archived_dataflows: &DashMap<Uuid, ArchivedDataflow>,
867 dataflow_id: Uuid,
868 node_id: NodeId,
869 daemon_connections: &DaemonConnections,
870 tail: Option<usize>,
871) -> eyre::Result<Vec<u8>> {
872 let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
873 dataflow.nodes.clone()
874 } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
875 dataflow.nodes.clone()
876 } else {
877 bail!("No dataflow found with UUID `{dataflow_id}`")
878 };
879
880 let machine_ids: Vec<Option<String>> = nodes
881 .values()
882 .filter(|node| node.id == node_id)
883 .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
884 .collect();
885
886 let machine_id = if let [machine_id] = &machine_ids[..] {
887 machine_id
888 } else if machine_ids.is_empty() {
889 bail!("No machine contains {}/{}", dataflow_id, node_id)
890 } else {
891 bail!(
892 "More than one machine contains {}/{}. However, it should only be present on one.",
893 dataflow_id,
894 node_id
895 )
896 };
897
898 let daemon_ids: Vec<_> = match machine_id {
899 None => daemon_connections.unnamed().collect(),
900 Some(machine_id) => daemon_connections
901 .get_matching_daemon_id(machine_id)
902 .into_iter()
903 .collect(),
904 };
905 let daemon_id = match &daemon_ids[..] {
906 [id] => (*id).clone(),
907 [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
908 _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
909 };
910 let client = daemon_connections
911 .get(&daemon_id)
912 .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?
913 .client
914 .clone();
915 let reply_logs = client
917 .logs(
918 tarpc::context::current(),
919 dataflow_id,
920 node_id.clone(),
921 tail,
922 )
923 .await
924 .context("RPC transport error")?;
925 tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
926
927 reply_logs.map_err(|err: String| eyre!(err))
928}
929
930#[tracing::instrument(skip(daemon_connections))]
931async fn build_dataflow(
932 build_request: BuildRequest,
933 build_id: BuildId,
934 daemon_connections: &DaemonConnections,
935) -> eyre::Result<RunningBuild> {
936 let BuildRequest {
937 session_id,
938 dataflow,
939 git_sources,
940 prev_git_sources,
941 local_working_dir,
942 uv,
943 } = build_request;
944
945 let nodes = dataflow.resolve_aliases_and_set_defaults()?;
946
947 let mut git_sources_by_daemon = git_sources
948 .into_iter()
949 .into_grouping_map_by(|(id, _)| {
950 nodes
951 .get(id)
952 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
953 })
954 .collect();
955 let mut prev_git_sources_by_daemon = prev_git_sources
956 .into_iter()
957 .into_grouping_map_by(|(id, _)| {
958 nodes
959 .get(id)
960 .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
961 })
962 .collect();
963
964 let nodes_by_daemon = nodes
965 .values()
966 .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
967
968 let mut daemons = BTreeSet::new();
969 for (machine, nodes_on_machine) in &nodes_by_daemon {
970 let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
971 tracing::debug!(
972 "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
973 );
974
975 let build_command = BuildDataflowNodes {
976 build_id,
977 session_id,
978 local_working_dir: local_working_dir.clone(),
979 git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
980 prev_git_sources: prev_git_sources_by_daemon
981 .remove(machine)
982 .unwrap_or_default(),
983 dataflow_descriptor: dataflow.clone(),
984 nodes_on_machine,
985 uv,
986 };
987
988 let daemon_id = build_dataflow_on_machine(
989 daemon_connections,
990 machine.map(|s| s.as_str()),
991 build_command,
992 )
993 .await
994 .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
995 daemons.insert(daemon_id);
996 }
997
998 tracing::info!("successfully triggered dataflow build `{build_id}`",);
999
1000 Ok(RunningBuild {
1001 errors: Vec::new(),
1002 build_result: CachedResult::default(),
1003 buffered_log_messages: Vec::new(),
1004 log_subscribers: Vec::new(),
1005 pending_build_results: daemons,
1006 })
1007}
1008
1009async fn build_dataflow_on_machine(
1010 daemon_connections: &DaemonConnections,
1011 machine: Option<&str>,
1012 build_command: BuildDataflowNodes,
1013) -> Result<DaemonId, eyre::ErrReport> {
1014 let daemon_id = match machine {
1015 Some(machine) => daemon_connections
1016 .get_matching_daemon_id(machine)
1017 .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1018 .clone(),
1019 None => daemon_connections
1020 .unnamed()
1021 .next()
1022 .wrap_err("no unnamed daemon connections")?
1023 .clone(),
1024 };
1025
1026 let client = daemon_connections
1027 .get(&daemon_id)
1028 .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?
1029 .client
1030 .clone();
1031 client
1033 .build(tarpc::context::current(), build_command)
1034 .await
1035 .context("RPC transport error")?
1036 .map_err(|e: String| eyre!(e))
1037 .wrap_err("daemon returned an error")?;
1038 Ok(daemon_id)
1039}
1040
1041#[allow(clippy::too_many_arguments)]
1042async fn start_dataflow(
1043 build_id: Option<BuildId>,
1044 session_id: SessionId,
1045 dataflow: Descriptor,
1046 local_working_dir: Option<PathBuf>,
1047 name: Option<String>,
1048 daemon_connections: &DaemonConnections,
1049 running_dataflows: &DashMap<Uuid, RunningDataflow>,
1050 uv: bool,
1051 write_events_to: Option<PathBuf>,
1052) -> eyre::Result<Uuid> {
1053 let plan = run::plan_dataflow(
1054 build_id,
1055 session_id,
1056 &dataflow,
1057 local_working_dir,
1058 daemon_connections,
1059 uv,
1060 write_events_to,
1061 )?;
1062
1063 let uuid = plan.uuid;
1064 let daemons = plan.daemons.clone();
1065
1066 let run::DataflowPlan {
1067 uuid: _,
1068 daemons: _,
1069 nodes,
1070 node_to_daemon,
1071 daemon_spawn_commands,
1072 } = plan;
1073
1074 running_dataflows.insert(
1080 uuid,
1081 RunningDataflow {
1082 uuid,
1083 name,
1084 descriptor: dataflow,
1085 pending_daemons: if daemons.len() > 1 {
1086 daemons.clone()
1087 } else {
1088 BTreeSet::new()
1089 },
1090 exited_before_subscribe: Default::default(),
1091 daemons: daemons.clone(),
1092 nodes,
1093 node_to_daemon,
1094 node_metrics: BTreeMap::new(),
1095 spawn_result: CachedResult::default(),
1096 stop_reply_senders: Vec::new(),
1097 buffered_log_messages: Vec::new(),
1098 log_subscribers: Vec::new(),
1099 pending_spawn_results: daemons,
1100 },
1101 );
1102
1103 if let Err(err) =
1106 run::execute_dataflow_plan(uuid, daemon_spawn_commands, daemon_connections).await
1107 {
1108 running_dataflows.remove(&uuid);
1109 return Err(err);
1110 }
1111
1112 Ok(uuid)
1113}
1114
1115async fn destroy_daemon(daemon_id: DaemonId, client: DaemonControlClient) -> Result<()> {
1116 client
1117 .destroy(tarpc::context::current())
1118 .await
1119 .wrap_err(format!(
1120 "failed to send destroy message to daemon `{daemon_id}`"
1121 ))?
1122 .map_err(|e: String| eyre!(e))
1123 .wrap_err("failed to destroy daemon")?;
1124
1125 tracing::info!("successfully destroyed daemon `{daemon_id}`");
1126 Ok(())
1127}
1128
1129async fn destroy_daemons(daemon_connections: &DaemonConnections) -> eyre::Result<()> {
1130 let daemons: Vec<(DaemonId, DaemonControlClient)> = daemon_connections
1132 .iter()
1133 .map(|r| (r.key().clone(), r.value().client.clone()))
1134 .collect();
1135 let results = futures::future::join_all(daemons.into_iter().map(|(daemon_id, client)| {
1138 tracing::info!("Destroying daemon connection for `{daemon_id}`");
1139 destroy_daemon(daemon_id, client)
1140 }))
1141 .await;
1142 daemon_connections.clear();
1143
1144 for result in results {
1145 result?;
1146 }
1147 Ok(())
1148}
1149
1150#[derive(Debug)]
1151pub enum Event {
1152 NewDaemonConnection(TcpStream),
1153 DaemonConnectError(eyre::Report),
1154 Control(ControlEvent),
1155 Daemon(DaemonRequest),
1156 DaemonHeartbeatInterval,
1157 CtrlC,
1158 Log(LogMessage),
1159 Close,
1160}
1161
1162impl Event {
1163 #[allow(clippy::match_like_matches_macro)]
1165 pub fn log(&self) -> bool {
1166 match self {
1167 Event::DaemonHeartbeatInterval => false,
1168 _ => true,
1169 }
1170 }
1171
1172 fn kind(&self) -> &'static str {
1173 match self {
1174 Event::NewDaemonConnection(_) => "NewDaemonConnection",
1175 Event::DaemonConnectError(_) => "DaemonConnectError",
1176 Event::Control(_) => "Control",
1177 Event::Daemon(_) => "Daemon",
1178 Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1179 Event::CtrlC => "CtrlC",
1180 Event::Log(_) => "Log",
1181 Event::Close => "Close",
1182 }
1183 }
1184}
1185
1186#[derive(Debug)]
1187pub enum DaemonRequest {
1188 Register {
1189 machine_id: Option<String>,
1190 machine_uid: Option<String>,
1191 connection: TcpStream,
1192 version_check_result: Result<(), String>,
1193 },
1194 RegisterNotificationChannel {
1195 daemon_id: DaemonId,
1196 connection: TcpStream,
1197 },
1198}
1199
1200fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1201 let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1202
1203 let mut ctrlc_sent = false;
1204 ctrlc::set_handler(move || {
1205 if ctrlc_sent {
1206 tracing::warn!("received second ctrlc signal -> aborting immediately");
1207 std::process::abort();
1208 } else {
1209 tracing::info!("received ctrlc signal");
1210 if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1211 tracing::error!("failed to report ctrl-c event to dora-coordinator");
1212 }
1213
1214 ctrlc_sent = true;
1215 }
1216 })
1217 .wrap_err("failed to set ctrl-c handler")?;
1218
1219 Ok(ReceiverStream::new(ctrlc_rx))
1220}