1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
//! Data processing units.
//!
//! RTRTR provides the means for flexible data processing through
//! interconnected entities called _units._ Each unit produces a constantly
//! updated data set. Other units can subscribe to updates from these sets.
//! Alternatively, they can produce their own data set from external input.
//! Different types of units exist that perform different tasks. They can
//! all be plugged together all kinds of ways.
//!
//! This module contains all the units currently available. It provides
//! access to them via a grand enum `Unit` that contains all unit types as
//! variants.
//!
//! Units can be created from configuration via serde deserialization. They
//! are started by spawning them into an async runtime and then just keep
//! running there.

//------------ Sub-modules ---------------------------------------------------
//
// These contain all the actual unit types grouped by shared functionality.
mod combine;
mod json;
mod rtr;
mod slurm;

//------------ Unit ----------------------------------------------------------

use serde::Deserialize;
use crate::comms::Gate;
use crate::manager::Component;

/// The fundamental entity for data processing.
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
pub enum Unit {
    #[serde(rename = "any")]
    Any(combine::Any),

    #[serde(rename = "rtr")]
    RtrTcp(rtr::Tcp),

    #[serde(rename = "rtr-tls")]
    RtrTls(rtr::Tls),

    #[serde(rename = "json")]
    Json(json::Json),

    #[serde(rename = "slurm")]
    Slurm(slurm::LocalExceptions),
}

impl Unit {
    pub async fn run(
        self, component: Component, gate: Gate
    )  {
        let _ = match self {
            Unit::Any(unit) => unit.run(component, gate).await,
            Unit::RtrTcp(unit) => unit.run(component, gate).await,
            Unit::RtrTls(unit) => unit.run(component, gate).await,
            Unit::Json(unit) => unit.run(component, gate).await,
            Unit::Slurm(unit) => unit.run(component, gate).await,
        };
    }
}