Skip to main content

dora_coordinator/
lib.rs

1use crate::{server::CoordinatorControlServer, tcp_utils::tcp_send};
2pub use control::ControlEvent;
3use dashmap::{
4    DashMap,
5    mapref::one::{Ref, RefMut},
6};
7use dora_core::{
8    config::{NodeId, OperatorId},
9    descriptor::DescriptorExt,
10    uhlc::{self, HLC},
11};
12use dora_message::{
13    BuildId, SessionId,
14    cli_to_coordinator::{
15        BuildRequest, CoordinatorControl, CoordinatorControlClient, CoordinatorControlRequest,
16        CoordinatorControlResponse,
17    },
18    common::DaemonId,
19    coordinator_to_cli::{DataflowResult, LogMessage, StopDataflowReply},
20    coordinator_to_daemon::{
21        BuildDataflowNodes, DaemonControlClient, DaemonControlRequest, DaemonControlResponse,
22        RegisterResult, Timestamped,
23    },
24    daemon_to_coordinator::DataflowDaemonResult,
25    descriptor::{Descriptor, ResolvedNode},
26    tarpc::{
27        self, ClientMessage, Response, Transport, client,
28        server::{BaseChannel, Channel},
29        tokio_serde,
30    },
31};
32use eyre::{ContextCompat, Result, WrapErr, bail, eyre};
33use futures::{Future, Stream, StreamExt, future, stream::FuturesUnordered};
34use futures_concurrency::stream::Merge;
35use itertools::Itertools;
36use log_subscriber::LogSubscriber;
37
38use std::{
39    collections::{BTreeMap, BTreeSet},
40    net::SocketAddr,
41    path::PathBuf,
42    sync::Arc,
43    time::{Duration, Instant},
44};
45use tokio::{
46    net::TcpStream,
47    sync::{mpsc, oneshot},
48    task::JoinHandle,
49};
50use tokio_stream::wrappers::ReceiverStream;
51use uuid::Uuid;
52
53mod control;
54mod listener;
55mod log_subscriber;
56mod run;
57mod server;
58mod state;
59mod tcp_utils;
60
61/// Start the coordinator with a TCP listener for control messages. Returns the daemon port and
62/// a future that resolves when the coordinator finishes.
63pub async fn start(
64    bind: SocketAddr,
65    bind_control: SocketAddr,
66    external_events: impl Stream<Item = Event> + Unpin,
67) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
68    let tasks = FuturesUnordered::new();
69    let control_events = control::control_events(bind_control, &tasks)
70        .await
71        .wrap_err("failed to create control events")?;
72
73    let (daemon_port, coordinator_state, future) =
74        init_coordinator(bind, external_events, control_events, tasks).await?;
75
76    // Bind the tarpc RPC server on the same interface
77    let rpc_bind = SocketAddr::new(
78        bind_control.ip(),
79        dora_core::topics::dora_coordinator_port_rpc(bind_control.port()),
80    );
81    let listener =
82        tarpc::serde_transport::tcp::listen(rpc_bind, tokio_serde::formats::Json::default)
83            .await
84            .wrap_err("failed to start tarpc server for control messages")?;
85
86    let stream = listener
87        // ignore connect errors
88        .filter_map(|c| future::ready(c.ok()))
89        .map(move |transport| {
90            let client_ip = transport.peer_addr().ok().map(|addr| addr.ip());
91            serve_control_requests(transport, coordinator_state.clone(), client_ip)
92        });
93    tokio::spawn(stream.for_each(|handle_connection| async {
94        tokio::spawn(handle_connection);
95    }));
96
97    Ok((daemon_port, future))
98}
99
100/// Start the coordinator with an in-process RPC server instead of a TCP listener.
101///
102/// Returns the `CoordinatorControlClient` to communicate with the RPC server.
103///
104/// This function is mainly useful for testing.
105pub async fn start_with_channel_rpc(
106    bind: SocketAddr,
107    external_events: impl Stream<Item = Event> + Unpin,
108) -> Result<
109    (
110        CoordinatorControlClient,
111        impl Future<Output = eyre::Result<()>>,
112    ),
113    eyre::ErrReport,
114> {
115    let tasks = FuturesUnordered::new();
116
117    let (_daemon_port, coordinator_state, future) =
118        init_coordinator(bind, external_events, futures::stream::empty(), tasks).await?;
119
120    // Create an in-process channel-based client (no TCP overhead)
121    let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
122    tokio::spawn(serve_control_requests(
123        server_transport,
124        coordinator_state,
125        None,
126    ));
127    let control_client =
128        CoordinatorControlClient::new(client::Config::default(), client_transport).spawn();
129
130    Ok((control_client, future))
131}
132
133/// Shared coordinator setup used by both [`start`] and [`start_with_channel_rpc`].
134async fn init_coordinator(
135    bind: SocketAddr,
136    external_events: impl Stream<Item = Event> + Unpin,
137    control_events: impl Stream<Item = Event> + Unpin,
138    mut tasks: FuturesUnordered<JoinHandle<()>>,
139) -> Result<(
140    u16,
141    Arc<state::CoordinatorState>,
142    impl Future<Output = eyre::Result<()>>,
143)> {
144    use tokio_stream::wrappers::TcpListenerStream;
145
146    let daemon_listener = listener::create_listener(bind).await?;
147    let daemon_port = daemon_listener
148        .local_addr()
149        .wrap_err("failed to get local addr of daemon listener")?
150        .port();
151    let new_daemon_connections = TcpListenerStream::new(daemon_listener).map(|c| {
152        c.map(Event::NewDaemonConnection)
153            .wrap_err("failed to open connection")
154            .unwrap_or_else(Event::DaemonConnectError)
155    });
156
157    // Setup ctrl-c handler
158    let ctrlc_events = set_up_ctrlc_handler()?;
159
160    let events = (
161        external_events,
162        new_daemon_connections,
163        control_events,
164        ctrlc_events,
165    )
166        .merge();
167
168    let daemon_heartbeat_interval =
169        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
170            .map(|_| Event::DaemonHeartbeatInterval);
171
172    // events that should be aborted on `dora destroy`
173    let (abortable_events, abort_handle) =
174        futures::stream::abortable((events, daemon_heartbeat_interval).merge());
175
176    let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(100);
177    let coordinator_state = Arc::new(state::CoordinatorState {
178        clock: Arc::new(HLC::default()),
179        running_builds: Default::default(),
180        finished_builds: Default::default(),
181        running_dataflows: Default::default(),
182        dataflow_results: Default::default(),
183        archived_dataflows: Default::default(),
184        daemon_connections: Default::default(),
185        daemon_events_tx,
186        abort_handle,
187    });
188
189    let state_for_caller = coordinator_state.clone();
190
191    let future = async move {
192        start_inner(abortable_events, &tasks, daemon_events, coordinator_state).await?;
193
194        tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
195        while let Some(join_result) = tasks.next().await {
196            if let Err(err) = join_result {
197                tracing::error!("task panicked: {err}");
198            }
199        }
200        tracing::debug!("all spawned tasks finished, exiting..");
201        Ok(())
202    };
203    Ok((daemon_port, state_for_caller, future))
204}
205
206/// Serve [`CoordinatorControl`] RPC requests from the given transport.
207fn serve_control_requests<T>(
208    transport: T,
209    state: Arc<state::CoordinatorState>,
210    client_ip: Option<std::net::IpAddr>,
211) -> impl Future<Output = ()>
212where
213    T: Transport<Response<CoordinatorControlResponse>, ClientMessage<CoordinatorControlRequest>>
214        + Send
215        + 'static,
216    T::Error: std::error::Error + Send + Sync + 'static,
217{
218    let channel = BaseChannel::with_defaults(transport);
219    let server = CoordinatorControlServer { state, client_ip };
220    channel.execute(server.serve()).for_each(|fut| async {
221        tokio::spawn(fut);
222    })
223}
224
225// Resolve the dataflow name.
226fn resolve_name(
227    name: String,
228    running_dataflows: &DashMap<Uuid, RunningDataflow>,
229    archived_dataflows: &DashMap<Uuid, ArchivedDataflow>,
230) -> eyre::Result<Uuid> {
231    let uuids: Vec<_> = running_dataflows
232        .iter()
233        .filter(|r| r.value().name.as_deref() == Some(name.as_str()))
234        .map(|r| *r.key())
235        .collect();
236    let archived_uuids: Vec<_> = archived_dataflows
237        .iter()
238        .filter(|r| r.value().name.as_deref() == Some(name.as_str()))
239        .map(|r| *r.key())
240        .collect();
241
242    if uuids.is_empty() {
243        if archived_uuids.is_empty() {
244            bail!("no dataflow with name `{name}`");
245        } else if let [uuid] = archived_uuids.as_slice() {
246            Ok(*uuid)
247        } else {
248            // TODO: Index the archived dataflows in order to return logs based on the index.
249            bail!(
250                "multiple archived dataflows found with name `{name}`, Please provide the UUID instead."
251            );
252        }
253    } else if let [uuid] = uuids.as_slice() {
254        Ok(*uuid)
255    } else {
256        bail!("multiple dataflows found with name `{name}`");
257    }
258}
259
260#[derive(Default)]
261pub(crate) struct DaemonConnections {
262    daemons: DashMap<DaemonId, DaemonConnection>,
263}
264
265impl DaemonConnections {
266    fn add(&self, daemon_id: DaemonId, connection: DaemonConnection) {
267        let previous = self.daemons.insert(daemon_id.clone(), connection);
268        if previous.is_some() {
269            tracing::info!("closing previous connection `{daemon_id}` on new register");
270        }
271    }
272
273    fn get(&self, id: &DaemonId) -> Option<Ref<'_, DaemonId, DaemonConnection>> {
274        self.daemons.get(id)
275    }
276
277    pub(crate) fn get_mut(&self, id: &DaemonId) -> Option<RefMut<'_, DaemonId, DaemonConnection>> {
278        self.daemons.get_mut(id)
279    }
280
281    fn get_matching_daemon_id(&self, machine_id: &str) -> Option<DaemonId> {
282        self.daemons
283            .iter()
284            .find(|r| r.key().matches_machine_id(machine_id))
285            .map(|r| r.key().clone())
286    }
287
288    fn clear(&self) {
289        self.daemons.clear();
290    }
291
292    fn is_empty(&self) -> bool {
293        self.daemons.is_empty()
294    }
295
296    fn keys(&self) -> impl Iterator<Item = DaemonId> {
297        self.daemons.iter().map(|r| r.key().clone())
298    }
299
300    fn iter(
301        &self,
302    ) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, DaemonId, DaemonConnection>>
303    {
304        self.daemons.iter()
305    }
306
307    pub(crate) fn remove(&self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
308        self.daemons
309            .remove(daemon_id)
310            .map(|(_, connection)| connection)
311    }
312
313    fn unnamed(&self) -> impl Iterator<Item = DaemonId> {
314        self.daemons
315            .iter()
316            .filter(|r| r.key().machine_id().is_none())
317            .map(|r| r.key().clone())
318    }
319}
320
321async fn start_inner(
322    events: impl Stream<Item = Event> + Unpin,
323    tasks: &FuturesUnordered<JoinHandle<()>>,
324    daemon_events: tokio::sync::mpsc::Receiver<Event>,
325    coordinator_state: Arc<state::CoordinatorState>,
326) -> eyre::Result<()> {
327    let clock = coordinator_state.clock.clone();
328
329    let daemon_events = ReceiverStream::new(daemon_events);
330
331    let mut events = (events, daemon_events).merge();
332
333    while let Some(event) = events.next().await {
334        // used below for measuring the event handling duration
335        let start = Instant::now();
336        let event_kind = event.kind();
337
338        if event.log() {
339            tracing::trace!("Handling event {event:?}");
340        }
341        match event {
342            Event::Close => {
343                tracing::info!("Received Close event, shutting down coordinator");
344                break;
345            }
346            Event::NewDaemonConnection(connection) => {
347                connection.set_nodelay(true)?;
348                let events_tx = coordinator_state.daemon_events_tx.clone();
349                if !events_tx.is_closed() {
350                    let task = tokio::spawn(listener::handle_connection(
351                        connection,
352                        events_tx,
353                        clock.clone(),
354                    ));
355                    tasks.push(task);
356                } else {
357                    tracing::warn!(
358                        "ignoring new daemon connection because events_tx was closed already"
359                    );
360                }
361            }
362            Event::DaemonConnectError(err) => {
363                tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
364            }
365            Event::Daemon(event) => match event {
366                DaemonRequest::Register {
367                    machine_id,
368                    machine_uid,
369                    mut connection,
370                    version_check_result,
371                } => {
372                    let existing = match &machine_id {
373                        Some(id) => coordinator_state
374                            .daemon_connections
375                            .get_matching_daemon_id(id),
376                        None => coordinator_state.daemon_connections.unnamed().next(),
377                    };
378                    let existing_result = if existing.is_some() {
379                        Err(format!(
380                            "There is already a connected daemon with machine ID `{machine_id:?}`"
381                        ))
382                    } else {
383                        Ok(())
384                    };
385
386                    // assign a unique ID to the daemon
387                    let daemon_id = DaemonId::new(machine_id);
388
389                    let reply: Timestamped<RegisterResult> = Timestamped {
390                        inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
391                            Ok(_) => RegisterResult::Ok {
392                                daemon_id: daemon_id.clone(),
393                            },
394                            Err(err) => RegisterResult::Err(err.clone()),
395                        },
396                        timestamp: clock.new_timestamp(),
397                    };
398
399                    let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
400                        .await
401                        .context("tcp send failed");
402                    match version_check_result
403                        .map_err(|e| eyre!(e))
404                        .and(existing_result.map_err(|e| eyre!(e)))
405                        .and(send_result)
406                    {
407                        Ok(()) => {
408                            // Set up tarpc client on the registered stream.
409                            // The daemon runs a tarpc server on its end.
410                            let peer_addr = connection.peer_addr().ok();
411                            let codec = tokio_serde::formats::Json::<
412                                Response<DaemonControlResponse>,
413                                ClientMessage<DaemonControlRequest>,
414                            >::default();
415                            let transport =
416                                tarpc::serde_transport::Transport::from((connection, codec));
417                            let daemon_client =
418                                DaemonControlClient::new(client::Config::default(), transport)
419                                    .spawn();
420
421                            coordinator_state.daemon_connections.add(
422                                daemon_id.clone(),
423                                DaemonConnection {
424                                    client: daemon_client,
425                                    last_heartbeat: Instant::now(),
426                                    peer_addr,
427                                    machine_uid,
428                                },
429                            );
430                        }
431                        Err(err) => {
432                            tracing::warn!(
433                                "failed to register daemon connection for daemon `{daemon_id}`: {err}"
434                            );
435                        }
436                    }
437                }
438                DaemonRequest::RegisterNotificationChannel {
439                    daemon_id,
440                    connection,
441                } => {
442                    // Set up a tarpc server for daemon→coordinator RPC on this
443                    // second TCP connection.
444                    use dora_message::daemon_to_coordinator::{
445                        CoordinatorNotify, CoordinatorNotifyRequest, CoordinatorNotifyResponse,
446                    };
447                    use tarpc::server::{BaseChannel, Channel};
448
449                    let codec = tokio_serde::formats::Json::<
450                        ClientMessage<CoordinatorNotifyRequest>,
451                        Response<CoordinatorNotifyResponse>,
452                    >::default();
453                    let transport = tarpc::serde_transport::Transport::from((connection, codec));
454
455                    let server = listener::CoordinatorNotifyServer {
456                        daemon_id: daemon_id.clone(),
457                        coordinator_state: coordinator_state.clone(),
458                    };
459
460                    let channel = BaseChannel::with_defaults(transport);
461                    tokio::spawn(channel.execute(server.serve()).for_each(|fut| async {
462                        tokio::spawn(fut);
463                    }));
464
465                    tracing::info!(
466                        "reverse-channel RPC server established for daemon `{daemon_id}`"
467                    );
468                }
469            },
470            Event::Control(event) => match event {
471                ControlEvent::Error(err) => tracing::error!("{err:?}"),
472                ControlEvent::LogSubscribe {
473                    dataflow_id,
474                    level,
475                    connection,
476                } => {
477                    if let Some(mut dataflow) =
478                        coordinator_state.running_dataflows.get_mut(&dataflow_id)
479                    {
480                        dataflow
481                            .log_subscribers
482                            .push(LogSubscriber::new(level, connection));
483                        let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
484                        for message in buffered {
485                            send_log_message(&mut dataflow.log_subscribers, &message).await;
486                        }
487                    }
488                }
489                ControlEvent::BuildLogSubscribe {
490                    build_id,
491                    level,
492                    connection,
493                } => {
494                    if let Some(mut build) = coordinator_state.running_builds.get_mut(&build_id) {
495                        build
496                            .log_subscribers
497                            .push(LogSubscriber::new(level, connection));
498                        let buffered = std::mem::take(&mut build.buffered_log_messages);
499                        for message in buffered {
500                            send_log_message(&mut build.log_subscribers, &message).await;
501                        }
502                    }
503                }
504            },
505            Event::DaemonHeartbeatInterval => {
506                // Collect daemon IDs, elapsed times, and clients while briefly
507                // holding the DashMap lock.  Drop the lock before doing any
508                // async I/O.
509                let daemons_to_check: Vec<(DaemonId, Duration, DaemonControlClient)> =
510                    coordinator_state
511                        .daemon_connections
512                        .iter()
513                        .map(|r| {
514                            (
515                                r.key().clone(),
516                                r.value().last_heartbeat.elapsed(),
517                                r.value().client.clone(),
518                            )
519                        })
520                        .collect();
521                // DashMap lock is now dropped.
522
523                let mut disconnected = BTreeSet::new();
524                for (machine_id, elapsed, client) in daemons_to_check {
525                    if elapsed > Duration::from_secs(15) {
526                        tracing::warn!(
527                            "no heartbeat message from machine `{machine_id}` since {elapsed:?}",
528                        )
529                    }
530                    if elapsed > Duration::from_secs(30) {
531                        disconnected.insert(machine_id);
532                        continue;
533                    }
534                    // Send a heartbeat ping to the daemon so it knows the
535                    // coordinator is still alive.  This is fire-and-forget:
536                    // the daemon independently sends its own heartbeat to the
537                    // coordinator (updating `last_heartbeat`), so a failed
538                    // ping here does not mean the daemon is gone.
539                    tokio::spawn(async move {
540                        if let Err(err) = client.heartbeat(tarpc::context::current()).await {
541                            tracing::warn!(
542                                "failed to send heartbeat to daemon `{machine_id}`: {err}"
543                            );
544                        }
545                    });
546                }
547                if !disconnected.is_empty() {
548                    tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
549                    for machine_id in disconnected {
550                        coordinator_state.daemon_connections.remove(&machine_id);
551                    }
552                }
553            }
554            Event::CtrlC => {
555                tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
556                handle_destroy(&coordinator_state).await?;
557            }
558            Event::Log(message) => {
559                if let Some(dataflow_id) = &message.dataflow_id {
560                    if let Some(mut dataflow) =
561                        coordinator_state.running_dataflows.get_mut(dataflow_id)
562                    {
563                        if dataflow.log_subscribers.is_empty() {
564                            // buffer log message until there are subscribers
565                            dataflow.buffered_log_messages.push(message);
566                        } else {
567                            send_log_message(&mut dataflow.log_subscribers, &message).await;
568                        }
569                    }
570                } else if let Some(build_id) = &message.build_id {
571                    if let Some(mut build) = coordinator_state.running_builds.get_mut(build_id) {
572                        if build.log_subscribers.is_empty() {
573                            // buffer log message until there are subscribers
574                            build.buffered_log_messages.push(message);
575                        } else {
576                            send_log_message(&mut build.log_subscribers, &message).await;
577                        }
578                    }
579                }
580            }
581        }
582
583        // warn if event handling took too long -> the main loop should never be blocked for too long
584        let elapsed = start.elapsed();
585        if elapsed > Duration::from_millis(100) {
586            tracing::warn!(
587                "Coordinator took {}ms for handling event: {event_kind}",
588                elapsed.as_millis()
589            );
590        }
591    }
592
593    tracing::info!("stopped");
594
595    Ok(())
596}
597
598pub(crate) async fn send_log_message(
599    log_subscribers: &mut Vec<LogSubscriber>,
600    message: &LogMessage,
601) {
602    for subscriber in log_subscribers.iter_mut() {
603        let send_result =
604            tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
605
606        if send_result.await.is_err() {
607            subscriber.close();
608        }
609    }
610    log_subscribers.retain(|s| !s.is_closed());
611}
612
613pub(crate) fn dataflow_result(
614    results: &BTreeMap<DaemonId, DataflowDaemonResult>,
615    dataflow_uuid: Uuid,
616    clock: &uhlc::HLC,
617) -> DataflowResult {
618    let mut node_results = BTreeMap::new();
619    for result in results.values() {
620        node_results.extend(result.node_results.clone());
621        if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
622            tracing::warn!("failed to update HLC: {err}");
623        }
624    }
625
626    DataflowResult {
627        uuid: dataflow_uuid,
628        timestamp: clock.new_timestamp(),
629        node_results,
630    }
631}
632
633pub(crate) struct DaemonConnection {
634    client: DaemonControlClient,
635    pub(crate) last_heartbeat: Instant,
636    peer_addr: Option<SocketAddr>,
637    /// System-level machine identifier reported by the daemon at registration.
638    machine_uid: Option<String>,
639}
640
641async fn handle_destroy(
642    coordinator_state: &state::CoordinatorState,
643) -> Result<(), eyre::ErrReport> {
644    coordinator_state.abort_handle.abort();
645    for dataflow_uuid in coordinator_state
646        .running_dataflows
647        .iter()
648        .map(|entry| *entry.key())
649        .collect::<Vec<_>>()
650    {
651        let _ = stop_dataflow(
652            &coordinator_state.running_dataflows,
653            dataflow_uuid,
654            &coordinator_state.daemon_connections,
655            None,
656            false,
657        )
658        .await?;
659    }
660
661    let result = destroy_daemons(&coordinator_state.daemon_connections).await;
662
663    let _ = coordinator_state.daemon_events_tx.send(Event::Close).await;
664    result
665}
666
667/// Result of a completed dataflow build.
668#[derive(Debug, Clone)]
669pub struct BuildFinishedResult {
670    pub build_id: BuildId,
671    pub result: Result<(), String>,
672}
673
674pub(crate) struct RunningBuild {
675    pub(crate) errors: Vec<String>,
676    pub(crate) build_result: CachedResult<BuildFinishedResult>,
677
678    /// Buffer for log messages that were sent before there were any subscribers.
679    pub(crate) buffered_log_messages: Vec<LogMessage>,
680    pub(crate) log_subscribers: Vec<LogSubscriber>,
681
682    pub(crate) pending_build_results: BTreeSet<DaemonId>,
683}
684
685pub(crate) struct RunningDataflow {
686    name: Option<String>,
687    pub(crate) uuid: Uuid,
688    descriptor: Descriptor,
689    /// The IDs of the daemons that the dataflow is running on.
690    pub(crate) daemons: BTreeSet<DaemonId>,
691    /// IDs of daemons that are waiting until all nodes are started.
692    pub(crate) pending_daemons: BTreeSet<DaemonId>,
693    pub(crate) exited_before_subscribe: Vec<NodeId>,
694    nodes: BTreeMap<NodeId, ResolvedNode>,
695    /// Maps each node to the daemon it's running on
696    node_to_daemon: BTreeMap<NodeId, DaemonId>,
697    /// Latest metrics for each node (from daemons)
698    node_metrics: BTreeMap<NodeId, dora_message::daemon_to_coordinator::NodeMetrics>,
699
700    pub(crate) spawn_result: CachedResult<Uuid>,
701    pub(crate) stop_reply_senders:
702        Vec<tokio::sync::oneshot::Sender<eyre::Result<StopDataflowReply>>>,
703
704    /// Buffer for log messages that were sent before there were any subscribers.
705    pub(crate) buffered_log_messages: Vec<LogMessage>,
706    pub(crate) log_subscribers: Vec<LogSubscriber>,
707
708    pub(crate) pending_spawn_results: BTreeSet<DaemonId>,
709}
710
711pub enum CachedResult<T> {
712    Pending {
713        result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<T>>>,
714    },
715    Cached {
716        result: eyre::Result<T>,
717    },
718}
719
720impl<T> Default for CachedResult<T> {
721    fn default() -> Self {
722        Self::Pending {
723            result_senders: Vec::new(),
724        }
725    }
726}
727
728impl<T: Clone> CachedResult<T> {
729    fn register(&mut self, reply_sender: tokio::sync::oneshot::Sender<eyre::Result<T>>) {
730        match self {
731            CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
732            CachedResult::Cached { result } => {
733                Self::send_result_to(result, reply_sender);
734            }
735        }
736    }
737
738    fn set_result(&mut self, result: eyre::Result<T>) {
739        match self {
740            CachedResult::Pending { result_senders } => {
741                for sender in result_senders.drain(..) {
742                    Self::send_result_to(&result, sender);
743                }
744                *self = CachedResult::Cached { result };
745            }
746            CachedResult::Cached { .. } => {}
747        }
748    }
749
750    fn send_result_to(result: &eyre::Result<T>, sender: oneshot::Sender<eyre::Result<T>>) {
751        let result = match result {
752            Ok(r) => Ok(r.clone()),
753            Err(err) => Err(eyre!("{err:?}")),
754        };
755        let _ = sender.send(result);
756    }
757}
758
759pub(crate) struct ArchivedDataflow {
760    name: Option<String>,
761    nodes: BTreeMap<NodeId, ResolvedNode>,
762}
763
764impl From<&RunningDataflow> for ArchivedDataflow {
765    fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
766        ArchivedDataflow {
767            name: dataflow.name.clone(),
768            nodes: dataflow.nodes.clone(),
769        }
770    }
771}
772
773impl PartialEq for RunningDataflow {
774    fn eq(&self, other: &Self) -> bool {
775        self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
776    }
777}
778
779impl Eq for RunningDataflow {}
780
781async fn stop_dataflow<'a>(
782    running_dataflows: &'a DashMap<Uuid, RunningDataflow>,
783    dataflow_uuid: Uuid,
784    daemon_connections: &DaemonConnections,
785    grace_duration: Option<Duration>,
786    force: bool,
787) -> eyre::Result<RefMut<'a, Uuid, RunningDataflow>> {
788    // Collect daemon IDs while briefly holding the lock.
789    let daemon_ids: Vec<DaemonId> = {
790        let Some(dataflow) = running_dataflows.get(&dataflow_uuid) else {
791            bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
792        };
793        dataflow.daemons.iter().cloned().collect()
794    };
795    // DashMap lock is now dropped — safe to do async I/O.
796
797    for daemon_id in &daemon_ids {
798        let client = daemon_connections
799            .get(daemon_id)
800            .wrap_err("no daemon connection")?
801            .client
802            .clone();
803        // DashMap lock is dropped — safe to do async I/O.
804        client
805            .stop_dataflow(
806                tarpc::context::current(),
807                dataflow_uuid,
808                grace_duration,
809                force,
810            )
811            .await
812            .context("RPC transport error")?
813            .map_err(|e: String| eyre!(e))
814            .wrap_err("failed to stop dataflow")?;
815    }
816
817    tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
818
819    // Re-acquire the lock for the caller.
820    running_dataflows
821        .get_mut(&dataflow_uuid)
822        .wrap_err("dataflow was removed while sending stop commands")
823}
824
825async fn reload_dataflow(
826    running_dataflows: &DashMap<Uuid, RunningDataflow>,
827    dataflow_id: Uuid,
828    node_id: NodeId,
829    operator_id: Option<OperatorId>,
830    daemon_connections: &DaemonConnections,
831) -> eyre::Result<()> {
832    // Collect daemon IDs while briefly holding the lock.
833    let daemon_ids: Vec<DaemonId> = {
834        let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
835            bail!("No running dataflow found with UUID `{dataflow_id}`")
836        };
837        dataflow.daemons.iter().cloned().collect()
838    };
839    // DashMap lock is now dropped — safe to do async I/O.
840
841    for machine_id in &daemon_ids {
842        let client = daemon_connections
843            .get(machine_id)
844            .wrap_err("no daemon connection")?
845            .client
846            .clone();
847        client
848            .reload_dataflow(
849                tarpc::context::current(),
850                dataflow_id,
851                node_id.clone(),
852                operator_id.clone(),
853            )
854            .await
855            .context("RPC transport error")?
856            .map_err(|e: String| eyre!(e))
857            .wrap_err("failed to reload dataflow")?;
858    }
859    tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
860
861    Ok(())
862}
863
864async fn retrieve_logs(
865    running_dataflows: &DashMap<Uuid, RunningDataflow>,
866    archived_dataflows: &DashMap<Uuid, ArchivedDataflow>,
867    dataflow_id: Uuid,
868    node_id: NodeId,
869    daemon_connections: &DaemonConnections,
870    tail: Option<usize>,
871) -> eyre::Result<Vec<u8>> {
872    let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
873        dataflow.nodes.clone()
874    } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
875        dataflow.nodes.clone()
876    } else {
877        bail!("No dataflow found with UUID `{dataflow_id}`")
878    };
879
880    let machine_ids: Vec<Option<String>> = nodes
881        .values()
882        .filter(|node| node.id == node_id)
883        .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
884        .collect();
885
886    let machine_id = if let [machine_id] = &machine_ids[..] {
887        machine_id
888    } else if machine_ids.is_empty() {
889        bail!("No machine contains {}/{}", dataflow_id, node_id)
890    } else {
891        bail!(
892            "More than one machine contains {}/{}. However, it should only be present on one.",
893            dataflow_id,
894            node_id
895        )
896    };
897
898    let daemon_ids: Vec<_> = match machine_id {
899        None => daemon_connections.unnamed().collect(),
900        Some(machine_id) => daemon_connections
901            .get_matching_daemon_id(machine_id)
902            .into_iter()
903            .collect(),
904    };
905    let daemon_id = match &daemon_ids[..] {
906        [id] => (*id).clone(),
907        [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
908        _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
909    };
910    let client = daemon_connections
911        .get(&daemon_id)
912        .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?
913        .client
914        .clone();
915    // DashMap lock is dropped — safe to do async I/O.
916    let reply_logs = client
917        .logs(
918            tarpc::context::current(),
919            dataflow_id,
920            node_id.clone(),
921            tail,
922        )
923        .await
924        .context("RPC transport error")?;
925    tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
926
927    reply_logs.map_err(|err: String| eyre!(err))
928}
929
930#[tracing::instrument(skip(daemon_connections))]
931async fn build_dataflow(
932    build_request: BuildRequest,
933    build_id: BuildId,
934    daemon_connections: &DaemonConnections,
935) -> eyre::Result<RunningBuild> {
936    let BuildRequest {
937        session_id,
938        dataflow,
939        git_sources,
940        prev_git_sources,
941        local_working_dir,
942        uv,
943    } = build_request;
944
945    let nodes = dataflow.resolve_aliases_and_set_defaults()?;
946
947    let mut git_sources_by_daemon = git_sources
948        .into_iter()
949        .into_grouping_map_by(|(id, _)| {
950            nodes
951                .get(id)
952                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
953        })
954        .collect();
955    let mut prev_git_sources_by_daemon = prev_git_sources
956        .into_iter()
957        .into_grouping_map_by(|(id, _)| {
958            nodes
959                .get(id)
960                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
961        })
962        .collect();
963
964    let nodes_by_daemon = nodes
965        .values()
966        .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
967
968    let mut daemons = BTreeSet::new();
969    for (machine, nodes_on_machine) in &nodes_by_daemon {
970        let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
971        tracing::debug!(
972            "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
973        );
974
975        let build_command = BuildDataflowNodes {
976            build_id,
977            session_id,
978            local_working_dir: local_working_dir.clone(),
979            git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
980            prev_git_sources: prev_git_sources_by_daemon
981                .remove(machine)
982                .unwrap_or_default(),
983            dataflow_descriptor: dataflow.clone(),
984            nodes_on_machine,
985            uv,
986        };
987
988        let daemon_id = build_dataflow_on_machine(
989            daemon_connections,
990            machine.map(|s| s.as_str()),
991            build_command,
992        )
993        .await
994        .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
995        daemons.insert(daemon_id);
996    }
997
998    tracing::info!("successfully triggered dataflow build `{build_id}`",);
999
1000    Ok(RunningBuild {
1001        errors: Vec::new(),
1002        build_result: CachedResult::default(),
1003        buffered_log_messages: Vec::new(),
1004        log_subscribers: Vec::new(),
1005        pending_build_results: daemons,
1006    })
1007}
1008
1009async fn build_dataflow_on_machine(
1010    daemon_connections: &DaemonConnections,
1011    machine: Option<&str>,
1012    build_command: BuildDataflowNodes,
1013) -> Result<DaemonId, eyre::ErrReport> {
1014    let daemon_id = match machine {
1015        Some(machine) => daemon_connections
1016            .get_matching_daemon_id(machine)
1017            .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1018            .clone(),
1019        None => daemon_connections
1020            .unnamed()
1021            .next()
1022            .wrap_err("no unnamed daemon connections")?
1023            .clone(),
1024    };
1025
1026    let client = daemon_connections
1027        .get(&daemon_id)
1028        .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?
1029        .client
1030        .clone();
1031    // DashMap lock is dropped — safe to do async I/O.
1032    client
1033        .build(tarpc::context::current(), build_command)
1034        .await
1035        .context("RPC transport error")?
1036        .map_err(|e: String| eyre!(e))
1037        .wrap_err("daemon returned an error")?;
1038    Ok(daemon_id)
1039}
1040
1041#[allow(clippy::too_many_arguments)]
1042async fn start_dataflow(
1043    build_id: Option<BuildId>,
1044    session_id: SessionId,
1045    dataflow: Descriptor,
1046    local_working_dir: Option<PathBuf>,
1047    name: Option<String>,
1048    daemon_connections: &DaemonConnections,
1049    running_dataflows: &DashMap<Uuid, RunningDataflow>,
1050    uv: bool,
1051    write_events_to: Option<PathBuf>,
1052) -> eyre::Result<Uuid> {
1053    let plan = run::plan_dataflow(
1054        build_id,
1055        session_id,
1056        &dataflow,
1057        local_working_dir,
1058        daemon_connections,
1059        uv,
1060        write_events_to,
1061    )?;
1062
1063    let uuid = plan.uuid;
1064    let daemons = plan.daemons.clone();
1065
1066    let run::DataflowPlan {
1067        uuid: _,
1068        daemons: _,
1069        nodes,
1070        node_to_daemon,
1071        daemon_spawn_commands,
1072    } = plan;
1073
1074    // Insert the RunningDataflow into the map BEFORE sending spawn commands to
1075    // the daemons.  This avoids a race where the daemon finishes spawning
1076    // and sends the SpawnResult back before start_dataflow returns — if the
1077    // entry isn't in the map yet, the coordinator event loop would discard
1078    // the result and `wait_for_spawn` would time out.
1079    running_dataflows.insert(
1080        uuid,
1081        RunningDataflow {
1082            uuid,
1083            name,
1084            descriptor: dataflow,
1085            pending_daemons: if daemons.len() > 1 {
1086                daemons.clone()
1087            } else {
1088                BTreeSet::new()
1089            },
1090            exited_before_subscribe: Default::default(),
1091            daemons: daemons.clone(),
1092            nodes,
1093            node_to_daemon,
1094            node_metrics: BTreeMap::new(),
1095            spawn_result: CachedResult::default(),
1096            stop_reply_senders: Vec::new(),
1097            buffered_log_messages: Vec::new(),
1098            log_subscribers: Vec::new(),
1099            pending_spawn_results: daemons,
1100        },
1101    );
1102
1103    // Now send the spawn commands.  If a result arrives quickly, the entry is
1104    // already in the map so the event loop won't discard it.
1105    if let Err(err) =
1106        run::execute_dataflow_plan(uuid, daemon_spawn_commands, daemon_connections).await
1107    {
1108        running_dataflows.remove(&uuid);
1109        return Err(err);
1110    }
1111
1112    Ok(uuid)
1113}
1114
1115async fn destroy_daemon(daemon_id: DaemonId, client: DaemonControlClient) -> Result<()> {
1116    client
1117        .destroy(tarpc::context::current())
1118        .await
1119        .wrap_err(format!(
1120            "failed to send destroy message to daemon `{daemon_id}`"
1121        ))?
1122        .map_err(|e: String| eyre!(e))
1123        .wrap_err("failed to destroy daemon")?;
1124
1125    tracing::info!("successfully destroyed daemon `{daemon_id}`");
1126    Ok(())
1127}
1128
1129async fn destroy_daemons(daemon_connections: &DaemonConnections) -> eyre::Result<()> {
1130    // Collect daemon IDs and cloned clients, then drop the DashMap lock.
1131    let daemons: Vec<(DaemonId, DaemonControlClient)> = daemon_connections
1132        .iter()
1133        .map(|r| (r.key().clone(), r.value().client.clone()))
1134        .collect();
1135    // DashMap lock is now dropped — safe to do concurrent async I/O.
1136
1137    let results = futures::future::join_all(daemons.into_iter().map(|(daemon_id, client)| {
1138        tracing::info!("Destroying daemon connection for `{daemon_id}`");
1139        destroy_daemon(daemon_id, client)
1140    }))
1141    .await;
1142    daemon_connections.clear();
1143
1144    for result in results {
1145        result?;
1146    }
1147    Ok(())
1148}
1149
1150#[derive(Debug)]
1151pub enum Event {
1152    NewDaemonConnection(TcpStream),
1153    DaemonConnectError(eyre::Report),
1154    Control(ControlEvent),
1155    Daemon(DaemonRequest),
1156    DaemonHeartbeatInterval,
1157    CtrlC,
1158    Log(LogMessage),
1159    Close,
1160}
1161
1162impl Event {
1163    /// Whether this event should be logged.
1164    #[allow(clippy::match_like_matches_macro)]
1165    pub fn log(&self) -> bool {
1166        match self {
1167            Event::DaemonHeartbeatInterval => false,
1168            _ => true,
1169        }
1170    }
1171
1172    fn kind(&self) -> &'static str {
1173        match self {
1174            Event::NewDaemonConnection(_) => "NewDaemonConnection",
1175            Event::DaemonConnectError(_) => "DaemonConnectError",
1176            Event::Control(_) => "Control",
1177            Event::Daemon(_) => "Daemon",
1178            Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1179            Event::CtrlC => "CtrlC",
1180            Event::Log(_) => "Log",
1181            Event::Close => "Close",
1182        }
1183    }
1184}
1185
1186#[derive(Debug)]
1187pub enum DaemonRequest {
1188    Register {
1189        machine_id: Option<String>,
1190        machine_uid: Option<String>,
1191        connection: TcpStream,
1192        version_check_result: Result<(), String>,
1193    },
1194    RegisterNotificationChannel {
1195        daemon_id: DaemonId,
1196        connection: TcpStream,
1197    },
1198}
1199
1200fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1201    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1202
1203    let mut ctrlc_sent = false;
1204    ctrlc::set_handler(move || {
1205        if ctrlc_sent {
1206            tracing::warn!("received second ctrlc signal -> aborting immediately");
1207            std::process::abort();
1208        } else {
1209            tracing::info!("received ctrlc signal");
1210            if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1211                tracing::error!("failed to report ctrl-c event to dora-coordinator");
1212            }
1213
1214            ctrlc_sent = true;
1215        }
1216    })
1217    .wrap_err("failed to set ctrl-c handler")?;
1218
1219    Ok(ReceiverStream::new(ctrlc_rx))
1220}