vegafusion_core/planning/
watch.rs1use crate::planning::stitch::CommPlan;
2use crate::proto::gen::tasks::{Variable, VariableNamespace};
3use crate::task_graph::graph::ScopedVariable;
4use crate::task_graph::task_value::TaskValue;
5use datafusion_common::ScalarValue;
6use itertools::Itertools;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::convert::TryFrom;
10use vegafusion_common::data::scalar::ScalarValueHelpers;
11use vegafusion_common::data::table::VegaFusionTable;
12
13use vegafusion_common::error::{Result, VegaFusionError};
14
15#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
16#[serde(rename_all = "lowercase")]
17pub enum WatchNamespace {
18 Signal,
19 Data,
20}
21
22impl TryFrom<VariableNamespace> for WatchNamespace {
23 type Error = VegaFusionError;
24
25 fn try_from(value: VariableNamespace) -> Result<Self> {
26 match value {
27 VariableNamespace::Signal => Ok(Self::Signal),
28 VariableNamespace::Data => Ok(Self::Data),
29 _ => Err(VegaFusionError::internal("Scale namespace not supported")),
30 }
31 }
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct Watch {
36 pub namespace: WatchNamespace,
37 pub name: String,
38 pub scope: Vec<u32>,
39}
40
41impl Watch {
42 pub fn to_scoped_variable(&self) -> ScopedVariable {
43 (
44 match self.namespace {
45 WatchNamespace::Signal => Variable::new_signal(&self.name),
46 WatchNamespace::Data => Variable::new_data(&self.name),
47 },
48 self.scope.clone(),
49 )
50 }
51}
52
53impl TryFrom<ScopedVariable> for Watch {
54 type Error = VegaFusionError;
55
56 fn try_from(value: ScopedVariable) -> Result<Self> {
57 let tmp = value.0.namespace();
58 let tmp = WatchNamespace::try_from(tmp)?;
59 Ok(Self {
60 namespace: tmp,
61 name: value.0.name.clone(),
62 scope: value.1,
63 })
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub struct WatchPlan {
69 pub server_to_client: Vec<Watch>,
70 pub client_to_server: Vec<Watch>,
71}
72
73impl From<CommPlan> for WatchPlan {
74 fn from(value: CommPlan) -> Self {
75 Self {
76 server_to_client: value
77 .server_to_client
78 .into_iter()
79 .map(|scoped_var| Watch::try_from(scoped_var).unwrap())
80 .sorted()
81 .collect(),
82 client_to_server: value
83 .client_to_server
84 .into_iter()
85 .map(|scoped_var| Watch::try_from(scoped_var).unwrap())
86 .sorted()
87 .collect(),
88 }
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct WatchValue {
94 pub watch: Watch,
95 pub value: Value,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99pub struct WatchValues {
100 pub values: Vec<WatchValue>,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104#[serde(rename_all = "lowercase")]
105pub enum ExportUpdateNamespace {
106 Signal,
107 Data,
108}
109
110impl TryFrom<VariableNamespace> for ExportUpdateNamespace {
111 type Error = VegaFusionError;
112
113 fn try_from(value: VariableNamespace) -> Result<Self> {
114 match value {
115 VariableNamespace::Signal => Ok(Self::Signal),
116 VariableNamespace::Data => Ok(Self::Data),
117 _ => Err(VegaFusionError::internal("Scale namespace not supported")),
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
123pub struct ExportUpdateArrow {
124 pub namespace: ExportUpdateNamespace,
125 pub name: String,
126 pub scope: Vec<u32>,
127 pub value: TaskValue,
128}
129
130impl ExportUpdateArrow {
131 pub fn to_json(&self) -> Result<ExportUpdateJSON> {
132 Ok(ExportUpdateJSON {
133 namespace: self.namespace.clone(),
134 name: self.name.clone(),
135 scope: self.scope.clone(),
136 value: self.value.to_json()?,
137 })
138 }
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142pub struct ExportUpdateJSON {
143 pub namespace: ExportUpdateNamespace,
144 pub name: String,
145 pub scope: Vec<u32>,
146 pub value: Value,
147}
148
149impl ExportUpdateJSON {
150 pub fn to_scoped_var(&self) -> ScopedVariable {
151 let namespace = match self.namespace {
152 ExportUpdateNamespace::Signal => VariableNamespace::Signal as i32,
153 ExportUpdateNamespace::Data => VariableNamespace::Data as i32,
154 };
155
156 (
157 Variable {
158 name: self.name.clone(),
159 namespace,
160 },
161 self.scope.clone(),
162 )
163 }
164
165 pub fn to_task_value(&self) -> TaskValue {
166 match self.namespace {
167 ExportUpdateNamespace::Signal => {
168 TaskValue::Scalar(ScalarValue::from_json(&self.value).unwrap())
169 }
170 ExportUpdateNamespace::Data => {
171 TaskValue::Table(VegaFusionTable::from_json(&self.value).unwrap())
172 }
173 }
174 }
175}
176
177pub type ExportUpdateBatch = Vec<ExportUpdateJSON>;