rove 0.3.0

System for real time spatial and timeseries quality control of weather data
Documentation
use core::future::Future;
use hyper_util::rt::TokioIo;
use pb::{rove_client::RoveClient, validate_request::SpaceSpec, Flag, ValidateRequest};
use rove::{
    data_switch::{DataConnector, DataSwitch},
    dev_utils::{construct_hardcoded_pipeline, TestDataSource},
    start_server_unix_listener, Pipeline,
};
use std::{collections::HashMap, sync::Arc};
use tempfile::NamedTempFile;
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Channel, Endpoint};
use tower::service_fn;

mod pb {
    tonic::include_proto!("rove");
}

const DATA_LEN_SINGLE: usize = 3;
const DATA_LEN_SPATIAL: usize = 1000;

pub async fn set_up_rove(
    data_switch: DataSwitch,
    pipelines: HashMap<String, Pipeline>,
) -> (impl Future<Output = ()>, RoveClient<Channel>) {
    let coordinator_socket = NamedTempFile::new().unwrap();
    let coordinator_socket = Arc::new(coordinator_socket.into_temp_path());
    std::fs::remove_file(&*coordinator_socket).unwrap();
    let coordinator_uds = UnixListener::bind(&*coordinator_socket).unwrap();
    let coordinator_stream = UnixListenerStream::new(coordinator_uds);
    let coordinator_future = async {
        start_server_unix_listener(coordinator_stream, data_switch, pipelines)
            .await
            .unwrap();
    };

    let coordinator_channel =
        Endpoint::try_from("http://any.url")
            .unwrap()
            .connect_with_connector(service_fn(move |_: tonic::transport::Uri| {
                let socket = Arc::clone(&coordinator_socket);
                async move {
                    Ok::<_, std::io::Error>(TokioIo::new(UnixStream::connect(&*socket).await?))
                }
            }))
            .await
            .unwrap();
    let client = RoveClient::new(coordinator_channel);

    (coordinator_future, client)
}

// TODO: we should probably just use one of the sample pipelines here once we have the checks
// from olympian working
#[tokio::test]
async fn integration_test_hardcoded_pipeline() {
    let data_switch = DataSwitch::new(HashMap::from([(
        String::from("test"),
        Box::new(TestDataSource {
            data_len_single: DATA_LEN_SINGLE,
            data_len_series: 1,
            data_len_spatial: DATA_LEN_SPATIAL,
        }) as Box<dyn DataConnector + Send>,
    )]));

    let (coordinator_future, mut client) =
        set_up_rove(data_switch, construct_hardcoded_pipeline()).await;

    let requests_future = async {
        let response = client
            .validate(ValidateRequest {
                data_source: String::from("test"),
                backing_sources: vec![],
                start_time: Some(prost_types::Timestamp::default()),
                end_time: Some(prost_types::Timestamp::default()),
                time_resolution: String::from("PT5M"),
                space_spec: Some(SpaceSpec::All(())),
                pipeline: String::from("hardcoded"),
                extra_spec: None,
            })
            .await
            .unwrap()
            .into_inner();

        assert_eq!(response.results.len(), 4);
        assert!(response.results.iter().any(|c| c.check == "step_check"));
        assert!(response.results.iter().any(|c| c.check == "spike_check"));
        assert!(response.results.iter().any(|c| c.check == "buddy_check"));
        assert!(response.results.iter().any(|c| c.check == "sct"));
        println!("{:?}", response);
        assert!(response.results.iter().all(|c| {
            let flags: Vec<i32> = c
                .flag_series
                .iter()
                .map(|fs| *fs.flags.first().unwrap())
                .collect();
            flags == vec![Flag::Pass as i32; DATA_LEN_SPATIAL]
                || flags == vec![Flag::Isolated as i32; DATA_LEN_SPATIAL]
        }))
    };

    tokio::select! {
        _ = coordinator_future => panic!("coordinator returned first"),
        _ = requests_future => (),
    }
}