dora-coordinator 0.5.0

`dora` goal is to be a low latency, composable, and distributed data flow.
Documentation
use crate::{
    Event,
    tcp_utils::{tcp_receive, tcp_send},
};
use dora_message::{BuildId, cli_to_coordinator::LegacyControlRequest};
use eyre::{Context, eyre};
use futures::{
    FutureExt, Stream, StreamExt,
    future::{self, Either},
    stream::FuturesUnordered,
};
use futures_concurrency::future::Race;
use std::{io::ErrorKind, net::SocketAddr};
use tokio::{
    net::{TcpListener, TcpStream},
    sync::mpsc,
    task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

pub(crate) async fn control_events(
    control_listen_addr: SocketAddr,
    tasks: &FuturesUnordered<JoinHandle<()>>,
) -> eyre::Result<impl Stream<Item = Event> + use<>> {
    let (tx, rx) = mpsc::channel(10);

    let (finish_tx, mut finish_rx) = mpsc::channel(1);
    tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
    tasks.push(tokio::spawn(async move {
        while let Some(()) = finish_rx.recv().await {}
    }));

    Ok(ReceiverStream::new(rx).map(Event::Control))
}

async fn listen(
    control_listen_addr: SocketAddr,
    tx: mpsc::Sender<ControlEvent>,
    _finish_tx: mpsc::Sender<()>,
) {
    let result = TcpListener::bind(control_listen_addr)
        .await
        .wrap_err("failed to listen for control messages");
    let incoming = match result {
        Ok(incoming) => incoming,
        Err(err) => {
            let _ = tx.send(err.into()).await;
            return;
        }
    };

    loop {
        let new_connection = incoming.accept().map(Either::Left);
        let coordinator_stop = tx.closed().map(Either::Right);
        let connection = match (new_connection, coordinator_stop).race().await {
            future::Either::Left(connection) => connection,
            future::Either::Right(()) => {
                // coordinator was stopped
                break;
            }
        };
        match connection.wrap_err("failed to connect") {
            Ok((connection, _)) => {
                let tx = tx.clone();
                tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
            }
            Err(err) => {
                if tx.blocking_send(err.into()).is_err() {
                    break;
                }
            }
        }
    }
}

async fn handle_requests(
    mut connection: TcpStream,
    tx: mpsc::Sender<ControlEvent>,
    _finish_tx: mpsc::Sender<()>,
) {
    let _peer_addr = connection.peer_addr().ok();

    let next_request = tcp_receive(&mut connection).map(Either::Left);
    let coordinator_stopped = tx.closed().map(Either::Right);
    let raw = match (next_request, coordinator_stopped).race().await {
        Either::Right(()) => return,
        Either::Left(request) => match request {
            Ok(message) => message,
            Err(err) => match err.kind() {
                ErrorKind::UnexpectedEof => {
                    tracing::trace!("Control connection closed");
                    return;
                }
                err => {
                    let err = eyre!(err).wrap_err("failed to receive incoming message");
                    tracing::error!("{err}");
                    return;
                }
            },
        },
    };

    let request = serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");

    match request {
        Ok(request) => match request {
            LegacyControlRequest::LogSubscribe { dataflow_id, level } => {
                let _ = tx
                    .send(ControlEvent::LogSubscribe {
                        dataflow_id,
                        level,
                        connection,
                    })
                    .await;
            }
            LegacyControlRequest::BuildLogSubscribe { build_id, level } => {
                let _ = tx
                    .send(ControlEvent::BuildLogSubscribe {
                        build_id,
                        level,
                        connection,
                    })
                    .await;
            }
        },
        Err(err) => {
            tracing::warn!("failed to parse incoming control message: {:#?}", err);
            let reply = format!("failed to parse message: {err:?}");
            let serialized: Vec<u8> =
                match serde_json::to_vec(&reply).wrap_err("failed to serialize message") {
                    Ok(s) => s,
                    Err(err) => {
                        tracing::error!("{err:?}");
                        return;
                    }
                };
            match tcp_send(&mut connection, &serialized).await {
                Ok(()) => {}
                Err(err) => match err.kind() {
                    ErrorKind::UnexpectedEof => {
                        tracing::debug!("Control connection closed while trying to send reply");
                    }
                    err => {
                        let err = eyre!(err).wrap_err("failed to send reply");
                        tracing::error!("{err}");
                    }
                },
            };
        }
    }
}

#[derive(Debug)]
pub enum ControlEvent {
    LogSubscribe {
        dataflow_id: Uuid,
        level: log::LevelFilter,
        connection: TcpStream,
    },
    BuildLogSubscribe {
        build_id: BuildId,
        level: log::LevelFilter,
        connection: TcpStream,
    },
    Error(eyre::Report),
}

impl From<eyre::Report> for ControlEvent {
    fn from(err: eyre::Report) -> Self {
        ControlEvent::Error(err)
    }
}