dora_coordinator/
lib.rs

1use crate::{
2    run::spawn_dataflow,
3    tcp_utils::{tcp_receive, tcp_send},
4};
5pub use control::ControlEvent;
6use dora_core::{
7    config::{NodeId, OperatorId},
8    uhlc::{self, HLC},
9};
10use dora_message::{
11    cli_to_coordinator::ControlRequest,
12    common::DaemonId,
13    coordinator_to_cli::{
14        ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
15        DataflowStatus, LogLevel, LogMessage,
16    },
17    coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
18    daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
19    descriptor::{Descriptor, ResolvedNode},
20};
21use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
22use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
23use futures_concurrency::stream::Merge;
24use log_subscriber::LogSubscriber;
25use run::SpawnedDataflow;
26use std::{
27    collections::{BTreeMap, BTreeSet, HashMap},
28    net::SocketAddr,
29    path::PathBuf,
30    sync::Arc,
31    time::{Duration, Instant},
32};
33use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
34use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
35use uuid::Uuid;
36
37mod control;
38mod listener;
39mod log_subscriber;
40mod run;
41mod tcp_utils;
42
43pub async fn start(
44    bind: SocketAddr,
45    bind_control: SocketAddr,
46    external_events: impl Stream<Item = Event> + Unpin,
47) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
48    let listener = listener::create_listener(bind).await?;
49    let port = listener
50        .local_addr()
51        .wrap_err("failed to get local addr of listener")?
52        .port();
53    let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
54        c.map(Event::NewDaemonConnection)
55            .wrap_err("failed to open connection")
56            .unwrap_or_else(Event::DaemonConnectError)
57    });
58
59    let mut tasks = FuturesUnordered::new();
60    let control_events = control::control_events(bind_control, &tasks)
61        .await
62        .wrap_err("failed to create control events")?;
63
64    // Setup ctrl-c handler
65    let ctrlc_events = set_up_ctrlc_handler()?;
66
67    let events = (
68        external_events,
69        new_daemon_connections,
70        control_events,
71        ctrlc_events,
72    )
73        .merge();
74
75    let future = async move {
76        start_inner(events, &tasks).await?;
77
78        tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
79        while let Some(join_result) = tasks.next().await {
80            if let Err(err) = join_result {
81                tracing::error!("task panicked: {err}");
82            }
83        }
84        tracing::debug!("all spawned tasks finished, exiting..");
85        Ok(())
86    };
87    Ok((port, future))
88}
89
90// Resolve the dataflow name.
91fn resolve_name(
92    name: String,
93    running_dataflows: &HashMap<Uuid, RunningDataflow>,
94    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
95) -> eyre::Result<Uuid> {
96    let uuids: Vec<_> = running_dataflows
97        .iter()
98        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
99        .map(|(k, _)| k)
100        .copied()
101        .collect();
102    let archived_uuids: Vec<_> = archived_dataflows
103        .iter()
104        .filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
105        .map(|(k, _)| k)
106        .copied()
107        .collect();
108
109    if uuids.is_empty() {
110        if archived_uuids.is_empty() {
111            bail!("no dataflow with name `{name}`");
112        } else if let [uuid] = archived_uuids.as_slice() {
113            Ok(*uuid)
114        } else {
115            // TODO: Index the archived dataflows in order to return logs based on the index.
116            bail!("multiple archived dataflows found with name `{name}`, Please provide the UUID instead.");
117        }
118    } else if let [uuid] = uuids.as_slice() {
119        Ok(*uuid)
120    } else {
121        bail!("multiple dataflows found with name `{name}`");
122    }
123}
124
125#[derive(Default)]
126struct DaemonConnections {
127    daemons: BTreeMap<DaemonId, DaemonConnection>,
128}
129
130impl DaemonConnections {
131    fn add(&mut self, daemon_id: DaemonId, connection: DaemonConnection) {
132        let previous = self.daemons.insert(daemon_id.clone(), connection);
133        if previous.is_some() {
134            tracing::info!("closing previous connection `{daemon_id}` on new register");
135        }
136    }
137
138    fn get_mut(&mut self, id: &DaemonId) -> Option<&mut DaemonConnection> {
139        self.daemons.get_mut(id)
140    }
141
142    fn get_matching_daemon_id(&self, machine_id: &str) -> Option<&DaemonId> {
143        self.daemons
144            .keys()
145            .find(|id| id.matches_machine_id(machine_id))
146    }
147
148    fn drain(&mut self) -> impl Iterator<Item = (DaemonId, DaemonConnection)> {
149        std::mem::take(&mut self.daemons).into_iter()
150    }
151
152    fn is_empty(&self) -> bool {
153        self.daemons.is_empty()
154    }
155
156    fn keys(&self) -> impl Iterator<Item = &DaemonId> {
157        self.daemons.keys()
158    }
159
160    fn iter(&self) -> impl Iterator<Item = (&DaemonId, &DaemonConnection)> {
161        self.daemons.iter()
162    }
163
164    fn iter_mut(&mut self) -> impl Iterator<Item = (&DaemonId, &mut DaemonConnection)> {
165        self.daemons.iter_mut()
166    }
167
168    fn remove(&mut self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
169        self.daemons.remove(daemon_id)
170    }
171
172    fn unnamed(&self) -> impl Iterator<Item = &DaemonId> {
173        self.daemons.keys().filter(|id| id.machine_id().is_none())
174    }
175}
176
177async fn start_inner(
178    events: impl Stream<Item = Event> + Unpin,
179    tasks: &FuturesUnordered<JoinHandle<()>>,
180) -> eyre::Result<()> {
181    let clock = Arc::new(HLC::default());
182
183    let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
184    let mut daemon_events_tx = Some(daemon_events_tx);
185    let daemon_events = ReceiverStream::new(daemon_events);
186
187    let daemon_heartbeat_interval =
188        tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
189            .map(|_| Event::DaemonHeartbeatInterval);
190
191    // events that should be aborted on `dora destroy`
192    let (abortable_events, abort_handle) =
193        futures::stream::abortable((events, daemon_heartbeat_interval).merge());
194
195    let mut events = (abortable_events, daemon_events).merge();
196
197    let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
198    let mut dataflow_results: HashMap<Uuid, BTreeMap<DaemonId, DataflowDaemonResult>> =
199        HashMap::new();
200    let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
201    let mut daemon_connections = DaemonConnections::default();
202
203    while let Some(event) = events.next().await {
204        if event.log() {
205            tracing::trace!("Handling event {event:?}");
206        }
207        match event {
208            Event::NewDaemonConnection(connection) => {
209                connection.set_nodelay(true)?;
210                let events_tx = daemon_events_tx.clone();
211                if let Some(events_tx) = events_tx {
212                    let task = tokio::spawn(listener::handle_connection(
213                        connection,
214                        events_tx,
215                        clock.clone(),
216                    ));
217                    tasks.push(task);
218                } else {
219                    tracing::warn!(
220                        "ignoring new daemon connection because events_tx was closed already"
221                    );
222                }
223            }
224            Event::DaemonConnectError(err) => {
225                tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
226            }
227            Event::Daemon(event) => match event {
228                DaemonRequest::Register {
229                    machine_id,
230                    mut connection,
231                    version_check_result,
232                } => {
233                    let existing = match &machine_id {
234                        Some(id) => daemon_connections.get_matching_daemon_id(id),
235                        None => daemon_connections.unnamed().next(),
236                    };
237                    let existing_result = if existing.is_some() {
238                        Err(format!(
239                            "There is already a connected daemon with machine ID `{machine_id:?}`"
240                        ))
241                    } else {
242                        Ok(())
243                    };
244
245                    // assign a unique ID to the daemon
246                    let daemon_id = DaemonId::new(machine_id);
247
248                    let reply: Timestamped<RegisterResult> = Timestamped {
249                        inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
250                            Ok(_) => RegisterResult::Ok {
251                                daemon_id: daemon_id.clone(),
252                            },
253                            Err(err) => RegisterResult::Err(err.clone()),
254                        },
255                        timestamp: clock.new_timestamp(),
256                    };
257
258                    let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
259                        .await
260                        .context("tcp send failed");
261                    match version_check_result.map_err(|e| eyre!(e)).and(send_result) {
262                        Ok(()) => {
263                            daemon_connections.add(
264                                daemon_id.clone(),
265                                DaemonConnection {
266                                    stream: connection,
267                                    last_heartbeat: Instant::now(),
268                                },
269                            );
270                        }
271                        Err(err) => {
272                            tracing::warn!("failed to register daemon connection for daemon `{daemon_id}`: {err}");
273                        }
274                    }
275                }
276            },
277            Event::Dataflow { uuid, event } => match event {
278                DataflowEvent::ReadyOnDaemon {
279                    daemon_id,
280                    exited_before_subscribe,
281                } => {
282                    match running_dataflows.entry(uuid) {
283                        std::collections::hash_map::Entry::Occupied(mut entry) => {
284                            let dataflow = entry.get_mut();
285                            dataflow.pending_daemons.remove(&daemon_id);
286                            dataflow
287                                .exited_before_subscribe
288                                .extend(exited_before_subscribe);
289                            if dataflow.pending_daemons.is_empty() {
290                                tracing::debug!("sending all nodes ready message to daemons");
291                                let message = serde_json::to_vec(&Timestamped {
292                                    inner: DaemonCoordinatorEvent::AllNodesReady {
293                                        dataflow_id: uuid,
294                                        exited_before_subscribe: dataflow
295                                            .exited_before_subscribe
296                                            .clone(),
297                                    },
298                                    timestamp: clock.new_timestamp(),
299                                })
300                                .wrap_err("failed to serialize AllNodesReady message")?;
301
302                                // notify all machines that run parts of the dataflow
303                                for daemon_id in &dataflow.daemons {
304                                    let Some(connection) = daemon_connections.get_mut(daemon_id)
305                                    else {
306                                        tracing::warn!(
307                                            "no daemon connection found for machine `{daemon_id}`"
308                                        );
309                                        continue;
310                                    };
311                                    tcp_send(&mut connection.stream, &message)
312                                        .await
313                                        .wrap_err_with(|| {
314                                            format!(
315                                                "failed to send AllNodesReady({uuid}) message \
316                                            to machine {daemon_id}"
317                                            )
318                                        })?;
319                                }
320                            }
321                        }
322                        std::collections::hash_map::Entry::Vacant(_) => {
323                            tracing::warn!("dataflow not running on ReadyOnMachine");
324                        }
325                    }
326                }
327                DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => {
328                    tracing::debug!("coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})");
329                    match running_dataflows.entry(uuid) {
330                        std::collections::hash_map::Entry::Occupied(mut entry) => {
331                            let dataflow = entry.get_mut();
332                            dataflow.daemons.remove(&daemon_id);
333                            tracing::info!(
334                                "removed machine id: {daemon_id} from dataflow: {:#?}",
335                                dataflow.uuid
336                            );
337                            dataflow_results
338                                .entry(uuid)
339                                .or_default()
340                                .insert(daemon_id, result);
341
342                            if dataflow.daemons.is_empty() {
343                                // Archive finished dataflow
344                                archived_dataflows
345                                    .entry(uuid)
346                                    .or_insert_with(|| ArchivedDataflow::from(entry.get()));
347                                let mut finished_dataflow = entry.remove();
348                                let dataflow_id = finished_dataflow.uuid;
349                                send_log_message(
350                                    &mut finished_dataflow,
351                                    &LogMessage {
352                                        dataflow_id,
353                                        node_id: None,
354                                        daemon_id: None,
355                                        level: LogLevel::Info,
356                                        target: Some("coordinator".into()),
357                                        module_path: None,
358                                        file: None,
359                                        line: None,
360                                        message: "dataflow finished".into(),
361                                    },
362                                )
363                                .await;
364
365                                let reply = ControlRequestReply::DataflowStopped {
366                                    uuid,
367                                    result: dataflow_results
368                                        .get(&uuid)
369                                        .map(|r| dataflow_result(r, uuid, &clock))
370                                        .unwrap_or_else(|| {
371                                            DataflowResult::ok_empty(uuid, clock.new_timestamp())
372                                        }),
373                                };
374                                for sender in finished_dataflow.reply_senders {
375                                    let _ = sender.send(Ok(reply.clone()));
376                                }
377                            }
378                        }
379                        std::collections::hash_map::Entry::Vacant(_) => {
380                            tracing::warn!("dataflow not running on DataflowFinishedOnDaemon");
381                        }
382                    }
383                }
384            },
385
386            Event::Control(event) => match event {
387                ControlEvent::IncomingRequest {
388                    request,
389                    reply_sender,
390                } => {
391                    match request {
392                        ControlRequest::Start {
393                            dataflow,
394                            name,
395                            local_working_dir,
396                            uv,
397                        } => {
398                            let name = name.or_else(|| names::Generator::default().next());
399
400                            let inner = async {
401                                if let Some(name) = name.as_deref() {
402                                    // check that name is unique
403                                    if running_dataflows
404                                        .values()
405                                        .any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
406                                    {
407                                        bail!("there is already a running dataflow with name `{name}`");
408                                    }
409                                }
410                                let dataflow = start_dataflow(
411                                    dataflow,
412                                    local_working_dir,
413                                    name,
414                                    &mut daemon_connections,
415                                    &clock,
416                                    uv,
417                                )
418                                .await?;
419                                Ok(dataflow)
420                            };
421                            let reply = inner.await.map(|dataflow| {
422                                let uuid = dataflow.uuid;
423                                running_dataflows.insert(uuid, dataflow);
424                                ControlRequestReply::DataflowStarted { uuid }
425                            });
426                            let _ = reply_sender.send(reply);
427                        }
428                        ControlRequest::Check { dataflow_uuid } => {
429                            let status = match &running_dataflows.get(&dataflow_uuid) {
430                                Some(_) => ControlRequestReply::DataflowStarted {
431                                    uuid: dataflow_uuid,
432                                },
433                                None => ControlRequestReply::DataflowStopped {
434                                    uuid: dataflow_uuid,
435                                    result: dataflow_results
436                                        .get(&dataflow_uuid)
437                                        .map(|r| dataflow_result(r, dataflow_uuid, &clock))
438                                        .unwrap_or_else(|| {
439                                            DataflowResult::ok_empty(
440                                                dataflow_uuid,
441                                                clock.new_timestamp(),
442                                            )
443                                        }),
444                                },
445                            };
446                            let _ = reply_sender.send(Ok(status));
447                        }
448                        ControlRequest::Reload {
449                            dataflow_id,
450                            node_id,
451                            operator_id,
452                        } => {
453                            let reload = async {
454                                reload_dataflow(
455                                    &running_dataflows,
456                                    dataflow_id,
457                                    node_id,
458                                    operator_id,
459                                    &mut daemon_connections,
460                                    clock.new_timestamp(),
461                                )
462                                .await?;
463                                Result::<_, eyre::Report>::Ok(())
464                            };
465                            let reply =
466                                reload
467                                    .await
468                                    .map(|()| ControlRequestReply::DataflowReloaded {
469                                        uuid: dataflow_id,
470                                    });
471                            let _ = reply_sender.send(reply);
472                        }
473                        ControlRequest::Stop {
474                            dataflow_uuid,
475                            grace_duration,
476                        } => {
477                            if let Some(result) = dataflow_results.get(&dataflow_uuid) {
478                                let reply = ControlRequestReply::DataflowStopped {
479                                    uuid: dataflow_uuid,
480                                    result: dataflow_result(result, dataflow_uuid, &clock),
481                                };
482                                let _ = reply_sender.send(Ok(reply));
483
484                                continue;
485                            }
486
487                            let dataflow = stop_dataflow(
488                                &mut running_dataflows,
489                                dataflow_uuid,
490                                &mut daemon_connections,
491                                clock.new_timestamp(),
492                                grace_duration,
493                            )
494                            .await;
495
496                            match dataflow {
497                                Ok(dataflow) => {
498                                    dataflow.reply_senders.push(reply_sender);
499                                }
500                                Err(err) => {
501                                    let _ = reply_sender.send(Err(err));
502                                }
503                            }
504                        }
505                        ControlRequest::StopByName {
506                            name,
507                            grace_duration,
508                        } => match resolve_name(name, &running_dataflows, &archived_dataflows) {
509                            Ok(dataflow_uuid) => {
510                                if let Some(result) = dataflow_results.get(&dataflow_uuid) {
511                                    let reply = ControlRequestReply::DataflowStopped {
512                                        uuid: dataflow_uuid,
513                                        result: dataflow_result(result, dataflow_uuid, &clock),
514                                    };
515                                    let _ = reply_sender.send(Ok(reply));
516
517                                    continue;
518                                }
519
520                                let dataflow = stop_dataflow(
521                                    &mut running_dataflows,
522                                    dataflow_uuid,
523                                    &mut daemon_connections,
524                                    clock.new_timestamp(),
525                                    grace_duration,
526                                )
527                                .await;
528
529                                match dataflow {
530                                    Ok(dataflow) => {
531                                        dataflow.reply_senders.push(reply_sender);
532                                    }
533                                    Err(err) => {
534                                        let _ = reply_sender.send(Err(err));
535                                    }
536                                }
537                            }
538                            Err(err) => {
539                                let _ = reply_sender.send(Err(err));
540                            }
541                        },
542                        ControlRequest::Logs { uuid, name, node } => {
543                            let dataflow_uuid = if let Some(uuid) = uuid {
544                                Ok(uuid)
545                            } else if let Some(name) = name {
546                                resolve_name(name, &running_dataflows, &archived_dataflows)
547                            } else {
548                                Err(eyre!("No uuid"))
549                            };
550
551                            match dataflow_uuid {
552                                Ok(uuid) => {
553                                    let reply = retrieve_logs(
554                                        &running_dataflows,
555                                        &archived_dataflows,
556                                        uuid,
557                                        node.into(),
558                                        &mut daemon_connections,
559                                        clock.new_timestamp(),
560                                    )
561                                    .await
562                                    .map(ControlRequestReply::Logs);
563                                    let _ = reply_sender.send(reply);
564                                }
565                                Err(err) => {
566                                    let _ = reply_sender.send(Err(err));
567                                }
568                            }
569                        }
570                        ControlRequest::Destroy => {
571                            tracing::info!("Received destroy command");
572
573                            let reply = handle_destroy(
574                                &mut running_dataflows,
575                                &mut daemon_connections,
576                                &abort_handle,
577                                &mut daemon_events_tx,
578                                &clock,
579                            )
580                            .await
581                            .map(|()| ControlRequestReply::DestroyOk);
582                            let _ = reply_sender.send(reply);
583                        }
584                        ControlRequest::List => {
585                            let mut dataflows: Vec<_> = running_dataflows.values().collect();
586                            dataflows.sort_by_key(|d| (&d.name, d.uuid));
587
588                            let running = dataflows.into_iter().map(|d| DataflowListEntry {
589                                id: DataflowIdAndName {
590                                    uuid: d.uuid,
591                                    name: d.name.clone(),
592                                },
593                                status: DataflowStatus::Running,
594                            });
595                            let finished_failed =
596                                dataflow_results.iter().map(|(&uuid, results)| {
597                                    let name =
598                                        archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
599                                    let id = DataflowIdAndName { uuid, name };
600                                    let status = if results.values().all(|r| r.is_ok()) {
601                                        DataflowStatus::Finished
602                                    } else {
603                                        DataflowStatus::Failed
604                                    };
605                                    DataflowListEntry { id, status }
606                                });
607
608                            let reply = Ok(ControlRequestReply::DataflowList(DataflowList(
609                                running.chain(finished_failed).collect(),
610                            )));
611                            let _ = reply_sender.send(reply);
612                        }
613                        ControlRequest::DaemonConnected => {
614                            let running = !daemon_connections.is_empty();
615                            let _ = reply_sender
616                                .send(Ok(ControlRequestReply::DaemonConnected(running)));
617                        }
618                        ControlRequest::ConnectedMachines => {
619                            let reply = Ok(ControlRequestReply::ConnectedDaemons(
620                                daemon_connections.keys().cloned().collect(),
621                            ));
622                            let _ = reply_sender.send(reply);
623                        }
624                        ControlRequest::LogSubscribe { .. } => {
625                            let _ = reply_sender.send(Err(eyre::eyre!(
626                                "LogSubscribe request should be handled separately"
627                            )));
628                        }
629                    }
630                }
631                ControlEvent::Error(err) => tracing::error!("{err:?}"),
632                ControlEvent::LogSubscribe {
633                    dataflow_id,
634                    level,
635                    connection,
636                } => {
637                    if let Some(dataflow) = running_dataflows.get_mut(&dataflow_id) {
638                        dataflow
639                            .log_subscribers
640                            .push(LogSubscriber::new(level, connection));
641                    }
642                }
643            },
644            Event::DaemonHeartbeatInterval => {
645                let mut disconnected = BTreeSet::new();
646                for (machine_id, connection) in daemon_connections.iter_mut() {
647                    if connection.last_heartbeat.elapsed() > Duration::from_secs(15) {
648                        tracing::warn!(
649                            "no heartbeat message from machine `{machine_id}` since {:?}",
650                            connection.last_heartbeat.elapsed()
651                        )
652                    }
653                    if connection.last_heartbeat.elapsed() > Duration::from_secs(30) {
654                        disconnected.insert(machine_id.clone());
655                        continue;
656                    }
657                    let result: eyre::Result<()> = tokio::time::timeout(
658                        Duration::from_millis(500),
659                        send_heartbeat_message(&mut connection.stream, clock.new_timestamp()),
660                    )
661                    .await
662                    .wrap_err("timeout")
663                    .and_then(|r| r)
664                    .wrap_err_with(|| {
665                        format!("failed to send heartbeat message to daemon at `{machine_id}`")
666                    });
667                    if let Err(err) = result {
668                        tracing::warn!("{err:?}");
669                        disconnected.insert(machine_id.clone());
670                    }
671                }
672                if !disconnected.is_empty() {
673                    tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
674                    for machine_id in disconnected {
675                        daemon_connections.remove(&machine_id);
676                    }
677                }
678            }
679            Event::CtrlC => {
680                tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
681                handle_destroy(
682                    &mut running_dataflows,
683                    &mut daemon_connections,
684                    &abort_handle,
685                    &mut daemon_events_tx,
686                    &clock,
687                )
688                .await?;
689            }
690            Event::DaemonHeartbeat {
691                daemon_id: machine_id,
692            } => {
693                if let Some(connection) = daemon_connections.get_mut(&machine_id) {
694                    connection.last_heartbeat = Instant::now();
695                }
696            }
697            Event::Log(message) => {
698                if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
699                    send_log_message(dataflow, &message).await;
700                }
701            }
702            Event::DaemonExit { daemon_id } => {
703                tracing::info!("Daemon `{daemon_id}` exited");
704                daemon_connections.remove(&daemon_id);
705            }
706        }
707    }
708
709    tracing::info!("stopped");
710
711    Ok(())
712}
713
714async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage) {
715    for subscriber in &mut dataflow.log_subscribers {
716        let send_result =
717            tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
718
719        if send_result.await.is_err() {
720            subscriber.close();
721        }
722    }
723    dataflow.log_subscribers.retain(|s| !s.is_closed());
724}
725
726fn dataflow_result(
727    results: &BTreeMap<DaemonId, DataflowDaemonResult>,
728    dataflow_uuid: Uuid,
729    clock: &uhlc::HLC,
730) -> DataflowResult {
731    let mut node_results = BTreeMap::new();
732    for result in results.values() {
733        node_results.extend(result.node_results.clone());
734        if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
735            tracing::warn!("failed to update HLC: {err}");
736        }
737    }
738
739    DataflowResult {
740        uuid: dataflow_uuid,
741        timestamp: clock.new_timestamp(),
742        node_results,
743    }
744}
745
746struct DaemonConnection {
747    stream: TcpStream,
748    last_heartbeat: Instant,
749}
750
751async fn handle_destroy(
752    running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
753    daemon_connections: &mut DaemonConnections,
754    abortable_events: &futures::stream::AbortHandle,
755    daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
756    clock: &HLC,
757) -> Result<(), eyre::ErrReport> {
758    abortable_events.abort();
759    for dataflow_uuid in running_dataflows.keys().cloned().collect::<Vec<_>>() {
760        let _ = stop_dataflow(
761            running_dataflows,
762            dataflow_uuid,
763            daemon_connections,
764            clock.new_timestamp(),
765            None,
766        )
767        .await?;
768    }
769
770    let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await;
771    *daemon_events_tx = None;
772    result
773}
774
775async fn send_heartbeat_message(
776    connection: &mut TcpStream,
777    timestamp: uhlc::Timestamp,
778) -> eyre::Result<()> {
779    let message = serde_json::to_vec(&Timestamped {
780        inner: DaemonCoordinatorEvent::Heartbeat,
781        timestamp,
782    })
783    .context("Could not serialize heartbeat message")?;
784
785    tcp_send(connection, &message)
786        .await
787        .wrap_err("failed to send heartbeat message to daemon")
788}
789
790struct RunningDataflow {
791    name: Option<String>,
792    uuid: Uuid,
793    /// The IDs of the daemons that the dataflow is running on.
794    daemons: BTreeSet<DaemonId>,
795    /// IDs of daemons that are waiting until all nodes are started.
796    pending_daemons: BTreeSet<DaemonId>,
797    exited_before_subscribe: Vec<NodeId>,
798    nodes: BTreeMap<NodeId, ResolvedNode>,
799
800    reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
801
802    log_subscribers: Vec<LogSubscriber>,
803}
804
805struct ArchivedDataflow {
806    name: Option<String>,
807    nodes: BTreeMap<NodeId, ResolvedNode>,
808}
809
810impl From<&RunningDataflow> for ArchivedDataflow {
811    fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
812        ArchivedDataflow {
813            name: dataflow.name.clone(),
814            nodes: dataflow.nodes.clone(),
815        }
816    }
817}
818
819impl PartialEq for RunningDataflow {
820    fn eq(&self, other: &Self) -> bool {
821        self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
822    }
823}
824
825impl Eq for RunningDataflow {}
826
827async fn stop_dataflow<'a>(
828    running_dataflows: &'a mut HashMap<Uuid, RunningDataflow>,
829    dataflow_uuid: Uuid,
830    daemon_connections: &mut DaemonConnections,
831    timestamp: uhlc::Timestamp,
832    grace_duration: Option<Duration>,
833) -> eyre::Result<&'a mut RunningDataflow> {
834    let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
835        bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
836    };
837
838    let message = serde_json::to_vec(&Timestamped {
839        inner: DaemonCoordinatorEvent::StopDataflow {
840            dataflow_id: dataflow_uuid,
841            grace_duration,
842        },
843        timestamp,
844    })?;
845
846    for daemon_id in &dataflow.daemons {
847        let daemon_connection = daemon_connections
848            .get_mut(daemon_id)
849            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
850        tcp_send(&mut daemon_connection.stream, &message)
851            .await
852            .wrap_err("failed to send stop message to daemon")?;
853
854        // wait for reply
855        let reply_raw = tcp_receive(&mut daemon_connection.stream)
856            .await
857            .wrap_err("failed to receive stop reply from daemon")?;
858        match serde_json::from_slice(&reply_raw)
859            .wrap_err("failed to deserialize stop reply from daemon")?
860        {
861            DaemonCoordinatorReply::StopResult(result) => result
862                .map_err(|e| eyre!(e))
863                .wrap_err("failed to stop dataflow")?,
864            other => bail!("unexpected reply after sending stop: {other:?}"),
865        }
866    }
867
868    tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
869
870    Ok(dataflow)
871}
872
873async fn reload_dataflow(
874    running_dataflows: &HashMap<Uuid, RunningDataflow>,
875    dataflow_id: Uuid,
876    node_id: NodeId,
877    operator_id: Option<OperatorId>,
878    daemon_connections: &mut DaemonConnections,
879    timestamp: uhlc::Timestamp,
880) -> eyre::Result<()> {
881    let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
882        bail!("No running dataflow found with UUID `{dataflow_id}`")
883    };
884    let message = serde_json::to_vec(&Timestamped {
885        inner: DaemonCoordinatorEvent::ReloadDataflow {
886            dataflow_id,
887            node_id,
888            operator_id,
889        },
890        timestamp,
891    })?;
892
893    for machine_id in &dataflow.daemons {
894        let daemon_connection = daemon_connections
895            .get_mut(machine_id)
896            .wrap_err("no daemon connection")?; // TODO: take from dataflow spec
897        tcp_send(&mut daemon_connection.stream, &message)
898            .await
899            .wrap_err("failed to send reload message to daemon")?;
900
901        // wait for reply
902        let reply_raw = tcp_receive(&mut daemon_connection.stream)
903            .await
904            .wrap_err("failed to receive reload reply from daemon")?;
905        match serde_json::from_slice(&reply_raw)
906            .wrap_err("failed to deserialize reload reply from daemon")?
907        {
908            DaemonCoordinatorReply::ReloadResult(result) => result
909                .map_err(|e| eyre!(e))
910                .wrap_err("failed to reload dataflow")?,
911            other => bail!("unexpected reply after sending reload: {other:?}"),
912        }
913    }
914    tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
915
916    Ok(())
917}
918
919async fn retrieve_logs(
920    running_dataflows: &HashMap<Uuid, RunningDataflow>,
921    archived_dataflows: &HashMap<Uuid, ArchivedDataflow>,
922    dataflow_id: Uuid,
923    node_id: NodeId,
924    daemon_connections: &mut DaemonConnections,
925    timestamp: uhlc::Timestamp,
926) -> eyre::Result<Vec<u8>> {
927    let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
928        dataflow.nodes.clone()
929    } else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
930        dataflow.nodes.clone()
931    } else {
932        bail!("No dataflow found with UUID `{dataflow_id}`")
933    };
934
935    let message = serde_json::to_vec(&Timestamped {
936        inner: DaemonCoordinatorEvent::Logs {
937            dataflow_id,
938            node_id: node_id.clone(),
939        },
940        timestamp,
941    })?;
942
943    let machine_ids: Vec<Option<String>> = nodes
944        .values()
945        .filter(|node| node.id == node_id)
946        .map(|node| node.deploy.machine.clone())
947        .collect();
948
949    let machine_id = if let [machine_id] = &machine_ids[..] {
950        machine_id
951    } else if machine_ids.is_empty() {
952        bail!("No machine contains {}/{}", dataflow_id, node_id)
953    } else {
954        bail!(
955            "More than one machine contains {}/{}. However, it should only be present on one.",
956            dataflow_id,
957            node_id
958        )
959    };
960
961    let daemon_ids: Vec<_> = match machine_id {
962        None => daemon_connections.unnamed().collect(),
963        Some(machine_id) => daemon_connections
964            .get_matching_daemon_id(machine_id)
965            .into_iter()
966            .collect(),
967    };
968    let daemon_id = match &daemon_ids[..] {
969        [id] => (*id).clone(),
970        [] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
971        _ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
972    };
973    let daemon_connection = daemon_connections
974        .get_mut(&daemon_id)
975        .wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?;
976    tcp_send(&mut daemon_connection.stream, &message)
977        .await
978        .wrap_err("failed to send logs message to daemon")?;
979
980    // wait for reply
981    let reply_raw = tcp_receive(&mut daemon_connection.stream)
982        .await
983        .wrap_err("failed to retrieve logs reply from daemon")?;
984    let reply_logs = match serde_json::from_slice(&reply_raw)
985        .wrap_err("failed to deserialize logs reply from daemon")?
986    {
987        DaemonCoordinatorReply::Logs(logs) => logs,
988        other => bail!("unexpected reply after sending logs: {other:?}"),
989    };
990    tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
991
992    reply_logs.map_err(|err| eyre!(err))
993}
994
995async fn start_dataflow(
996    dataflow: Descriptor,
997    working_dir: PathBuf,
998    name: Option<String>,
999    daemon_connections: &mut DaemonConnections,
1000    clock: &HLC,
1001    uv: bool,
1002) -> eyre::Result<RunningDataflow> {
1003    let SpawnedDataflow {
1004        uuid,
1005        daemons,
1006        nodes,
1007    } = spawn_dataflow(dataflow, working_dir, daemon_connections, clock, uv).await?;
1008    Ok(RunningDataflow {
1009        uuid,
1010        name,
1011        pending_daemons: if daemons.len() > 1 {
1012            daemons.clone()
1013        } else {
1014            BTreeSet::new()
1015        },
1016        exited_before_subscribe: Default::default(),
1017        daemons,
1018        nodes,
1019        reply_senders: Vec::new(),
1020        log_subscribers: Vec::new(),
1021    })
1022}
1023
1024async fn destroy_daemon(
1025    daemon_id: DaemonId,
1026    mut daemon_connection: DaemonConnection,
1027
1028    timestamp: uhlc::Timestamp,
1029) -> Result<()> {
1030    let message = serde_json::to_vec(&Timestamped {
1031        inner: DaemonCoordinatorEvent::Destroy,
1032        timestamp,
1033    })?;
1034
1035    tcp_send(&mut daemon_connection.stream, &message)
1036        .await
1037        .wrap_err(format!(
1038            "failed to send destroy message to daemon `{daemon_id}`"
1039        ))?;
1040
1041    // wait for reply
1042    let reply_raw = tcp_receive(&mut daemon_connection.stream)
1043        .await
1044        .wrap_err("failed to receive destroy reply from daemon")?;
1045    match serde_json::from_slice(&reply_raw)
1046        .wrap_err("failed to deserialize destroy reply from daemon")?
1047    {
1048        DaemonCoordinatorReply::DestroyResult { result, .. } => result
1049            .map_err(|e| eyre!(e))
1050            .wrap_err("failed to destroy dataflow")?,
1051        other => bail!("unexpected reply after sending `destroy`: {other:?}"),
1052    }
1053
1054    tracing::info!("successfully destroyed daemon `{daemon_id}`");
1055    Ok(())
1056}
1057
1058async fn destroy_daemons(
1059    daemon_connections: &mut DaemonConnections,
1060    timestamp: uhlc::Timestamp,
1061) -> eyre::Result<()> {
1062    let futures = daemon_connections
1063        .drain()
1064        .map(|(daemon_id, daemon_connection)| {
1065            destroy_daemon(daemon_id, daemon_connection, timestamp)
1066        })
1067        .collect::<Vec<_>>();
1068    let results: Vec<std::result::Result<(), eyre::Error>> =
1069        join_all(futures).await.into_iter().collect::<Vec<_>>();
1070    for result in results {
1071        result?;
1072    }
1073    Ok(())
1074}
1075
1076#[derive(Debug)]
1077pub enum Event {
1078    NewDaemonConnection(TcpStream),
1079    DaemonConnectError(eyre::Report),
1080    DaemonHeartbeat {
1081        daemon_id: DaemonId,
1082    },
1083    Dataflow {
1084        uuid: Uuid,
1085        event: DataflowEvent,
1086    },
1087    Control(ControlEvent),
1088    Daemon(DaemonRequest),
1089    DaemonHeartbeatInterval,
1090    CtrlC,
1091    Log(LogMessage),
1092    DaemonExit {
1093        daemon_id: dora_message::common::DaemonId,
1094    },
1095}
1096
1097impl Event {
1098    /// Whether this event should be logged.
1099    #[allow(clippy::match_like_matches_macro)]
1100    pub fn log(&self) -> bool {
1101        match self {
1102            Event::DaemonHeartbeatInterval => false,
1103            _ => true,
1104        }
1105    }
1106}
1107
1108#[derive(Debug)]
1109pub enum DataflowEvent {
1110    DataflowFinishedOnDaemon {
1111        daemon_id: DaemonId,
1112        result: DataflowDaemonResult,
1113    },
1114    ReadyOnDaemon {
1115        daemon_id: DaemonId,
1116        exited_before_subscribe: Vec<NodeId>,
1117    },
1118}
1119
1120#[derive(Debug)]
1121pub enum DaemonRequest {
1122    Register {
1123        version_check_result: Result<(), String>,
1124        machine_id: Option<String>,
1125        connection: TcpStream,
1126    },
1127}
1128
1129fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
1130    let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
1131
1132    let mut ctrlc_sent = false;
1133    ctrlc::set_handler(move || {
1134        if ctrlc_sent {
1135            tracing::warn!("received second ctrlc signal -> aborting immediately");
1136            std::process::abort();
1137        } else {
1138            tracing::info!("received ctrlc signal");
1139            if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
1140                tracing::error!("failed to report ctrl-c event to dora-coordinator");
1141            }
1142
1143            ctrlc_sent = true;
1144        }
1145    })
1146    .wrap_err("failed to set ctrl-c handler")?;
1147
1148    Ok(ReceiverStream::new(ctrlc_rx))
1149}