vegafusion_core/planning/
watch.rs

1use crate::planning::stitch::CommPlan;
2use crate::proto::gen::tasks::{Variable, VariableNamespace};
3use crate::task_graph::graph::ScopedVariable;
4use crate::task_graph::task_value::TaskValue;
5use datafusion_common::ScalarValue;
6use itertools::Itertools;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use std::convert::TryFrom;
10use vegafusion_common::data::scalar::ScalarValueHelpers;
11use vegafusion_common::data::table::VegaFusionTable;
12
13use vegafusion_common::error::{Result, VegaFusionError};
14
15#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
16#[serde(rename_all = "lowercase")]
17pub enum WatchNamespace {
18    Signal,
19    Data,
20}
21
22impl TryFrom<VariableNamespace> for WatchNamespace {
23    type Error = VegaFusionError;
24
25    fn try_from(value: VariableNamespace) -> Result<Self> {
26        match value {
27            VariableNamespace::Signal => Ok(Self::Signal),
28            VariableNamespace::Data => Ok(Self::Data),
29            _ => Err(VegaFusionError::internal("Scale namespace not supported")),
30        }
31    }
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
35pub struct Watch {
36    pub namespace: WatchNamespace,
37    pub name: String,
38    pub scope: Vec<u32>,
39}
40
41impl Watch {
42    pub fn to_scoped_variable(&self) -> ScopedVariable {
43        (
44            match self.namespace {
45                WatchNamespace::Signal => Variable::new_signal(&self.name),
46                WatchNamespace::Data => Variable::new_data(&self.name),
47            },
48            self.scope.clone(),
49        )
50    }
51}
52
53impl TryFrom<ScopedVariable> for Watch {
54    type Error = VegaFusionError;
55
56    fn try_from(value: ScopedVariable) -> Result<Self> {
57        let tmp = value.0.namespace();
58        let tmp = WatchNamespace::try_from(tmp)?;
59        Ok(Self {
60            namespace: tmp,
61            name: value.0.name.clone(),
62            scope: value.1,
63        })
64    }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
68pub struct WatchPlan {
69    pub server_to_client: Vec<Watch>,
70    pub client_to_server: Vec<Watch>,
71}
72
73impl From<CommPlan> for WatchPlan {
74    fn from(value: CommPlan) -> Self {
75        Self {
76            server_to_client: value
77                .server_to_client
78                .into_iter()
79                .map(|scoped_var| Watch::try_from(scoped_var).unwrap())
80                .sorted()
81                .collect(),
82            client_to_server: value
83                .client_to_server
84                .into_iter()
85                .map(|scoped_var| Watch::try_from(scoped_var).unwrap())
86                .sorted()
87                .collect(),
88        }
89    }
90}
91
92#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
93pub struct WatchValue {
94    pub watch: Watch,
95    pub value: Value,
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
99pub struct WatchValues {
100    pub values: Vec<WatchValue>,
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104#[serde(rename_all = "lowercase")]
105pub enum ExportUpdateNamespace {
106    Signal,
107    Data,
108}
109
110impl TryFrom<VariableNamespace> for ExportUpdateNamespace {
111    type Error = VegaFusionError;
112
113    fn try_from(value: VariableNamespace) -> Result<Self> {
114        match value {
115            VariableNamespace::Signal => Ok(Self::Signal),
116            VariableNamespace::Data => Ok(Self::Data),
117            _ => Err(VegaFusionError::internal("Scale namespace not supported")),
118        }
119    }
120}
121
122#[derive(Debug, Clone)]
123pub struct ExportUpdateArrow {
124    pub namespace: ExportUpdateNamespace,
125    pub name: String,
126    pub scope: Vec<u32>,
127    pub value: TaskValue,
128}
129
130impl ExportUpdateArrow {
131    pub fn to_json(&self) -> Result<ExportUpdateJSON> {
132        Ok(ExportUpdateJSON {
133            namespace: self.namespace.clone(),
134            name: self.name.clone(),
135            scope: self.scope.clone(),
136            value: self.value.to_json()?,
137        })
138    }
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
142pub struct ExportUpdateJSON {
143    pub namespace: ExportUpdateNamespace,
144    pub name: String,
145    pub scope: Vec<u32>,
146    pub value: Value,
147}
148
149impl ExportUpdateJSON {
150    pub fn to_scoped_var(&self) -> ScopedVariable {
151        let namespace = match self.namespace {
152            ExportUpdateNamespace::Signal => VariableNamespace::Signal as i32,
153            ExportUpdateNamespace::Data => VariableNamespace::Data as i32,
154        };
155
156        (
157            Variable {
158                name: self.name.clone(),
159                namespace,
160            },
161            self.scope.clone(),
162        )
163    }
164
165    pub fn to_task_value(&self) -> TaskValue {
166        match self.namespace {
167            ExportUpdateNamespace::Signal => {
168                TaskValue::Scalar(ScalarValue::from_json(&self.value).unwrap())
169            }
170            ExportUpdateNamespace::Data => {
171                TaskValue::Table(VegaFusionTable::from_json(&self.value).unwrap())
172            }
173        }
174    }
175}
176
177pub type ExportUpdateBatch = Vec<ExportUpdateJSON>;