1use crate::{
2 config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
3 id::{DataId, NodeId, OperatorId},
4};
5use schemars::JsonSchema;
6use serde::{Deserialize, Serialize};
7use serde_with_expand_env::with_expand_envs;
8use std::{
9 collections::{BTreeMap, BTreeSet},
10 fmt,
11 path::PathBuf,
12};
13
14pub const SHELL_SOURCE: &str = "shell";
15pub const DYNAMIC_SOURCE: &str = "dynamic";
16
17#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
19#[serde(deny_unknown_fields)]
20#[schemars(title = "dora-rs specification")]
21pub struct Descriptor {
22 #[schemars(skip)]
23 #[serde(default)]
24 pub communication: CommunicationConfig,
25 #[schemars(skip)]
26 #[serde(rename = "_unstable_deploy")]
27 pub deploy: Option<Deploy>,
28 pub nodes: Vec<Node>,
29 #[schemars(skip)]
30 #[serde(default, rename = "_unstable_debug")]
31 pub debug: Debug,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
35#[serde(deny_unknown_fields)]
36pub struct Deploy {
37 pub machine: Option<String>,
38 pub working_dir: Option<PathBuf>,
39}
40
41#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
42pub struct Debug {
43 #[serde(default)]
44 pub publish_all_messages_to_zenoh: bool,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
49#[serde(deny_unknown_fields)]
50pub struct Node {
51 pub id: NodeId,
53 pub name: Option<String>,
55 pub description: Option<String>,
57 pub env: Option<BTreeMap<String, EnvValue>>,
59
60 #[schemars(skip)]
62 #[serde(rename = "_unstable_deploy")]
63 pub deploy: Option<Deploy>,
64
65 #[serde(default, skip_serializing_if = "Option::is_none")]
66 pub operators: Option<RuntimeNode>,
67 #[serde(default, skip_serializing_if = "Option::is_none")]
68 pub custom: Option<CustomNode>,
69 #[serde(default, skip_serializing_if = "Option::is_none")]
70 pub operator: Option<SingleOperatorDefinition>,
71
72 #[serde(default, skip_serializing_if = "Option::is_none")]
73 pub path: Option<String>,
74 #[serde(default, skip_serializing_if = "Option::is_none")]
75 pub git: Option<String>,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
77 pub branch: Option<String>,
78 #[serde(default, skip_serializing_if = "Option::is_none")]
79 pub tag: Option<String>,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
81 pub rev: Option<String>,
82
83 #[serde(default, skip_serializing_if = "Option::is_none")]
84 pub args: Option<String>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub build: Option<String>,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub send_stdout_as: Option<String>,
89 #[serde(default)]
90 pub inputs: BTreeMap<DataId, Input>,
91 #[serde(default)]
92 pub outputs: BTreeSet<DataId>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ResolvedNode {
97 pub id: NodeId,
98 pub name: Option<String>,
99 pub description: Option<String>,
100 pub env: Option<BTreeMap<String, EnvValue>>,
101
102 #[serde(default)]
103 pub deploy: Option<Deploy>,
104
105 #[serde(flatten)]
106 pub kind: CoreNodeKind,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(rename_all = "lowercase")]
111pub enum CoreNodeKind {
112 #[serde(rename = "operators")]
114 Runtime(RuntimeNode),
115 Custom(CustomNode),
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
119#[serde(transparent)]
120pub struct RuntimeNode {
121 pub operators: Vec<OperatorDefinition>,
122}
123
124#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
125pub struct OperatorDefinition {
126 pub id: OperatorId,
127 #[serde(flatten)]
128 pub config: OperatorConfig,
129}
130
131#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
132pub struct SingleOperatorDefinition {
133 pub id: Option<OperatorId>,
135 #[serde(flatten)]
136 pub config: OperatorConfig,
137}
138
139#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
140pub struct OperatorConfig {
141 pub name: Option<String>,
142 pub description: Option<String>,
143
144 #[serde(default)]
145 pub inputs: BTreeMap<DataId, Input>,
146 #[serde(default)]
147 pub outputs: BTreeSet<DataId>,
148
149 #[serde(flatten)]
150 pub source: OperatorSource,
151
152 #[serde(default, skip_serializing_if = "Option::is_none")]
153 pub build: Option<String>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub send_stdout_as: Option<String>,
156}
157
158#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
159#[serde(rename_all = "kebab-case")]
160pub enum OperatorSource {
161 SharedLibrary(String),
162 Python(PythonSource),
163 #[schemars(skip)]
164 Wasm(String),
165}
166#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
167#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
168pub struct PythonSource {
169 pub source: String,
170 pub conda_env: Option<String>,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
174#[serde(untagged)]
175pub enum PythonSourceDef {
176 SourceOnly(String),
177 WithOptions {
178 source: String,
179 conda_env: Option<String>,
180 },
181}
182
183impl From<PythonSource> for PythonSourceDef {
184 fn from(input: PythonSource) -> Self {
185 match input {
186 PythonSource {
187 source,
188 conda_env: None,
189 } => Self::SourceOnly(source),
190 PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
191 }
192 }
193}
194
195impl From<PythonSourceDef> for PythonSource {
196 fn from(value: PythonSourceDef) -> Self {
197 match value {
198 PythonSourceDef::SourceOnly(source) => Self {
199 source,
200 conda_env: None,
201 },
202 PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
203 }
204 }
205}
206
207#[derive(Debug, Serialize, Deserialize, Clone)]
208#[serde(deny_unknown_fields)]
209pub struct PythonOperatorConfig {
210 pub path: PathBuf,
211 #[serde(default)]
212 pub inputs: BTreeMap<DataId, InputMapping>,
213 #[serde(default)]
214 pub outputs: BTreeSet<DataId>,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
218pub struct CustomNode {
219 pub path: String,
230 pub source: NodeSource,
231 #[serde(default, skip_serializing_if = "Option::is_none")]
233 pub args: Option<String>,
234 pub envs: Option<BTreeMap<String, EnvValue>>,
238 #[serde(default, skip_serializing_if = "Option::is_none")]
239 pub build: Option<String>,
240 #[serde(skip_serializing_if = "Option::is_none")]
242 pub send_stdout_as: Option<String>,
243
244 #[serde(flatten)]
245 pub run_config: NodeRunConfig,
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
249pub enum NodeSource {
250 Local,
251 GitBranch {
252 repo: String,
253 rev: Option<GitRepoRev>,
254 },
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
258pub enum ResolvedNodeSource {
259 Local,
260 GitCommit { repo: String, commit_hash: String },
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
264pub enum GitRepoRev {
265 Branch(String),
266 Tag(String),
267 Rev(String),
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
271#[serde(untagged)]
272pub enum EnvValue {
273 #[serde(deserialize_with = "with_expand_envs")]
274 Bool(bool),
275 #[serde(deserialize_with = "with_expand_envs")]
276 Integer(i64),
277 #[serde(deserialize_with = "with_expand_envs")]
278 Float(f64),
279 #[serde(deserialize_with = "with_expand_envs")]
280 String(String),
281}
282
283impl fmt::Display for EnvValue {
284 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
285 match self {
286 EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
287 EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
288 EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
289 EnvValue::String(str) => fmt.write_str(str),
290 }
291 }
292}