Crate rove

Source
Expand description

System for quality control of meteorological data.

Provides a modular system for scheduling QC tests on data, and marshalling data and metadata into these tests. It can be used as a standalone gRPC service, or a component within another service (say, a data ingestor).

As a standalone service:

use rove::{
    start_server,
    data_switch::{DataSwitch, DataConnector},
    dev_utils::{TestDataSource, construct_hardcoded_pipeline},
};
use std::collections::HashMap;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let data_switch = DataSwitch::new(HashMap::from([(
        String::from("test"),
        Box::new(TestDataSource{
            data_len_single: 3,
            data_len_series: 1000,
            data_len_spatial: 1000,
        }) as Box<dyn DataConnector + Send>
    )]));

    start_server(
        "[::1]:1337".parse()?,
        data_switch,
        construct_hardcoded_pipeline(),
    )
    .await
}

As a component:

use rove::{
    Scheduler,
    data_switch::{DataSwitch, DataConnector, Timestamp, Timerange, TimeSpec, SpaceSpec},
    dev_utils::{TestDataSource, construct_hardcoded_pipeline},
};
use std::collections::HashMap;
use chrono::{Utc, TimeZone};
use chronoutil::RelativeDuration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let data_switch = DataSwitch::new(HashMap::from([(
        String::from("test"),
        Box::new(TestDataSource{
            data_len_single: 3,
            data_len_series: 1000,
            data_len_spatial: 1000,
        }) as Box<dyn DataConnector + Send>
    )]));

    let rove_scheduler = Scheduler::new(construct_hardcoded_pipeline(), data_switch);

    let response = rove_scheduler.validate_direct(
        "my_data_source",
        &vec!["my_backing_source"],
        &TimeSpec::new(
            Timestamp(
                Utc.with_ymd_and_hms(2023, 6, 26, 12, 0, 0)
                    .unwrap()
                    .timestamp(),
            ),
            Timestamp(
                Utc.with_ymd_and_hms(2023, 6, 26, 14, 0, 0)
                    .unwrap()
                    .timestamp(),
            ),
            RelativeDuration::minutes(5),
        ),
        &SpaceSpec::One(String::from("station_id")),
        "TA_PT1H",
        None,
    ).await?;

    for result in response {
        println!("check: {}", result.check);
        println!("flags: {:?}", result.results);
    }

    Ok(())
}

Re-exports§

pub use pipeline::load_pipelines;
pub use pipeline::Pipeline;
pub use scheduler::Scheduler;

Modules§

data_switch
Utilities for creating and using DataConnectors
pipeline
Definitions and utilities for deserialising QC pipelines
scheduler
Utilities for scheduling QC checks

Enums§

Flag
Flag indicating result of a QC test for a given data point

Functions§

start_server
Starts up a gRPC server to process QC run requests