Skip to main content

dora_coordinator/
control.rs

1use crate::{
2    Event,
3    tcp_utils::{tcp_receive, tcp_send},
4};
5use dora_message::{BuildId, cli_to_coordinator::LegacyControlRequest};
6use eyre::{Context, eyre};
7use futures::{
8    FutureExt, Stream, StreamExt,
9    future::{self, Either},
10    stream::FuturesUnordered,
11};
12use futures_concurrency::future::Race;
13use std::{io::ErrorKind, net::SocketAddr};
14use tokio::{
15    net::{TcpListener, TcpStream},
16    sync::mpsc,
17    task::JoinHandle,
18};
19use tokio_stream::wrappers::ReceiverStream;
20use uuid::Uuid;
21
22pub(crate) async fn control_events(
23    control_listen_addr: SocketAddr,
24    tasks: &FuturesUnordered<JoinHandle<()>>,
25) -> eyre::Result<impl Stream<Item = Event> + use<>> {
26    let (tx, rx) = mpsc::channel(10);
27
28    let (finish_tx, mut finish_rx) = mpsc::channel(1);
29    tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
30    tasks.push(tokio::spawn(async move {
31        while let Some(()) = finish_rx.recv().await {}
32    }));
33
34    Ok(ReceiverStream::new(rx).map(Event::Control))
35}
36
37async fn listen(
38    control_listen_addr: SocketAddr,
39    tx: mpsc::Sender<ControlEvent>,
40    _finish_tx: mpsc::Sender<()>,
41) {
42    let result = TcpListener::bind(control_listen_addr)
43        .await
44        .wrap_err("failed to listen for control messages");
45    let incoming = match result {
46        Ok(incoming) => incoming,
47        Err(err) => {
48            let _ = tx.send(err.into()).await;
49            return;
50        }
51    };
52
53    loop {
54        let new_connection = incoming.accept().map(Either::Left);
55        let coordinator_stop = tx.closed().map(Either::Right);
56        let connection = match (new_connection, coordinator_stop).race().await {
57            future::Either::Left(connection) => connection,
58            future::Either::Right(()) => {
59                // coordinator was stopped
60                break;
61            }
62        };
63        match connection.wrap_err("failed to connect") {
64            Ok((connection, _)) => {
65                let tx = tx.clone();
66                tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
67            }
68            Err(err) => {
69                if tx.blocking_send(err.into()).is_err() {
70                    break;
71                }
72            }
73        }
74    }
75}
76
77async fn handle_requests(
78    mut connection: TcpStream,
79    tx: mpsc::Sender<ControlEvent>,
80    _finish_tx: mpsc::Sender<()>,
81) {
82    let _peer_addr = connection.peer_addr().ok();
83
84    let next_request = tcp_receive(&mut connection).map(Either::Left);
85    let coordinator_stopped = tx.closed().map(Either::Right);
86    let raw = match (next_request, coordinator_stopped).race().await {
87        Either::Right(()) => return,
88        Either::Left(request) => match request {
89            Ok(message) => message,
90            Err(err) => match err.kind() {
91                ErrorKind::UnexpectedEof => {
92                    tracing::trace!("Control connection closed");
93                    return;
94                }
95                err => {
96                    let err = eyre!(err).wrap_err("failed to receive incoming message");
97                    tracing::error!("{err}");
98                    return;
99                }
100            },
101        },
102    };
103
104    let request = serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");
105
106    match request {
107        Ok(request) => match request {
108            LegacyControlRequest::LogSubscribe { dataflow_id, level } => {
109                let _ = tx
110                    .send(ControlEvent::LogSubscribe {
111                        dataflow_id,
112                        level,
113                        connection,
114                    })
115                    .await;
116            }
117            LegacyControlRequest::BuildLogSubscribe { build_id, level } => {
118                let _ = tx
119                    .send(ControlEvent::BuildLogSubscribe {
120                        build_id,
121                        level,
122                        connection,
123                    })
124                    .await;
125            }
126        },
127        Err(err) => {
128            tracing::warn!("failed to parse incoming control message: {:#?}", err);
129            let reply = format!("failed to parse message: {err:?}");
130            let serialized: Vec<u8> =
131                match serde_json::to_vec(&reply).wrap_err("failed to serialize message") {
132                    Ok(s) => s,
133                    Err(err) => {
134                        tracing::error!("{err:?}");
135                        return;
136                    }
137                };
138            match tcp_send(&mut connection, &serialized).await {
139                Ok(()) => {}
140                Err(err) => match err.kind() {
141                    ErrorKind::UnexpectedEof => {
142                        tracing::debug!("Control connection closed while trying to send reply");
143                    }
144                    err => {
145                        let err = eyre!(err).wrap_err("failed to send reply");
146                        tracing::error!("{err}");
147                    }
148                },
149            };
150        }
151    }
152}
153
154#[derive(Debug)]
155pub enum ControlEvent {
156    LogSubscribe {
157        dataflow_id: Uuid,
158        level: log::LevelFilter,
159        connection: TcpStream,
160    },
161    BuildLogSubscribe {
162        build_id: BuildId,
163        level: log::LevelFilter,
164        connection: TcpStream,
165    },
166    Error(eyre::Report),
167}
168
169impl From<eyre::Report> for ControlEvent {
170    fn from(err: eyre::Report) -> Self {
171        ControlEvent::Error(err)
172    }
173}