dora_coordinator/
control.rs

1use crate::{
2    Event,
3    tcp_utils::{tcp_receive, tcp_send},
4};
5use dora_message::{
6    BuildId, cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply,
7};
8use eyre::{Context, eyre};
9use futures::{
10    FutureExt, Stream, StreamExt,
11    future::{self, Either},
12    stream::FuturesUnordered,
13};
14use futures_concurrency::future::Race;
15use std::{io::ErrorKind, net::SocketAddr};
16use tokio::{
17    net::{TcpListener, TcpStream},
18    sync::{mpsc, oneshot},
19    task::JoinHandle,
20};
21use tokio_stream::wrappers::ReceiverStream;
22use uuid::Uuid;
23
24pub(crate) async fn control_events(
25    control_listen_addr: SocketAddr,
26    tasks: &FuturesUnordered<JoinHandle<()>>,
27) -> eyre::Result<impl Stream<Item = Event> + use<>> {
28    let (tx, rx) = mpsc::channel(10);
29
30    let (finish_tx, mut finish_rx) = mpsc::channel(1);
31    tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
32    tasks.push(tokio::spawn(async move {
33        while let Some(()) = finish_rx.recv().await {}
34    }));
35
36    Ok(ReceiverStream::new(rx).map(Event::Control))
37}
38
39async fn listen(
40    control_listen_addr: SocketAddr,
41    tx: mpsc::Sender<ControlEvent>,
42    _finish_tx: mpsc::Sender<()>,
43) {
44    let result = TcpListener::bind(control_listen_addr)
45        .await
46        .wrap_err("failed to listen for control messages");
47    let incoming = match result {
48        Ok(incoming) => incoming,
49        Err(err) => {
50            let _ = tx.send(err.into()).await;
51            return;
52        }
53    };
54
55    loop {
56        let new_connection = incoming.accept().map(Either::Left);
57        let coordinator_stop = tx.closed().map(Either::Right);
58        let connection = match (new_connection, coordinator_stop).race().await {
59            future::Either::Left(connection) => connection,
60            future::Either::Right(()) => {
61                // coordinator was stopped
62                break;
63            }
64        };
65        match connection.wrap_err("failed to connect") {
66            Ok((connection, _)) => {
67                let tx = tx.clone();
68                tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
69            }
70            Err(err) => {
71                if tx.blocking_send(err.into()).is_err() {
72                    break;
73                }
74            }
75        }
76    }
77}
78
79async fn handle_requests(
80    mut connection: TcpStream,
81    tx: mpsc::Sender<ControlEvent>,
82    _finish_tx: mpsc::Sender<()>,
83) {
84    let peer_addr = connection.peer_addr().ok();
85    loop {
86        let next_request = tcp_receive(&mut connection).map(Either::Left);
87        let coordinator_stopped = tx.closed().map(Either::Right);
88        let raw = match (next_request, coordinator_stopped).race().await {
89            Either::Right(()) => break,
90            Either::Left(request) => match request {
91                Ok(message) => message,
92                Err(err) => match err.kind() {
93                    ErrorKind::UnexpectedEof => {
94                        tracing::trace!("Control connection closed");
95                        break;
96                    }
97                    err => {
98                        let err = eyre!(err).wrap_err("failed to receive incoming message");
99                        tracing::error!("{err}");
100                        break;
101                    }
102                },
103            },
104        };
105
106        let request =
107            serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");
108
109        if let Ok(ControlRequest::LogSubscribe { dataflow_id, level }) = request {
110            let _ = tx
111                .send(ControlEvent::LogSubscribe {
112                    dataflow_id,
113                    level,
114                    connection,
115                })
116                .await;
117            break;
118        }
119
120        if let Ok(ControlRequest::BuildLogSubscribe { build_id, level }) = request {
121            let _ = tx
122                .send(ControlEvent::BuildLogSubscribe {
123                    build_id,
124                    level,
125                    connection,
126                })
127                .await;
128            break;
129        }
130
131        let mut result = match request {
132            Ok(request) => handle_request(request, &tx).await,
133            Err(err) => Err(err),
134        };
135
136        if let Ok(ControlRequestReply::CliAndDefaultDaemonIps { cli, .. }) = &mut result {
137            if cli.is_none() {
138                // fill cli IP address in reply
139                *cli = peer_addr.map(|s| s.ip());
140            }
141        }
142
143        let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}")));
144        let serialized: Vec<u8> =
145            match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {
146                Ok(s) => s,
147                Err(err) => {
148                    tracing::error!("{err:?}");
149                    break;
150                }
151            };
152        match tcp_send(&mut connection, &serialized).await {
153            Ok(()) => {}
154            Err(err) => match err.kind() {
155                ErrorKind::UnexpectedEof => {
156                    tracing::debug!("Control connection closed while trying to send reply");
157                    break;
158                }
159                err => {
160                    let err = eyre!(err).wrap_err("failed to send reply");
161                    tracing::error!("{err}");
162                    break;
163                }
164            },
165        }
166
167        if matches!(reply, ControlRequestReply::CoordinatorStopped) {
168            break;
169        }
170    }
171}
172
173async fn handle_request(
174    request: ControlRequest,
175    tx: &mpsc::Sender<ControlEvent>,
176) -> eyre::Result<ControlRequestReply> {
177    let (reply_tx, reply_rx) = oneshot::channel();
178    let event = ControlEvent::IncomingRequest {
179        request: request.clone(),
180        reply_sender: reply_tx,
181    };
182
183    if tx.send(event).await.is_err() {
184        return Ok(ControlRequestReply::CoordinatorStopped);
185    }
186
187    reply_rx
188        .await
189        .wrap_err_with(|| format!("no coordinator reply to {request:?}"))?
190}
191
192#[derive(Debug)]
193pub enum ControlEvent {
194    IncomingRequest {
195        request: ControlRequest,
196        reply_sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
197    },
198    LogSubscribe {
199        dataflow_id: Uuid,
200        level: log::LevelFilter,
201        connection: TcpStream,
202    },
203    BuildLogSubscribe {
204        build_id: BuildId,
205        level: log::LevelFilter,
206        connection: TcpStream,
207    },
208    Error(eyre::Report),
209}
210
211impl From<eyre::Report> for ControlEvent {
212    fn from(err: eyre::Report) -> Self {
213        ControlEvent::Error(err)
214    }
215}