dora_message/descriptor.rs
1#![warn(missing_docs)]
2
3use crate::{
4 config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
5 id::{DataId, NodeId, OperatorId},
6};
7use schemars::JsonSchema;
8use serde::{Deserialize, Serialize};
9use serde_with_expand_env::with_expand_envs;
10use std::{
11 collections::{BTreeMap, BTreeSet},
12 fmt,
13 path::PathBuf,
14};
15
16pub const SHELL_SOURCE: &str = "shell";
17/// Set the [`Node::path`] field to this value to treat the node as a
18/// [_dynamic node_](https://docs.rs/dora-node-api/latest/dora_node_api/).
19pub const DYNAMIC_SOURCE: &str = "dynamic";
20
21/// # Dataflow Specification
22///
23/// The main configuration structure for defining a Dora dataflow. Dataflows are
24/// specified through YAML files that describe the nodes, their connections, and
25/// execution parameters.
26///
27/// ## Structure
28///
29/// A dataflow consists of:
30/// - **Nodes**: The computational units that process data
31/// - **Communication**: Optional communication configuration
32/// - **Deployment**: Optional deployment configuration (unstable)
33/// - **Debug options**: Optional development and debugging settings (unstable)
34///
35/// ## Example
36///
37/// ```yaml
38/// nodes:
39/// - id: webcam
40/// operator:
41/// python: webcam.py
42/// inputs:
43/// tick: dora/timer/millis/100
44/// outputs:
45/// - image
46/// - id: plot
47/// operator:
48/// python: plot.py
49/// inputs:
50/// image: webcam/image
51/// ```
52#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
53#[serde(deny_unknown_fields)]
54#[schemars(title = "dora-rs specification")]
55pub struct Descriptor {
56 /// List of nodes in the dataflow
57 ///
58 /// This is the most important field of the dataflow specification.
59 /// Each node must be identified by a unique `id`:
60 ///
61 /// ## Example
62 ///
63 /// ```yaml
64 /// nodes:
65 /// - id: foo
66 /// path: path/to/the/executable
67 /// # ... (see below)
68 /// - id: bar
69 /// path: path/to/another/executable
70 /// # ... (see below)
71 /// ```
72 ///
73 /// For each node, you need to specify the `path` of the executable or script that Dora should run when starting the node.
74 /// Most of the other node fields are optional, but you typically want to specify at least some `inputs` and/or `outputs`.
75 pub nodes: Vec<Node>,
76
77 /// Communication configuration (optional, uses defaults)
78 #[schemars(skip)]
79 #[serde(default)]
80 pub communication: CommunicationConfig,
81
82 /// Deployment configuration (optional, unstable)
83 #[schemars(skip)]
84 #[serde(rename = "_unstable_deploy")]
85 pub deploy: Option<Deploy>,
86
87 /// Debug options (optional, unstable)
88 #[schemars(skip)]
89 #[serde(default, rename = "_unstable_debug")]
90 pub debug: Debug,
91}
92
93/// Specifies when a node should be restarted.
94#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema)]
95#[serde(rename_all = "kebab-case")]
96pub enum RestartPolicy {
97 /// Never restart the node (default)
98 #[default]
99 Never,
100 /// Restart the node if it exits with a non-zero exit code.
101 OnFailure,
102 /// Always restart the node when it exits, regardless of exit code.
103 ///
104 /// The node will not be restarted on the following conditions:
105 ///
106 /// - The node was stopped by the user (e.g., via `dora stop`).
107 /// - All inputs to the node have been closed and the node finished with a non-zero exit code.
108 Always,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
112#[serde(deny_unknown_fields)]
113pub struct Deploy {
114 /// Target machine for deployment
115 pub machine: Option<String>,
116 /// Working directory for the deployment
117 pub working_dir: Option<PathBuf>,
118}
119
120#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
121pub struct Debug {
122 /// Whether to publish all messages to Zenoh for debugging
123 #[serde(default)]
124 pub publish_all_messages_to_zenoh: bool,
125}
126
127/// # Dora Node Configuration
128///
129/// A node represents a computational unit in a Dora dataflow. Each node runs as a
130/// separate process and can communicate with other nodes through inputs and outputs.
131#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
132#[serde(deny_unknown_fields)]
133pub struct Node {
134 /// Unique node identifier. Must not contain `/` characters.
135 ///
136 /// Node IDs can be arbitrary strings with the following limitations:
137 ///
138 /// - They must not contain any `/` characters (slashes).
139 /// - We do not recommend using whitespace characters (e.g. spaces) in IDs
140 ///
141 /// Each node must have an ID field.
142 ///
143 /// ## Example
144 ///
145 /// ```yaml
146 /// nodes:
147 /// - id: camera_node
148 /// - id: some_other_node
149 /// ```
150 pub id: NodeId,
151
152 /// Human-readable node name for documentation.
153 ///
154 /// This optional field can be used to define a more descriptive name in addition to a short
155 /// [`id`](Self::id).
156 ///
157 /// ## Example
158 ///
159 /// ```yaml
160 /// nodes:
161 /// - id: camera_node
162 /// name: "Camera Input Handler"
163 pub name: Option<String>,
164
165 /// Detailed description of the node's functionality.
166 ///
167 /// ## Example
168 ///
169 /// ```yaml
170 /// nodes:
171 /// - id: camera_node
172 /// description: "Captures video frames from webcam"
173 /// ```
174 pub description: Option<String>,
175
176 /// Path to executable or script that should be run.
177 ///
178 /// Specifies the path of the executable or script that Dora should run when starting the
179 /// dataflow.
180 /// This can point to a normal executable (e.g. when using a compiled language such as Rust) or
181 /// a Python script.
182 ///
183 /// Dora will automatically append a `.exe` extension on Windows systems when the specified
184 /// file name has no extension.
185 ///
186 /// ## Example
187 ///
188 /// ```yaml
189 /// nodes:
190 /// - id: rust-example
191 /// path: target/release/rust-node
192 /// - id: python-example
193 /// path: ./receive_data.py
194 /// ```
195 ///
196 /// ## URL as Path
197 ///
198 /// The `path` field can also point to a URL instead of a local path.
199 /// In this case, Dora will download the given file when starting the dataflow.
200 ///
201 /// Note that this is quite an old feature and using this functionality is **not recommended**
202 /// anymore. Instead, we recommend using a [`git`][Self::git] and/or [`build`](Self::build)
203 /// key.
204 #[serde(default, skip_serializing_if = "Option::is_none")]
205 pub path: Option<String>,
206
207 /// Command-line arguments passed to the executable.
208 ///
209 /// The command-line arguments that should be passed to the executable/script specified in `path`.
210 /// The arguments should be separated by space.
211 /// This field is optional and defaults to an empty argument list.
212 ///
213 /// ## Example
214 /// ```yaml
215 /// nodes:
216 /// - id: example
217 /// path: example-node
218 /// args: -v --some-flag foo
219 /// ```
220 #[serde(default, skip_serializing_if = "Option::is_none")]
221 pub args: Option<String>,
222
223 /// Environment variables for node builds and execution.
224 ///
225 /// Key-value map of environment variables that should be set for both the
226 /// [`build`](Self::build) operation and the node execution (i.e. when the node is spawned
227 /// through [`path`](Self::path)).
228 ///
229 /// Supports strings, numbers, and booleans.
230 ///
231 /// ## Example
232 ///
233 /// ```yaml
234 /// nodes:
235 /// - id: example-node
236 /// path: path/to/node
237 /// env:
238 /// DEBUG: true
239 /// PORT: 8080
240 /// API_KEY: "secret-key"
241 /// ```
242 pub env: Option<BTreeMap<String, EnvValue>>,
243
244 /// Multiple operators running in a shared runtime process.
245 ///
246 /// Operators are an experimental, lightweight alternative to nodes.
247 /// Instead of running as a separate process, operators are linked into a runtime process.
248 /// This allows running multiple operators to share a single address space (not supported for
249 /// Python currently).
250 ///
251 /// Operators are defined as part of the node list, as children of a runtime node.
252 /// A runtime node is a special node that specifies no [`path`](Self::path) field, but contains
253 /// an `operators` field instead.
254 ///
255 /// ## Example
256 ///
257 /// ```yaml
258 /// nodes:
259 /// - id: runtime-node
260 /// operators:
261 /// - id: processor
262 /// python: process.py
263 /// ```
264 #[serde(default, skip_serializing_if = "Option::is_none")]
265 pub operators: Option<RuntimeNode>,
266
267 /// Single operator configuration.
268 ///
269 /// This is a convenience field for defining runtime nodes that contain only a single operator.
270 /// This field is an alternative to the [`operators`](Self::operators) field, which can be used
271 /// if there is only a single operator defined for the runtime node.
272 ///
273 /// ## Example
274 ///
275 /// ```yaml
276 /// nodes:
277 /// - id: runtime-node
278 /// operator:
279 /// id: processor
280 /// python: script.py
281 /// outputs: [data]
282 /// ```
283 #[serde(default, skip_serializing_if = "Option::is_none")]
284 pub operator: Option<SingleOperatorDefinition>,
285
286 /// Legacy node configuration (deprecated).
287 ///
288 /// Please use the top-level [`path`](Self::path), [`args`](Self::args), etc. fields instead.
289 #[serde(default, skip_serializing_if = "Option::is_none")]
290 pub custom: Option<CustomNode>,
291
292 /// Output data identifiers produced by this node.
293 ///
294 /// List of output identifiers that the node sends.
295 /// Must contain all `output_id` values that the node uses when sending output, e.g. through the
296 /// [`send_output`](https://docs.rs/dora-node-api/latest/dora_node_api/struct.DoraNode.html#method.send_output)
297 /// function.
298 ///
299 /// ## Example
300 ///
301 /// ```yaml
302 /// nodes:
303 /// - id: example-node
304 /// outputs:
305 /// - processed_image
306 /// - metadata
307 /// ```
308 #[serde(default)]
309 pub outputs: BTreeSet<DataId>,
310
311 /// Input data connections from other nodes.
312 ///
313 /// Defines the inputs that this node is subscribing to.
314 ///
315 /// The `inputs` field should be a key-value map of the following format:
316 ///
317 /// `input_id: source_node_id/source_node_output_id`
318 ///
319 /// The components are defined as follows:
320 ///
321 /// - `input_id` is the local identifier that should be used for this input.
322 ///
323 /// This will map to the `id` field of
324 /// [`Event::Input`](https://docs.rs/dora-node-api/latest/dora_node_api/enum.Event.html#variant.Input)
325 /// events sent to the node event loop.
326 /// - `source_node_id` should be the `id` field of the node that sends the output that we want
327 /// to subscribe to
328 /// - `source_node_output_id` should be the identifier of the output that that we want
329 /// to subscribe to
330 ///
331 /// ## Example
332 ///
333 /// ```yaml
334 /// nodes:
335 /// - id: example-node
336 /// outputs:
337 /// - one
338 /// - two
339 /// - id: receiver
340 /// inputs:
341 /// my_input: example-node/two
342 /// ```
343 #[serde(default)]
344 pub inputs: BTreeMap<DataId, Input>,
345
346 /// Redirect stdout/stderr to a data output.
347 ///
348 /// This field can be used to send all stdout and stderr output of the node as a Dora output.
349 /// Each output line is sent as a separate message.
350 ///
351 ///
352 /// ## Example
353 ///
354 /// ```yaml
355 /// nodes:
356 /// - id: example
357 /// send_stdout_as: stdout_output
358 /// - id: logger
359 /// inputs:
360 /// example_output: example/stdout_output
361 /// ```
362 #[serde(skip_serializing_if = "Option::is_none")]
363 pub send_stdout_as: Option<String>,
364
365 /// Build commands executed during `dora build`. Each line runs separately.
366 ///
367 /// The `build` key specifies the command that should be invoked for building the node.
368 /// The key expects a single- or multi-line string.
369 ///
370 /// Each line is run as a separate command.
371 /// Spaces are used to separate arguments.
372 ///
373 /// Note that all the environment variables specified in the [`env`](Self::env) field are also
374 /// applied to the build commands.
375 ///
376 /// ## Special treatment of `pip`
377 ///
378 /// Build lines that start with `pip` or `pip3` are treated in a special way:
379 /// If the `--uv` argument is passed to the `dora build` command, all `pip`/`pip3` commands are
380 /// run through the [`uv` package manager](https://docs.astral.sh/uv/).
381 ///
382 /// ## Example
383 ///
384 /// ```yaml
385 /// nodes:
386 /// - id: build-example
387 /// build: cargo build -p receive_data --release
388 /// path: target/release/receive_data
389 /// - id: multi-line-example
390 /// build: |
391 /// pip install requirements.txt
392 /// pip install -e some/local/package
393 /// path: package
394 /// ```
395 ///
396 /// In the above example, the `pip` commands will be replaced by `uv pip` when run through
397 /// `dora build --uv`.
398 #[serde(default, skip_serializing_if = "Option::is_none")]
399 pub build: Option<String>,
400
401 /// Git repository URL for downloading nodes.
402 ///
403 /// The `git` key allows downloading nodes (i.e. their source code) from git repositories.
404 /// This can be especially useful for distributed dataflows.
405 ///
406 /// When a `git` key is specified, `dora build` automatically clones the specified repository
407 /// (or reuse an existing clone).
408 /// Then it checks out the specified [`branch`](Self::branch), [`tag`](Self::tag), or
409 /// [`rev`](Self::rev), or the default branch if none of them are specified.
410 /// Afterwards it runs the [`build`](Self::build) command if specified.
411 ///
412 /// Note that the git clone directory is set as working directory for both the
413 /// [`build`](Self::build) command and the specified [`path`](Self::path).
414 ///
415 /// ## Example
416 ///
417 /// ```yaml
418 /// nodes:
419 /// - id: rust-node
420 /// git: https://github.com/dora-rs/dora.git
421 /// build: cargo build -p rust-dataflow-example-node
422 /// path: target/debug/rust-dataflow-example-node
423 /// ```
424 ///
425 /// In the above example, `dora build` will first clone the specified `git` repository and then
426 /// run the specified `build` inside the local clone directory.
427 /// When `dora run` or `dora start` is invoked, the working directory will be the git clone
428 /// directory too. So a relative `path` will start from the clone directory.
429 #[serde(default, skip_serializing_if = "Option::is_none")]
430 pub git: Option<String>,
431
432 /// Git branch to checkout after cloning.
433 ///
434 /// The `branch` field is only allowed in combination with the [`git`](#git) field.
435 /// It specifies the branch that should be checked out after cloning.
436 /// Only one of `branch`, `tag`, or `rev` can be specified.
437 ///
438 /// ## Example
439 ///
440 /// ```yaml
441 /// nodes:
442 /// - id: rust-node
443 /// git: https://github.com/dora-rs/dora.git
444 /// branch: some-branch-name
445 /// ```
446 #[serde(default, skip_serializing_if = "Option::is_none")]
447 pub branch: Option<String>,
448
449 /// Git tag to checkout after cloning.
450 ///
451 /// The `tag` field is only allowed in combination with the [`git`](#git) field.
452 /// It specifies the git tag that should be checked out after cloning.
453 /// Only one of `branch`, `tag`, or `rev` can be specified.
454 ///
455 /// ## Example
456 ///
457 /// ```yaml
458 /// nodes:
459 /// - id: rust-node
460 /// git: https://github.com/dora-rs/dora.git
461 /// tag: v0.3.0
462 /// ```
463 #[serde(default, skip_serializing_if = "Option::is_none")]
464 pub tag: Option<String>,
465
466 /// Git revision (e.g. commit hash) to checkout after cloning.
467 ///
468 /// The `rev` field is only allowed in combination with the [`git`](#git) field.
469 /// It specifies the git revision (e.g. a commit hash) that should be checked out after cloning.
470 /// Only one of `branch`, `tag`, or `rev` can be specified.
471 ///
472 /// ## Example
473 ///
474 /// ```yaml
475 /// nodes:
476 /// - id: rust-node
477 /// git: https://github.com/dora-rs/dora.git
478 /// rev: 64ab0d7c
479 /// ```
480 #[serde(default, skip_serializing_if = "Option::is_none")]
481 pub rev: Option<String>,
482
483 /// Whether this node should be restarted on exit or error.
484 ///
485 /// Defaults to `RestartPolicy::Never`.
486 #[serde(default)]
487 pub restart_policy: RestartPolicy,
488
489 /// Unstable machine deployment configuration
490 #[schemars(skip)]
491 #[serde(rename = "_unstable_deploy")]
492 pub deploy: Option<Deploy>,
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize)]
496pub struct ResolvedNode {
497 pub id: NodeId,
498 pub name: Option<String>,
499 pub description: Option<String>,
500 pub env: Option<BTreeMap<String, EnvValue>>,
501
502 #[serde(default)]
503 pub deploy: Option<Deploy>,
504
505 #[serde(flatten)]
506 pub kind: CoreNodeKind,
507}
508
509impl ResolvedNode {
510 pub fn has_git_source(&self) -> bool {
511 self.kind
512 .as_custom()
513 .map(|n| n.source.is_git())
514 .unwrap_or_default()
515 }
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize)]
519#[serde(rename_all = "lowercase")]
520#[allow(clippy::large_enum_variant)]
521pub enum CoreNodeKind {
522 /// Dora runtime node
523 #[serde(rename = "operators")]
524 Runtime(RuntimeNode),
525 Custom(CustomNode),
526}
527
528impl CoreNodeKind {
529 pub fn as_custom(&self) -> Option<&CustomNode> {
530 match self {
531 CoreNodeKind::Runtime(_) => None,
532 CoreNodeKind::Custom(custom_node) => Some(custom_node),
533 }
534 }
535}
536
537#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
538#[serde(transparent)]
539pub struct RuntimeNode {
540 /// List of operators running in this runtime
541 pub operators: Vec<OperatorDefinition>,
542}
543
544#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
545pub struct OperatorDefinition {
546 /// Unique operator identifier within the runtime
547 pub id: OperatorId,
548 #[serde(flatten)]
549 pub config: OperatorConfig,
550}
551
552#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
553pub struct SingleOperatorDefinition {
554 /// Operator identifier (optional for single operators)
555 pub id: Option<OperatorId>,
556 #[serde(flatten)]
557 pub config: OperatorConfig,
558}
559
560#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
561pub struct OperatorConfig {
562 /// Human-readable operator name
563 pub name: Option<String>,
564 /// Detailed description of the operator
565 pub description: Option<String>,
566
567 /// Input data connections
568 #[serde(default)]
569 pub inputs: BTreeMap<DataId, Input>,
570 /// Output data identifiers
571 #[serde(default)]
572 pub outputs: BTreeSet<DataId>,
573
574 /// Operator source configuration (Python, shared library, etc.)
575 #[serde(flatten)]
576 pub source: OperatorSource,
577
578 /// Build commands for this operator
579 #[serde(default, skip_serializing_if = "Option::is_none")]
580 pub build: Option<String>,
581 /// Redirect stdout to data output
582 #[serde(skip_serializing_if = "Option::is_none")]
583 pub send_stdout_as: Option<String>,
584}
585
586#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
587#[serde(rename_all = "kebab-case")]
588pub enum OperatorSource {
589 SharedLibrary(String),
590 Python(PythonSource),
591 #[schemars(skip)]
592 Wasm(String),
593}
594#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
595#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
596pub struct PythonSource {
597 pub source: String,
598 pub conda_env: Option<String>,
599}
600
601#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
602#[serde(untagged)]
603pub enum PythonSourceDef {
604 SourceOnly(String),
605 WithOptions {
606 source: String,
607 conda_env: Option<String>,
608 },
609}
610
611impl From<PythonSource> for PythonSourceDef {
612 fn from(input: PythonSource) -> Self {
613 match input {
614 PythonSource {
615 source,
616 conda_env: None,
617 } => Self::SourceOnly(source),
618 PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
619 }
620 }
621}
622
623impl From<PythonSourceDef> for PythonSource {
624 fn from(value: PythonSourceDef) -> Self {
625 match value {
626 PythonSourceDef::SourceOnly(source) => Self {
627 source,
628 conda_env: None,
629 },
630 PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
631 }
632 }
633}
634
635#[derive(Debug, Serialize, Deserialize, Clone)]
636#[serde(deny_unknown_fields)]
637pub struct PythonOperatorConfig {
638 pub path: PathBuf,
639 #[serde(default)]
640 pub inputs: BTreeMap<DataId, InputMapping>,
641 #[serde(default)]
642 pub outputs: BTreeSet<DataId>,
643}
644
645#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
646pub struct CustomNode {
647 /// Path of the source code
648 ///
649 /// If you want to use a specific `conda` environment.
650 /// Provide the python path within the source.
651 ///
652 /// source: /home/peter/miniconda3/bin/python
653 ///
654 /// args: some_node.py
655 ///
656 /// Source can match any executable in PATH.
657 pub path: String,
658 pub source: NodeSource,
659 /// Args for the executable.
660 #[serde(default, skip_serializing_if = "Option::is_none")]
661 pub args: Option<String>,
662 /// Environment variables for the custom nodes
663 ///
664 /// Deprecated, use outer-level `env` field instead.
665 pub envs: Option<BTreeMap<String, EnvValue>>,
666 #[serde(default, skip_serializing_if = "Option::is_none")]
667 pub build: Option<String>,
668 /// Send stdout and stderr to another node
669 #[serde(skip_serializing_if = "Option::is_none")]
670 pub send_stdout_as: Option<String>,
671
672 #[serde(default)]
673 pub restart_policy: RestartPolicy,
674
675 #[serde(flatten)]
676 pub run_config: NodeRunConfig,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
680pub enum NodeSource {
681 Local,
682 GitBranch {
683 repo: String,
684 rev: Option<GitRepoRev>,
685 },
686}
687
688impl NodeSource {
689 pub fn is_git(&self) -> bool {
690 matches!(self, Self::GitBranch { .. })
691 }
692}
693
694#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
695pub enum ResolvedNodeSource {
696 Local,
697 GitCommit { repo: String, commit_hash: String },
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
701pub enum GitRepoRev {
702 Branch(String),
703 Tag(String),
704 Rev(String),
705}
706
707#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
708#[serde(untagged)]
709pub enum EnvValue {
710 #[serde(deserialize_with = "with_expand_envs")]
711 Bool(bool),
712 #[serde(deserialize_with = "with_expand_envs")]
713 Integer(i64),
714 #[serde(deserialize_with = "with_expand_envs")]
715 Float(f64),
716 #[serde(deserialize_with = "with_expand_envs")]
717 String(String),
718}
719
720impl fmt::Display for EnvValue {
721 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
722 match self {
723 EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
724 EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
725 EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
726 EnvValue::String(str) => fmt.write_str(str),
727 }
728 }
729}