dora_coordinator/
lib.rs

1use crate::{
2    run::spawn_dataflow,
3    tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7    config::{NodeId, OperatorId},
8    descriptor::DescriptorExt,
9    uhlc::{self, HLC},
10};
11use dora_message::{
12    cli_to_coordinator::ControlRequest,
13    common::{DaemonId, GitSource},
14    coordinator_to_cli::{
15        ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
16        DataflowStatus, LogLevel, LogMessage,
17    },
18    coordinator_to_daemon::{
19        BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
20    },
21    daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
22    descriptor::{Descriptor, ResolvedNode},
23    BuildId, DataflowId, SessionId,
24};
25use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
26use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
27use futures_concurrency::stream::Merge;
28use itertools::Itertools;
29use log_subscriber::LogSubscriber;
30use run::SpawnedDataflow;
31use std::{
32    collections::{BTreeMap, BTreeSet, HashMap},
33    net::SocketAddr,
34    path::PathBuf,
35    sync::Arc,
36    time::{Duration, Instant},
37};
38use tokio::{
39    net::TcpStream,
40    sync::{mpsc, oneshot},
41    task::JoinHandle,
42};
43use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
44use uuid::Uuid;
45
46mod control;
47mod listener;
48mod log_subscriber;
49mod run;
50mod tcp_utils;
51
52pub async fn start(
53    bind: SocketAddr,
54    bind_control: SocketAddr,
55    external_events: impl Stream<Item = Event> + Unpin,
56) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
57    let listener = listener::create_listener(bind).await?;
58    let port = listener
59        .local_addr()
60        .wrap_err("failed to get local addr of listener")?
61        .port();
62    let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
63        c.map(Event::NewDaemonConnection)
64            .wrap_err("failed to open connection")
65            .unwrap_or_else(Event::DaemonConnectError)
66    });
67
68    let mut tasks = FuturesUnordered::new();
69    let control_events = control::control_events(bind_control, &tasks)
70        .await
71        .wrap_err("failed to create control events")?;
72
73    // Setup ctrl-c handler
74    let ctrlc_events = set_up_ctrlc_handler()?;
75
76    let events = (
77        external_events,
78        new_daemon_connections,
79        control_events,
80        ctrlc_events,
81    )
82        .merge();
83
84    let future = async move {
85        start_inner(events, &tasks).await?;
86
87        tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
88        while let Some(join_result) = tasks.next().await {
89            if let Err(err) = join_result {
90                tracing::error!("task panicked: {err}");
91            }
92        }
93        tracing::debug!("all spawned tasks finished, exiting..");
94        Ok(())
95    };
96    Ok((port, future))
97}
98
99// Resolve the dataflow name.
100fn resolve_name(
101    name: String,
102    running_dataflows: &HashMap<Uuid, RunningDataflow>,
103    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
104) -> eyre::Result<Uuid> {
105    let uuids: Vec<_> = running_dataflows
106        .iter()
107        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
108        .map(|(k, _)| k)
109        .copied()
110        .collect();
111    let archived_uuids: Vec<_> = archived_dataflows
112        .iter()
113        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
114        .map(|(k, _)| k)
115        .copied()
116        .collect();
117
118    if uuids.is_empty() {
119        if archived_uuids.is_empty() {
120            bail!("no dataflow with name `{name}`");
121        } else if let [uuid] = archived_uuids.as_slice() {
122            Ok(*uuid)
123        } else {
124            // TODO: Index the archived dataflows in order to return logs based on the index.
125            bail!("multiple archived dataflows found with name `{name}`, Please provide the UUID instead.");
126        }
127    } else if let [uuid] = uuids.as_slice() {
128        Ok(*uuid)
129    } else {
130        bail!("multiple dataflows found with name `{name}`");
131    }
132}
133
134#[derive(Default)]
135struct DaemonConnections {
136    daemons: BTreeMap<DaemonId, DaemonConnection>,
137}
138
139impl DaemonConnections {
140    fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
141        let previous = self.daemons.insert(daemon_id.clone(), connection);
142        if previous.is_some() {
143            tracing::info!("closing previous connection `{daemon_id}` on new register");
144        }
145    }
146
147    fn get(&self, id: &DaemonId) -> Option<&DaemonConnection> {
148        self.daemons.get(id)
149    }
150
151    fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
152        self.daemons.get_mut(id)
153    }
154
155    fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
156        self.daemons
157            .keys()
158            .find(|id| id.matches_machine_id(machine_id))
159    }
160
161    fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
162        std::mem::take(&mut self.daemons).into_iter()
163    }
164
165    fn is_empty(&self) -> bool {
166        self.daemons.is_empty()
167    }
168
169    fn keys(&self) -> impl Iterator<Item = &DaemonId> {
170        self.daemons.keys()
171    }
172
173    fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
174        self.daemons.iter_mut()
175    }
176
177    fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
178        self.daemons.remove(daemon_id)
179    }
180
181    fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
182        self.daemons.keys().filter(|id| id.machine_id().is_none())
183    }
184}
185
186async fn start_inner(
187    events: impl Stream<Item = Event> + Unpin,
188    tasks: &FuturesUnordered<JoinHandle<()>>,
189) -> eyre::Result<()> {
190    let clock = Arc::new(HLC::default());
191
192    let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
193    let mut daemon_events_tx = Some(daemon_events_tx);
194    let daemon_events = ReceiverStream::new(daemon_events);
195
196    let daemon_heartbeat_interval =
197        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
198            .map(|_| Event::DaemonHeartbeatInterval);
199
200    // events that should be aborted on `dora destroy`
201    let (abortable_events, abort_handle) =
202        futures::stream::abortable((events, daemon_heartbeat_interval).merge());
203
204    let mut events = (abortable_events, daemon_events).merge();
205
206    let mut running_builds: HashMap<BuildId, RunningBuild> = HashMap::new();
207    let mut finished_builds: HashMap<BuildId, CachedResult> = HashMap::new();
208
209    let mut running_dataflows: HashMap<DataflowId, RunningDataflow> = HashMap::new();
210    let mut dataflow_results: HashMap<DataflowId, BTreeMap<DaemonId, DataflowDaemonResult>> =
211        HashMap::new();
212    let mut archived_dataflows: HashMap<DataflowId, ArchivedDataflow> = HashMap::new();
213    let mut daemon_connections = DaemonConnections::default();
214
215    while let Some(event) = events.next().await {
216        // used below for measuring the event handling duration
217        let start = Instant::now();
218        let event_kind = event.kind();
219
220        if event.log() {
221            tracing::trace!("Handling event {event:?}");
222        }
223        match event {
224            Event::NewDaemonConnection(connection) => {
225                connection.set_nodelay(true)?;
226                let events_tx = daemon_events_tx.clone();
227                if let Some(events_tx) = events_tx {
228                    let task = tokio::spawn(listener::handle_connection(
229                        connection,
230                        events_tx,
231                        clock.clone(),
232                    ));
233                    tasks.push(task);
234                } else {
235                    tracing::warn!(
236                        "ignoring new daemon connection because events_tx was closed already"
237                    );
238                }
239            }
240            Event::DaemonConnectError(err) => {
241                tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
242            }
243            Event::Daemon(event) => match event {
244                DaemonRequest::Register {
245                    machine_id,
246                    mut connection,
247                    version_check_result,
248                } => {
249                    let existing = match &machine_id {
250                        Some(id) => daemon_connections.get_matching_daemon_id(id),
251                        None => daemon_connections.unnamed().next(),
252                    };
253                    let existing_result = if existing.is_some() {
254                        Err(format!(
255                            "There is already a connected daemon with machine ID `{machine_id:?}`"
256                        ))
257                    } else {
258                        Ok(())
259                    };
260
261                    // assign a unique ID to the daemon
262                    let daemon_id = DaemonId::new(machine_id);
263
264                    let reply: Timestamped<RegisterResult> = Timestamped {
265                        inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
266                            Ok(_) => RegisterResult::Ok {
267                                daemon_id: daemon_id.clone(),
268                            },
269                            Err(err) => RegisterResult::Err(err.clone()),
270                        },
271                        timestamp: clock.new_timestamp(),
272                    };
273
274                    let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
275                        .await
276                        .context("tcp send failed");
277                    match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
278                        Ok(()) => {
279                            daemon_connections.add(
280                                daemon_id.clone(),
281                                DaemonConnection {
282                                    stream: connection,
283                                    last_heartbeat: Instant::now(),
284                                },
285                            );
286                        }
287                        Err(err) => {
288                            tracing::warn!("failed to register daemon connection for daemon `{daemon_id}`: {err}");
289                        }
290                    }
291                }
292            },
293            Event::Dataflow { uuid, event } => match event {
294                DataflowEvent::ReadyOnDaemon {
295                    daemon_id,
296                    exited_before_subscribe,
297                } => {
298                    match running_dataflows.entry(uuid) {
299                        std::collections::hash_map::Entry::Occupied(mut entry) => {
300                            let dataflow = entry.get_mut();
301                            dataflow.pending_daemons.remove(&daemon_id);
302                            dataflow
303                                .exited_before_subscribe
304                                .extend(exited_before_subscribe);
305                            if dataflow.pending_daemons.is_empty() {
306                                tracing::debug!("sending all nodes ready message to daemons");
307                                let message = serde_json::to_vec(&Timestamped {
308                                    inner: DaemonCoordinatorEvent::AllNodesReady {
309                                        dataflow_id: uuid,
310                                        exited_before_subscribe: dataflow
311                                            .exited_before_subscribe
312                                            .clone(),
313                                    },
314                                    timestamp: clock.new_timestamp(),
315                                })
316                                .wrap_err("failed to serialize AllNodesReady message")?;
317
318                                // notify all machines that run parts of the dataflow
319                                for daemon_id in &dataflow.daemons {
320                                    let Some(connection) = daemon_connections.get_mut(daemon_id)
321                                    else {
322                                        tracing::warn!(
323                                            "no daemon connection found for machine `{daemon_id}`"
324                                        );
325                                        continue;
326                                    };
327                                    tcp_send(&mut connection.stream, &message)
328                                        .await
329                                        .wrap_err_with(|| {
330                                            format!(
331                                                "failed to send AllNodesReady({uuid}) message \
332                                            to machine {daemon_id}"
333                                            )
334                                        })?;
335                                }
336                            }
337                        }
338                        std::collections::hash_map::Entry::Vacant(_) => {
339                            tracing::warn!("dataflow not running on ReadyOnMachine");
340                        }
341                    }
342                }
343                DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
344                    tracing::debug!("coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})");
345                    match running_dataflows.entry(uuid) {
346                        std::collections::hash_map::Entry::Occupied(mut entry) => {
347                            let dataflow = entry.get_mut();
348                            dataflow.daemons.remove(&daemon_id);
349                            tracing::info!(
350                                "removed machine id: {daemon_id} from dataflow: {:#?}",
351                                dataflow.uuid
352                            );
353                            dataflow_results
354                                .entry(uuid)
355                                .or_default()
356                                .insert(daemon_id, result);
357
358                            if dataflow.daemons.is_empty() {
359                                // Archive finished dataflow
360                                archived_dataflows
361                                    .entry(uuid)
362                                    .or_insert_with(|| ArchivedDataflow::from(entry.get()));
363                                let mut finished_dataflow = entry.remove();
364                                let dataflow_id = finished_dataflow.uuid;
365                                send_log_message(
366                                    &mut finished_dataflow.log_subscribers,
367                                    &LogMessage {
368                                        build_id: None,
369                                        dataflow_id: Some(dataflow_id),
370                                        node_id: None,
371                                        daemon_id: None,
372                                        level: LogLevel::Info.into(),
373                                        target: Some("coordinator".into()),
374                                        module_path: None,
375                                        file: None,
376                                        line: None,
377                                        message: "dataflow finished".into(),
378                                    },
379                                )
380                                .await;
381
382                                let reply = ControlRequestReply::DataflowStopped {
383                                    uuid,
384                                    result: dataflow_results
385                                        .get(&uuid)
386                                        .map(|r| dataflow_result(r, uuid, &clock))
387                                        .unwrap_or_else(|| {
388                                            DataflowResult::ok_empty(uuid, clock.new_timestamp())
389                                        }),
390                                };
391                                for sender in finished_dataflow.stop_reply_senders {
392                                    let _ = sender.send(Ok(reply.clone()));
393                                }
394                                if !matches!(
395                                    finished_dataflow.spawn_result,
396                                    CachedResult::Cached { .. }
397                                ) {
398                                    log::error!("pending spawn result on dataflow finish");
399                                }
400                            }
401                        }
402                        std::collections::hash_map::Entry::Vacant(_) => {
403                            tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
404                        }
405                    }
406                }
407            },
408
409            Event::Control(event) => match event {
410                ControlEvent::IncomingRequest {
411                    request,
412                    reply_sender,
413                } => {
414                    match request {
415                        ControlRequest::Build {
416                            session_id,
417                            dataflow,
418                            git_sources,
419                            prev_git_sources,
420                            local_working_dir,
421                            uv,
422                        } => {
423                            // assign a random build id
424                            let build_id = BuildId::generate();
425
426                            let result = build_dataflow(
427                                build_id,
428                                session_id,
429                                dataflow,
430                                git_sources,
431                                prev_git_sources,
432                                local_working_dir,
433                                &clock,
434                                uv,
435                                &mut daemon_connections,
436                            )
437                            .await;
438                            match result {
439                                Ok(build) => {
440                                    running_builds.insert(build_id, build);
441                                    let _ = reply_sender.send(Ok(
442                                        ControlRequestReply::DataflowBuildTriggered { build_id },
443                                    ));
444                                }
445                                Err(err) => {
446                                    let _ = reply_sender.send(Err(err));
447                                }
448                            }
449                        }
450                        ControlRequest::WaitForBuild { build_id } => {
451                            if let Some(build) = running_builds.get_mut(&build_id) {
452                                build.build_result.register(reply_sender);
453                            } else if let Some(result) = finished_builds.get_mut(&build_id) {
454                                result.register(reply_sender);
455                            } else {
456                                let _ =
457                                    reply_sender.send(Err(eyre!("unknown build id {build_id}")));
458                            }
459                        }
460                        ControlRequest::Start {
461                            build_id,
462                            session_id,
463                            dataflow,
464                            name,
465                            local_working_dir,
466                            uv,
467                        } => {
468                            let name = name.or_else(|| names::Generator::default().next());
469
470                            let inner = async {
471                                if let Some(name) = name.as_deref() {
472                                    // check that name is unique
473                                    if running_dataflows
474                                        .values()
475                                        .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
476                                    {
477                                        bail!("there is already a running dataflow with name `{name}`");
478                                    }
479                                }
480                                let dataflow = start_dataflow(
481                                    build_id,
482                                    session_id,
483                                    dataflow,
484                                    local_working_dir,
485                                    name,
486                                    &mut daemon_connections,
487                                    &clock,
488                                    uv,
489                                )
490                                .await?;
491                                Ok(dataflow)
492                            };
493                            match inner.await {
494                                Ok(dataflow) => {
495                                    let uuid = dataflow.uuid;
496                                    running_dataflows.insert(uuid, dataflow);
497                                    let _ = reply_sender.send(Ok(
498                                        ControlRequestReply::DataflowStartTriggered { uuid },
499                                    ));
500                                }
501                                Err(err) => {
502                                    let _ = reply_sender.send(Err(err));
503                                }
504                            }
505                        }
506                        ControlRequest::WaitForSpawn { dataflow_id } => {
507                            if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
508                                dataflow.spawn_result.register(reply_sender);
509                            } else {
510                                let _ =
511                                    reply_sender.send(Err(eyre!("unknown dataflow {dataflow_id}")));
512                            }
513                        }
514                        ControlRequest::Check { dataflow_uuid } => {
515                            let status = match &running_dataflows.get(&dataflow_uuid) {
516                                Some(_) => ControlRequestReply::DataflowSpawned {
517                                    uuid: dataflow_uuid,
518                                },
519                                None => ControlRequestReply::DataflowStopped {
520                                    uuid: dataflow_uuid,
521                                    result: dataflow_results
522                                        .get(&dataflow_uuid)
523                                        .map(|r| dataflow_result(r, dataflow_uuid, &clock))
524                                        .unwrap_or_else(|| {
525                                            DataflowResult::ok_empty(
526                                                dataflow_uuid,
527                                                clock.new_timestamp(),
528                                            )
529                                        }),
530                                },
531                            };
532                            let _ = reply_sender.send(Ok(status));
533                        }
534                        ControlRequest::Reload {
535                            dataflow_id,
536                            node_id,
537                            operator_id,
538                        } => {
539                            let reload = async {
540                                reload_dataflow(
541                                    &running_dataflows,
542                                    dataflow_id,
543                                    node_id,
544                                    operator_id,
545                                    &mut daemon_connections,
546                                    clock.new_timestamp(),
547                                )
548                                .await?;
549                                Result::<_, eyre::Report>::Ok(())
550                            };
551                            let reply =
552                                reload
553                                    .await
554                                    .map(|()| ControlRequestReply::DataflowReloaded {
555                                        uuid: dataflow_id,
556                                    });
557                            let _ = reply_sender.send(reply);
558                        }
559                        ControlRequest::Stop {
560                            dataflow_uuid,
561                            grace_duration,
562                        } => {
563                            if let Some(result) = dataflow_results.get(&dataflow_uuid) {
564                                let reply = ControlRequestReply::DataflowStopped {
565                                    uuid: dataflow_uuid,
566                                    result: dataflow_result(result, dataflow_uuid, &clock),
567                                };
568                                let _ = reply_sender.send(Ok(reply));
569
570                                continue;
571                            }
572
573                            let dataflow = stop_dataflow(
574                                &mut running_dataflows,
575                                dataflow_uuid,
576                                &mut daemon_connections,
577                                clock.new_timestamp(),
578                                grace_duration,
579                            )
580                            .await;
581
582                            match dataflow {
583                                Ok(dataflow) => {
584                                    dataflow.stop_reply_senders.push(reply_sender);
585                                }
586                                Err(err) => {
587                                    let _ = reply_sender.send(Err(err));
588                                }
589                            }
590                        }
591                        ControlRequest::StopByName {
592                            name,
593                            grace_duration,
594                        } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
595                            Ok(dataflow_uuid) => {
596                                if let Some(result) = dataflow_results.get(&dataflow_uuid) {
597                                    let reply = ControlRequestReply::DataflowStopped {
598                                        uuid: dataflow_uuid,
599                                        result: dataflow_result(result, dataflow_uuid, &clock),
600                                    };
601                                    let _ = reply_sender.send(Ok(reply));
602
603                                    continue;
604                                }
605
606                                let dataflow = stop_dataflow(
607                                    &mut running_dataflows,
608                                    dataflow_uuid,
609                                    &mut daemon_connections,
610                                    clock.new_timestamp(),
611                                    grace_duration,
612                                )
613                                .await;
614
615                                match dataflow {
616                                    Ok(dataflow) => {
617                                        dataflow.stop_reply_senders.push(reply_sender);
618                                    }
619                                    Err(err) => {
620                                        let _ = reply_sender.send(Err(err));
621                                    }
622                                }
623                            }
624                            Err(err) => {
625                                let _ = reply_sender.send(Err(err));
626                            }
627                        },
628                        ControlRequest::Logs { uuid, name, node } => {
629                            let dataflow_uuid = if let Some(uuid) = uuid {
630                                Ok(uuid)
631                            } else if let Some(name) = name {
632                                resolve_name(name, &running_dataflows, &archived_dataflows)
633                            } else {
634                                Err(eyre!("No uuid"))
635                            };
636
637                            match dataflow_uuid {
638                                Ok(uuid) => {
639                                    let reply = retrieve_logs(
640                                        &running_dataflows,
641                                        &archived_dataflows,
642                                        uuid,
643                                        node.into(),
644                                        &mut daemon_connections,
645                                        clock.new_timestamp(),
646                                    )
647                                    .await
648                                    .map(ControlRequestReply::Logs);
649                                    let _ = reply_sender.send(reply);
650                                }
651                                Err(err) => {
652                                    let _ = reply_sender.send(Err(err));
653                                }
654                            }
655                        }
656                        ControlRequest::Destroy => {
657                            tracing::info!("Received destroy command");
658
659                            let reply = handle_destroy(
660                                &mut running_dataflows,
661                                &mut daemon_connections,
662                                &abort_handle,
663                                &mut daemon_events_tx,
664                                &clock,
665                            )
666                            .await
667                            .map(|()| ControlRequestReply::DestroyOk);
668                            let _ = reply_sender.send(reply);
669                        }
670                        ControlRequest::List => {
671                            let mut dataflows: Vec<_> = running_dataflows.values().collect();
672                            dataflows.sort_by_key(|d| (&d.name, d.uuid));
673
674                            let running = dataflows.into_iter().map(|d| DataflowListEntry {
675                                id: DataflowIdAndName {
676                                    uuid: d.uuid,
677                                    name: d.name.clone(),
678                                },
679                                status: DataflowStatus::Running,
680                            });
681                            let finished_failed =
682                                dataflow_results.iter().map(|(&uuid, results)| {
683                                    let name =
684                                        archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
685                                    let id = DataflowIdAndName { uuid, name };
686                                    let status = if results.values().all(|r| r.is_ok()) {
687                                        DataflowStatus::Finished
688                                    } else {
689                                        DataflowStatus::Failed
690                                    };
691                                    DataflowListEntry { id, status }
692                                });
693
694                            let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
695                                running.chain(finished_failed).collect(),
696                            )));
697                            let _ = reply_sender.send(reply);
698                        }
699                        ControlRequest::DaemonConnected => {
700                            let running = !daemon_connections.is_empty();
701                            let _ = reply_sender
702                                .send(Ok(ControlRequestReply::DaemonConnected(running)));
703                        }
704                        ControlRequest::ConnectedMachines => {
705                            let reply = Ok(ControlRequestReply::ConnectedDaemons(
706                                daemon_connections.keys().cloned().collect(),
707                            ));
708                            let _ = reply_sender.send(reply);
709                        }
710                        ControlRequest::LogSubscribe { .. } => {
711                            let _ = reply_sender.send(Err(eyre::eyre!(
712                                "LogSubscribe request should be handled separately"
713                            )));
714                        }
715                        ControlRequest::BuildLogSubscribe { .. } => {
716                            let _ = reply_sender.send(Err(eyre::eyre!(
717                                "BuildLogSubscribe request should be handled separately"
718                            )));
719                        }
720                        ControlRequest::CliAndDefaultDaemonOnSameMachine => {
721                            let mut default_daemon_ip = None;
722                            if let Some(default_id) = daemon_connections.unnamed().next() {
723                                if let Some(connection) = daemon_connections.get(default_id) {
724                                    if let Ok(addr) = connection.stream.peer_addr() {
725                                        default_daemon_ip = Some(addr.ip());
726                                    }
727                                }
728                            }
729                            let _ = reply_sender.send(Ok(
730                                ControlRequestReply::CliAndDefaultDaemonIps {
731                                    default_daemon: default_daemon_ip,
732                                    cli: None, // filled later
733                                },
734                            ));
735                        }
736                    }
737                }
738                ControlEvent::Error(err) => tracing::error!("{err:?}"),
739                ControlEvent::LogSubscribe {
740                    dataflow_id,
741                    level,
742                    connection,
743                } => {
744                    if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
745                        dataflow
746                            .log_subscribers
747                            .push(LogSubscriber::new(level, connection));
748                    }
749                }
750                ControlEvent::BuildLogSubscribe {
751                    build_id,
752                    level,
753                    connection,
754                } => {
755                    if let Some(build) = running_builds.get_mut(&build_id) {
756                        build
757                            .log_subscribers
758                            .push(LogSubscriber::new(level, connection));
759                    }
760                }
761            },
762            Event::DaemonHeartbeatInterval => {
763                let mut disconnected = BTreeSet::new();
764                for (machine_id, connection) in daemon_connections.iter_mut() {
765                    if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
766                        tracing::warn!(
767                            "no heartbeat message from machine `{machine_id}` since {:?}",
768                            connection.last_heartbeat.elapsed()
769                        )
770                    }
771                    if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
772                        disconnected.insert(machine_id.clone());
773                        continue;
774                    }
775                    let result: eyre::Result<()> = tokio::time::timeout(
776                        Duration::from_millis(500),
777                        send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
778                    )
779                    .await
780                    .wrap_err("timeout")
781                    .and_then(|r| r)
782                    .wrap_err_with(|| {
783                        format!("failed to send heartbeat message to daemon at `{machine_id}`")
784                    });
785                    if let Err(err) = result {
786                        tracing::warn!("{err:?}");
787                        disconnected.insert(machine_id.clone());
788                    }
789                }
790                if !disconnected.is_empty() {
791                    tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
792                    for machine_id in disconnected {
793                        daemon_connections.remove(&machine_id);
794                    }
795                }
796            }
797            Event::CtrlC => {
798                tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
799                handle_destroy(
800                    &mut running_dataflows,
801                    &mut daemon_connections,
802                    &abort_handle,
803                    &mut daemon_events_tx,
804                    &clock,
805                )
806                .await?;
807            }
808            Event::DaemonHeartbeat {
809                daemon_id: machine_id,
810            } => {
811                if let Some(connection) = daemon_connections.get_mut(&machine_id) {
812                    connection.last_heartbeat = Instant::now();
813                }
814            }
815            Event::Log(message) => {
816                if let Some(dataflow_id) = &message.dataflow_id {
817                    if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
818                        send_log_message(&mut dataflow.log_subscribers, &message).await;
819                    }
820                }
821                if let Some(build_id) = message.build_id {
822                    if let Some(build) = running_builds.get_mut(&build_id) {
823                        send_log_message(&mut build.log_subscribers, &message).await;
824                    }
825                }
826            }
827            Event::DaemonExit { daemon_id } => {
828                tracing::info!("Daemon `{daemon_id}` exited");
829                daemon_connections.remove(&daemon_id);
830            }
831            Event::DataflowBuildResult {
832                build_id,
833                daemon_id,
834                result,
835            } => match running_builds.get_mut(&build_id) {
836                Some(build) => {
837                    build.pending_build_results.remove(&daemon_id);
838                    match result {
839                        Ok(()) => {}
840                        Err(err) => {
841                            build.errors.push(format!("{err:?}"));
842                        }
843                    };
844                    if build.pending_build_results.is_empty() {
845                        tracing::info!("dataflow build finished: `{build_id}`");
846                        let mut build = running_builds.remove(&build_id).unwrap();
847                        let result = if build.errors.is_empty() {
848                            Ok(())
849                        } else {
850                            Err(format!("build failed: {}", build.errors.join("\n\n")))
851                        };
852
853                        build.build_result.set_result(Ok(
854                            ControlRequestReply::DataflowBuildFinished { build_id, result },
855                        ));
856
857                        finished_builds.insert(build_id, build.build_result);
858                    }
859                }
860                None => {
861                    tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
862                }
863            },
864            Event::DataflowSpawnResult {
865                dataflow_id,
866                daemon_id,
867                result,
868            } => match running_dataflows.get_mut(&dataflow_id) {
869                Some(dataflow) => {
870                    dataflow.pending_spawn_results.remove(&daemon_id);
871                    match result {
872                        Ok(()) => {
873                            if dataflow.pending_spawn_results.is_empty() {
874                                tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
875                                dataflow.spawn_result.set_result(Ok(
876                                    ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
877                                ));
878                            }
879                        }
880                        Err(err) => {
881                            tracing::warn!("error while spawning dataflow `{dataflow_id}`");
882                            dataflow.spawn_result.set_result(Err(err));
883                        }
884                    };
885                }
886                None => {
887                    tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map");
888                }
889            },
890        }
891
892        // warn if event handling took too long -> the main loop should never be blocked for too long
893        let elapsed = start.elapsed();
894        if elapsed > Duration::from_millis(100) {
895            tracing::warn!(
896                "Coordinator took {}ms for handling event: {event_kind}",
897                elapsed.as_millis()
898            );
899        }
900    }
901
902    tracing::info!("stopped");
903
904    Ok(())
905}
906
907async fn send_log_message(log_subscribers: &mut Vec<LogSubscriber>, message: &LogMessage) {
908    for subscriber in log_subscribers.iter_mut() {
909        let send_result =
910            tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
911
912        if send_result.await.is_err() {
913            subscriber.close();
914        }
915    }
916    log_subscribers.retain(|s| !s.is_closed());
917}
918
919fn dataflow_result(
920    results: &BTreeMap<DaemonId, DataflowDaemonResult>,
921    dataflow_uuid: Uuid,
922    clock: &uhlc::HLC,
923) -> DataflowResult {
924    let mut node_results = BTreeMap::new();
925    for result in results.values() {
926        node_results.extend(result.node_results.clone());
927        if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
928            tracing::warn!("failed to update HLC: {err}");
929        }
930    }
931
932    DataflowResult {
933        uuid: dataflow_uuid,
934        timestamp: clock.new_timestamp(),
935        node_results,
936    }
937}
938
939struct DaemonConnection {
940    stream: TcpStream,
941    last_heartbeat: Instant,
942}
943
944async fn handle_destroy(
945    running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
946    daemon_connections: &mut DaemonConnections,
947    abortable_events: &futures::stream::AbortHandle,
948    daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
949    clock: &HLC,
950) -> Result<(), eyre::ErrReport> {
951    abortable_events.abort();
952    for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
953        let _ = stop_dataflow(
954            running_dataflows,
955            dataflow_uuid,
956            daemon_connections,
957            clock.new_timestamp(),
958            None,
959        )
960        .await?;
961    }
962
963    let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
964    *daemon_events_tx = None;
965    result
966}
967
968async fn send_heartbeat_message(
969    connection: &mut TcpStream,
970    timestamp: uhlc::Timestamp,
971) -> eyre::Result<()> {
972    let message = serde_json::to_vec(&Timestamped {
973        inner: DaemonCoordinatorEvent::Heartbeat,
974        timestamp,
975    })
976    .context("Could not serialize heartbeat message")?;
977
978    tcp_send(connection, &message)
979        .await
980        .wrap_err("failed to send heartbeat message to daemon")
981}
982
983struct RunningBuild {
984    errors: Vec<String>,
985    build_result: CachedResult,
986
987    log_subscribers: Vec<LogSubscriber>,
988
989    pending_build_results: BTreeSet<DaemonId>,
990}
991
992struct RunningDataflow {
993    name: Option<String>,
994    uuid: Uuid,
995    /// The IDs of the daemons that the dataflow is running on.
996    daemons: BTreeSet<DaemonId>,
997    /// IDs of daemons that are waiting until all nodes are started.
998    pending_daemons: BTreeSet<DaemonId>,
999    exited_before_subscribe: Vec<NodeId>,
1000    nodes: BTreeMap<NodeId, ResolvedNode>,
1001
1002    spawn_result: CachedResult,
1003    stop_reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1004
1005    log_subscribers: Vec<LogSubscriber>,
1006
1007    pending_spawn_results: BTreeSet<DaemonId>,
1008}
1009
1010pub enum CachedResult {
1011    Pending {
1012        result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
1013    },
1014    Cached {
1015        result: eyre::Result<ControlRequestReply>,
1016    },
1017}
1018
1019impl Default for CachedResult {
1020    fn default() -> Self {
1021        Self::Pending {
1022            result_senders: Vec::new(),
1023        }
1024    }
1025}
1026
1027impl CachedResult {
1028    fn register(
1029        &mut self,
1030        reply_sender: tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>,
1031    ) {
1032        match self {
1033            CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
1034            CachedResult::Cached { result } => {
1035                Self::send_result_to(result, reply_sender);
1036            }
1037        }
1038    }
1039
1040    fn set_result(&mut self, result: eyre::Result<ControlRequestReply>) {
1041        match self {
1042            CachedResult::Pending { result_senders } => {
1043                for sender in result_senders.drain(..) {
1044                    Self::send_result_to(&result, sender);
1045                }
1046                *self = CachedResult::Cached { result };
1047            }
1048            CachedResult::Cached { .. } => {}
1049        }
1050    }
1051
1052    fn send_result_to(
1053        result: &eyre::Result<ControlRequestReply>,
1054        sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
1055    ) {
1056        let result = match result {
1057            Ok(r) => Ok(r.clone()),
1058            Err(err) => Err(eyre!("{err:?}")),
1059        };
1060        let _ = sender.send(result);
1061    }
1062}
1063
1064struct ArchivedDataflow {
1065    name: Option<String>,
1066    nodes: BTreeMap<NodeId, ResolvedNode>,
1067}
1068
1069impl From<&RunningDataflow> for ArchivedDataflow {
1070    fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
1071        ArchivedDataflow {
1072            name: dataflow.name.clone(),
1073            nodes: dataflow.nodes.clone(),
1074        }
1075    }
1076}
1077
1078impl PartialEq for RunningDataflow {
1079    fn eq(&self, other: &Self) -> bool {
1080        self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
1081    }
1082}
1083
1084impl Eq for RunningDataflow {}
1085
1086async fn stop_dataflow<'a>(
1087    running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
1088    dataflow_uuid: Uuid,
1089    daemon_connections: &mut DaemonConnections,
1090    timestamp: uhlc::Timestamp,
1091    grace_duration: Option<Duration>,
1092) -> eyre::Result<&'a mut RunningDataflow> {
1093    let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
1094        bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
1095    };
1096
1097    let message = serde_json::to_vec(&Timestamped {
1098        inner: DaemonCoordinatorEvent::StopDataflow {
1099            dataflow_id: dataflow_uuid,
1100            grace_duration,
1101        },
1102        timestamp,
1103    })?;
1104
1105    for daemon_id in &dataflow.daemons {
1106        let daemon_connection = daemon_connections
1107            .get_mut(daemon_id)
1108            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
1109        tcp_send(&mut daemon_connection.stream, &message)
1110            .await
1111            .wrap_err("failed to send stop message to daemon")?;
1112
1113        // wait for reply
1114        let reply_raw = tcp_receive(&mut daemon_connection.stream)
1115            .await
1116            .wrap_err("failed to receive stop reply from daemon")?;
1117        match serde_json::from_slice(&reply_raw)
1118            .wrap_err("failed to deserialize stop reply from daemon")?
1119        {
1120            DaemonCoordinatorReply::StopResult(result) => result
1121                .map_err(|e| eyre!(e))
1122                .wrap_err("failed to stop dataflow")?,
1123            other => bail!("unexpected reply after sending stop: {other:?}"),
1124        }
1125    }
1126
1127    tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
1128
1129    Ok(dataflow)
1130}
1131
1132async fn reload_dataflow(
1133    running_dataflows: &HashMap<Uuid, RunningDataflow>,
1134    dataflow_id: Uuid,
1135    node_id: NodeId,
1136    operator_id: Option<OperatorId>,
1137    daemon_connections: &mut DaemonConnections,
1138    timestamp: uhlc::Timestamp,
1139) -> eyre::Result<()> {
1140    let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
1141        bail!("No running dataflow found with UUID `{dataflow_id}`")
1142    };
1143    let message = serde_json::to_vec(&Timestamped {
1144        inner: DaemonCoordinatorEvent::ReloadDataflow {
1145            dataflow_id,
1146            node_id,
1147            operator_id,
1148        },
1149        timestamp,
1150    })?;
1151
1152    for machine_id in &dataflow.daemons {
1153        let daemon_connection = daemon_connections
1154            .get_mut(machine_id)
1155            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
1156        tcp_send(&mut daemon_connection.stream, &message)
1157            .await
1158            .wrap_err("failed to send reload message to daemon")?;
1159
1160        // wait for reply
1161        let reply_raw = tcp_receive(&mut daemon_connection.stream)
1162            .await
1163            .wrap_err("failed to receive reload reply from daemon")?;
1164        match serde_json::from_slice(&reply_raw)
1165            .wrap_err("failed to deserialize reload reply from daemon")?
1166        {
1167            DaemonCoordinatorReply::ReloadResult(result) => result
1168                .map_err(|e| eyre!(e))
1169                .wrap_err("failed to reload dataflow")?,
1170            other => bail!("unexpected reply after sending reload: {other:?}"),
1171        }
1172    }
1173    tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
1174
1175    Ok(())
1176}
1177
1178async fn retrieve_logs(
1179    running_dataflows: &HashMap<Uuid, RunningDataflow>,
1180    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
1181    dataflow_id: Uuid,
1182    node_id: NodeId,
1183    daemon_connections: &mut DaemonConnections,
1184    timestamp: uhlc::Timestamp,
1185) -> eyre::Result<Vec<u8>> {
1186    let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
1187        dataflow.nodes.clone()
1188    } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
1189        dataflow.nodes.clone()
1190    } else {
1191        bail!("No dataflow found with UUID `{dataflow_id}`")
1192    };
1193
1194    let message = serde_json::to_vec(&Timestamped {
1195        inner: DaemonCoordinatorEvent::Logs {
1196            dataflow_id,
1197            node_id: node_id.clone(),
1198        },
1199        timestamp,
1200    })?;
1201
1202    let machine_ids: Vec<Option<String>> = nodes
1203        .values()
1204        .filter(|node| node.id == node_id)
1205        .map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
1206        .collect();
1207
1208    let machine_id = if let [machine_id] = &machine_ids[..] {
1209        machine_id
1210    } else if machine_ids.is_empty() {
1211        bail!("No machine contains {}/{}", dataflow_id, node_id)
1212    } else {
1213        bail!(
1214            "More than one machine contains {}/{}. However, it should only be present on one.",
1215            dataflow_id,
1216            node_id
1217        )
1218    };
1219
1220    let daemon_ids: Vec<_> = match machine_id {
1221        None => daemon_connections.unnamed().collect(),
1222        Some(machine_id) => daemon_connections
1223            .get_matching_daemon_id(machine_id)
1224            .into_iter()
1225            .collect(),
1226    };
1227    let daemon_id = match &daemon_ids[..] {
1228        [id] => (*id).clone(),
1229        [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
1230        _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
1231    };
1232    let daemon_connection = daemon_connections
1233        .get_mut(&daemon_id)
1234        .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
1235    tcp_send(&mut daemon_connection.stream, &message)
1236        .await
1237        .wrap_err("failed to send logs message to daemon")?;
1238
1239    // wait for reply
1240    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1241        .await
1242        .wrap_err("failed to retrieve logs reply from daemon")?;
1243    let reply_logs = match serde_json::from_slice(&reply_raw)
1244        .wrap_err("failed to deserialize logs reply from daemon")?
1245    {
1246        DaemonCoordinatorReply::Logs(logs) => logs,
1247        other => bail!("unexpected reply after sending logs: {other:?}"),
1248    };
1249    tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
1250
1251    reply_logs.map_err(|err| eyre!(err))
1252}
1253
1254#[allow(clippy::too_many_arguments)]
1255#[tracing::instrument(skip(daemon_connections, clock))]
1256async fn build_dataflow(
1257    build_id: BuildId,
1258    session_id: SessionId,
1259    dataflow: Descriptor,
1260    git_sources: BTreeMap<NodeId, GitSource>,
1261    prev_git_sources: BTreeMap<NodeId, GitSource>,
1262    local_working_dir: Option<PathBuf>,
1263    clock: &HLC,
1264    uv: bool,
1265    daemon_connections: &mut DaemonConnections,
1266) -> eyre::Result<RunningBuild> {
1267    let nodes = dataflow.resolve_aliases_and_set_defaults()?;
1268
1269    let mut git_sources_by_daemon = git_sources
1270        .into_iter()
1271        .into_grouping_map_by(|(id, _)| {
1272            nodes
1273                .get(id)
1274                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1275        })
1276        .collect();
1277    let mut prev_git_sources_by_daemon = prev_git_sources
1278        .into_iter()
1279        .into_grouping_map_by(|(id, _)| {
1280            nodes
1281                .get(id)
1282                .and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
1283        })
1284        .collect();
1285
1286    let nodes_by_daemon = nodes
1287        .values()
1288        .into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
1289
1290    let mut daemons = BTreeSet::new();
1291    for (machine, nodes_on_machine) in &nodes_by_daemon {
1292        let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
1293        tracing::debug!(
1294            "Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
1295        );
1296
1297        let build_command = BuildDataflowNodes {
1298            build_id,
1299            session_id,
1300            local_working_dir: local_working_dir.clone(),
1301            git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
1302            prev_git_sources: prev_git_sources_by_daemon
1303                .remove(machine)
1304                .unwrap_or_default(),
1305            dataflow_descriptor: dataflow.clone(),
1306            nodes_on_machine,
1307            uv,
1308        };
1309        let message = serde_json::to_vec(&Timestamped {
1310            inner: DaemonCoordinatorEvent::Build(build_command),
1311            timestamp: clock.new_timestamp(),
1312        })?;
1313
1314        let daemon_id =
1315            build_dataflow_on_machine(daemon_connections, machine.map(|s| s.as_str()), &message)
1316                .await
1317                .wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
1318        daemons.insert(daemon_id);
1319    }
1320
1321    tracing::info!("successfully triggered dataflow build `{build_id}`",);
1322
1323    Ok(RunningBuild {
1324        errors: Vec::new(),
1325        build_result: CachedResult::default(),
1326        log_subscribers: Vec::new(),
1327        pending_build_results: daemons,
1328    })
1329}
1330
1331async fn build_dataflow_on_machine(
1332    daemon_connections: &mut DaemonConnections,
1333    machine: Option<&str>,
1334    message: &[u8],
1335) -> Result<DaemonId, eyre::ErrReport> {
1336    let daemon_id = match machine {
1337        Some(machine) => daemon_connections
1338            .get_matching_daemon_id(machine)
1339            .wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
1340            .clone(),
1341        None => daemon_connections
1342            .unnamed()
1343            .next()
1344            .wrap_err("no unnamed daemon connections")?
1345            .clone(),
1346    };
1347
1348    let daemon_connection = daemon_connections
1349        .get_mut(&daemon_id)
1350        .wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
1351    tcp_send(&mut daemon_connection.stream, message)
1352        .await
1353        .wrap_err("failed to send build message to daemon")?;
1354
1355    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1356        .await
1357        .wrap_err("failed to receive build reply from daemon")?;
1358    match serde_json::from_slice(&reply_raw)
1359        .wrap_err("failed to deserialize build reply from daemon")?
1360    {
1361        DaemonCoordinatorReply::TriggerBuildResult(result) => result
1362            .map_err(|e| eyre!(e))
1363            .wrap_err("daemon returned an error")?,
1364        _ => bail!("unexpected reply"),
1365    }
1366    Ok(daemon_id)
1367}
1368
1369#[allow(clippy::too_many_arguments)]
1370async fn start_dataflow(
1371    build_id: Option<BuildId>,
1372    session_id: SessionId,
1373    dataflow: Descriptor,
1374    local_working_dir: Option<PathBuf>,
1375    name: Option<String>,
1376    daemon_connections: &mut DaemonConnections,
1377    clock: &HLC,
1378    uv: bool,
1379) -> eyre::Result<RunningDataflow> {
1380    let SpawnedDataflow {
1381        uuid,
1382        daemons,
1383        nodes,
1384    } = spawn_dataflow(
1385        build_id,
1386        session_id,
1387        dataflow,
1388        local_working_dir,
1389        daemon_connections,
1390        clock,
1391        uv,
1392    )
1393    .await?;
1394    Ok(RunningDataflow {
1395        uuid,
1396        name,
1397        pending_daemons: if daemons.len() > 1 {
1398            daemons.clone()
1399        } else {
1400            BTreeSet::new()
1401        },
1402        exited_before_subscribe: Default::default(),
1403        daemons: daemons.clone(),
1404        nodes,
1405        spawn_result: CachedResult::default(),
1406        stop_reply_senders: Vec::new(),
1407        log_subscribers: Vec::new(),
1408        pending_spawn_results: daemons,
1409    })
1410}
1411
1412async fn destroy_daemon(
1413    daemon_id: DaemonId,
1414    mut daemon_connection: DaemonConnection,
1415
1416    timestamp: uhlc::Timestamp,
1417) -> Result<()> {
1418    let message = serde_json::to_vec(&Timestamped {
1419        inner: DaemonCoordinatorEvent::Destroy,
1420        timestamp,
1421    })?;
1422
1423    tcp_send(&mut daemon_connection.stream, &message)
1424        .await
1425        .wrap_err(format!(
1426            "failed to send destroy message to daemon `{daemon_id}`"
1427        ))?;
1428
1429    // wait for reply
1430    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1431        .await
1432        .wrap_err("failed to receive destroy reply from daemon")?;
1433    match serde_json::from_slice(&reply_raw)
1434        .wrap_err("failed to deserialize destroy reply from daemon")?
1435    {
1436        DaemonCoordinatorReply::DestroyResult { result, .. } => result
1437            .map_err(|e| eyre!(e))
1438            .wrap_err("failed to destroy dataflow")?,
1439        other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1440    }
1441
1442    tracing::info!("successfully destroyed daemon `{daemon_id}`");
1443    Ok(())
1444}
1445
1446async fn destroy_daemons(
1447    daemon_connections: &mut DaemonConnections,
1448    timestamp: uhlc::Timestamp,
1449) -> eyre::Result<()> {
1450    let futures = daemon_connections
1451        .drain()
1452        .map(|(daemon_id, daemon_connection)| {
1453            destroy_daemon(daemon_id, daemon_connection, timestamp)
1454        })
1455        .collect::<Vec<_>>();
1456    let results: Vec<std::result::Result<(), eyre::Error>> =
1457        join_all(futures).await.into_iter().collect::<Vec<_>>();
1458    for result in results {
1459        result?;
1460    }
1461    Ok(())
1462}
1463
1464#[derive(Debug)]
1465pub enum Event {
1466    NewDaemonConnection(TcpStream),
1467    DaemonConnectError(eyre::Report),
1468    DaemonHeartbeat {
1469        daemon_id: DaemonId,
1470    },
1471    Dataflow {
1472        uuid: Uuid,
1473        event: DataflowEvent,
1474    },
1475    Control(ControlEvent),
1476    Daemon(DaemonRequest),
1477    DaemonHeartbeatInterval,
1478    CtrlC,
1479    Log(LogMessage),
1480    DaemonExit {
1481        daemon_id: dora_message::common::DaemonId,
1482    },
1483    DataflowBuildResult {
1484        build_id: BuildId,
1485        daemon_id: DaemonId,
1486        result: eyre::Result<()>,
1487    },
1488    DataflowSpawnResult {
1489        dataflow_id: uuid::Uuid,
1490        daemon_id: DaemonId,
1491        result: eyre::Result<()>,
1492    },
1493}
1494
1495impl Event {
1496    /// Whether this event should be logged.
1497    #[allow(clippy::match_like_matches_macro)]
1498    pub fn log(&self) -> bool {
1499        match self {
1500            Event::DaemonHeartbeatInterval => false,
1501            _ => true,
1502        }
1503    }
1504
1505    fn kind(&self) -> &'static str {
1506        match self {
1507            Event::NewDaemonConnection(_) => "NewDaemonConnection",
1508            Event::DaemonConnectError(_) => "DaemonConnectError",
1509            Event::DaemonHeartbeat { .. } => "DaemonHeartbeat",
1510            Event::Dataflow { .. } => "Dataflow",
1511            Event::Control(_) => "Control",
1512            Event::Daemon(_) => "Daemon",
1513            Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
1514            Event::CtrlC => "CtrlC",
1515            Event::Log(_) => "Log",
1516            Event::DaemonExit { .. } => "DaemonExit",
1517            Event::DataflowBuildResult { .. } => "DataflowBuildResult",
1518            Event::DataflowSpawnResult { .. } => "DataflowSpawnResult",
1519        }
1520    }
1521}
1522
1523#[derive(Debug)]
1524pub enum DataflowEvent {
1525    DataflowFinishedOnDaemon {
1526        daemon_id: DaemonId,
1527        result: DataflowDaemonResult,
1528    },
1529    ReadyOnDaemon {
1530        daemon_id: DaemonId,
1531        exited_before_subscribe: Vec<NodeId>,
1532    },
1533}
1534
1535#[derive(Debug)]
1536pub enum DaemonRequest {
1537    Register {
1538        version_check_result: Result<(), String>,
1539        machine_id: Option<String>,
1540        connection: TcpStream,
1541    },
1542}
1543
1544fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1545    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1546
1547    let mut ctrlc_sent = false;
1548    ctrlc::set_handler(move || {
1549        if ctrlc_sent {
1550            tracing::warn!("received second ctrlc signal -> aborting immediately");
1551            std::process::abort();
1552        } else {
1553            tracing::info!("received ctrlc signal");
1554            if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1555                tracing::error!("failed to report ctrl-c event to dora-coordinator");
1556            }
1557
1558            ctrlc_sent = true;
1559        }
1560    })
1561    .wrap_err("failed to set ctrl-c handler")?;
1562
1563    Ok(ReceiverStream::new(ctrlc_rx))
1564}