dora_message/descriptor.rs
1#![warn(missing_docs)]
2
3use crate::{
4 config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
5 id::{DataId, NodeId, OperatorId},
6};
7use schemars::JsonSchema;
8use serde::{Deserialize, Serialize};
9use serde_with_expand_env::with_expand_envs;
10use std::{
11 collections::{BTreeMap, BTreeSet},
12 fmt,
13 path::PathBuf,
14};
15
16pub const SHELL_SOURCE: &str = "shell";
17/// Set the [`Node::path`] field to this value to treat the node as a
18/// [_dynamic node_](https://docs.rs/dora-node-api/latest/dora_node_api/).
19pub const DYNAMIC_SOURCE: &str = "dynamic";
20
21/// # Dataflow Specification
22///
23/// The main configuration structure for defining a Dora dataflow. Dataflows are
24/// specified through YAML files that describe the nodes, their connections, and
25/// execution parameters.
26///
27/// ## Structure
28///
29/// A dataflow consists of:
30/// - **Nodes**: The computational units that process data
31/// - **Communication**: Optional communication configuration
32/// - **Deployment**: Optional deployment configuration (unstable)
33/// - **Debug options**: Optional development and debugging settings (unstable)
34///
35/// ## Example
36///
37/// ```yaml
38/// nodes:
39/// - id: webcam
40/// operator:
41/// python: webcam.py
42/// inputs:
43/// tick: dora/timer/millis/100
44/// outputs:
45/// - image
46/// - id: plot
47/// operator:
48/// python: plot.py
49/// inputs:
50/// image: webcam/image
51/// ```
52#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
53#[serde(deny_unknown_fields)]
54#[schemars(title = "dora-rs specification")]
55pub struct Descriptor {
56 /// List of nodes in the dataflow
57 ///
58 /// This is the most important field of the dataflow specification.
59 /// Each node must be identified by a unique `id`:
60 ///
61 /// ## Example
62 ///
63 /// ```yaml
64 /// nodes:
65 /// - id: foo
66 /// path: path/to/the/executable
67 /// # ... (see below)
68 /// - id: bar
69 /// path: path/to/another/executable
70 /// # ... (see below)
71 /// ```
72 ///
73 /// For each node, you need to specify the `path` of the executable or script that Dora should run when starting the node.
74 /// Most of the other node fields are optional, but you typically want to specify at least some `inputs` and/or `outputs`.
75 pub nodes: Vec<Node>,
76
77 /// Communication configuration (optional, uses defaults)
78 #[schemars(skip)]
79 #[serde(default)]
80 pub communication: CommunicationConfig,
81
82 /// Deployment configuration (optional, unstable)
83 #[schemars(skip)]
84 #[serde(rename = "_unstable_deploy")]
85 pub deploy: Option<Deploy>,
86
87 /// Debug options (optional, unstable)
88 #[schemars(skip)]
89 #[serde(default, rename = "_unstable_debug")]
90 pub debug: Debug,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
94#[serde(deny_unknown_fields)]
95pub struct Deploy {
96 /// Target machine for deployment
97 pub machine: Option<String>,
98 /// Working directory for the deployment
99 pub working_dir: Option<PathBuf>,
100}
101
102#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
103pub struct Debug {
104 /// Whether to publish all messages to Zenoh for debugging
105 #[serde(default)]
106 pub publish_all_messages_to_zenoh: bool,
107}
108
109/// # Dora Node Configuration
110///
111/// A node represents a computational unit in a Dora dataflow. Each node runs as a
112/// separate process and can communicate with other nodes through inputs and outputs.
113#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
114#[serde(deny_unknown_fields)]
115pub struct Node {
116 /// Unique node identifier. Must not contain `/` characters.
117 ///
118 /// Node IDs can be arbitrary strings with the following limitations:
119 ///
120 /// - They must not contain any `/` characters (slashes).
121 /// - We do not recommend using whitespace characters (e.g. spaces) in IDs
122 ///
123 /// Each node must have an ID field.
124 ///
125 /// ## Example
126 ///
127 /// ```yaml
128 /// nodes:
129 /// - id: camera_node
130 /// - id: some_other_node
131 /// ```
132 pub id: NodeId,
133
134 /// Human-readable node name for documentation.
135 ///
136 /// This optional field can be used to define a more descriptive name in addition to a short
137 /// [`id`](Self::id).
138 ///
139 /// ## Example
140 ///
141 /// ```yaml
142 /// nodes:
143 /// - id: camera_node
144 /// name: "Camera Input Handler"
145 pub name: Option<String>,
146
147 /// Detailed description of the node's functionality.
148 ///
149 /// ## Example
150 ///
151 /// ```yaml
152 /// nodes:
153 /// - id: camera_node
154 /// description: "Captures video frames from webcam"
155 /// ```
156 pub description: Option<String>,
157
158 /// Path to executable or script that should be run.
159 ///
160 /// Specifies the path of the executable or script that Dora should run when starting the
161 /// dataflow.
162 /// This can point to a normal executable (e.g. when using a compiled language such as Rust) or
163 /// a Python script.
164 ///
165 /// Dora will automatically append a `.exe` extension on Windows systems when the specified
166 /// file name has no extension.
167 ///
168 /// ## Example
169 ///
170 /// ```yaml
171 /// nodes:
172 /// - id: rust-example
173 /// path: target/release/rust-node
174 /// - id: python-example
175 /// path: ./receive_data.py
176 /// ```
177 ///
178 /// ## URL as Path
179 ///
180 /// The `path` field can also point to a URL instead of a local path.
181 /// In this case, Dora will download the given file when starting the dataflow.
182 ///
183 /// Note that this is quite an old feature and using this functionality is **not recommended**
184 /// anymore. Instead, we recommend using a [`git`][Self::git] and/or [`build`](Self::build)
185 /// key.
186 #[serde(default, skip_serializing_if = "Option::is_none")]
187 pub path: Option<String>,
188
189 /// Command-line arguments passed to the executable.
190 ///
191 /// The command-line arguments that should be passed to the executable/script specified in `path`.
192 /// The arguments should be separated by space.
193 /// This field is optional and defaults to an empty argument list.
194 ///
195 /// ## Example
196 /// ```yaml
197 /// nodes:
198 /// - id: example
199 /// path: example-node
200 /// args: -v --some-flag foo
201 /// ```
202 #[serde(default, skip_serializing_if = "Option::is_none")]
203 pub args: Option<String>,
204
205 /// Environment variables for node builds and execution.
206 ///
207 /// Key-value map of environment variables that should be set for both the
208 /// [`build`](Self::build) operation and the node execution (i.e. when the node is spawned
209 /// through [`path`](Self::path)).
210 ///
211 /// Supports strings, numbers, and booleans.
212 ///
213 /// ## Example
214 ///
215 /// ```yaml
216 /// nodes:
217 /// - id: example-node
218 /// path: path/to/node
219 /// env:
220 /// DEBUG: true
221 /// PORT: 8080
222 /// API_KEY: "secret-key"
223 /// ```
224 pub env: Option<BTreeMap<String, EnvValue>>,
225
226 /// Multiple operators running in a shared runtime process.
227 ///
228 /// Operators are an experimental, lightweight alternative to nodes.
229 /// Instead of running as a separate process, operators are linked into a runtime process.
230 /// This allows running multiple operators to share a single address space (not supported for
231 /// Python currently).
232 ///
233 /// Operators are defined as part of the node list, as children of a runtime node.
234 /// A runtime node is a special node that specifies no [`path`](Self::path) field, but contains
235 /// an `operators` field instead.
236 ///
237 /// ## Example
238 ///
239 /// ```yaml
240 /// nodes:
241 /// - id: runtime-node
242 /// operators:
243 /// - id: processor
244 /// python: process.py
245 /// ```
246 #[serde(default, skip_serializing_if = "Option::is_none")]
247 pub operators: Option<RuntimeNode>,
248
249 /// Single operator configuration.
250 ///
251 /// This is a convenience field for defining runtime nodes that contain only a single operator.
252 /// This field is an alternative to the [`operators`](Self::operators) field, which can be used
253 /// if there is only a single operator defined for the runtime node.
254 ///
255 /// ## Example
256 ///
257 /// ```yaml
258 /// nodes:
259 /// - id: runtime-node
260 /// operator:
261 /// id: processor
262 /// python: script.py
263 /// outputs: [data]
264 /// ```
265 #[serde(default, skip_serializing_if = "Option::is_none")]
266 pub operator: Option<SingleOperatorDefinition>,
267
268 /// Legacy node configuration (deprecated).
269 ///
270 /// Please use the top-level [`path`](Self::path), [`args`](Self::args), etc. fields instead.
271 #[serde(default, skip_serializing_if = "Option::is_none")]
272 pub custom: Option<CustomNode>,
273
274 /// Output data identifiers produced by this node.
275 ///
276 /// List of output identifiers that the node sends.
277 /// Must contain all `output_id` values that the node uses when sending output, e.g. through the
278 /// [`send_output`](https://docs.rs/dora-node-api/latest/dora_node_api/struct.DoraNode.html#method.send_output)
279 /// function.
280 ///
281 /// ## Example
282 ///
283 /// ```yaml
284 /// nodes:
285 /// - id: example-node
286 /// outputs:
287 /// - processed_image
288 /// - metadata
289 /// ```
290 #[serde(default)]
291 pub outputs: BTreeSet<DataId>,
292
293 /// Input data connections from other nodes.
294 ///
295 /// Defines the inputs that this node is subscribing to.
296 ///
297 /// The `inputs` field should be a key-value map of the following format:
298 ///
299 /// `input_id: source_node_id/source_node_output_id`
300 ///
301 /// The components are defined as follows:
302 ///
303 /// - `input_id` is the local identifier that should be used for this input.
304 ///
305 /// This will map to the `id` field of
306 /// [`Event::Input`](https://docs.rs/dora-node-api/latest/dora_node_api/enum.Event.html#variant.Input)
307 /// events sent to the node event loop.
308 /// - `source_node_id` should be the `id` field of the node that sends the output that we want
309 /// to subscribe to
310 /// - `source_node_output_id` should be the identifier of the output that that we want
311 /// to subscribe to
312 ///
313 /// ## Example
314 ///
315 /// ```yaml
316 /// nodes:
317 /// - id: example-node
318 /// outputs:
319 /// - one
320 /// - two
321 /// - id: receiver
322 /// inputs:
323 /// my_input: example-node/two
324 /// ```
325 #[serde(default)]
326 pub inputs: BTreeMap<DataId, Input>,
327
328 /// Redirect stdout/stderr to a data output.
329 ///
330 /// This field can be used to send all stdout and stderr output of the node as a Dora output.
331 /// Each output line is sent as a separate message.
332 ///
333 ///
334 /// ## Example
335 ///
336 /// ```yaml
337 /// nodes:
338 /// - id: example
339 /// send_stdout_as: stdout_output
340 /// - id: logger
341 /// inputs:
342 /// example_output: example/stdout_output
343 /// ```
344 #[serde(skip_serializing_if = "Option::is_none")]
345 pub send_stdout_as: Option<String>,
346
347 /// Build commands executed during `dora build`. Each line runs separately.
348 ///
349 /// The `build` key specifies the command that should be invoked for building the node.
350 /// The key expects a single- or multi-line string.
351 ///
352 /// Each line is run as a separate command.
353 /// Spaces are used to separate arguments.
354 ///
355 /// Note that all the environment variables specified in the [`env`](Self::env) field are also
356 /// applied to the build commands.
357 ///
358 /// ## Special treatment of `pip`
359 ///
360 /// Build lines that start with `pip` or `pip3` are treated in a special way:
361 /// If the `--uv` argument is passed to the `dora build` command, all `pip`/`pip3` commands are
362 /// run through the [`uv` package manager](https://docs.astral.sh/uv/).
363 ///
364 /// ## Example
365 ///
366 /// ```yaml
367 /// nodes:
368 /// - id: build-example
369 /// build: cargo build -p receive_data --release
370 /// path: target/release/receive_data
371 /// - id: multi-line-example
372 /// build: |
373 /// pip install requirements.txt
374 /// pip install -e some/local/package
375 /// path: package
376 /// ```
377 ///
378 /// In the above example, the `pip` commands will be replaced by `uv pip` when run through
379 /// `dora build --uv`.
380 #[serde(default, skip_serializing_if = "Option::is_none")]
381 pub build: Option<String>,
382
383 /// Git repository URL for downloading nodes.
384 ///
385 /// The `git` key allows downloading nodes (i.e. their source code) from git repositories.
386 /// This can be especially useful for distributed dataflows.
387 ///
388 /// When a `git` key is specified, `dora build` automatically clones the specified repository
389 /// (or reuse an existing clone).
390 /// Then it checks out the specified [`branch`](Self::branch), [`tag`](Self::tag), or
391 /// [`rev`](Self::rev), or the default branch if none of them are specified.
392 /// Afterwards it runs the [`build`](Self::build) command if specified.
393 ///
394 /// Note that the git clone directory is set as working directory for both the
395 /// [`build`](Self::build) command and the specified [`path`](Self::path).
396 ///
397 /// ## Example
398 ///
399 /// ```yaml
400 /// nodes:
401 /// - id: rust-node
402 /// git: https://github.com/dora-rs/dora.git
403 /// build: cargo build -p rust-dataflow-example-node
404 /// path: target/debug/rust-dataflow-example-node
405 /// ```
406 ///
407 /// In the above example, `dora build` will first clone the specified `git` repository and then
408 /// run the specified `build` inside the local clone directory.
409 /// When `dora run` or `dora start` is invoked, the working directory will be the git clone
410 /// directory too. So a relative `path` will start from the clone directory.
411 #[serde(default, skip_serializing_if = "Option::is_none")]
412 pub git: Option<String>,
413
414 /// Git branch to checkout after cloning.
415 ///
416 /// The `branch` field is only allowed in combination with the [`git`](#git) field.
417 /// It specifies the branch that should be checked out after cloning.
418 /// Only one of `branch`, `tag`, or `rev` can be specified.
419 ///
420 /// ## Example
421 ///
422 /// ```yaml
423 /// nodes:
424 /// - id: rust-node
425 /// git: https://github.com/dora-rs/dora.git
426 /// branch: some-branch-name
427 /// ```
428 #[serde(default, skip_serializing_if = "Option::is_none")]
429 pub branch: Option<String>,
430
431 /// Git tag to checkout after cloning.
432 ///
433 /// The `tag` field is only allowed in combination with the [`git`](#git) field.
434 /// It specifies the git tag that should be checked out after cloning.
435 /// Only one of `branch`, `tag`, or `rev` can be specified.
436 ///
437 /// ## Example
438 ///
439 /// ```yaml
440 /// nodes:
441 /// - id: rust-node
442 /// git: https://github.com/dora-rs/dora.git
443 /// tag: v0.3.0
444 /// ```
445 #[serde(default, skip_serializing_if = "Option::is_none")]
446 pub tag: Option<String>,
447
448 /// Git revision (e.g. commit hash) to checkout after cloning.
449 ///
450 /// The `rev` field is only allowed in combination with the [`git`](#git) field.
451 /// It specifies the git revision (e.g. a commit hash) that should be checked out after cloning.
452 /// Only one of `branch`, `tag`, or `rev` can be specified.
453 ///
454 /// ## Example
455 ///
456 /// ```yaml
457 /// nodes:
458 /// - id: rust-node
459 /// git: https://github.com/dora-rs/dora.git
460 /// rev: 64ab0d7c
461 /// ```
462 #[serde(default, skip_serializing_if = "Option::is_none")]
463 pub rev: Option<String>,
464
465 /// Unstable machine deployment configuration
466 #[schemars(skip)]
467 #[serde(rename = "_unstable_deploy")]
468 pub deploy: Option<Deploy>,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct ResolvedNode {
473 pub id: NodeId,
474 pub name: Option<String>,
475 pub description: Option<String>,
476 pub env: Option<BTreeMap<String, EnvValue>>,
477
478 #[serde(default)]
479 pub deploy: Option<Deploy>,
480
481 #[serde(flatten)]
482 pub kind: CoreNodeKind,
483}
484
485impl ResolvedNode {
486 pub fn has_git_source(&self) -> bool {
487 self.kind
488 .as_custom()
489 .map(|n| n.source.is_git())
490 .unwrap_or_default()
491 }
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize)]
495#[serde(rename_all = "lowercase")]
496#[allow(clippy::large_enum_variant)]
497pub enum CoreNodeKind {
498 /// Dora runtime node
499 #[serde(rename = "operators")]
500 Runtime(RuntimeNode),
501 Custom(CustomNode),
502}
503
504impl CoreNodeKind {
505 pub fn as_custom(&self) -> Option<&CustomNode> {
506 match self {
507 CoreNodeKind::Runtime(_) => None,
508 CoreNodeKind::Custom(custom_node) => Some(custom_node),
509 }
510 }
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
514#[serde(transparent)]
515pub struct RuntimeNode {
516 /// List of operators running in this runtime
517 pub operators: Vec<OperatorDefinition>,
518}
519
520#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
521pub struct OperatorDefinition {
522 /// Unique operator identifier within the runtime
523 pub id: OperatorId,
524 #[serde(flatten)]
525 pub config: OperatorConfig,
526}
527
528#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
529pub struct SingleOperatorDefinition {
530 /// Operator identifier (optional for single operators)
531 pub id: Option<OperatorId>,
532 #[serde(flatten)]
533 pub config: OperatorConfig,
534}
535
536#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
537pub struct OperatorConfig {
538 /// Human-readable operator name
539 pub name: Option<String>,
540 /// Detailed description of the operator
541 pub description: Option<String>,
542
543 /// Input data connections
544 #[serde(default)]
545 pub inputs: BTreeMap<DataId, Input>,
546 /// Output data identifiers
547 #[serde(default)]
548 pub outputs: BTreeSet<DataId>,
549
550 /// Operator source configuration (Python, shared library, etc.)
551 #[serde(flatten)]
552 pub source: OperatorSource,
553
554 /// Build commands for this operator
555 #[serde(default, skip_serializing_if = "Option::is_none")]
556 pub build: Option<String>,
557 /// Redirect stdout to data output
558 #[serde(skip_serializing_if = "Option::is_none")]
559 pub send_stdout_as: Option<String>,
560}
561
562#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
563#[serde(rename_all = "kebab-case")]
564pub enum OperatorSource {
565 SharedLibrary(String),
566 Python(PythonSource),
567 #[schemars(skip)]
568 Wasm(String),
569}
570#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
571#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
572pub struct PythonSource {
573 pub source: String,
574 pub conda_env: Option<String>,
575}
576
577#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
578#[serde(untagged)]
579pub enum PythonSourceDef {
580 SourceOnly(String),
581 WithOptions {
582 source: String,
583 conda_env: Option<String>,
584 },
585}
586
587impl From<PythonSource> for PythonSourceDef {
588 fn from(input: PythonSource) -> Self {
589 match input {
590 PythonSource {
591 source,
592 conda_env: None,
593 } => Self::SourceOnly(source),
594 PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
595 }
596 }
597}
598
599impl From<PythonSourceDef> for PythonSource {
600 fn from(value: PythonSourceDef) -> Self {
601 match value {
602 PythonSourceDef::SourceOnly(source) => Self {
603 source,
604 conda_env: None,
605 },
606 PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
607 }
608 }
609}
610
611#[derive(Debug, Serialize, Deserialize, Clone)]
612#[serde(deny_unknown_fields)]
613pub struct PythonOperatorConfig {
614 pub path: PathBuf,
615 #[serde(default)]
616 pub inputs: BTreeMap<DataId, InputMapping>,
617 #[serde(default)]
618 pub outputs: BTreeSet<DataId>,
619}
620
621#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
622pub struct CustomNode {
623 /// Path of the source code
624 ///
625 /// If you want to use a specific `conda` environment.
626 /// Provide the python path within the source.
627 ///
628 /// source: /home/peter/miniconda3/bin/python
629 ///
630 /// args: some_node.py
631 ///
632 /// Source can match any executable in PATH.
633 pub path: String,
634 pub source: NodeSource,
635 /// Args for the executable.
636 #[serde(default, skip_serializing_if = "Option::is_none")]
637 pub args: Option<String>,
638 /// Environment variables for the custom nodes
639 ///
640 /// Deprecated, use outer-level `env` field instead.
641 pub envs: Option<BTreeMap<String, EnvValue>>,
642 #[serde(default, skip_serializing_if = "Option::is_none")]
643 pub build: Option<String>,
644 /// Send stdout and stderr to another node
645 #[serde(skip_serializing_if = "Option::is_none")]
646 pub send_stdout_as: Option<String>,
647
648 #[serde(flatten)]
649 pub run_config: NodeRunConfig,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
653pub enum NodeSource {
654 Local,
655 GitBranch {
656 repo: String,
657 rev: Option<GitRepoRev>,
658 },
659}
660
661impl NodeSource {
662 pub fn is_git(&self) -> bool {
663 matches!(self, Self::GitBranch { .. })
664 }
665}
666
667#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
668pub enum ResolvedNodeSource {
669 Local,
670 GitCommit { repo: String, commit_hash: String },
671}
672
673#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
674pub enum GitRepoRev {
675 Branch(String),
676 Tag(String),
677 Rev(String),
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
681#[serde(untagged)]
682pub enum EnvValue {
683 #[serde(deserialize_with = "with_expand_envs")]
684 Bool(bool),
685 #[serde(deserialize_with = "with_expand_envs")]
686 Integer(i64),
687 #[serde(deserialize_with = "with_expand_envs")]
688 Float(f64),
689 #[serde(deserialize_with = "with_expand_envs")]
690 String(String),
691}
692
693impl fmt::Display for EnvValue {
694 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
695 match self {
696 EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
697 EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
698 EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
699 EnvValue::String(str) => fmt.write_str(str),
700 }
701 }
702}