Crate rove

source ·
Expand description

System for quality control of meteorological data.

Provides a modular system for scheduling QC tests on data, and marshalling data and metadata into these tests. It can be used as a standalone gRPC service, or a component within another service (say, a data ingestor).

As a standalone service:

use rove::{
    start_server,
    data_switch::{DataSwitch, DataConnector},
    dev_utils::{TestDataSource, construct_hardcoded_dag},
};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let data_switch = DataSwitch::new(HashMap::from([
        ("test", &TestDataSource{
            data_len_single: 3,
            data_len_series: 1000,
            data_len_spatial: 1000,
        } as &dyn DataConnector),
    ]));

    start_server(
        "[::1]:1337".parse()?,
        data_switch,
        construct_hardcoded_dag(),
    )
    .await
}

As a component:

use rove::{
    Scheduler,
    data_switch::{DataSwitch, DataConnector, Timestamp, Timerange},
    dev_utils::{TestDataSource, construct_hardcoded_dag},
};
use std::collections::HashMap;
use chrono::{Utc, TimeZone};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let data_switch = DataSwitch::new(HashMap::from([
        ("test", &TestDataSource{
            data_len_single: 3,
            data_len_series: 1000,
            data_len_spatial: 1000,
        } as &dyn DataConnector),
    ]));

    let rove_scheduler = Scheduler::new(construct_hardcoded_dag(), data_switch);

    let mut rx = rove_scheduler.validate_series_direct(
        "test:single",
        &["dip_check", "step_check"],
        Timerange{
            start: Timestamp(
                Utc.with_ymd_and_hms(2023, 6, 26, 12, 0, 0)
                    .unwrap()
                    .timestamp(),
            ),
            end: Timestamp(
                Utc.with_ymd_and_hms(2023, 6, 26, 14, 0, 0)
                    .unwrap()
                    .timestamp(),
            ),
        },
    ).await?;

    while let Some(response) = rx.recv().await {
        match response {
            Ok(inner) => {
                println!("\ntest name: {}\n", inner.test);
                for result in inner.results {
                    println!("timestamp: {}", result.time.unwrap().seconds);
                    println!("flag: {}", result.flag);
                }
            }
            Err(e) => println!("uh oh, got an error: {}", e),
        }
    }

    Ok(())
}

Modules

Structs

Functions

  • Starts up a gRPC server to process QC run requests