dora_message/
descriptor.rs

1use crate::{
2    config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
3    id::{DataId, NodeId, OperatorId},
4};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use serde_with_expand_env::with_expand_envs;
8use std::{
9    collections::{BTreeMap, BTreeSet},
10    fmt,
11    path::PathBuf,
12};
13
14pub const SHELL_SOURCE: &str = "shell";
15pub const DYNAMIC_SOURCE: &str = "dynamic";
16
17/// Dataflow description
18#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19#[serde(deny_unknown_fields)]
20#[schemars(title = "dora-rs specification")]
21pub struct Descriptor {
22    #[schemars(skip)]
23    #[serde(default)]
24    pub communication: CommunicationConfig,
25    #[schemars(skip)]
26    #[serde(default, rename = "_unstable_deploy")]
27    pub deploy: Deploy,
28    pub nodes: Vec<Node>,
29    #[schemars(skip)]
30    #[serde(default, rename = "_unstable_debug")]
31    pub debug: Debug,
32}
33
34#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
35#[serde(deny_unknown_fields)]
36pub struct Deploy {
37    pub machine: Option<String>,
38}
39
40#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
41pub struct Debug {
42    #[serde(default)]
43    pub publish_all_messages_to_zenoh: bool,
44}
45
46/// Dora Node
47#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
48#[serde(deny_unknown_fields)]
49pub struct Node {
50    /// Node identifier
51    pub id: NodeId,
52    /// Node name
53    pub name: Option<String>,
54    /// Description of the node
55    pub description: Option<String>,
56    /// Environment variables
57    pub env: Option<BTreeMap<String, EnvValue>>,
58
59    /// Unstable machine deployment configuration
60    #[schemars(skip)]
61    #[serde(default, rename = "_unstable_deploy")]
62    pub deploy: Deploy,
63
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub operators: Option<RuntimeNode>,
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub custom: Option<CustomNode>,
68    #[serde(default, skip_serializing_if = "Option::is_none")]
69    pub operator: Option<SingleOperatorDefinition>,
70
71    #[serde(default, skip_serializing_if = "Option::is_none")]
72    pub path: Option<String>,
73    #[serde(default, skip_serializing_if = "Option::is_none")]
74    pub args: Option<String>,
75    #[serde(default, skip_serializing_if = "Option::is_none")]
76    pub build: Option<String>,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub send_stdout_as: Option<String>,
79    #[serde(default)]
80    pub inputs: BTreeMap<DataId, Input>,
81    #[serde(default)]
82    pub outputs: BTreeSet<DataId>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct ResolvedNode {
87    pub id: NodeId,
88    pub name: Option<String>,
89    pub description: Option<String>,
90    pub env: Option<BTreeMap<String, EnvValue>>,
91
92    #[serde(default)]
93    pub deploy: Deploy,
94
95    #[serde(flatten)]
96    pub kind: CoreNodeKind,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100#[serde(rename_all = "lowercase")]
101pub enum CoreNodeKind {
102    /// Dora runtime node
103    #[serde(rename = "operators")]
104    Runtime(RuntimeNode),
105    Custom(CustomNode),
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
109#[serde(transparent)]
110pub struct RuntimeNode {
111    pub operators: Vec<OperatorDefinition>,
112}
113
114#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
115pub struct OperatorDefinition {
116    pub id: OperatorId,
117    #[serde(flatten)]
118    pub config: OperatorConfig,
119}
120
121#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
122pub struct SingleOperatorDefinition {
123    /// ID is optional if there is only a single operator.
124    pub id: Option<OperatorId>,
125    #[serde(flatten)]
126    pub config: OperatorConfig,
127}
128
129#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
130pub struct OperatorConfig {
131    pub name: Option<String>,
132    pub description: Option<String>,
133
134    #[serde(default)]
135    pub inputs: BTreeMap<DataId, Input>,
136    #[serde(default)]
137    pub outputs: BTreeSet<DataId>,
138
139    #[serde(flatten)]
140    pub source: OperatorSource,
141
142    #[serde(default, skip_serializing_if = "Option::is_none")]
143    pub build: Option<String>,
144    #[serde(skip_serializing_if = "Option::is_none")]
145    pub send_stdout_as: Option<String>,
146}
147
148#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
149#[serde(rename_all = "kebab-case")]
150pub enum OperatorSource {
151    SharedLibrary(String),
152    Python(PythonSource),
153    #[schemars(skip)]
154    Wasm(String),
155}
156#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
157#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
158pub struct PythonSource {
159    pub source: String,
160    pub conda_env: Option<String>,
161}
162
163#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
164#[serde(untagged)]
165pub enum PythonSourceDef {
166    SourceOnly(String),
167    WithOptions {
168        source: String,
169        conda_env: Option<String>,
170    },
171}
172
173impl From<PythonSource> for PythonSourceDef {
174    fn from(input: PythonSource) -> Self {
175        match input {
176            PythonSource {
177                source,
178                conda_env: None,
179            } => Self::SourceOnly(source),
180            PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
181        }
182    }
183}
184
185impl From<PythonSourceDef> for PythonSource {
186    fn from(value: PythonSourceDef) -> Self {
187        match value {
188            PythonSourceDef::SourceOnly(source) => Self {
189                source,
190                conda_env: None,
191            },
192            PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
193        }
194    }
195}
196
197#[derive(Debug, Serialize, Deserialize, Clone)]
198#[serde(deny_unknown_fields)]
199pub struct PythonOperatorConfig {
200    pub path: PathBuf,
201    #[serde(default)]
202    pub inputs: BTreeMap<DataId, InputMapping>,
203    #[serde(default)]
204    pub outputs: BTreeSet<DataId>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
208pub struct CustomNode {
209    /// Path of the source code
210    ///
211    /// If you want to use a specific `conda` environment.
212    /// Provide the python path within the source.
213    ///
214    /// source: /home/peter/miniconda3/bin/python
215    ///
216    /// args: some_node.py
217    ///
218    /// Source can match any executable in PATH.
219    pub source: String,
220    /// Args for the executable.
221    #[serde(default, skip_serializing_if = "Option::is_none")]
222    pub args: Option<String>,
223    /// Environment variables for the custom nodes
224    ///
225    /// Deprecated, use outer-level `env` field instead.
226    pub envs: Option<BTreeMap<String, EnvValue>>,
227    #[serde(default, skip_serializing_if = "Option::is_none")]
228    pub build: Option<String>,
229    /// Send stdout and stderr to another node
230    #[serde(skip_serializing_if = "Option::is_none")]
231    pub send_stdout_as: Option<String>,
232
233    #[serde(flatten)]
234    pub run_config: NodeRunConfig,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
238#[serde(untagged)]
239pub enum EnvValue {
240    #[serde(deserialize_with = "with_expand_envs")]
241    Bool(bool),
242    #[serde(deserialize_with = "with_expand_envs")]
243    Integer(i64),
244    #[serde(deserialize_with = "with_expand_envs")]
245    Float(f64),
246    #[serde(deserialize_with = "with_expand_envs")]
247    String(String),
248}
249
250impl fmt::Display for EnvValue {
251    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
252        match self {
253            EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
254            EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
255            EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
256            EnvValue::String(str) => fmt.write_str(str),
257        }
258    }
259}