dora_message/
descriptor.rs

1use crate::{
2    config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
3    id::{DataId, NodeId, OperatorId},
4};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use serde_with_expand_env::with_expand_envs;
8use std::{
9    collections::{BTreeMap, BTreeSet},
10    fmt,
11    path::PathBuf,
12};
13
14pub const SHELL_SOURCE: &str = "shell";
15pub const DYNAMIC_SOURCE: &str = "dynamic";
16
17/// Dataflow description
18#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19#[serde(deny_unknown_fields)]
20#[schemars(title = "dora-rs specification")]
21pub struct Descriptor {
22    #[schemars(skip)]
23    #[serde(default)]
24    pub communication: CommunicationConfig,
25    #[schemars(skip)]
26    #[serde(rename = "_unstable_deploy")]
27    pub deploy: Option<Deploy>,
28    pub nodes: Vec<Node>,
29    #[schemars(skip)]
30    #[serde(default, rename = "_unstable_debug")]
31    pub debug: Debug,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
35#[serde(deny_unknown_fields)]
36pub struct Deploy {
37    pub machine: Option<String>,
38    pub working_dir: Option<PathBuf>,
39}
40
41#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
42pub struct Debug {
43    #[serde(default)]
44    pub publish_all_messages_to_zenoh: bool,
45}
46
47/// Dora Node
48#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct Node {
51    /// Node identifier
52    pub id: NodeId,
53    /// Node name
54    pub name: Option<String>,
55    /// Description of the node
56    pub description: Option<String>,
57    /// Environment variables
58    pub env: Option<BTreeMap<String, EnvValue>>,
59
60    /// Unstable machine deployment configuration
61    #[schemars(skip)]
62    #[serde(rename = "_unstable_deploy")]
63    pub deploy: Option<Deploy>,
64
65    #[serde(default, skip_serializing_if = "Option::is_none")]
66    pub operators: Option<RuntimeNode>,
67    #[serde(default, skip_serializing_if = "Option::is_none")]
68    pub custom: Option<CustomNode>,
69    #[serde(default, skip_serializing_if = "Option::is_none")]
70    pub operator: Option<SingleOperatorDefinition>,
71
72    #[serde(default, skip_serializing_if = "Option::is_none")]
73    pub path: Option<String>,
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub git: Option<String>,
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub branch: Option<String>,
78    #[serde(default, skip_serializing_if = "Option::is_none")]
79    pub tag: Option<String>,
80    #[serde(default, skip_serializing_if = "Option::is_none")]
81    pub rev: Option<String>,
82
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub args: Option<String>,
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub build: Option<String>,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub send_stdout_as: Option<String>,
89    #[serde(default)]
90    pub inputs: BTreeMap<DataId, Input>,
91    #[serde(default)]
92    pub outputs: BTreeSet<DataId>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ResolvedNode {
97    pub id: NodeId,
98    pub name: Option<String>,
99    pub description: Option<String>,
100    pub env: Option<BTreeMap<String, EnvValue>>,
101
102    #[serde(default)]
103    pub deploy: Option<Deploy>,
104
105    #[serde(flatten)]
106    pub kind: CoreNodeKind,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum CoreNodeKind {
112    /// Dora runtime node
113    #[serde(rename = "operators")]
114    Runtime(RuntimeNode),
115    Custom(CustomNode),
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
119#[serde(transparent)]
120pub struct RuntimeNode {
121    pub operators: Vec<OperatorDefinition>,
122}
123
124#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
125pub struct OperatorDefinition {
126    pub id: OperatorId,
127    #[serde(flatten)]
128    pub config: OperatorConfig,
129}
130
131#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
132pub struct SingleOperatorDefinition {
133    /// ID is optional if there is only a single operator.
134    pub id: Option<OperatorId>,
135    #[serde(flatten)]
136    pub config: OperatorConfig,
137}
138
139#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
140pub struct OperatorConfig {
141    pub name: Option<String>,
142    pub description: Option<String>,
143
144    #[serde(default)]
145    pub inputs: BTreeMap<DataId, Input>,
146    #[serde(default)]
147    pub outputs: BTreeSet<DataId>,
148
149    #[serde(flatten)]
150    pub source: OperatorSource,
151
152    #[serde(default, skip_serializing_if = "Option::is_none")]
153    pub build: Option<String>,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub send_stdout_as: Option<String>,
156}
157
158#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
159#[serde(rename_all = "kebab-case")]
160pub enum OperatorSource {
161    SharedLibrary(String),
162    Python(PythonSource),
163    #[schemars(skip)]
164    Wasm(String),
165}
166#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
167#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
168pub struct PythonSource {
169    pub source: String,
170    pub conda_env: Option<String>,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
174#[serde(untagged)]
175pub enum PythonSourceDef {
176    SourceOnly(String),
177    WithOptions {
178        source: String,
179        conda_env: Option<String>,
180    },
181}
182
183impl From<PythonSource> for PythonSourceDef {
184    fn from(input: PythonSource) -> Self {
185        match input {
186            PythonSource {
187                source,
188                conda_env: None,
189            } => Self::SourceOnly(source),
190            PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
191        }
192    }
193}
194
195impl From<PythonSourceDef> for PythonSource {
196    fn from(value: PythonSourceDef) -> Self {
197        match value {
198            PythonSourceDef::SourceOnly(source) => Self {
199                source,
200                conda_env: None,
201            },
202            PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
203        }
204    }
205}
206
207#[derive(Debug, Serialize, Deserialize, Clone)]
208#[serde(deny_unknown_fields)]
209pub struct PythonOperatorConfig {
210    pub path: PathBuf,
211    #[serde(default)]
212    pub inputs: BTreeMap<DataId, InputMapping>,
213    #[serde(default)]
214    pub outputs: BTreeSet<DataId>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
218pub struct CustomNode {
219    /// Path of the source code
220    ///
221    /// If you want to use a specific `conda` environment.
222    /// Provide the python path within the source.
223    ///
224    /// source: /home/peter/miniconda3/bin/python
225    ///
226    /// args: some_node.py
227    ///
228    /// Source can match any executable in PATH.
229    pub path: String,
230    pub source: NodeSource,
231    /// Args for the executable.
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub args: Option<String>,
234    /// Environment variables for the custom nodes
235    ///
236    /// Deprecated, use outer-level `env` field instead.
237    pub envs: Option<BTreeMap<String, EnvValue>>,
238    #[serde(default, skip_serializing_if = "Option::is_none")]
239    pub build: Option<String>,
240    /// Send stdout and stderr to another node
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub send_stdout_as: Option<String>,
243
244    #[serde(flatten)]
245    pub run_config: NodeRunConfig,
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
249pub enum NodeSource {
250    Local,
251    GitBranch {
252        repo: String,
253        rev: Option<GitRepoRev>,
254    },
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
258pub enum ResolvedNodeSource {
259    Local,
260    GitCommit { repo: String, commit_hash: String },
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
264pub enum GitRepoRev {
265    Branch(String),
266    Tag(String),
267    Rev(String),
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
271#[serde(untagged)]
272pub enum EnvValue {
273    #[serde(deserialize_with = "with_expand_envs")]
274    Bool(bool),
275    #[serde(deserialize_with = "with_expand_envs")]
276    Integer(i64),
277    #[serde(deserialize_with = "with_expand_envs")]
278    Float(f64),
279    #[serde(deserialize_with = "with_expand_envs")]
280    String(String),
281}
282
283impl fmt::Display for EnvValue {
284    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
285        match self {
286            EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
287            EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
288            EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
289            EnvValue::String(str) => fmt.write_str(str),
290        }
291    }
292}