1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use crate::error::VegaFusionError;
use crate::proto::gen::tasks::Variable;
use crate::proto::gen::transforms::transform::TransformKind;
use crate::proto::gen::transforms::{
    Aggregate, Bin, Collect, Extent, Filter, Fold, Formula, Identifier, Impute, Pivot, Project,
    Sequence, Stack, TimeUnit,
};
use crate::proto::gen::transforms::{JoinAggregate, Transform, Window};
use crate::spec::transform::TransformSpec;
use crate::task_graph::task::InputVariable;
use std::convert::TryFrom;

pub mod aggregate;
pub mod bin;
pub mod collect;
pub mod extent;
pub mod filter;
pub mod fold;
pub mod formula;
pub mod identifier;
pub mod impute;
pub mod joinaggregate;
pub mod pipeline;
pub mod pivot;
pub mod project;
pub mod sequence;
pub mod stack;
pub mod timeunit;
pub mod window;

impl TryFrom<&TransformSpec> for TransformKind {
    type Error = VegaFusionError;

    fn try_from(value: &TransformSpec) -> std::result::Result<Self, Self::Error> {
        Ok(match value {
            TransformSpec::Extent(tx_spec) => Self::Extent(Extent::new(tx_spec)),
            TransformSpec::Filter(tx_spec) => Self::Filter(Filter::try_new(tx_spec)?),
            TransformSpec::Formula(tx_spec) => Self::Formula(Formula::try_new(tx_spec)?),
            TransformSpec::Bin(tx_spec) => Self::Bin(Bin::try_new(tx_spec)?),
            TransformSpec::Aggregate(tx_spec) => Self::Aggregate(Aggregate::new(tx_spec)),
            TransformSpec::Collect(tx_spec) => Self::Collect(Collect::try_new(tx_spec)?),
            TransformSpec::Timeunit(tx_spec) => Self::Timeunit(TimeUnit::try_new(tx_spec)?),
            TransformSpec::JoinAggregate(tx_spec) => {
                Self::Joinaggregate(JoinAggregate::new(tx_spec))
            }
            TransformSpec::Window(tx_spec) => Self::Window(Window::try_new(tx_spec)?),
            TransformSpec::Project(tx_spec) => Self::Project(Project::try_new(tx_spec)?),
            TransformSpec::Stack(tx_spec) => Self::Stack(Stack::try_new(tx_spec)?),
            TransformSpec::Impute(tx_spec) => Self::Impute(Impute::try_new(tx_spec)?),
            TransformSpec::Pivot(tx_spec) => Self::Pivot(Pivot::try_new(tx_spec)?),
            TransformSpec::Identifier(tx_spec) => Self::Identifier(Identifier::try_new(tx_spec)?),
            TransformSpec::Fold(tx_spec) => Self::Fold(Fold::try_new(tx_spec)?),
            TransformSpec::Sequence(tx_spec) => Self::Sequence(Sequence::try_new(tx_spec)?),
            _ => {
                return Err(VegaFusionError::parse(format!(
                    "Unsupported transform: {value:?}"
                )))
            }
        })
    }
}

impl TryFrom<&TransformSpec> for Transform {
    type Error = VegaFusionError;

    fn try_from(value: &TransformSpec) -> Result<Self, Self::Error> {
        Ok(Self {
            transform_kind: Some(TransformKind::try_from(value)?),
        })
    }
}

impl TransformKind {
    pub fn as_dependencies_trait(&self) -> &dyn TransformDependencies {
        match self {
            TransformKind::Filter(tx) => tx,
            TransformKind::Extent(tx) => tx,
            TransformKind::Formula(tx) => tx,
            TransformKind::Bin(tx) => tx,
            TransformKind::Aggregate(tx) => tx,
            TransformKind::Collect(tx) => tx,
            TransformKind::Timeunit(tx) => tx,
            TransformKind::Joinaggregate(tx) => tx,
            TransformKind::Window(tx) => tx,
            TransformKind::Project(tx) => tx,
            TransformKind::Stack(tx) => tx,
            TransformKind::Impute(tx) => tx,
            TransformKind::Pivot(tx) => tx,
            TransformKind::Identifier(tx) => tx,
            TransformKind::Fold(tx) => tx,
            TransformKind::Sequence(tx) => tx,
        }
    }
}

impl Transform {
    pub fn transform_kind(&self) -> &TransformKind {
        self.transform_kind.as_ref().unwrap()
    }
}

pub trait TransformDependencies: Send + Sync {
    fn input_vars(&self) -> Vec<InputVariable> {
        Vec::new()
    }

    fn output_vars(&self) -> Vec<Variable> {
        Vec::new()
    }
}

impl TransformDependencies for Transform {
    fn input_vars(&self) -> Vec<InputVariable> {
        self.transform_kind().as_dependencies_trait().input_vars()
    }

    fn output_vars(&self) -> Vec<Variable> {
        self.transform_kind().as_dependencies_trait().output_vars()
    }
}