dora_coordinator/
control.rs

1use crate::{
2    tcp_utils::{tcp_receive, tcp_send},
3    Event,
4};
5use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
6use eyre::{eyre, Context};
7use futures::{
8    future::{self, Either},
9    stream::FuturesUnordered,
10    FutureExt, Stream, StreamExt,
11};
12use futures_concurrency::future::Race;
13use std::{io::ErrorKind, net::SocketAddr};
14use tokio::{
15    net::{TcpListener, TcpStream},
16    sync::{mpsc, oneshot},
17    task::JoinHandle,
18};
19use tokio_stream::wrappers::ReceiverStream;
20use uuid::Uuid;
21
22pub(crate) async fn control_events(
23    control_listen_addr: SocketAddr,
24    tasks: &FuturesUnordered<JoinHandle<()>>,
25) -> eyre::Result<impl Stream<Item = Event>> {
26    let (tx, rx) = mpsc::channel(10);
27
28    let (finish_tx, mut finish_rx) = mpsc::channel(1);
29    tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
30    tasks.push(tokio::spawn(async move {
31        while let Some(()) = finish_rx.recv().await {}
32    }));
33
34    Ok(ReceiverStream::new(rx).map(Event::Control))
35}
36
37async fn listen(
38    control_listen_addr: SocketAddr,
39    tx: mpsc::Sender<ControlEvent>,
40    _finish_tx: mpsc::Sender<()>,
41) {
42    let result = TcpListener::bind(control_listen_addr)
43        .await
44        .wrap_err("failed to listen for control messages");
45    let incoming = match result {
46        Ok(incoming) => incoming,
47        Err(err) => {
48            let _ = tx.send(err.into()).await;
49            return;
50        }
51    };
52
53    loop {
54        let new_connection = incoming.accept().map(Either::Left);
55        let coordinator_stop = tx.closed().map(Either::Right);
56        let connection = match (new_connection, coordinator_stop).race().await {
57            future::Either::Left(connection) => connection,
58            future::Either::Right(()) => {
59                // coordinator was stopped
60                break;
61            }
62        };
63        match connection.wrap_err("failed to connect") {
64            Ok((connection, _)) => {
65                let tx = tx.clone();
66                tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
67            }
68            Err(err) => {
69                if tx.blocking_send(err.into()).is_err() {
70                    break;
71                }
72            }
73        }
74    }
75}
76
77async fn handle_requests(
78    mut connection: TcpStream,
79    tx: mpsc::Sender<ControlEvent>,
80    _finish_tx: mpsc::Sender<()>,
81) {
82    loop {
83        let next_request = tcp_receive(&mut connection).map(Either::Left);
84        let coordinator_stopped = tx.closed().map(Either::Right);
85        let raw = match (next_request, coordinator_stopped).race().await {
86            Either::Right(()) => break,
87            Either::Left(request) => match request {
88                Ok(message) => message,
89                Err(err) => match err.kind() {
90                    ErrorKind::UnexpectedEof => {
91                        tracing::trace!("Control connection closed");
92                        break;
93                    }
94                    err => {
95                        let err = eyre!(err).wrap_err("failed to receive incoming message");
96                        tracing::error!("{err}");
97                        break;
98                    }
99                },
100            },
101        };
102
103        let request =
104            serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");
105
106        if let Ok(ControlRequest::LogSubscribe { dataflow_id, level }) = request {
107            let _ = tx
108                .send(ControlEvent::LogSubscribe {
109                    dataflow_id,
110                    level,
111                    connection,
112                })
113                .await;
114            break;
115        }
116
117        let result = match request {
118            Ok(request) => handle_request(request, &tx).await,
119            Err(err) => Err(err),
120        };
121
122        let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}")));
123        let serialized: Vec<u8> =
124            match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {
125                Ok(s) => s,
126                Err(err) => {
127                    tracing::error!("{err:?}");
128                    break;
129                }
130            };
131        match tcp_send(&mut connection, &serialized).await {
132            Ok(()) => {}
133            Err(err) => match err.kind() {
134                ErrorKind::UnexpectedEof => {
135                    tracing::debug!("Control connection closed while trying to send reply");
136                    break;
137                }
138                err => {
139                    let err = eyre!(err).wrap_err("failed to send reply");
140                    tracing::error!("{err}");
141                    break;
142                }
143            },
144        }
145
146        if matches!(reply, ControlRequestReply::CoordinatorStopped) {
147            break;
148        }
149    }
150}
151
152async fn handle_request(
153    request: ControlRequest,
154    tx: &mpsc::Sender<ControlEvent>,
155) -> eyre::Result<ControlRequestReply> {
156    let (reply_tx, reply_rx) = oneshot::channel();
157    let event = ControlEvent::IncomingRequest {
158        request,
159        reply_sender: reply_tx,
160    };
161
162    if tx.send(event).await.is_err() {
163        return Ok(ControlRequestReply::CoordinatorStopped);
164    }
165
166    reply_rx
167        .await
168        .unwrap_or(Ok(ControlRequestReply::CoordinatorStopped))
169}
170
171#[derive(Debug)]
172pub enum ControlEvent {
173    IncomingRequest {
174        request: ControlRequest,
175        reply_sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
176    },
177    LogSubscribe {
178        dataflow_id: Uuid,
179        level: log::LevelFilter,
180        connection: TcpStream,
181    },
182    Error(eyre::Report),
183}
184
185impl From<eyre::Report> for ControlEvent {
186    fn from(err: eyre::Report) -> Self {
187        ControlEvent::Error(err)
188    }
189}