dora_coordinator/
lib.rs

1use crate::{
2    run::spawn_dataflow,
3    tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7    config::{NodeId, OperatorId},
8    descriptor::DescriptorExt,
9    uhlc::{self, HLC},
10};
11use dora_message::{
12    BuildId, DataflowId, SessionId,
13    cli_to_coordinator::ControlRequest,
14    common::{DaemonId, GitSource},
15    coordinator_to_cli::{
16        ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
17        DataflowStatus, LogLevel, LogMessage,
18    },
19    coordinator_to_daemon::{
20        BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
21    },
22    daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
23    descriptor::{Descriptor, ResolvedNode},
24};
25use eyre::{ContextCompat, Result, WrapErr, bail, eyre};
26use futures::{Future, Stream, StreamExt, future::join_all, stream::FuturesUnordered};
27use futures_concurrency::stream::Merge;
28use itertools::Itertools;
29use log_subscriber::LogSubscriber;
30use petname::petname;
31use run::SpawnedDataflow;
32use std::{
33    collections::{BTreeMap, BTreeSet, HashMap},
34    net::SocketAddr,
35    path::PathBuf,
36    sync::Arc,
37    time::{Duration, Instant},
38};
39use tokio::{
40    net::TcpStream,
41    sync::{mpsc, oneshot},
42    task::JoinHandle,
43};
44use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
45use uuid::Uuid;
46
47mod control;
48mod listener;
49mod log_subscriber;
50mod run;
51mod tcp_utils;
52
53pub async fn start(
54    bind: SocketAddr,
55    bind_control: SocketAddr,
56    external_events: impl Stream<Item = Event> + Unpin,
57) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
58    let listener = listener::create_listener(bind).await?;
59    let port = listener
60        .local_addr()
61        .wrap_err("failed to get local addr of listener")?
62        .port();
63    let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
64        c.map(Event::NewDaemonConnection)
65            .wrap_err("failed to open connection")
66            .unwrap_or_else(Event::DaemonConnectError)
67    });
68
69    let mut tasks = FuturesUnordered::new();
70    let control_events = control::control_events(bind_control, &tasks)
71        .await
72        .wrap_err("failed to create control events")?;
73
74    // Setup ctrl-c handler
75    let ctrlc_events = set_up_ctrlc_handler()?;
76
77    let events = (
78        external_events,
79        new_daemon_connections,
80        control_events,
81        ctrlc_events,
82    )
83        .merge();
84
85    let future = async move {
86        start_inner(events, &tasks).await?;
87
88        tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
89        while let Some(join_result) = tasks.next().await {
90            if let Err(err) = join_result {
91                tracing::error!("task panicked: {err}");
92            }
93        }
94        tracing::debug!("all spawned tasks finished, exiting..");
95        Ok(())
96    };
97    Ok((port, future))
98}
99
100// Resolve the dataflow name.
101fn resolve_name(
102    name: String,
103    running_dataflows: &HashMap<Uuid, RunningDataflow>,
104    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
105) -> eyre::Result<Uuid> {
106    let uuids: Vec<_> = running_dataflows
107        .iter()
108        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
109        .map(|(k, _)| k)
110        .copied()
111        .collect();
112    let archived_uuids: Vec<_> = archived_dataflows
113        .iter()
114        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
115        .map(|(k, _)| k)
116        .copied()
117        .collect();
118
119    if uuids.is_empty() {
120        if archived_uuids.is_empty() {
121            bail!("no dataflow with name `{name}`");
122        } else if let [uuid] = archived_uuids.as_slice() {
123            Ok(*uuid)
124        } else {
125            // TODO: Index the archived dataflows in order to return logs based on the index.
126            bail!(
127                "multiple archived dataflows found with name `{name}`, Please provide the UUID instead."
128            );
129        }
130    } else if let [uuid] = uuids.as_slice() {
131        Ok(*uuid)
132    } else {
133        bail!("multiple dataflows found with name `{name}`");
134    }
135}
136
137#[derive(Default)]
138struct DaemonConnections {
139    daemons: BTreeMap<DaemonId, DaemonConnection>,
140}
141
142impl DaemonConnections {
143    fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
144        let previous = self.daemons.insert(daemon_id.clone(), connection);
145        if previous.is_some() {
146            tracing::info!("closing previous connection `{daemon_id}` on new register");
147        }
148    }
149
150    fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
151        self.daemons.get(id)
152    }
153
154    fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
155        self.daemons.get_mut(id)
156    }
157
158    fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
159        self.daemons
160            .keys()
161            .find(|id| id.matches_machine_id(machine_id))
162    }
163
164    fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
165        std::mem::take(&mut self.daemons).into_iter()
166    }
167
168    fn is_empty(&self) -> bool {
169        self.daemons.is_empty()
170    }
171
172    fn keys(&self) -> impl Iterator<Item = &DaemonId> {
173        self.daemons.keys()
174    }
175
176    fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
177        self.daemons.iter_mut()
178    }
179
180    fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
181        self.daemons.remove(daemon_id)
182    }
183
184    fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
185        self.daemons.keys().filter(|id| id.machine_id().is_none())
186    }
187}
188
189async fn start_inner(
190    events: impl Stream<Item = Event> + Unpin,
191    tasks: &FuturesUnordered<JoinHandle<()>>,
192) -> eyre::Result<()> {
193    let clock = Arc::new(HLC::default());
194
195    let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
196    let mut daemon_events_tx = Some(daemon_events_tx);
197    let daemon_events = ReceiverStream::new(daemon_events);
198
199    let daemon_heartbeat_interval =
200        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
201            .map(|_| Event::DaemonHeartbeatInterval);
202
203    // events that should be aborted on `dora destroy`
204    let (abortable_events, abort_handle) =
205        futures::stream::abortable((events, daemon_heartbeat_interval).merge());
206
207    let mut events = (abortable_events, daemon_events).merge();
208
209    let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
210    let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();
211
212    let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
213    let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
214        HashMap::new();
215    let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
216    let mut daemon_connections = DaemonConnections::default();
217
218    while let Some(event) = events.next().await {
219        // used below for measuring the event handling duration
220        let start = Instant::now();
221        let event_kind = event.kind();
222
223        if event.log() {
224            tracing::trace!("Handling event {event:?}");
225        }
226        match event {
227            Event::NewDaemonConnection(connection) => {
228                connection.set_nodelay(true)?;
229                let events_tx = daemon_events_tx.clone();
230                if let Some(events_tx) = events_tx {
231                    let task = tokio::spawn(listener::handle_connection(
232                        connection,
233                        events_tx,
234                        clock.clone(),
235                    ));
236                    tasks.push(task);
237                } else {
238                    tracing::warn!(
239                        "ignoring new daemon connection because events_tx was closed already"
240                    );
241                }
242            }
243            Event::DaemonConnectError(err) => {
244                tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
245            }
246            Event::Daemon(event) => match event {
247                DaemonRequest::Register {
248                    machine_id,
249                    mut connection,
250                    version_check_result,
251                } => {
252                    let existing = match &machine_id {
253                        Some(id) => daemon_connections.get_matching_daemon_id(id),
254                        None => daemon_connections.unnamed().next(),
255                    };
256                    let existing_result = if existing.is_some() {
257                        Err(format!(
258                            "There is already a connected daemon with machine ID `{machine_id:?}`"
259                        ))
260                    } else {
261                        Ok(())
262                    };
263
264                    // assign a unique ID to the daemon
265                    let daemon_id = DaemonId::new(machine_id);
266
267                    let reply: Timestamped<RegisterResult> = Timestamped {
268                        inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
269                            Ok(_) => RegisterResult::Ok {
270                                daemon_id: daemon_id.clone(),
271                            },
272                            Err(err) => RegisterResult::Err(err.clone()),
273                        },
274                        timestamp: clock.new_timestamp(),
275                    };
276
277                    let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
278                        .await
279                        .context("tcp send failed");
280                    match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
281                        Ok(()) => {
282                            daemon_connections.add(
283                                daemon_id.clone(),
284                                DaemonConnection {
285                                    stream: connection,
286                                    last_heartbeat: Instant::now(),
287                                },
288                            );
289                        }
290                        Err(err) => {
291                            tracing::warn!(
292                                "failed to register daemon connection for daemon `{daemon_id}`: {err}"
293                            );
294                        }
295                    }
296                }
297            },
298            Event::Dataflow { uuid, event } => match event {
299                DataflowEvent::ReadyOnDaemon {
300                    daemon_id,
301                    exited_before_subscribe,
302                } => {
303                    match running_dataflows.entry(uuid) {
304                        std::collections::hash_map::Entry::Occupied(mut entry) => {
305                            let dataflow = entry.get_mut();
306                            dataflow.pending_daemons.remove(&daemon_id);
307                            dataflow
308                                .exited_before_subscribe
309                                .extend(exited_before_subscribe);
310                            if dataflow.pending_daemons.is_empty() {
311                                tracing::debug!("sending all nodes ready message to daemons");
312                                let message = serde_json::to_vec(&Timestamped {
313                                    inner: DaemonCoordinatorEvent::AllNodesReady {
314                                        dataflow_id: uuid,
315                                        exited_before_subscribe: dataflow
316                                            .exited_before_subscribe
317                                            .clone(),
318                                    },
319                                    timestamp: clock.new_timestamp(),
320                                })
321                                .wrap_err("failed to serialize AllNodesReady message")?;
322
323                                // notify all machines that run parts of the dataflow
324                                for daemon_id in &dataflow.daemons {
325                                    let Some(connection) = daemon_connections.get_mut(daemon_id)
326                                    else {
327                                        tracing::warn!(
328                                            "no daemon connection found for machine `{daemon_id}`"
329                                        );
330                                        continue;
331                                    };
332                                    tcp_send(&mut connection.stream, &message)
333                                        .await
334                                        .wrap_err_with(|| {
335                                            format!(
336                                                "failed to send AllNodesReady({uuid}) message \
337                                            to machine {daemon_id}"
338                                            )
339                                        })?;
340                                }
341                            }
342                        }
343                        std::collections::hash_map::Entry::Vacant(_) => {
344                            tracing::warn!("dataflow not running on ReadyOnMachine");
345                        }
346                    }
347                }
348                DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
349                    tracing::debug!(
350                        "coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})"
351                    );
352                    match running_dataflows.entry(uuid) {
353                        std::collections::hash_map::Entry::Occupied(mut entry) => {
354                            let dataflow = entry.get_mut();
355                            dataflow.daemons.remove(&daemon_id);
356                            tracing::info!(
357                                "removed machine id: {daemon_id} from dataflow: {:#?}",
358                                dataflow.uuid
359                            );
360                            dataflow_results
361                                .entry(uuid)
362                                .or_default()
363                                .insert(daemon_id, result);
364
365                            if dataflow.daemons.is_empty() {
366                                // Archive finished dataflow
367                                archived_dataflows
368                                    .entry(uuid)
369                                    .or_insert_with(|| ArchivedDataflow::from(entry.get()));
370                                let mut finished_dataflow = entry.remove();
371                                let dataflow_id = finished_dataflow.uuid;
372                                send_log_message(
373                                    &mut finished_dataflow.log_subscribers,
374                                    &LogMessage {
375                                        build_id: None,
376                                        dataflow_id: Some(dataflow_id),
377                                        node_id: None,
378                                        daemon_id: None,
379                                        level: LogLevel::Info.into(),
380                                        target: Some("coordinator".into()),
381                                        module_path: None,
382                                        file: None,
383                                        line: None,
384                                        message: "dataflow finished".into(),
385                                    },
386                                )
387                                .await;
388
389                                let reply = ControlRequestReply::DataflowStopped {
390                                    uuid,
391                                    result: dataflow_results
392                                        .get(&uuid)
393                                        .map(|r| dataflow_result(r, uuid, &clock))
394                                        .unwrap_or_else(|| {
395                                            DataflowResult::ok_empty(uuid, clock.new_timestamp())
396                                        }),
397                                };
398                                for sender in finished_dataflow.stop_reply_senders {
399                                    let _ = sender.send(Ok(reply.clone()));
400                                }
401                                if !matches!(
402                                    finished_dataflow.spawn_result,
403                                    CachedResult::Cached { .. }
404                                ) {
405                                    log::error!("pending spawn result on dataflow finish");
406                                }
407                            }
408                        }
409                        std::collections::hash_map::Entry::Vacant(_) => {
410                            tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
411                        }
412                    }
413                }
414            },
415
416            Event::Control(event) => match event {
417                ControlEvent::IncomingRequest {
418                    request,
419                    reply_sender,
420                } => {
421                    match request {
422                        ControlRequest::Build {
423                            session_id,
424                            dataflow,
425                            git_sources,
426                            prev_git_sources,
427                            local_working_dir,
428                            uv,
429                        } => {
430                            // assign a random build id
431                            let build_id = BuildId::generate();
432
433                            let result = build_dataflow(
434                                build_id,
435                                session_id,
436                                dataflow,
437                                git_sources,
438                                prev_git_sources,
439                                local_working_dir,
440                                &clock,
441                                uv,
442                                &mut daemon_connections,
443                            )
444                            .await;
445                            match result {
446                                Ok(build) => {
447                                    running_builds.insert(build_id, build);
448                                    let _ = reply_sender.send(Ok(
449                                        ControlRequestReply::DataflowBuildTriggered { build_id },
450                                    ));
451                                }
452                                Err(err) => {
453                                    let _ = reply_sender.send(Err(err));
454                                }
455                            }
456                        }
457                        ControlRequest::WaitForBuild { build_id } => {
458                            if let Some(build) = running_builds.get_mut(&build_id) {
459                                build.build_result.register(reply_sender);
460                            } else if let Some(result) = finished_builds.get_mut(&build_id) {
461                                result.register(reply_sender);
462                            } else {
463                                let _ =
464                                    reply_sender.send(Err(eyre!("unknown build id {build_id}")));
465                            }
466                        }
467                        ControlRequest::Start {
468                            build_id,
469                            session_id,
470                            dataflow,
471                            name,
472                            local_working_dir,
473                            uv,
474                        } => {
475                            let name = name.or_else(|| petname(2, "-"));
476
477                            let inner = async {
478                                if let Some(name) = name.as_deref() {
479                                    // check that name is unique
480                                    if running_dataflows
481                                        .values()
482                                        .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
483                                    {
484                                        bail!(
485                                            "there is already a running dataflow with name `{name}`"
486                                        );
487                                    }
488                                }
489                                let dataflow = start_dataflow(
490                                    build_id,
491                                    session_id,
492                                    dataflow,
493                                    local_working_dir,
494                                    name,
495                                    &mut daemon_connections,
496                                    &clock,
497                                    uv,
498                                )
499                                .await?;
500                                Ok(dataflow)
501                            };
502                            match inner.await {
503                                Ok(dataflow) => {
504                                    let uuid = dataflow.uuid;
505                                    running_dataflows.insert(uuid, dataflow);
506                                    let _ = reply_sender.send(Ok(
507                                        ControlRequestReply::DataflowStartTriggered { uuid },
508                                    ));
509                                }
510                                Err(err) => {
511                                    let _ = reply_sender.send(Err(err));
512                                }
513                            }
514                        }
515                        ControlRequest::WaitForSpawn { dataflow_id } => {
516                            if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
517                                dataflow.spawn_result.register(reply_sender);
518                            } else {
519                                let _ =
520                                    reply_sender.send(Err(eyre!("unknown dataflow {dataflow_id}")));
521                            }
522                        }
523                        ControlRequest::Check { dataflow_uuid } => {
524                            let status = match &running_dataflows.get(&dataflow_uuid) {
525                                Some(_) => ControlRequestReply::DataflowSpawned {
526                                    uuid: dataflow_uuid,
527                                },
528                                None => ControlRequestReply::DataflowStopped {
529                                    uuid: dataflow_uuid,
530                                    result: dataflow_results
531                                        .get(&dataflow_uuid)
532                                        .map(|r| dataflow_result(r, dataflow_uuid, &clock))
533                                        .unwrap_or_else(|| {
534                                            DataflowResult::ok_empty(
535                                                dataflow_uuid,
536                                                clock.new_timestamp(),
537                                            )
538                                        }),
539                                },
540                            };
541                            let _ = reply_sender.send(Ok(status));
542                        }
543                        ControlRequest::Reload {
544                            dataflow_id,
545                            node_id,
546                            operator_id,
547                        } => {
548                            let reload = async {
549                                reload_dataflow(
550                                    &running_dataflows,
551                                    dataflow_id,
552                                    node_id,
553                                    operator_id,
554                                    &mut daemon_connections,
555                                    clock.new_timestamp(),
556                                )
557                                .await?;
558                                Result::<_, eyre::Report>::Ok(())
559                            };
560                            let reply =
561                                reload
562                                    .await
563                                    .map(|()| ControlRequestReply::DataflowReloaded {
564                                        uuid: dataflow_id,
565                                    });
566                            let _ = reply_sender.send(reply);
567                        }
568                        ControlRequest::Stop {
569                            dataflow_uuid,
570                            grace_duration,
571                        } => {
572                            if let Some(result) = dataflow_results.get(&dataflow_uuid) {
573                                let reply = ControlRequestReply::DataflowStopped {
574                                    uuid: dataflow_uuid,
575                                    result: dataflow_result(result, dataflow_uuid, &clock),
576                                };
577                                let _ = reply_sender.send(Ok(reply));
578
579                                continue;
580                            }
581
582                            let dataflow = stop_dataflow(
583                                &mut running_dataflows,
584                                dataflow_uuid,
585                                &mut daemon_connections,
586                                clock.new_timestamp(),
587                                grace_duration,
588                            )
589                            .await;
590
591                            match dataflow {
592                                Ok(dataflow) => {
593                                    dataflow.stop_reply_senders.push(reply_sender);
594                                }
595                                Err(err) => {
596                                    let _ = reply_sender.send(Err(err));
597                                }
598                            }
599                        }
600                        ControlRequest::StopByName {
601                            name,
602                            grace_duration,
603                        } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
604                            Ok(dataflow_uuid) => {
605                                if let Some(result) = dataflow_results.get(&dataflow_uuid) {
606                                    let reply = ControlRequestReply::DataflowStopped {
607                                        uuid: dataflow_uuid,
608                                        result: dataflow_result(result, dataflow_uuid, &clock),
609                                    };
610                                    let _ = reply_sender.send(Ok(reply));
611
612                                    continue;
613                                }
614
615                                let dataflow = stop_dataflow(
616                                    &mut running_dataflows,
617                                    dataflow_uuid,
618                                    &mut daemon_connections,
619                                    clock.new_timestamp(),
620                                    grace_duration,
621                                )
622                                .await;
623
624                                match dataflow {
625                                    Ok(dataflow) => {
626                                        dataflow.stop_reply_senders.push(reply_sender);
627                                    }
628                                    Err(err) => {
629                                        let _ = reply_sender.send(Err(err));
630                                    }
631                                }
632                            }
633                            Err(err) => {
634                                let _ = reply_sender.send(Err(err));
635                            }
636                        },
637                        ControlRequest::Logs { uuid, name, node } => {
638                            let dataflow_uuid = if let Some(uuid) = uuid {
639                                Ok(uuid)
640                            } else if let Some(name) = name {
641                                resolve_name(name, &running_dataflows, &archived_dataflows)
642                            } else {
643                                Err(eyre!("No uuid"))
644                            };
645
646                            match dataflow_uuid {
647                                Ok(uuid) => {
648                                    let reply = retrieve_logs(
649                                        &running_dataflows,
650                                        &archived_dataflows,
651                                        uuid,
652                                        node.into(),
653                                        &mut daemon_connections,
654                                        clock.new_timestamp(),
655                                    )
656                                    .await
657                                    .map(ControlRequestReply::Logs);
658                                    let _ = reply_sender.send(reply);
659                                }
660                                Err(err) => {
661                                    let _ = reply_sender.send(Err(err));
662                                }
663                            }
664                        }
665                        ControlRequest::Destroy => {
666                            tracing::info!("Received destroy command");
667
668                            let reply = handle_destroy(
669                                &mut running_dataflows,
670                                &mut daemon_connections,
671                                &abort_handle,
672                                &mut daemon_events_tx,
673                                &clock,
674                            )
675                            .await
676                            .map(|()| ControlRequestReply::DestroyOk);
677                            let _ = reply_sender.send(reply);
678                        }
679                        ControlRequest::List => {
680                            let mut dataflows: Vec<_> = running_dataflows.values().collect();
681                            dataflows.sort_by_key(|d| (&d.name, d.uuid));
682
683                            let running = dataflows.into_iter().map(|d| DataflowListEntry {
684                                id: DataflowIdAndName {
685                                    uuid: d.uuid,
686                                    name: d.name.clone(),
687                                },
688                                status: DataflowStatus::Running,
689                            });
690                            let finished_failed =
691                                dataflow_results.iter().map(|(&uuid, results)| {
692                                    let name =
693                                        archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
694                                    let id = DataflowIdAndName { uuid, name };
695                                    let status = if results.values().all(|r| r.is_ok()) {
696                                        DataflowStatus::Finished
697                                    } else {
698                                        DataflowStatus::Failed
699                                    };
700                                    DataflowListEntry { id, status }
701                                });
702
703                            let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
704                                running.chain(finished_failed).collect(),
705                            )));
706                            let _ = reply_sender.send(reply);
707                        }
708                        ControlRequest::DaemonConnected => {
709                            let running = !daemon_connections.is_empty();
710                            let _ = reply_sender
711                                .send(Ok(ControlRequestReply::DaemonConnected(running)));
712                        }
713                        ControlRequest::ConnectedMachines => {
714                            let reply = Ok(ControlRequestReply::ConnectedDaemons(
715                                daemon_connections.keys().cloned().collect(),
716                            ));
717                            let _ = reply_sender.send(reply);
718                        }
719                        ControlRequest::LogSubscribe { .. } => {
720                            let _ = reply_sender.send(Err(eyre::eyre!(
721                                "LogSubscribe request should be handled separately"
722                            )));
723                        }
724                        ControlRequest::BuildLogSubscribe { .. } => {
725                            let _ = reply_sender.send(Err(eyre::eyre!(
726                                "BuildLogSubscribe request should be handled separately"
727                            )));
728                        }
729                        ControlRequest::CliAndDefaultDaemonOnSameMachine => {
730                            let mut default_daemon_ip = None;
731                            if let Some(default_id) = daemon_connections.unnamed().next() {
732                                if let Some(connection) = daemon_connections.get(default_id) {
733                                    if let Ok(addr) = connection.stream.peer_addr() {
734                                        default_daemon_ip = Some(addr.ip());
735                                    }
736                                }
737                            }
738                            let _ = reply_sender.send(Ok(
739                                ControlRequestReply::CliAndDefaultDaemonIps {
740                                    default_daemon: default_daemon_ip,
741                                    cli: None, // filled later
742                                },
743                            ));
744                        }
745                    }
746                }
747                ControlEvent::Error(err) => tracing::error!("{err:?}"),
748                ControlEvent::LogSubscribe {
749                    dataflow_id,
750                    level,
751                    connection,
752                } => {
753                    if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
754                        dataflow
755                            .log_subscribers
756                            .push(LogSubscriber::new(level, connection));
757                        let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
758                        for message in buffered {
759                            send_log_message(&mut dataflow.log_subscribers, &message).await;
760                        }
761                    }
762                }
763                ControlEvent::BuildLogSubscribe {
764                    build_id,
765                    level,
766                    connection,
767                } => {
768                    if let Some(build) = running_builds.get_mut(&build_id) {
769                        build
770                            .log_subscribers
771                            .push(LogSubscriber::new(level, connection));
772                        let buffered = std::mem::take(&mut build.buffered_log_messages);
773                        for message in buffered {
774                            send_log_message(&mut build.log_subscribers, &message).await;
775                        }
776                    }
777                }
778            },
779            Event::DaemonHeartbeatInterval => {
780                let mut disconnected = BTreeSet::new();
781                for (machine_id, connection) in daemon_connections.iter_mut() {
782                    if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
783                        tracing::warn!(
784                            "no heartbeat message from machine `{machine_id}` since {:?}",
785                            connection.last_heartbeat.elapsed()
786                        )
787                    }
788                    if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
789                        disconnected.insert(machine_id.clone());
790                        continue;
791                    }
792                    let result: eyre::Result<()> = tokio::time::timeout(
793                        Duration::from_millis(500),
794                        send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
795                    )
796                    .await
797                    .wrap_err("timeout")
798                    .and_then(|r| r)
799                    .wrap_err_with(|| {
800                        format!("failed to send heartbeat message to daemon at `{machine_id}`")
801                    });
802                    if let Err(err) = result {
803                        tracing::warn!("{err:?}");
804                        disconnected.insert(machine_id.clone());
805                    }
806                }
807                if !disconnected.is_empty() {
808                    tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
809                    for machine_id in disconnected {
810                        daemon_connections.remove(&machine_id);
811                    }
812                }
813            }
814            Event::CtrlC => {
815                tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
816                handle_destroy(
817                    &mut running_dataflows,
818                    &mut daemon_connections,
819                    &abort_handle,
820                    &mut daemon_events_tx,
821                    &clock,
822                )
823                .await?;
824            }
825            Event::DaemonHeartbeat {
826                daemon_id: machine_id,
827            } => {
828                if let Some(connection) = daemon_connections.get_mut(&machine_id) {
829                    connection.last_heartbeat = Instant::now();
830                }
831            }
832            Event::Log(message) => {
833                if let Some(dataflow_id) = &message.dataflow_id {
834                    if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
835                        if dataflow.log_subscribers.is_empty() {
836                            // buffer log message until there are subscribers
837                            dataflow.buffered_log_messages.push(message);
838                        } else {
839                            send_log_message(&mut dataflow.log_subscribers, &message).await;
840                        }
841                    }
842                } else if let Some(build_id) = &message.build_id {
843                    if let Some(build) = running_builds.get_mut(build_id) {
844                        if build.log_subscribers.is_empty() {
845                            // buffer log message until there are subscribers
846                            build.buffered_log_messages.push(message);
847                        } else {
848                            send_log_message(&mut build.log_subscribers, &message).await;
849                        }
850                    }
851                }
852            }
853            Event::DaemonExit { daemon_id } => {
854                tracing::info!("Daemon `{daemon_id}` exited");
855                daemon_connections.remove(&daemon_id);
856            }
857            Event::DataflowBuildResult {
858                build_id,
859                daemon_id,
860                result,
861            } => match running_builds.get_mut(&build_id) {
862                Some(build) => {
863                    build.pending_build_results.remove(&daemon_id);
864                    match result {
865                        Ok(()) => {}
866                        Err(err) => {
867                            build.errors.push(format!("{err:?}"));
868                        }
869                    };
870                    if build.pending_build_results.is_empty() {
871                        tracing::info!("dataflow build finished: `{build_id}`");
872                        let mut build = running_builds.remove(&build_id).unwrap();
873                        let result = if build.errors.is_empty() {
874                            Ok(())
875                        } else {
876                            Err(format!("build failed: {}", build.errors.join("\n\n")))
877                        };
878
879                        build.build_result.set_result(Ok(
880                            ControlRequestReply::DataflowBuildFinished { build_id, result },
881                        ));
882
883                        finished_builds.insert(build_id, build.build_result);
884                    }
885                }
886                None => {
887                    tracing::warn!(
888                        "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
889                    );
890                }
891            },
892            Event::DataflowSpawnResult {
893                dataflow_id,
894                daemon_id,
895                result,
896            } => match running_dataflows.get_mut(&dataflow_id) {
897                Some(dataflow) => {
898                    dataflow.pending_spawn_results.remove(&daemon_id);
899                    match result {
900                        Ok(()) => {
901                            if dataflow.pending_spawn_results.is_empty() {
902                                tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
903                                dataflow.spawn_result.set_result(Ok(
904                                    ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
905                                ));
906                            }
907                        }
908                        Err(err) => {
909                            tracing::warn!("error while spawning dataflow `{dataflow_id}`");
910                            dataflow.spawn_result.set_result(Err(err));
911                        }
912                    };
913                }
914                None => {
915                    tracing::warn!(
916                        "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"
917                    );
918                }
919            },
920        }
921
922        // warn if event handling took too long -> the main loop should never be blocked for too long
923        let elapsed = start.elapsed();
924        if elapsed > Duration::from_millis(100) {
925            tracing::warn!(
926                "Coordinator took {}ms for handling event: {event_kind}",
927                elapsed.as_millis()
928            );
929        }
930    }
931
932    tracing::info!("stopped");
933
934    Ok(())
935}
936
937async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
938    for subscriber in log_subscribers.iter_mut() {
939        let send_result =
940            tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
941
942        if send_result.await.is_err() {
943            subscriber.close();
944        }
945    }
946    log_subscribers.retain(|s| !s.is_closed());
947}
948
949fn dataflow_result(
950    results: &BTreeMap<DaemonId, DataflowDaemonResult>,
951    dataflow_uuid: Uuid,
952    clock: &uhlc::HLC,
953) -> DataflowResult {
954    let mut node_results = BTreeMap::new();
955    for result in results.values() {
956        node_results.extend(result.node_results.clone());
957        if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
958            tracing::warn!("failed to update HLC: {err}");
959        }
960    }
961
962    DataflowResult {
963        uuid: dataflow_uuid,
964        timestamp: clock.new_timestamp(),
965        node_results,
966    }
967}
968
969struct DaemonConnection {
970    stream: TcpStream,
971    last_heartbeat: Instant,
972}
973
974async fn handle_destroy(
975    running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
976    daemon_connections: &mut DaemonConnections,
977    abortable_events: &futures::stream::AbortHandle,
978    daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
979    clock: &HLC,
980) -> Result<(), eyre::ErrReport> {
981    abortable_events.abort();
982    for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
983        let _ = stop_dataflow(
984            running_dataflows,
985            dataflow_uuid,
986            daemon_connections,
987            clock.new_timestamp(),
988            None,
989        )
990        .await?;
991    }
992
993    let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
994    *daemon_events_tx = None;
995    result
996}
997
998async fn send_heartbeat_message(
999    connection: &mut TcpStream,
1000    timestamp: uhlc::Timestamp,
1001) -> eyre::Result<()> {
1002    let message = serde_json::to_vec(&Timestamped {
1003        inner: DaemonCoordinatorEvent::Heartbeat,
1004        timestamp,
1005    })
1006    .context("Could not serialize heartbeat message")?;
1007
1008    tcp_send(connection, &message)
1009        .await
1010        .wrap_err("failed to send heartbeat message to daemon")
1011}
1012
1013struct RunningBuild {
1014    errors: Vec<String>,
1015    build_result: CachedResult,
1016
1017    /// Buffer for log messages that were sent before there were any subscribers.
1018    buffered_log_messages: Vec<LogMessage>,
1019    log_subscribers: Vec<LogSubscriber>,
1020
1021    pending_build_results: BTreeSet<DaemonId>,
1022}
1023
1024struct RunningDataflow {
1025    name: Option<String>,
1026    uuid: Uuid,
1027    /// The IDs of the daemons that the dataflow is running on.
1028    daemons: BTreeSet<DaemonId>,
1029    /// IDs of daemons that are waiting until all nodes are started.
1030    pending_daemons: BTreeSet<DaemonId>,
1031    exited_before_subscribe: Vec<NodeId>,
1032    nodes: BTreeMap<NodeId, ResolvedNode>,
1033
1034    spawn_result: CachedResult,
1035    stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1036
1037    /// Buffer for log messages that were sent before there were any subscribers.
1038    buffered_log_messages: Vec<LogMessage>,
1039    log_subscribers: Vec<LogSubscriber>,
1040
1041    pending_spawn_results: BTreeSet<DaemonId>,
1042}
1043
1044pub enum CachedResult {
1045    Pending {
1046        result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1047    },
1048    Cached {
1049        result: eyre::Result<ControlRequestReply>,
1050    },
1051}
1052
1053impl Default for CachedResult {
1054    fn default() -> Self {
1055        Self::Pending {
1056            result_senders: Vec::new(),
1057        }
1058    }
1059}
1060
1061impl CachedResult {
1062    fn register(
1063        &mut self,
1064        reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
1065    ) {
1066        match self {
1067            CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
1068            CachedResult::Cached { result } => {
1069                Self::send_result_to(result, reply_sender);
1070            }
1071        }
1072    }
1073
1074    fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
1075        match self {
1076            CachedResult::Pending { result_senders } => {
1077                for sender in result_senders.drain(..) {
1078                    Self::send_result_to(&result, sender);
1079                }
1080                *self = CachedResult::Cached { result };
1081            }
1082            CachedResult::Cached { .. } => {}
1083        }
1084    }
1085
1086    fn send_result_to(
1087        result: &eyre::Result<ControlRequestReply>,
1088        sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
1089    ) {
1090        let result = match result {
1091            Ok(r) => Ok(r.clone()),
1092            Err(err) => Err(eyre!("{err:?}")),
1093        };
1094        let _ = sender.send(result);
1095    }
1096}
1097
1098struct ArchivedDataflow {
1099    name: Option<String>,
1100    nodes: BTreeMap<NodeId, ResolvedNode>,
1101}
1102
1103impl From<&RunningDataflow> for ArchivedDataflow {
1104    fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
1105        ArchivedDataflow {
1106            name: dataflow.name.clone(),
1107            nodes: dataflow.nodes.clone(),
1108        }
1109    }
1110}
1111
1112impl PartialEq for RunningDataflow {
1113    fn eq(&self, other: &Self) -> bool {
1114        self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
1115    }
1116}
1117
1118impl Eq for RunningDataflow {}
1119
1120async fn stop_dataflow<'a>(
1121    running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
1122    dataflow_uuid: Uuid,
1123    daemon_connections: &mut DaemonConnections,
1124    timestamp: uhlc::Timestamp,
1125    grace_duration: Option<Duration>,
1126) -> eyre::Result<&'a mut RunningDataflow> {
1127    let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
1128        bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
1129    };
1130
1131    let message = serde_json::to_vec(&Timestamped {
1132        inner: DaemonCoordinatorEvent::StopDataflow {
1133            dataflow_id: dataflow_uuid,
1134            grace_duration,
1135        },
1136        timestamp,
1137    })?;
1138
1139    for daemon_id in &dataflow.daemons {
1140        let daemon_connection = daemon_connections
1141            .get_mut(daemon_id)
1142            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
1143        tcp_send(&mut daemon_connection.stream, &message)
1144            .await
1145            .wrap_err("failed to send stop message to daemon")?;
1146
1147        // wait for reply
1148        let reply_raw = tcp_receive(&mut daemon_connection.stream)
1149            .await
1150            .wrap_err("failed to receive stop reply from daemon")?;
1151        match serde_json::from_slice(&reply_raw)
1152            .wrap_err("failed to deserialize stop reply from daemon")?
1153        {
1154            DaemonCoordinatorReply::StopResult(result) => result
1155                .map_err(|e| eyre!(e))
1156                .wrap_err("failed to stop dataflow")?,
1157            other => bail!("unexpected reply after sending stop: {other:?}"),
1158        }
1159    }
1160
1161    tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
1162
1163    Ok(dataflow)
1164}
1165
1166async fn reload_dataflow(
1167    running_dataflows: &HashMap<Uuid, RunningDataflow>,
1168    dataflow_id: Uuid,
1169    node_id: NodeId,
1170    operator_id: Option<OperatorId>,
1171    daemon_connections: &mut DaemonConnections,
1172    timestamp: uhlc::Timestamp,
1173) -> eyre::Result<()> {
1174    let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
1175        bail!("No running dataflow found with UUID `{dataflow_id}`")
1176    };
1177    let message = serde_json::to_vec(&Timestamped {
1178        inner: DaemonCoordinatorEvent::ReloadDataflow {
1179            dataflow_id,
1180            node_id,
1181            operator_id,
1182        },
1183        timestamp,
1184    })?;
1185
1186    for machine_id in &dataflow.daemons {
1187        let daemon_connection = daemon_connections
1188            .get_mut(machine_id)
1189            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
1190        tcp_send(&mut daemon_connection.stream, &message)
1191            .await
1192            .wrap_err("failed to send reload message to daemon")?;
1193
1194        // wait for reply
1195        let reply_raw = tcp_receive(&mut daemon_connection.stream)
1196            .await
1197            .wrap_err("failed to receive reload reply from daemon")?;
1198        match serde_json::from_slice(&reply_raw)
1199            .wrap_err("failed to deserialize reload reply from daemon")?
1200        {
1201            DaemonCoordinatorReply::ReloadResult(result) => result
1202                .map_err(|e| eyre!(e))
1203                .wrap_err("failed to reload dataflow")?,
1204            other => bail!("unexpected reply after sending reload: {other:?}"),
1205        }
1206    }
1207    tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
1208
1209    Ok(())
1210}
1211
1212async fn retrieve_logs(
1213    running_dataflows: &HashMap<Uuid, RunningDataflow>,
1214    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
1215    dataflow_id: Uuid,
1216    node_id: NodeId,
1217    daemon_connections: &mut DaemonConnections,
1218    timestamp: uhlc::Timestamp,
1219) -> eyre::Result<Vec<u8>> {
1220    let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
1221        dataflow.nodes.clone()
1222    } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
1223        dataflow.nodes.clone()
1224    } else {
1225        bail!("No dataflow found with UUID `{dataflow_id}`")
1226    };
1227
1228    let message = serde_json::to_vec(&Timestamped {
1229        inner: DaemonCoordinatorEvent::Logs {
1230            dataflow_id,
1231            node_id: node_id.clone(),
1232        },
1233        timestamp,
1234    })?;
1235
1236    let machine_ids: Vec<Option<String>> = nodes
1237        .values()
1238        .filter(|node| node.id == node_id)
1239        .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
1240        .collect();
1241
1242    let machine_id = if let [machine_id] = &machine_ids[..] {
1243        machine_id
1244    } else if machine_ids.is_empty() {
1245        bail!("No machine contains {}/{}", dataflow_id, node_id)
1246    } else {
1247        bail!(
1248            "More than one machine contains {}/{}. However, it should only be present on one.",
1249            dataflow_id,
1250            node_id
1251        )
1252    };
1253
1254    let daemon_ids: Vec<_> = match machine_id {
1255        None => daemon_connections.unnamed().collect(),
1256        Some(machine_id) => daemon_connections
1257            .get_matching_daemon_id(machine_id)
1258            .into_iter()
1259            .collect(),
1260    };
1261    let daemon_id = match &daemon_ids[..] {
1262        [id] => (*id).clone(),
1263        [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
1264        _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
1265    };
1266    let daemon_connection = daemon_connections
1267        .get_mut(&daemon_id)
1268        .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
1269    tcp_send(&mut daemon_connection.stream, &message)
1270        .await
1271        .wrap_err("failed to send logs message to daemon")?;
1272
1273    // wait for reply
1274    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1275        .await
1276        .wrap_err("failed to retrieve logs reply from daemon")?;
1277    let reply_logs = match serde_json::from_slice(&reply_raw)
1278        .wrap_err("failed to deserialize logs reply from daemon")?
1279    {
1280        DaemonCoordinatorReply::Logs(logs) => logs,
1281        other => bail!("unexpected reply after sending logs: {other:?}"),
1282    };
1283    tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
1284
1285    reply_logs.map_err(|err| eyre!(err))
1286}
1287
1288#[allow(clippy::too_many_arguments)]
1289#[tracing::instrument(skip(daemon_connections, clock))]
1290async fn build_dataflow(
1291    build_id: BuildId,
1292    session_id: SessionId,
1293    dataflow: Descriptor,
1294    git_sources: BTreeMap<NodeId, GitSource>,
1295    prev_git_sources: BTreeMap<NodeId, GitSource>,
1296    local_working_dir: Option<PathBuf>,
1297    clock: &HLC,
1298    uv: bool,
1299    daemon_connections: &mut DaemonConnections,
1300) -> eyre::Result<RunningBuild> {
1301    let nodes = dataflow.resolve_aliases_and_set_defaults()?;
1302
1303    let mut git_sources_by_daemon = git_sources
1304        .into_iter()
1305        .into_grouping_map_by(|(id, _)| {
1306            nodes
1307                .get(id)
1308                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1309        })
1310        .collect();
1311    let mut prev_git_sources_by_daemon = prev_git_sources
1312        .into_iter()
1313        .into_grouping_map_by(|(id, _)| {
1314            nodes
1315                .get(id)
1316                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1317        })
1318        .collect();
1319
1320    let nodes_by_daemon = nodes
1321        .values()
1322        .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
1323
1324    let mut daemons = BTreeSet::new();
1325    for (machine, nodes_on_machine) in &nodes_by_daemon {
1326        let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
1327        tracing::debug!(
1328            "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
1329        );
1330
1331        let build_command = BuildDataflowNodes {
1332            build_id,
1333            session_id,
1334            local_working_dir: local_working_dir.clone(),
1335            git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
1336            prev_git_sources: prev_git_sources_by_daemon
1337                .remove(machine)
1338                .unwrap_or_default(),
1339            dataflow_descriptor: dataflow.clone(),
1340            nodes_on_machine,
1341            uv,
1342        };
1343        let message = serde_json::to_vec(&Timestamped {
1344            inner: DaemonCoordinatorEvent::Build(build_command),
1345            timestamp: clock.new_timestamp(),
1346        })?;
1347
1348        let daemon_id =
1349            build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
1350                .await
1351                .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
1352        daemons.insert(daemon_id);
1353    }
1354
1355    tracing::info!("successfully triggered dataflow build `{build_id}`",);
1356
1357    Ok(RunningBuild {
1358        errors: Vec::new(),
1359        build_result: CachedResult::default(),
1360        buffered_log_messages: Vec::new(),
1361        log_subscribers: Vec::new(),
1362        pending_build_results: daemons,
1363    })
1364}
1365
1366async fn build_dataflow_on_machine(
1367    daemon_connections: &mut DaemonConnections,
1368    machine: Option<&str>,
1369    message: &[u8],
1370) -> Result<DaemonId, eyre::ErrReport> {
1371    let daemon_id = match machine {
1372        Some(machine) => daemon_connections
1373            .get_matching_daemon_id(machine)
1374            .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1375            .clone(),
1376        None => daemon_connections
1377            .unnamed()
1378            .next()
1379            .wrap_err("no unnamed daemon connections")?
1380            .clone(),
1381    };
1382
1383    let daemon_connection = daemon_connections
1384        .get_mut(&daemon_id)
1385        .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
1386    tcp_send(&mut daemon_connection.stream, message)
1387        .await
1388        .wrap_err("failed to send build message to daemon")?;
1389
1390    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1391        .await
1392        .wrap_err("failed to receive build reply from daemon")?;
1393    match serde_json::from_slice(&reply_raw)
1394        .wrap_err("failed to deserialize build reply from daemon")?
1395    {
1396        DaemonCoordinatorReply::TriggerBuildResult(result) => result
1397            .map_err(|e| eyre!(e))
1398            .wrap_err("daemon returned an error")?,
1399        _ => bail!("unexpected reply"),
1400    }
1401    Ok(daemon_id)
1402}
1403
1404#[allow(clippy::too_many_arguments)]
1405async fn start_dataflow(
1406    build_id: Option<BuildId>,
1407    session_id: SessionId,
1408    dataflow: Descriptor,
1409    local_working_dir: Option<PathBuf>,
1410    name: Option<String>,
1411    daemon_connections: &mut DaemonConnections,
1412    clock: &HLC,
1413    uv: bool,
1414) -> eyre::Result<RunningDataflow> {
1415    let SpawnedDataflow {
1416        uuid,
1417        daemons,
1418        nodes,
1419    } = spawn_dataflow(
1420        build_id,
1421        session_id,
1422        dataflow,
1423        local_working_dir,
1424        daemon_connections,
1425        clock,
1426        uv,
1427    )
1428    .await?;
1429    Ok(RunningDataflow {
1430        uuid,
1431        name,
1432        pending_daemons: if daemons.len() > 1 {
1433            daemons.clone()
1434        } else {
1435            BTreeSet::new()
1436        },
1437        exited_before_subscribe: Default::default(),
1438        daemons: daemons.clone(),
1439        nodes,
1440        spawn_result: CachedResult::default(),
1441        stop_reply_senders: Vec::new(),
1442        buffered_log_messages: Vec::new(),
1443        log_subscribers: Vec::new(),
1444        pending_spawn_results: daemons,
1445    })
1446}
1447
1448async fn destroy_daemon(
1449    daemon_id: DaemonId,
1450    mut daemon_connection: DaemonConnection,
1451
1452    timestamp: uhlc::Timestamp,
1453) -> Result<()> {
1454    let message = serde_json::to_vec(&Timestamped {
1455        inner: DaemonCoordinatorEvent::Destroy,
1456        timestamp,
1457    })?;
1458
1459    tcp_send(&mut daemon_connection.stream, &message)
1460        .await
1461        .wrap_err(format!(
1462            "failed to send destroy message to daemon `{daemon_id}`"
1463        ))?;
1464
1465    // wait for reply
1466    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1467        .await
1468        .wrap_err("failed to receive destroy reply from daemon")?;
1469    match serde_json::from_slice(&reply_raw)
1470        .wrap_err("failed to deserialize destroy reply from daemon")?
1471    {
1472        DaemonCoordinatorReply::DestroyResult { result, .. } => result
1473            .map_err(|e| eyre!(e))
1474            .wrap_err("failed to destroy dataflow")?,
1475        other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1476    }
1477
1478    tracing::info!("successfully destroyed daemon `{daemon_id}`");
1479    Ok(())
1480}
1481
1482async fn destroy_daemons(
1483    daemon_connections: &mut DaemonConnections,
1484    timestamp: uhlc::Timestamp,
1485) -> eyre::Result<()> {
1486    let futures = daemon_connections
1487        .drain()
1488        .map(|(daemon_id, daemon_connection)| {
1489            destroy_daemon(daemon_id, daemon_connection, timestamp)
1490        })
1491        .collect::<Vec<_>>();
1492    let results: Vec<std::result::Result<(), eyre::Error>> =
1493        join_all(futures).await.into_iter().collect::<Vec<_>>();
1494    for result in results {
1495        result?;
1496    }
1497    Ok(())
1498}
1499
1500#[derive(Debug)]
1501pub enum Event {
1502    NewDaemonConnection(TcpStream),
1503    DaemonConnectError(eyre::Report),
1504    DaemonHeartbeat {
1505        daemon_id: DaemonId,
1506    },
1507    Dataflow {
1508        uuid: Uuid,
1509        event: DataflowEvent,
1510    },
1511    Control(ControlEvent),
1512    Daemon(DaemonRequest),
1513    DaemonHeartbeatInterval,
1514    CtrlC,
1515    Log(LogMessage),
1516    DaemonExit {
1517        daemon_id: dora_message::common::DaemonId,
1518    },
1519    DataflowBuildResult {
1520        build_id: BuildId,
1521        daemon_id: DaemonId,
1522        result: eyre::Result<()>,
1523    },
1524    DataflowSpawnResult {
1525        dataflow_id: uuid::Uuid,
1526        daemon_id: DaemonId,
1527        result: eyre::Result<()>,
1528    },
1529}
1530
1531impl Event {
1532    /// Whether this event should be logged.
1533    #[allow(clippy::match_like_matches_macro)]
1534    pub fn log(&self) -> bool {
1535        match self {
1536            Event::DaemonHeartbeatInterval => false,
1537            _ => true,
1538        }
1539    }
1540
1541    fn kind(&self) -> &'static str {
1542        match self {
1543            Event::NewDaemonConnection(_) => "NewDaemonConnection",
1544            Event::DaemonConnectError(_) => "DaemonConnectError",
1545            Event::DaemonHeartbeat { .. } => "DaemonHeartbeat",
1546            Event::Dataflow { .. } => "Dataflow",
1547            Event::Control(_) => "Control",
1548            Event::Daemon(_) => "Daemon",
1549            Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1550            Event::CtrlC => "CtrlC",
1551            Event::Log(_) => "Log",
1552            Event::DaemonExit { .. } => "DaemonExit",
1553            Event::DataflowBuildResult { .. } => "DataflowBuildResult",
1554            Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
1555        }
1556    }
1557}
1558
1559#[derive(Debug)]
1560pub enum DataflowEvent {
1561    DataflowFinishedOnDaemon {
1562        daemon_id: DaemonId,
1563        result: DataflowDaemonResult,
1564    },
1565    ReadyOnDaemon {
1566        daemon_id: DaemonId,
1567        exited_before_subscribe: Vec<NodeId>,
1568    },
1569}
1570
1571#[derive(Debug)]
1572pub enum DaemonRequest {
1573    Register {
1574        version_check_result: Result<(), String>,
1575        machine_id: Option<String>,
1576        connection: TcpStream,
1577    },
1578}
1579
1580fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1581    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1582
1583    let mut ctrlc_sent = false;
1584    ctrlc::set_handler(move || {
1585        if ctrlc_sent {
1586            tracing::warn!("received second ctrlc signal -> aborting immediately");
1587            std::process::abort();
1588        } else {
1589            tracing::info!("received ctrlc signal");
1590            if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1591                tracing::error!("failed to report ctrl-c event to dora-coordinator");
1592            }
1593
1594            ctrlc_sent = true;
1595        }
1596    })
1597    .wrap_err("failed to set ctrl-c handler")?;
1598
1599    Ok(ReceiverStream::new(ctrlc_rx))
1600}