vegafusion_core/data/
tasks.rs

1use crate::proto::gen::tasks::data_url_task::Url;
2use crate::proto::gen::tasks::{DataSourceTask, DataUrlTask, DataValuesTask, SignalTask, Variable};
3use crate::task_graph::task::{InputVariable, TaskDependencies};
4use crate::transform::TransformDependencies;
5use itertools::sorted;
6use std::collections::HashSet;
7
8impl TaskDependencies for DataValuesTask {
9    fn input_vars(&self) -> Vec<InputVariable> {
10        let mut vars: HashSet<InputVariable> = Default::default();
11
12        // Collect transform input vars
13        if let Some(pipeline) = self.pipeline.as_ref() {
14            vars.extend(pipeline.input_vars());
15        }
16
17        // Return variables sorted for determinism
18        sorted(vars).collect()
19    }
20
21    fn output_vars(&self) -> Vec<Variable> {
22        self.pipeline
23            .as_ref()
24            .iter()
25            .flat_map(|p| p.output_vars())
26            .collect()
27    }
28}
29
30impl TaskDependencies for DataUrlTask {
31    fn input_vars(&self) -> Vec<InputVariable> {
32        let mut vars: HashSet<InputVariable> = Default::default();
33
34        // Collect input vars from URL signal
35        if let Url::Expr(expr) = self.url.as_ref().unwrap() {
36            vars.extend(expr.input_vars());
37        }
38
39        // Collect transform input vars
40        if let Some(pipeline) = self.pipeline.as_ref() {
41            vars.extend(pipeline.input_vars());
42        }
43
44        // Return variables sorted for determinism
45        sorted(vars).collect()
46    }
47
48    fn output_vars(&self) -> Vec<Variable> {
49        self.pipeline
50            .as_ref()
51            .iter()
52            .flat_map(|p| p.output_vars())
53            .collect()
54    }
55}
56
57impl TaskDependencies for DataSourceTask {
58    fn input_vars(&self) -> Vec<InputVariable> {
59        let mut vars: HashSet<InputVariable> = Default::default();
60
61        // Add input vars from source
62        vars.insert(InputVariable {
63            var: Variable::new_data(&self.source),
64            propagate: true,
65        });
66
67        // Collect transform input vars
68        if let Some(pipeline) = self.pipeline.as_ref() {
69            vars.extend(pipeline.input_vars());
70        }
71
72        // Return variables sorted for determinism
73        sorted(vars).collect()
74    }
75
76    fn output_vars(&self) -> Vec<Variable> {
77        self.pipeline
78            .as_ref()
79            .iter()
80            .flat_map(|p| p.output_vars())
81            .collect()
82    }
83}
84
85impl TaskDependencies for SignalTask {
86    fn input_vars(&self) -> Vec<InputVariable> {
87        let expr = self.expr.as_ref().unwrap();
88        expr.input_vars()
89    }
90}