dora_coordinator/
lib.rs

1use crate::{
2    run::spawn_dataflow,
3    tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7    config::{NodeId, OperatorId},
8    descriptor::DescriptorExt,
9    uhlc::{self, HLC},
10};
11use dora_message::{
12    BuildId, DataflowId, SessionId,
13    cli_to_coordinator::ControlRequest,
14    common::{DaemonId, GitSource},
15    coordinator_to_cli::{
16        ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
17        DataflowStatus, LogLevel, LogMessage,
18    },
19    coordinator_to_daemon::{
20        BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
21    },
22    daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
23    descriptor::{Descriptor, ResolvedNode},
24};
25use eyre::{ContextCompat, Result, WrapErr, bail, eyre};
26use futures::{Future, Stream, StreamExt, future::join_all, stream::FuturesUnordered};
27use futures_concurrency::stream::Merge;
28use itertools::Itertools;
29use log_subscriber::LogSubscriber;
30use petname::petname;
31use run::SpawnedDataflow;
32use std::{
33    collections::{BTreeMap, BTreeSet, HashMap},
34    net::SocketAddr,
35    path::PathBuf,
36    sync::Arc,
37    time::{Duration, Instant},
38};
39use tokio::{
40    net::TcpStream,
41    sync::{mpsc, oneshot},
42    task::JoinHandle,
43};
44use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
45use uuid::Uuid;
46
47mod control;
48mod listener;
49mod log_subscriber;
50mod run;
51mod tcp_utils;
52
53pub async fn start(
54    bind: SocketAddr,
55    bind_control: SocketAddr,
56    external_events: impl Stream<Item = Event> + Unpin,
57) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
58    let listener = listener::create_listener(bind).await?;
59    let port = listener
60        .local_addr()
61        .wrap_err("failed to get local addr of listener")?
62        .port();
63    let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
64        c.map(Event::NewDaemonConnection)
65            .wrap_err("failed to open connection")
66            .unwrap_or_else(Event::DaemonConnectError)
67    });
68
69    let mut tasks = FuturesUnordered::new();
70    let control_events = control::control_events(bind_control, &tasks)
71        .await
72        .wrap_err("failed to create control events")?;
73
74    // Setup ctrl-c handler
75    let ctrlc_events = set_up_ctrlc_handler()?;
76
77    let events = (
78        external_events,
79        new_daemon_connections,
80        control_events,
81        ctrlc_events,
82    )
83        .merge();
84
85    let future = async move {
86        start_inner(events, &tasks).await?;
87
88        tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
89        while let Some(join_result) = tasks.next().await {
90            if let Err(err) = join_result {
91                tracing::error!("task panicked: {err}");
92            }
93        }
94        tracing::debug!("all spawned tasks finished, exiting..");
95        Ok(())
96    };
97    Ok((port, future))
98}
99
100// Resolve the dataflow name.
101fn resolve_name(
102    name: String,
103    running_dataflows: &HashMap<Uuid, RunningDataflow>,
104    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
105) -> eyre::Result<Uuid> {
106    let uuids: Vec<_> = running_dataflows
107        .iter()
108        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
109        .map(|(k, _)| k)
110        .copied()
111        .collect();
112    let archived_uuids: Vec<_> = archived_dataflows
113        .iter()
114        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
115        .map(|(k, _)| k)
116        .copied()
117        .collect();
118
119    if uuids.is_empty() {
120        if archived_uuids.is_empty() {
121            bail!("no dataflow with name `{name}`");
122        } else if let [uuid] = archived_uuids.as_slice() {
123            Ok(*uuid)
124        } else {
125            // TODO: Index the archived dataflows in order to return logs based on the index.
126            bail!(
127                "multiple archived dataflows found with name `{name}`, Please provide the UUID instead."
128            );
129        }
130    } else if let [uuid] = uuids.as_slice() {
131        Ok(*uuid)
132    } else {
133        bail!("multiple dataflows found with name `{name}`");
134    }
135}
136
137#[derive(Default)]
138struct DaemonConnections {
139    daemons: BTreeMap<DaemonId, DaemonConnection>,
140}
141
142impl DaemonConnections {
143    fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
144        let previous = self.daemons.insert(daemon_id.clone(), connection);
145        if previous.is_some() {
146            tracing::info!("closing previous connection `{daemon_id}` on new register");
147        }
148    }
149
150    fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
151        self.daemons.get(id)
152    }
153
154    fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
155        self.daemons.get_mut(id)
156    }
157
158    fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
159        self.daemons
160            .keys()
161            .find(|id| id.matches_machine_id(machine_id))
162    }
163
164    fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
165        std::mem::take(&mut self.daemons).into_iter()
166    }
167
168    fn is_empty(&self) -> bool {
169        self.daemons.is_empty()
170    }
171
172    fn keys(&self) -> impl Iterator<Item = &DaemonId> {
173        self.daemons.keys()
174    }
175
176    fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
177        self.daemons.iter_mut()
178    }
179
180    fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
181        self.daemons.remove(daemon_id)
182    }
183
184    fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
185        self.daemons.keys().filter(|id| id.machine_id().is_none())
186    }
187}
188
189async fn start_inner(
190    events: impl Stream<Item = Event> + Unpin,
191    tasks: &FuturesUnordered<JoinHandle<()>>,
192) -> eyre::Result<()> {
193    let clock = Arc::new(HLC::default());
194
195    let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
196    let mut daemon_events_tx = Some(daemon_events_tx);
197    let daemon_events = ReceiverStream::new(daemon_events);
198
199    let daemon_heartbeat_interval =
200        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
201            .map(|_| Event::DaemonHeartbeatInterval);
202
203    // events that should be aborted on `dora destroy`
204    let (abortable_events, abort_handle) =
205        futures::stream::abortable((events, daemon_heartbeat_interval).merge());
206
207    let mut events = (abortable_events, daemon_events).merge();
208
209    let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
210    let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();
211
212    let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
213    let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
214        HashMap::new();
215    let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
216    let mut daemon_connections = DaemonConnections::default();
217
218    while let Some(event) = events.next().await {
219        // used below for measuring the event handling duration
220        let start = Instant::now();
221        let event_kind = event.kind();
222
223        if event.log() {
224            tracing::trace!("Handling event {event:?}");
225        }
226        match event {
227            Event::NewDaemonConnection(connection) => {
228                connection.set_nodelay(true)?;
229                let events_tx = daemon_events_tx.clone();
230                if let Some(events_tx) = events_tx {
231                    let task = tokio::spawn(listener::handle_connection(
232                        connection,
233                        events_tx,
234                        clock.clone(),
235                    ));
236                    tasks.push(task);
237                } else {
238                    tracing::warn!(
239                        "ignoring new daemon connection because events_tx was closed already"
240                    );
241                }
242            }
243            Event::DaemonConnectError(err) => {
244                tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
245            }
246            Event::Daemon(event) => match event {
247                DaemonRequest::Register {
248                    machine_id,
249                    mut connection,
250                    version_check_result,
251                } => {
252                    let existing = match &machine_id {
253                        Some(id) => daemon_connections.get_matching_daemon_id(id),
254                        None => daemon_connections.unnamed().next(),
255                    };
256                    let existing_result = if existing.is_some() {
257                        Err(format!(
258                            "There is already a connected daemon with machine ID `{machine_id:?}`"
259                        ))
260                    } else {
261                        Ok(())
262                    };
263
264                    // assign a unique ID to the daemon
265                    let daemon_id = DaemonId::new(machine_id);
266
267                    let reply: Timestamped<RegisterResult> = Timestamped {
268                        inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
269                            Ok(_) => RegisterResult::Ok {
270                                daemon_id: daemon_id.clone(),
271                            },
272                            Err(err) => RegisterResult::Err(err.clone()),
273                        },
274                        timestamp: clock.new_timestamp(),
275                    };
276
277                    let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
278                        .await
279                        .context("tcp send failed");
280                    match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
281                        Ok(()) => {
282                            daemon_connections.add(
283                                daemon_id.clone(),
284                                DaemonConnection {
285                                    stream: connection,
286                                    last_heartbeat: Instant::now(),
287                                },
288                            );
289                        }
290                        Err(err) => {
291                            tracing::warn!(
292                                "failed to register daemon connection for daemon `{daemon_id}`: {err}"
293                            );
294                        }
295                    }
296                }
297            },
298            Event::Dataflow { uuid, event } => match event {
299                DataflowEvent::ReadyOnDaemon {
300                    daemon_id,
301                    exited_before_subscribe,
302                } => {
303                    match running_dataflows.entry(uuid) {
304                        std::collections::hash_map::Entry::Occupied(mut entry) => {
305                            let dataflow = entry.get_mut();
306                            dataflow.pending_daemons.remove(&daemon_id);
307                            dataflow
308                                .exited_before_subscribe
309                                .extend(exited_before_subscribe);
310                            if dataflow.pending_daemons.is_empty() {
311                                tracing::debug!("sending all nodes ready message to daemons");
312                                let message = serde_json::to_vec(&Timestamped {
313                                    inner: DaemonCoordinatorEvent::AllNodesReady {
314                                        dataflow_id: uuid,
315                                        exited_before_subscribe: dataflow
316                                            .exited_before_subscribe
317                                            .clone(),
318                                    },
319                                    timestamp: clock.new_timestamp(),
320                                })
321                                .wrap_err("failed to serialize AllNodesReady message")?;
322
323                                // notify all machines that run parts of the dataflow
324                                for daemon_id in &dataflow.daemons {
325                                    let Some(connection) = daemon_connections.get_mut(daemon_id)
326                                    else {
327                                        tracing::warn!(
328                                            "no daemon connection found for machine `{daemon_id}`"
329                                        );
330                                        continue;
331                                    };
332                                    tcp_send(&mut connection.stream, &message)
333                                        .await
334                                        .wrap_err_with(|| {
335                                            format!(
336                                                "failed to send AllNodesReady({uuid}) message \
337                                            to machine {daemon_id}"
338                                            )
339                                        })?;
340                                }
341                            }
342                        }
343                        std::collections::hash_map::Entry::Vacant(_) => {
344                            tracing::warn!("dataflow not running on ReadyOnMachine");
345                        }
346                    }
347                }
348                DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
349                    tracing::debug!(
350                        "coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})"
351                    );
352                    match running_dataflows.entry(uuid) {
353                        std::collections::hash_map::Entry::Occupied(mut entry) => {
354                            let dataflow = entry.get_mut();
355                            dataflow.daemons.remove(&daemon_id);
356                            tracing::info!(
357                                "removed machine id: {daemon_id} from dataflow: {:#?}",
358                                dataflow.uuid
359                            );
360                            dataflow_results
361                                .entry(uuid)
362                                .or_default()
363                                .insert(daemon_id, result);
364
365                            if dataflow.daemons.is_empty() {
366                                // Archive finished dataflow
367                                archived_dataflows
368                                    .entry(uuid)
369                                    .or_insert_with(|| ArchivedDataflow::from(entry.get()));
370                                let mut finished_dataflow = entry.remove();
371                                let dataflow_id = finished_dataflow.uuid;
372                                send_log_message(
373                                    &mut finished_dataflow.log_subscribers,
374                                    &LogMessage {
375                                        build_id: None,
376                                        dataflow_id: Some(dataflow_id),
377                                        node_id: None,
378                                        daemon_id: None,
379                                        level: LogLevel::Info.into(),
380                                        target: Some("coordinator".into()),
381                                        module_path: None,
382                                        file: None,
383                                        line: None,
384                                        message: "dataflow finished".into(),
385                                        timestamp: clock
386                                            .new_timestamp()
387                                            .get_time()
388                                            .to_system_time()
389                                            .into(),
390                                        fields: None,
391                                    },
392                                )
393                                .await;
394
395                                let reply = ControlRequestReply::DataflowStopped {
396                                    uuid,
397                                    result: dataflow_results
398                                        .get(&uuid)
399                                        .map(|r| dataflow_result(r, uuid, &clock))
400                                        .unwrap_or_else(|| {
401                                            DataflowResult::ok_empty(uuid, clock.new_timestamp())
402                                        }),
403                                };
404                                for sender in finished_dataflow.stop_reply_senders {
405                                    let _ = sender.send(Ok(reply.clone()));
406                                }
407                                if !matches!(
408                                    finished_dataflow.spawn_result,
409                                    CachedResult::Cached { .. }
410                                ) {
411                                    log::error!("pending spawn result on dataflow finish");
412                                }
413                            }
414                        }
415                        std::collections::hash_map::Entry::Vacant(_) => {
416                            tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
417                        }
418                    }
419                }
420            },
421
422            Event::Control(event) => match event {
423                ControlEvent::IncomingRequest {
424                    request,
425                    reply_sender,
426                } => {
427                    match request {
428                        ControlRequest::Build {
429                            session_id,
430                            dataflow,
431                            git_sources,
432                            prev_git_sources,
433                            local_working_dir,
434                            uv,
435                        } => {
436                            // assign a random build id
437                            let build_id = BuildId::generate();
438
439                            let result = build_dataflow(
440                                build_id,
441                                session_id,
442                                dataflow,
443                                git_sources,
444                                prev_git_sources,
445                                local_working_dir,
446                                &clock,
447                                uv,
448                                &mut daemon_connections,
449                            )
450                            .await;
451                            match result {
452                                Ok(build) => {
453                                    running_builds.insert(build_id, build);
454                                    let _ = reply_sender.send(Ok(
455                                        ControlRequestReply::DataflowBuildTriggered { build_id },
456                                    ));
457                                }
458                                Err(err) => {
459                                    let _ = reply_sender.send(Err(err));
460                                }
461                            }
462                        }
463                        ControlRequest::WaitForBuild { build_id } => {
464                            if let Some(build) = running_builds.get_mut(&build_id) {
465                                build.build_result.register(reply_sender);
466                            } else if let Some(result) = finished_builds.get_mut(&build_id) {
467                                result.register(reply_sender);
468                            } else {
469                                let _ =
470                                    reply_sender.send(Err(eyre!("unknown build id {build_id}")));
471                            }
472                        }
473                        ControlRequest::Start {
474                            build_id,
475                            session_id,
476                            dataflow,
477                            name,
478                            local_working_dir,
479                            uv,
480                            write_events_to,
481                        } => {
482                            let name = name.or_else(|| petname(2, "-"));
483
484                            let inner = async {
485                                if let Some(name) = name.as_deref() {
486                                    // check that name is unique
487                                    if running_dataflows
488                                        .values()
489                                        .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
490                                    {
491                                        bail!(
492                                            "there is already a running dataflow with name `{name}`"
493                                        );
494                                    }
495                                }
496                                let dataflow = start_dataflow(
497                                    build_id,
498                                    session_id,
499                                    dataflow,
500                                    local_working_dir,
501                                    name,
502                                    &mut daemon_connections,
503                                    &clock,
504                                    uv,
505                                    write_events_to,
506                                )
507                                .await?;
508                                Ok(dataflow)
509                            };
510                            match inner.await {
511                                Ok(dataflow) => {
512                                    let uuid = dataflow.uuid;
513                                    running_dataflows.insert(uuid, dataflow);
514                                    let _ = reply_sender.send(Ok(
515                                        ControlRequestReply::DataflowStartTriggered { uuid },
516                                    ));
517                                }
518                                Err(err) => {
519                                    let _ = reply_sender.send(Err(err));
520                                }
521                            }
522                        }
523                        ControlRequest::WaitForSpawn { dataflow_id } => {
524                            if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
525                                dataflow.spawn_result.register(reply_sender);
526                            } else {
527                                let _ =
528                                    reply_sender.send(Err(eyre!("unknown dataflow {dataflow_id}")));
529                            }
530                        }
531                        ControlRequest::Check { dataflow_uuid } => {
532                            let status = match &running_dataflows.get(&dataflow_uuid) {
533                                Some(_) => ControlRequestReply::DataflowSpawned {
534                                    uuid: dataflow_uuid,
535                                },
536                                None => ControlRequestReply::DataflowStopped {
537                                    uuid: dataflow_uuid,
538                                    result: dataflow_results
539                                        .get(&dataflow_uuid)
540                                        .map(|r| dataflow_result(r, dataflow_uuid, &clock))
541                                        .unwrap_or_else(|| {
542                                            DataflowResult::ok_empty(
543                                                dataflow_uuid,
544                                                clock.new_timestamp(),
545                                            )
546                                        }),
547                                },
548                            };
549                            let _ = reply_sender.send(Ok(status));
550                        }
551                        ControlRequest::Reload {
552                            dataflow_id,
553                            node_id,
554                            operator_id,
555                        } => {
556                            let reload = async {
557                                reload_dataflow(
558                                    &running_dataflows,
559                                    dataflow_id,
560                                    node_id,
561                                    operator_id,
562                                    &mut daemon_connections,
563                                    clock.new_timestamp(),
564                                )
565                                .await?;
566                                Result::<_, eyre::Report>::Ok(())
567                            };
568                            let reply =
569                                reload
570                                    .await
571                                    .map(|()| ControlRequestReply::DataflowReloaded {
572                                        uuid: dataflow_id,
573                                    });
574                            let _ = reply_sender.send(reply);
575                        }
576                        ControlRequest::Stop {
577                            dataflow_uuid,
578                            grace_duration,
579                            force,
580                        } => {
581                            if let Some(result) = dataflow_results.get(&dataflow_uuid) {
582                                let reply = ControlRequestReply::DataflowStopped {
583                                    uuid: dataflow_uuid,
584                                    result: dataflow_result(result, dataflow_uuid, &clock),
585                                };
586                                let _ = reply_sender.send(Ok(reply));
587
588                                continue;
589                            }
590
591                            let dataflow = stop_dataflow(
592                                &mut running_dataflows,
593                                dataflow_uuid,
594                                &mut daemon_connections,
595                                clock.new_timestamp(),
596                                grace_duration,
597                                force,
598                            )
599                            .await;
600
601                            match dataflow {
602                                Ok(dataflow) => {
603                                    dataflow.stop_reply_senders.push(reply_sender);
604                                }
605                                Err(err) => {
606                                    let _ = reply_sender.send(Err(err));
607                                }
608                            }
609                        }
610                        ControlRequest::StopByName {
611                            name,
612                            grace_duration,
613                            force,
614                        } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
615                            Ok(dataflow_uuid) => {
616                                if let Some(result) = dataflow_results.get(&dataflow_uuid) {
617                                    let reply = ControlRequestReply::DataflowStopped {
618                                        uuid: dataflow_uuid,
619                                        result: dataflow_result(result, dataflow_uuid, &clock),
620                                    };
621                                    let _ = reply_sender.send(Ok(reply));
622
623                                    continue;
624                                }
625
626                                let dataflow = stop_dataflow(
627                                    &mut running_dataflows,
628                                    dataflow_uuid,
629                                    &mut daemon_connections,
630                                    clock.new_timestamp(),
631                                    grace_duration,
632                                    force,
633                                )
634                                .await;
635
636                                match dataflow {
637                                    Ok(dataflow) => {
638                                        dataflow.stop_reply_senders.push(reply_sender);
639                                    }
640                                    Err(err) => {
641                                        let _ = reply_sender.send(Err(err));
642                                    }
643                                }
644                            }
645                            Err(err) => {
646                                let _ = reply_sender.send(Err(err));
647                            }
648                        },
649                        ControlRequest::Logs {
650                            uuid,
651                            name,
652                            node,
653                            tail,
654                        } => {
655                            let dataflow_uuid = if let Some(uuid) = uuid {
656                                Ok(uuid)
657                            } else if let Some(name) = name {
658                                resolve_name(name, &running_dataflows, &archived_dataflows)
659                            } else {
660                                Err(eyre!("No uuid"))
661                            };
662
663                            match dataflow_uuid {
664                                Ok(uuid) => {
665                                    let reply = retrieve_logs(
666                                        &running_dataflows,
667                                        &archived_dataflows,
668                                        uuid,
669                                        node.into(),
670                                        &mut daemon_connections,
671                                        clock.new_timestamp(),
672                                        tail,
673                                    )
674                                    .await
675                                    .map(ControlRequestReply::Logs);
676                                    let _ = reply_sender.send(reply);
677                                }
678                                Err(err) => {
679                                    let _ = reply_sender.send(Err(err));
680                                }
681                            }
682                        }
683                        ControlRequest::Info { dataflow_uuid } => {
684                            if let Some(dataflow) = running_dataflows.get(&dataflow_uuid) {
685                                let _ = reply_sender.send(Ok(ControlRequestReply::DataflowInfo {
686                                    uuid: dataflow.uuid,
687                                    name: dataflow.name.clone(),
688                                    descriptor: dataflow.descriptor.clone(),
689                                }));
690                            } else {
691                                let _ = reply_sender.send(Err(eyre!(
692                                    "No running dataflow with uuid `{dataflow_uuid}`"
693                                )));
694                            }
695                        }
696                        ControlRequest::Destroy => {
697                            tracing::info!("Received destroy command");
698
699                            let reply = handle_destroy(
700                                &mut running_dataflows,
701                                &mut daemon_connections,
702                                &abort_handle,
703                                &mut daemon_events_tx,
704                                &clock,
705                            )
706                            .await
707                            .map(|()| ControlRequestReply::DestroyOk);
708                            let _ = reply_sender.send(reply);
709                        }
710                        ControlRequest::List => {
711                            let mut dataflows: Vec<_> = running_dataflows.values().collect();
712                            dataflows.sort_by_key(|d| (&d.name, d.uuid));
713
714                            let running = dataflows.into_iter().map(|d| DataflowListEntry {
715                                id: DataflowIdAndName {
716                                    uuid: d.uuid,
717                                    name: d.name.clone(),
718                                },
719                                status: DataflowStatus::Running,
720                            });
721                            let finished_failed =
722                                dataflow_results.iter().map(|(&uuid, results)| {
723                                    let name =
724                                        archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
725                                    let id = DataflowIdAndName { uuid, name };
726                                    let status = if results.values().all(|r| r.is_ok()) {
727                                        DataflowStatus::Finished
728                                    } else {
729                                        DataflowStatus::Failed
730                                    };
731                                    DataflowListEntry { id, status }
732                                });
733
734                            let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
735                                running.chain(finished_failed).collect(),
736                            )));
737                            let _ = reply_sender.send(reply);
738                        }
739                        ControlRequest::DaemonConnected => {
740                            let running = !daemon_connections.is_empty();
741                            let _ = reply_sender
742                                .send(Ok(ControlRequestReply::DaemonConnected(running)));
743                        }
744                        ControlRequest::ConnectedMachines => {
745                            let reply = Ok(ControlRequestReply::ConnectedDaemons(
746                                daemon_connections.keys().cloned().collect(),
747                            ));
748                            let _ = reply_sender.send(reply);
749                        }
750                        ControlRequest::LogSubscribe { .. } => {
751                            let _ = reply_sender.send(Err(eyre::eyre!(
752                                "LogSubscribe request should be handled separately"
753                            )));
754                        }
755                        ControlRequest::BuildLogSubscribe { .. } => {
756                            let _ = reply_sender.send(Err(eyre::eyre!(
757                                "BuildLogSubscribe request should be handled separately"
758                            )));
759                        }
760                        ControlRequest::CliAndDefaultDaemonOnSameMachine => {
761                            let mut default_daemon_ip = None;
762                            if let Some(default_id) = daemon_connections.unnamed().next() {
763                                if let Some(connection) = daemon_connections.get(default_id) {
764                                    if let Ok(addr) = connection.stream.peer_addr() {
765                                        default_daemon_ip = Some(addr.ip());
766                                    }
767                                }
768                            }
769                            let _ = reply_sender.send(Ok(
770                                ControlRequestReply::CliAndDefaultDaemonIps {
771                                    default_daemon: default_daemon_ip,
772                                    cli: None, // filled later
773                                },
774                            ));
775                        }
776                        ControlRequest::GetNodeInfo => {
777                            use dora_message::coordinator_to_cli::{NodeInfo, NodeMetricsInfo};
778
779                            let mut node_infos = Vec::new();
780                            for dataflow in running_dataflows.values() {
781                                for (node_id, _node) in &dataflow.nodes {
782                                    // Get the specific daemon this node is running on
783                                    if let Some(daemon_id) = dataflow.node_to_daemon.get(node_id) {
784                                        // Get metrics if available
785                                        let metrics = dataflow.node_metrics.get(node_id).map(|m| {
786                                            NodeMetricsInfo {
787                                                pid: m.pid,
788                                                cpu_usage: m.cpu_usage,
789                                                // Use 1000 for MB (megabytes) instead of 1024 (mebibytes)
790                                                memory_mb: m.memory_bytes as f64 / 1000.0 / 1000.0,
791                                                disk_read_mb_s: m
792                                                    .disk_read_bytes
793                                                    .map(|b| b as f64 / 1000.0 / 1000.0),
794                                                disk_write_mb_s: m
795                                                    .disk_write_bytes
796                                                    .map(|b| b as f64 / 1000.0 / 1000.0),
797                                            }
798                                        });
799
800                                        node_infos.push(NodeInfo {
801                                            dataflow_id: dataflow.uuid,
802                                            dataflow_name: dataflow.name.clone(),
803                                            node_id: node_id.clone(),
804                                            daemon_id: daemon_id.clone(),
805                                            metrics,
806                                        });
807                                    }
808                                }
809                            }
810                            let _ = reply_sender
811                                .send(Ok(ControlRequestReply::NodeInfoList(node_infos)));
812                        }
813                    }
814                }
815                ControlEvent::Error(err) => tracing::error!("{err:?}"),
816                ControlEvent::LogSubscribe {
817                    dataflow_id,
818                    level,
819                    connection,
820                } => {
821                    if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
822                        dataflow
823                            .log_subscribers
824                            .push(LogSubscriber::new(level, connection));
825                        let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
826                        for message in buffered {
827                            send_log_message(&mut dataflow.log_subscribers, &message).await;
828                        }
829                    }
830                }
831                ControlEvent::BuildLogSubscribe {
832                    build_id,
833                    level,
834                    connection,
835                } => {
836                    if let Some(build) = running_builds.get_mut(&build_id) {
837                        build
838                            .log_subscribers
839                            .push(LogSubscriber::new(level, connection));
840                        let buffered = std::mem::take(&mut build.buffered_log_messages);
841                        for message in buffered {
842                            send_log_message(&mut build.log_subscribers, &message).await;
843                        }
844                    }
845                }
846            },
847            Event::DaemonHeartbeatInterval => {
848                let mut disconnected = BTreeSet::new();
849                for (machine_id, connection) in daemon_connections.iter_mut() {
850                    if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
851                        tracing::warn!(
852                            "no heartbeat message from machine `{machine_id}` since {:?}",
853                            connection.last_heartbeat.elapsed()
854                        )
855                    }
856                    if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
857                        disconnected.insert(machine_id.clone());
858                        continue;
859                    }
860                    let result: eyre::Result<()> = tokio::time::timeout(
861                        Duration::from_millis(500),
862                        send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
863                    )
864                    .await
865                    .wrap_err("timeout")
866                    .and_then(|r| r)
867                    .wrap_err_with(|| {
868                        format!("failed to send heartbeat message to daemon at `{machine_id}`")
869                    });
870                    if let Err(err) = result {
871                        tracing::warn!("{err:?}");
872                        disconnected.insert(machine_id.clone());
873                    }
874                }
875                if !disconnected.is_empty() {
876                    tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
877                    for machine_id in disconnected {
878                        daemon_connections.remove(&machine_id);
879                    }
880                }
881            }
882            Event::CtrlC => {
883                tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
884                handle_destroy(
885                    &mut running_dataflows,
886                    &mut daemon_connections,
887                    &abort_handle,
888                    &mut daemon_events_tx,
889                    &clock,
890                )
891                .await?;
892            }
893            Event::DaemonHeartbeat {
894                daemon_id: machine_id,
895            } => {
896                if let Some(connection) = daemon_connections.get_mut(&machine_id) {
897                    connection.last_heartbeat = Instant::now();
898                }
899            }
900            Event::Log(message) => {
901                if let Some(dataflow_id) = &message.dataflow_id {
902                    if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
903                        if dataflow.log_subscribers.is_empty() {
904                            // buffer log message until there are subscribers
905                            dataflow.buffered_log_messages.push(message);
906                        } else {
907                            send_log_message(&mut dataflow.log_subscribers, &message).await;
908                        }
909                    }
910                } else if let Some(build_id) = &message.build_id {
911                    if let Some(build) = running_builds.get_mut(build_id) {
912                        if build.log_subscribers.is_empty() {
913                            // buffer log message until there are subscribers
914                            build.buffered_log_messages.push(message);
915                        } else {
916                            send_log_message(&mut build.log_subscribers, &message).await;
917                        }
918                    }
919                }
920            }
921            Event::DaemonExit { daemon_id } => {
922                tracing::info!("Daemon `{daemon_id}` exited");
923                daemon_connections.remove(&daemon_id);
924            }
925            Event::NodeMetrics {
926                dataflow_id,
927                metrics,
928            } => {
929                // Store metrics for this dataflow
930                if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
931                    for (node_id, node_metrics) in metrics {
932                        dataflow.node_metrics.insert(node_id, node_metrics);
933                    }
934                }
935            }
936            Event::DataflowBuildResult {
937                build_id,
938                daemon_id,
939                result,
940            } => match running_builds.get_mut(&build_id) {
941                Some(build) => {
942                    build.pending_build_results.remove(&daemon_id);
943                    match result {
944                        Ok(()) => {}
945                        Err(err) => {
946                            build.errors.push(format!("{err:?}"));
947                        }
948                    };
949                    if build.pending_build_results.is_empty() {
950                        tracing::info!("dataflow build finished: `{build_id}`");
951                        let mut build = running_builds.remove(&build_id).unwrap();
952                        let result = if build.errors.is_empty() {
953                            Ok(())
954                        } else {
955                            Err(format!("build failed: {}", build.errors.join("\n\n")))
956                        };
957
958                        build.build_result.set_result(Ok(
959                            ControlRequestReply::DataflowBuildFinished { build_id, result },
960                        ));
961
962                        finished_builds.insert(build_id, build.build_result);
963                    }
964                }
965                None => {
966                    tracing::warn!(
967                        "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
968                    );
969                }
970            },
971            Event::DataflowSpawnResult {
972                dataflow_id,
973                daemon_id,
974                result,
975            } => match running_dataflows.get_mut(&dataflow_id) {
976                Some(dataflow) => {
977                    dataflow.pending_spawn_results.remove(&daemon_id);
978                    match result {
979                        Ok(()) => {
980                            if dataflow.pending_spawn_results.is_empty() {
981                                tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
982                                dataflow.spawn_result.set_result(Ok(
983                                    ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
984                                ));
985                            }
986                        }
987                        Err(err) => {
988                            tracing::warn!("error while spawning dataflow `{dataflow_id}`");
989                            dataflow.spawn_result.set_result(Err(err));
990                        }
991                    };
992                }
993                None => {
994                    tracing::warn!(
995                        "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
996                    );
997                }
998            },
999        }
1000
1001        // warn if event handling took too long -> the main loop should never be blocked for too long
1002        let elapsed = start.elapsed();
1003        if elapsed > Duration::from_millis(100) {
1004            tracing::warn!(
1005                "Coordinator took {}ms for handling event: {event_kind}",
1006                elapsed.as_millis()
1007            );
1008        }
1009    }
1010
1011    tracing::info!("stopped");
1012
1013    Ok(())
1014}
1015
1016async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
1017    for subscriber in log_subscribers.iter_mut() {
1018        let send_result =
1019            tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
1020
1021        if send_result.await.is_err() {
1022            subscriber.close();
1023        }
1024    }
1025    log_subscribers.retain(|s| !s.is_closed());
1026}
1027
1028fn dataflow_result(
1029    results: &BTreeMap<DaemonId, DataflowDaemonResult>,
1030    dataflow_uuid: Uuid,
1031    clock: &uhlc::HLC,
1032) -> DataflowResult {
1033    let mut node_results = BTreeMap::new();
1034    for result in results.values() {
1035        node_results.extend(result.node_results.clone());
1036        if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
1037            tracing::warn!("failed to update HLC: {err}");
1038        }
1039    }
1040
1041    DataflowResult {
1042        uuid: dataflow_uuid,
1043        timestamp: clock.new_timestamp(),
1044        node_results,
1045    }
1046}
1047
1048struct DaemonConnection {
1049    stream: TcpStream,
1050    last_heartbeat: Instant,
1051}
1052
1053async fn handle_destroy(
1054    running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
1055    daemon_connections: &mut DaemonConnections,
1056    abortable_events: &futures::stream::AbortHandle,
1057    daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
1058    clock: &HLC,
1059) -> Result<(), eyre::ErrReport> {
1060    abortable_events.abort();
1061    for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
1062        let _ = stop_dataflow(
1063            running_dataflows,
1064            dataflow_uuid,
1065            daemon_connections,
1066            clock.new_timestamp(),
1067            None,
1068            false,
1069        )
1070        .await?;
1071    }
1072
1073    let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
1074    *daemon_events_tx = None;
1075    result
1076}
1077
1078async fn send_heartbeat_message(
1079    connection: &mut TcpStream,
1080    timestamp: uhlc::Timestamp,
1081) -> eyre::Result<()> {
1082    let message = serde_json::to_vec(&Timestamped {
1083        inner: DaemonCoordinatorEvent::Heartbeat,
1084        timestamp,
1085    })
1086    .context("Could not serialize heartbeat message")?;
1087
1088    tcp_send(connection, &message)
1089        .await
1090        .wrap_err("failed to send heartbeat message to daemon")
1091}
1092
1093struct RunningBuild {
1094    errors: Vec<String>,
1095    build_result: CachedResult,
1096
1097    /// Buffer for log messages that were sent before there were any subscribers.
1098    buffered_log_messages: Vec<LogMessage>,
1099    log_subscribers: Vec<LogSubscriber>,
1100
1101    pending_build_results: BTreeSet<DaemonId>,
1102}
1103
1104struct RunningDataflow {
1105    name: Option<String>,
1106    uuid: Uuid,
1107    descriptor: Descriptor,
1108    /// The IDs of the daemons that the dataflow is running on.
1109    daemons: BTreeSet<DaemonId>,
1110    /// IDs of daemons that are waiting until all nodes are started.
1111    pending_daemons: BTreeSet<DaemonId>,
1112    exited_before_subscribe: Vec<NodeId>,
1113    nodes: BTreeMap<NodeId, ResolvedNode>,
1114    /// Maps each node to the daemon it's running on
1115    node_to_daemon: BTreeMap<NodeId, DaemonId>,
1116    /// Latest metrics for each node (from daemons)
1117    node_metrics: BTreeMap<NodeId, dora_message::daemon_to_coordinator::NodeMetrics>,
1118
1119    spawn_result: CachedResult,
1120    stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1121
1122    /// Buffer for log messages that were sent before there were any subscribers.
1123    buffered_log_messages: Vec<LogMessage>,
1124    log_subscribers: Vec<LogSubscriber>,
1125
1126    pending_spawn_results: BTreeSet<DaemonId>,
1127}
1128
1129pub enum CachedResult {
1130    Pending {
1131        result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1132    },
1133    Cached {
1134        result: eyre::Result<ControlRequestReply>,
1135    },
1136}
1137
1138impl Default for CachedResult {
1139    fn default() -> Self {
1140        Self::Pending {
1141            result_senders: Vec::new(),
1142        }
1143    }
1144}
1145
1146impl CachedResult {
1147    fn register(
1148        &mut self,
1149        reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
1150    ) {
1151        match self {
1152            CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
1153            CachedResult::Cached { result } => {
1154                Self::send_result_to(result, reply_sender);
1155            }
1156        }
1157    }
1158
1159    fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
1160        match self {
1161            CachedResult::Pending { result_senders } => {
1162                for sender in result_senders.drain(..) {
1163                    Self::send_result_to(&result, sender);
1164                }
1165                *self = CachedResult::Cached { result };
1166            }
1167            CachedResult::Cached { .. } => {}
1168        }
1169    }
1170
1171    fn send_result_to(
1172        result: &eyre::Result<ControlRequestReply>,
1173        sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
1174    ) {
1175        let result = match result {
1176            Ok(r) => Ok(r.clone()),
1177            Err(err) => Err(eyre!("{err:?}")),
1178        };
1179        let _ = sender.send(result);
1180    }
1181}
1182
1183struct ArchivedDataflow {
1184    name: Option<String>,
1185    nodes: BTreeMap<NodeId, ResolvedNode>,
1186}
1187
1188impl From<&RunningDataflow> for ArchivedDataflow {
1189    fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
1190        ArchivedDataflow {
1191            name: dataflow.name.clone(),
1192            nodes: dataflow.nodes.clone(),
1193        }
1194    }
1195}
1196
1197impl PartialEq for RunningDataflow {
1198    fn eq(&self, other: &Self) -> bool {
1199        self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
1200    }
1201}
1202
1203impl Eq for RunningDataflow {}
1204
1205async fn stop_dataflow<'a>(
1206    running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
1207    dataflow_uuid: Uuid,
1208    daemon_connections: &mut DaemonConnections,
1209    timestamp: uhlc::Timestamp,
1210    grace_duration: Option<Duration>,
1211    force: bool,
1212) -> eyre::Result<&'a mut RunningDataflow> {
1213    let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
1214        bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
1215    };
1216
1217    let message = serde_json::to_vec(&Timestamped {
1218        inner: DaemonCoordinatorEvent::StopDataflow {
1219            dataflow_id: dataflow_uuid,
1220            grace_duration,
1221            force,
1222        },
1223        timestamp,
1224    })?;
1225
1226    for daemon_id in &dataflow.daemons {
1227        let daemon_connection = daemon_connections
1228            .get_mut(daemon_id)
1229            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
1230        tcp_send(&mut daemon_connection.stream, &message)
1231            .await
1232            .wrap_err("failed to send stop message to daemon")?;
1233
1234        // wait for reply
1235        let reply_raw = tcp_receive(&mut daemon_connection.stream)
1236            .await
1237            .wrap_err("failed to receive stop reply from daemon")?;
1238        match serde_json::from_slice(&reply_raw)
1239            .wrap_err("failed to deserialize stop reply from daemon")?
1240        {
1241            DaemonCoordinatorReply::StopResult(result) => result
1242                .map_err(|e| eyre!(e))
1243                .wrap_err("failed to stop dataflow")?,
1244            other => bail!("unexpected reply after sending stop: {other:?}"),
1245        }
1246    }
1247
1248    tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
1249
1250    Ok(dataflow)
1251}
1252
1253async fn reload_dataflow(
1254    running_dataflows: &HashMap<Uuid, RunningDataflow>,
1255    dataflow_id: Uuid,
1256    node_id: NodeId,
1257    operator_id: Option<OperatorId>,
1258    daemon_connections: &mut DaemonConnections,
1259    timestamp: uhlc::Timestamp,
1260) -> eyre::Result<()> {
1261    let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
1262        bail!("No running dataflow found with UUID `{dataflow_id}`")
1263    };
1264    let message = serde_json::to_vec(&Timestamped {
1265        inner: DaemonCoordinatorEvent::ReloadDataflow {
1266            dataflow_id,
1267            node_id,
1268            operator_id,
1269        },
1270        timestamp,
1271    })?;
1272
1273    for machine_id in &dataflow.daemons {
1274        let daemon_connection = daemon_connections
1275            .get_mut(machine_id)
1276            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
1277        tcp_send(&mut daemon_connection.stream, &message)
1278            .await
1279            .wrap_err("failed to send reload message to daemon")?;
1280
1281        // wait for reply
1282        let reply_raw = tcp_receive(&mut daemon_connection.stream)
1283            .await
1284            .wrap_err("failed to receive reload reply from daemon")?;
1285        match serde_json::from_slice(&reply_raw)
1286            .wrap_err("failed to deserialize reload reply from daemon")?
1287        {
1288            DaemonCoordinatorReply::ReloadResult(result) => result
1289                .map_err(|e| eyre!(e))
1290                .wrap_err("failed to reload dataflow")?,
1291            other => bail!("unexpected reply after sending reload: {other:?}"),
1292        }
1293    }
1294    tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
1295
1296    Ok(())
1297}
1298
1299async fn retrieve_logs(
1300    running_dataflows: &HashMap<Uuid, RunningDataflow>,
1301    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
1302    dataflow_id: Uuid,
1303    node_id: NodeId,
1304    daemon_connections: &mut DaemonConnections,
1305    timestamp: uhlc::Timestamp,
1306    tail: Option<usize>,
1307) -> eyre::Result<Vec<u8>> {
1308    let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
1309        dataflow.nodes.clone()
1310    } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
1311        dataflow.nodes.clone()
1312    } else {
1313        bail!("No dataflow found with UUID `{dataflow_id}`")
1314    };
1315
1316    let message = serde_json::to_vec(&Timestamped {
1317        inner: DaemonCoordinatorEvent::Logs {
1318            dataflow_id,
1319            node_id: node_id.clone(),
1320            tail,
1321        },
1322        timestamp,
1323    })?;
1324
1325    let machine_ids: Vec<Option<String>> = nodes
1326        .values()
1327        .filter(|node| node.id == node_id)
1328        .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
1329        .collect();
1330
1331    let machine_id = if let [machine_id] = &machine_ids[..] {
1332        machine_id
1333    } else if machine_ids.is_empty() {
1334        bail!("No machine contains {}/{}", dataflow_id, node_id)
1335    } else {
1336        bail!(
1337            "More than one machine contains {}/{}. However, it should only be present on one.",
1338            dataflow_id,
1339            node_id
1340        )
1341    };
1342
1343    let daemon_ids: Vec<_> = match machine_id {
1344        None => daemon_connections.unnamed().collect(),
1345        Some(machine_id) => daemon_connections
1346            .get_matching_daemon_id(machine_id)
1347            .into_iter()
1348            .collect(),
1349    };
1350    let daemon_id = match &daemon_ids[..] {
1351        [id] => (*id).clone(),
1352        [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
1353        _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
1354    };
1355    let daemon_connection = daemon_connections
1356        .get_mut(&daemon_id)
1357        .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
1358    tcp_send(&mut daemon_connection.stream, &message)
1359        .await
1360        .wrap_err("failed to send logs message to daemon")?;
1361
1362    // wait for reply
1363    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1364        .await
1365        .wrap_err("failed to retrieve logs reply from daemon")?;
1366    let reply_logs = match serde_json::from_slice(&reply_raw)
1367        .wrap_err("failed to deserialize logs reply from daemon")?
1368    {
1369        DaemonCoordinatorReply::Logs(logs) => logs,
1370        other => bail!("unexpected reply after sending logs: {other:?}"),
1371    };
1372    tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
1373
1374    reply_logs.map_err(|err| eyre!(err))
1375}
1376
1377#[allow(clippy::too_many_arguments)]
1378#[tracing::instrument(skip(daemon_connections, clock))]
1379async fn build_dataflow(
1380    build_id: BuildId,
1381    session_id: SessionId,
1382    dataflow: Descriptor,
1383    git_sources: BTreeMap<NodeId, GitSource>,
1384    prev_git_sources: BTreeMap<NodeId, GitSource>,
1385    local_working_dir: Option<PathBuf>,
1386    clock: &HLC,
1387    uv: bool,
1388    daemon_connections: &mut DaemonConnections,
1389) -> eyre::Result<RunningBuild> {
1390    let nodes = dataflow.resolve_aliases_and_set_defaults()?;
1391
1392    let mut git_sources_by_daemon = git_sources
1393        .into_iter()
1394        .into_grouping_map_by(|(id, _)| {
1395            nodes
1396                .get(id)
1397                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1398        })
1399        .collect();
1400    let mut prev_git_sources_by_daemon = prev_git_sources
1401        .into_iter()
1402        .into_grouping_map_by(|(id, _)| {
1403            nodes
1404                .get(id)
1405                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1406        })
1407        .collect();
1408
1409    let nodes_by_daemon = nodes
1410        .values()
1411        .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
1412
1413    let mut daemons = BTreeSet::new();
1414    for (machine, nodes_on_machine) in &nodes_by_daemon {
1415        let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
1416        tracing::debug!(
1417            "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
1418        );
1419
1420        let build_command = BuildDataflowNodes {
1421            build_id,
1422            session_id,
1423            local_working_dir: local_working_dir.clone(),
1424            git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
1425            prev_git_sources: prev_git_sources_by_daemon
1426                .remove(machine)
1427                .unwrap_or_default(),
1428            dataflow_descriptor: dataflow.clone(),
1429            nodes_on_machine,
1430            uv,
1431        };
1432        let message = serde_json::to_vec(&Timestamped {
1433            inner: DaemonCoordinatorEvent::Build(build_command),
1434            timestamp: clock.new_timestamp(),
1435        })?;
1436
1437        let daemon_id =
1438            build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
1439                .await
1440                .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
1441        daemons.insert(daemon_id);
1442    }
1443
1444    tracing::info!("successfully triggered dataflow build `{build_id}`",);
1445
1446    Ok(RunningBuild {
1447        errors: Vec::new(),
1448        build_result: CachedResult::default(),
1449        buffered_log_messages: Vec::new(),
1450        log_subscribers: Vec::new(),
1451        pending_build_results: daemons,
1452    })
1453}
1454
1455async fn build_dataflow_on_machine(
1456    daemon_connections: &mut DaemonConnections,
1457    machine: Option<&str>,
1458    message: &[u8],
1459) -> Result<DaemonId, eyre::ErrReport> {
1460    let daemon_id = match machine {
1461        Some(machine) => daemon_connections
1462            .get_matching_daemon_id(machine)
1463            .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1464            .clone(),
1465        None => daemon_connections
1466            .unnamed()
1467            .next()
1468            .wrap_err("no unnamed daemon connections")?
1469            .clone(),
1470    };
1471
1472    let daemon_connection = daemon_connections
1473        .get_mut(&daemon_id)
1474        .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
1475    tcp_send(&mut daemon_connection.stream, message)
1476        .await
1477        .wrap_err("failed to send build message to daemon")?;
1478
1479    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1480        .await
1481        .wrap_err("failed to receive build reply from daemon")?;
1482    match serde_json::from_slice(&reply_raw)
1483        .wrap_err("failed to deserialize build reply from daemon")?
1484    {
1485        DaemonCoordinatorReply::TriggerBuildResult(result) => result
1486            .map_err(|e| eyre!(e))
1487            .wrap_err("daemon returned an error")?,
1488        _ => bail!("unexpected reply"),
1489    }
1490    Ok(daemon_id)
1491}
1492
1493#[allow(clippy::too_many_arguments)]
1494async fn start_dataflow(
1495    build_id: Option<BuildId>,
1496    session_id: SessionId,
1497    dataflow: Descriptor,
1498    local_working_dir: Option<PathBuf>,
1499    name: Option<String>,
1500    daemon_connections: &mut DaemonConnections,
1501    clock: &HLC,
1502    uv: bool,
1503    write_events_to: Option<PathBuf>,
1504) -> eyre::Result<RunningDataflow> {
1505    let SpawnedDataflow {
1506        uuid,
1507        daemons,
1508        nodes,
1509        node_to_daemon,
1510    } = spawn_dataflow(
1511        build_id,
1512        session_id,
1513        dataflow.clone(),
1514        local_working_dir,
1515        daemon_connections,
1516        clock,
1517        uv,
1518        write_events_to,
1519    )
1520    .await?;
1521    Ok(RunningDataflow {
1522        uuid,
1523        name,
1524        descriptor: dataflow,
1525        pending_daemons: if daemons.len() > 1 {
1526            daemons.clone()
1527        } else {
1528            BTreeSet::new()
1529        },
1530        exited_before_subscribe: Default::default(),
1531        daemons: daemons.clone(),
1532        nodes,
1533        node_to_daemon,
1534        node_metrics: BTreeMap::new(),
1535        spawn_result: CachedResult::default(),
1536        stop_reply_senders: Vec::new(),
1537        buffered_log_messages: Vec::new(),
1538        log_subscribers: Vec::new(),
1539        pending_spawn_results: daemons,
1540    })
1541}
1542
1543async fn destroy_daemon(
1544    daemon_id: DaemonId,
1545    mut daemon_connection: DaemonConnection,
1546
1547    timestamp: uhlc::Timestamp,
1548) -> Result<()> {
1549    let message = serde_json::to_vec(&Timestamped {
1550        inner: DaemonCoordinatorEvent::Destroy,
1551        timestamp,
1552    })?;
1553
1554    tcp_send(&mut daemon_connection.stream, &message)
1555        .await
1556        .wrap_err(format!(
1557            "failed to send destroy message to daemon `{daemon_id}`"
1558        ))?;
1559
1560    // wait for reply
1561    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1562        .await
1563        .wrap_err("failed to receive destroy reply from daemon")?;
1564    match serde_json::from_slice(&reply_raw)
1565        .wrap_err("failed to deserialize destroy reply from daemon")?
1566    {
1567        DaemonCoordinatorReply::DestroyResult { result, .. } => result
1568            .map_err(|e| eyre!(e))
1569            .wrap_err("failed to destroy dataflow")?,
1570        other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1571    }
1572
1573    tracing::info!("successfully destroyed daemon `{daemon_id}`");
1574    Ok(())
1575}
1576
1577async fn destroy_daemons(
1578    daemon_connections: &mut DaemonConnections,
1579    timestamp: uhlc::Timestamp,
1580) -> eyre::Result<()> {
1581    let futures = daemon_connections
1582        .drain()
1583        .map(|(daemon_id, daemon_connection)| {
1584            destroy_daemon(daemon_id, daemon_connection, timestamp)
1585        })
1586        .collect::<Vec<_>>();
1587    let results: Vec<std::result::Result<(), eyre::Error>> =
1588        join_all(futures).await.into_iter().collect::<Vec<_>>();
1589    for result in results {
1590        result?;
1591    }
1592    Ok(())
1593}
1594
1595#[derive(Debug)]
1596pub enum Event {
1597    NewDaemonConnection(TcpStream),
1598    DaemonConnectError(eyre::Report),
1599    DaemonHeartbeat {
1600        daemon_id: DaemonId,
1601    },
1602    Dataflow {
1603        uuid: Uuid,
1604        event: DataflowEvent,
1605    },
1606    Control(ControlEvent),
1607    Daemon(DaemonRequest),
1608    DaemonHeartbeatInterval,
1609    CtrlC,
1610    Log(LogMessage),
1611    DaemonExit {
1612        daemon_id: dora_message::common::DaemonId,
1613    },
1614    DataflowBuildResult {
1615        build_id: BuildId,
1616        daemon_id: DaemonId,
1617        result: eyre::Result<()>,
1618    },
1619    DataflowSpawnResult {
1620        dataflow_id: uuid::Uuid,
1621        daemon_id: DaemonId,
1622        result: eyre::Result<()>,
1623    },
1624    NodeMetrics {
1625        dataflow_id: uuid::Uuid,
1626        metrics: BTreeMap<NodeId, dora_message::daemon_to_coordinator::NodeMetrics>,
1627    },
1628}
1629
1630impl Event {
1631    /// Whether this event should be logged.
1632    #[allow(clippy::match_like_matches_macro)]
1633    pub fn log(&self) -> bool {
1634        match self {
1635            Event::DaemonHeartbeatInterval => false,
1636            _ => true,
1637        }
1638    }
1639
1640    fn kind(&self) -> &'static str {
1641        match self {
1642            Event::NewDaemonConnection(_) => "NewDaemonConnection",
1643            Event::DaemonConnectError(_) => "DaemonConnectError",
1644            Event::DaemonHeartbeat { .. } => "DaemonHeartbeat",
1645            Event::Dataflow { .. } => "Dataflow",
1646            Event::Control(_) => "Control",
1647            Event::Daemon(_) => "Daemon",
1648            Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1649            Event::CtrlC => "CtrlC",
1650            Event::Log(_) => "Log",
1651            Event::DaemonExit { .. } => "DaemonExit",
1652            Event::DataflowBuildResult { .. } => "DataflowBuildResult",
1653            Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
1654            Event::NodeMetrics { .. } => "NodeMetrics",
1655        }
1656    }
1657}
1658
1659#[derive(Debug)]
1660pub enum DataflowEvent {
1661    DataflowFinishedOnDaemon {
1662        daemon_id: DaemonId,
1663        result: DataflowDaemonResult,
1664    },
1665    ReadyOnDaemon {
1666        daemon_id: DaemonId,
1667        exited_before_subscribe: Vec<NodeId>,
1668    },
1669}
1670
1671#[derive(Debug)]
1672pub enum DaemonRequest {
1673    Register {
1674        version_check_result: Result<(), String>,
1675        machine_id: Option<String>,
1676        connection: TcpStream,
1677    },
1678}
1679
1680fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1681    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1682
1683    let mut ctrlc_sent = false;
1684    ctrlc::set_handler(move || {
1685        if ctrlc_sent {
1686            tracing::warn!("received second ctrlc signal -> aborting immediately");
1687            std::process::abort();
1688        } else {
1689            tracing::info!("received ctrlc signal");
1690            if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1691                tracing::error!("failed to report ctrl-c event to dora-coordinator");
1692            }
1693
1694            ctrlc_sent = true;
1695        }
1696    })
1697    .wrap_err("failed to set ctrl-c handler")?;
1698
1699    Ok(ReceiverStream::new(ctrlc_rx))
1700}