dora_message/
descriptor.rs

1#![warn(missing_docs)]
2
3use crate::{
4    config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
5    id::{DataId, NodeId, OperatorId},
6};
7use schemars::JsonSchema;
8use serde::{Deserialize, Serialize};
9use serde_with_expand_env::with_expand_envs;
10use std::{
11    collections::{BTreeMap, BTreeSet},
12    fmt,
13    path::PathBuf,
14};
15
16pub const SHELL_SOURCE: &str = "shell";
17/// Set the [`Node::path`] field to this value to treat the node as a
18/// [_dynamic node_](https://docs.rs/dora-node-api/latest/dora_node_api/).
19pub const DYNAMIC_SOURCE: &str = "dynamic";
20
21/// # Dataflow Specification
22///
23/// The main configuration structure for defining a Dora dataflow. Dataflows are
24/// specified through YAML files that describe the nodes, their connections, and
25/// execution parameters.
26///
27/// ## Structure
28///
29/// A dataflow consists of:
30/// - **Nodes**: The computational units that process data
31/// - **Communication**: Optional communication configuration
32/// - **Deployment**: Optional deployment configuration (unstable)
33/// - **Debug options**: Optional development and debugging settings (unstable)
34///
35/// ## Example
36///
37/// ```yaml
38/// nodes:
39///  - id: webcam
40///     operator:
41///       python: webcam.py
42///       inputs:
43///         tick: dora/timer/millis/100
44///       outputs:
45///         - image
46///   - id: plot
47///     operator:
48///       python: plot.py
49///       inputs:
50///         image: webcam/image
51/// ```
52#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
53#[serde(deny_unknown_fields)]
54#[schemars(title = "dora-rs specification")]
55pub struct Descriptor {
56    /// List of nodes in the dataflow
57    ///
58    /// This is the most important field of the dataflow specification.
59    /// Each node must be identified by a unique `id`:
60    ///
61    /// ## Example
62    ///
63    /// ```yaml
64    /// nodes:
65    ///   - id: foo
66    ///     path: path/to/the/executable
67    ///     # ... (see below)
68    ///   - id: bar
69    ///     path: path/to/another/executable
70    ///     # ... (see below)
71    /// ```
72    ///
73    /// For each node, you need to specify the `path` of the executable or script that Dora should run when starting the node.
74    /// Most of the other node fields are optional, but you typically want to specify at least some `inputs` and/or `outputs`.
75    pub nodes: Vec<Node>,
76
77    /// Communication configuration (optional, uses defaults)
78    #[schemars(skip)]
79    #[serde(default)]
80    pub communication: CommunicationConfig,
81
82    /// Deployment configuration (optional, unstable)
83    #[schemars(skip)]
84    #[serde(rename = "_unstable_deploy")]
85    pub deploy: Option<Deploy>,
86
87    /// Debug options (optional, unstable)
88    #[schemars(skip)]
89    #[serde(default, rename = "_unstable_debug")]
90    pub debug: Debug,
91}
92
93/// Specifies when a node should be restarted.
94#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema)]
95#[serde(rename_all = "kebab-case")]
96pub enum RestartPolicy {
97    /// Never restart the node (default)
98    #[default]
99    Never,
100    /// Restart the node if it exits with a non-zero exit code.
101    OnFailure,
102    /// Always restart the node when it exits, regardless of exit code.
103    ///
104    /// The node will not be restarted on the following conditions:
105    ///
106    /// - The node was stopped by the user (e.g., via `dora stop`).
107    /// - All inputs to the node have been closed and the node finished with a non-zero exit code.
108    Always,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
112#[serde(deny_unknown_fields)]
113pub struct Deploy {
114    /// Target machine for deployment
115    pub machine: Option<String>,
116    /// Working directory for the deployment
117    pub working_dir: Option<PathBuf>,
118}
119
120#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
121pub struct Debug {
122    /// Whether to publish all messages to Zenoh for debugging
123    #[serde(default)]
124    pub publish_all_messages_to_zenoh: bool,
125}
126
127/// # Dora Node Configuration
128///
129/// A node represents a computational unit in a Dora dataflow. Each node runs as a
130/// separate process and can communicate with other nodes through inputs and outputs.
131#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
132#[serde(deny_unknown_fields)]
133pub struct Node {
134    /// Unique node identifier. Must not contain `/` characters.
135    ///
136    /// Node IDs can be arbitrary strings with the following limitations:
137    ///
138    /// - They must not contain any `/` characters (slashes).
139    /// - We do not recommend using whitespace characters (e.g. spaces) in IDs
140    ///
141    /// Each node must have an ID field.
142    ///
143    /// ## Example
144    ///
145    /// ```yaml
146    /// nodes:
147    ///   - id: camera_node
148    ///   - id: some_other_node
149    /// ```
150    pub id: NodeId,
151
152    /// Human-readable node name for documentation.
153    ///
154    /// This optional field can be used to define a more descriptive name in addition to a short
155    /// [`id`](Self::id).
156    ///
157    /// ## Example
158    ///
159    /// ```yaml
160    /// nodes:
161    ///   - id: camera_node
162    ///     name: "Camera Input Handler"
163    pub name: Option<String>,
164
165    /// Detailed description of the node's functionality.
166    ///
167    /// ## Example
168    ///
169    /// ```yaml
170    /// nodes:
171    ///   - id: camera_node
172    ///     description: "Captures video frames from webcam"
173    /// ```
174    pub description: Option<String>,
175
176    /// Path to executable or script that should be run.
177    ///
178    /// Specifies the path of the executable or script that Dora should run when starting the
179    /// dataflow.
180    /// This can point to a normal executable (e.g. when using a compiled language such as Rust) or
181    /// a Python script.
182    ///
183    /// Dora will automatically append a `.exe` extension on Windows systems when the specified
184    /// file name has no extension.
185    ///
186    /// ## Example
187    ///
188    /// ```yaml
189    /// nodes:
190    ///   - id: rust-example
191    ///     path: target/release/rust-node
192    ///   - id: python-example
193    ///     path: ./receive_data.py
194    /// ```
195    ///
196    /// ## URL as Path
197    ///
198    /// The `path` field can also point to a URL instead of a local path.
199    /// In this case, Dora will download the given file when starting the dataflow.
200    ///
201    /// Note that this is quite an old feature and using this functionality is **not recommended**
202    /// anymore. Instead, we recommend using a [`git`][Self::git] and/or [`build`](Self::build)
203    /// key.
204    #[serde(default, skip_serializing_if = "Option::is_none")]
205    pub path: Option<String>,
206
207    /// Command-line arguments passed to the executable.
208    ///
209    /// The command-line arguments that should be passed to the executable/script specified in `path`.
210    /// The arguments should be separated by space.
211    /// This field is optional and defaults to an empty argument list.
212    ///
213    /// ## Example
214    /// ```yaml
215    /// nodes:
216    ///   - id: example
217    ///     path: example-node
218    ///     args: -v --some-flag foo
219    /// ```
220    #[serde(default, skip_serializing_if = "Option::is_none")]
221    pub args: Option<String>,
222
223    /// Environment variables for node builds and execution.
224    ///
225    /// Key-value map of environment variables that should be set for both the
226    /// [`build`](Self::build) operation and the node execution (i.e. when the node is spawned
227    /// through [`path`](Self::path)).
228    ///
229    /// Supports strings, numbers, and booleans.
230    ///
231    /// ## Example
232    ///
233    /// ```yaml
234    /// nodes:
235    ///   - id: example-node
236    ///     path: path/to/node
237    ///     env:
238    ///       DEBUG: true
239    ///       PORT: 8080
240    ///       API_KEY: "secret-key"
241    /// ```
242    pub env: Option<BTreeMap<String, EnvValue>>,
243
244    /// Multiple operators running in a shared runtime process.
245    ///
246    /// Operators are an experimental, lightweight alternative to nodes.
247    /// Instead of running as a separate process, operators are linked into a runtime process.
248    /// This allows running multiple operators to share a single address space (not supported for
249    /// Python currently).
250    ///
251    /// Operators are defined as part of the node list, as children of a runtime node.
252    /// A runtime node is a special node that specifies no [`path`](Self::path) field, but contains
253    /// an `operators` field instead.
254    ///
255    /// ## Example
256    ///
257    /// ```yaml
258    /// nodes:
259    ///   - id: runtime-node
260    ///     operators:
261    ///       - id: processor
262    ///         python: process.py
263    /// ```
264    #[serde(default, skip_serializing_if = "Option::is_none")]
265    pub operators: Option<RuntimeNode>,
266
267    /// Single operator configuration.
268    ///
269    /// This is a convenience field for defining runtime nodes that contain only a single operator.
270    /// This field is an alternative to the [`operators`](Self::operators) field, which can be used
271    /// if there is only a single operator defined for the runtime node.
272    ///
273    /// ## Example
274    ///
275    /// ```yaml
276    /// nodes:
277    ///   - id: runtime-node
278    ///     operator:
279    ///       id: processor
280    ///       python: script.py
281    ///       outputs: [data]
282    /// ```
283    #[serde(default, skip_serializing_if = "Option::is_none")]
284    pub operator: Option<SingleOperatorDefinition>,
285
286    /// Legacy node configuration (deprecated).
287    ///
288    /// Please use the top-level [`path`](Self::path), [`args`](Self::args), etc. fields instead.
289    #[serde(default, skip_serializing_if = "Option::is_none")]
290    pub custom: Option<CustomNode>,
291
292    /// Output data identifiers produced by this node.
293    ///
294    /// List of output identifiers that the node sends.
295    /// Must contain all `output_id` values that the node uses when sending output, e.g. through the
296    /// [`send_output`](https://docs.rs/dora-node-api/latest/dora_node_api/struct.DoraNode.html#method.send_output)
297    /// function.
298    ///
299    /// ## Example
300    ///
301    /// ```yaml
302    /// nodes:
303    ///   - id: example-node
304    ///     outputs:
305    ///       - processed_image
306    ///       - metadata
307    /// ```
308    #[serde(default)]
309    pub outputs: BTreeSet<DataId>,
310
311    /// Input data connections from other nodes.
312    ///
313    /// Defines the inputs that this node is subscribing to.
314    ///
315    /// The `inputs` field should be a key-value map of the following format:
316    ///
317    /// `input_id: source_node_id/source_node_output_id`
318    ///
319    /// The components are defined as follows:
320    ///
321    ///   - `input_id` is the local identifier that should be used for this input.
322    ///
323    ///     This will map to the `id` field of
324    ///     [`Event::Input`](https://docs.rs/dora-node-api/latest/dora_node_api/enum.Event.html#variant.Input)
325    ///     events sent to the node event loop.
326    ///   - `source_node_id` should be the `id` field of the node that sends the output that we want
327    ///     to subscribe to
328    ///   - `source_node_output_id` should be the identifier of the output that that we want
329    ///     to subscribe to
330    ///
331    /// ## Example
332    ///
333    /// ```yaml
334    /// nodes:
335    ///   - id: example-node
336    ///     outputs:
337    ///       - one
338    ///       - two
339    ///   - id: receiver
340    ///     inputs:
341    ///         my_input: example-node/two
342    /// ```
343    #[serde(default)]
344    pub inputs: BTreeMap<DataId, Input>,
345
346    /// Redirect stdout/stderr to a data output.
347    ///
348    /// This field can be used to send all stdout and stderr output of the node as a Dora output.
349    /// Each output line is sent as a separate message.
350    ///
351    ///
352    /// ## Example
353    ///
354    /// ```yaml
355    /// nodes:
356    ///   - id: example
357    ///     send_stdout_as: stdout_output
358    ///   - id: logger
359    ///     inputs:
360    ///         example_output: example/stdout_output
361    /// ```
362    #[serde(skip_serializing_if = "Option::is_none")]
363    pub send_stdout_as: Option<String>,
364
365    /// Build commands executed during `dora build`. Each line runs separately.
366    ///
367    /// The `build` key specifies the command that should be invoked for building the node.
368    /// The key expects a single- or multi-line string.
369    ///
370    /// Each line is run as a separate command.
371    /// Spaces are used to separate arguments.
372    ///
373    /// Note that all the environment variables specified in the [`env`](Self::env) field are also
374    /// applied to the build commands.
375    ///
376    /// ## Special treatment of `pip`
377    ///
378    /// Build lines that start with `pip` or `pip3` are treated in a special way:
379    /// If the `--uv` argument is passed to the `dora build` command, all `pip`/`pip3` commands are
380    /// run through the [`uv` package manager](https://docs.astral.sh/uv/).
381    ///
382    /// ## Example
383    ///
384    /// ```yaml
385    /// nodes:
386    /// - id: build-example
387    ///   build: cargo build -p receive_data --release
388    ///   path: target/release/receive_data
389    /// - id: multi-line-example
390    ///   build: |
391    ///       pip install requirements.txt
392    ///       pip install -e some/local/package
393    ///   path: package
394    /// ```
395    ///
396    /// In the above example, the `pip` commands will be replaced by `uv pip` when run through
397    /// `dora build --uv`.
398    #[serde(default, skip_serializing_if = "Option::is_none")]
399    pub build: Option<String>,
400
401    /// Git repository URL for downloading nodes.
402    ///
403    /// The `git` key allows downloading nodes (i.e. their source code) from git repositories.
404    /// This can be especially useful for distributed dataflows.
405    ///
406    /// When a `git` key is specified, `dora build` automatically clones the specified repository
407    /// (or reuse an existing clone).
408    /// Then it checks out the specified [`branch`](Self::branch), [`tag`](Self::tag), or
409    /// [`rev`](Self::rev), or the default branch if none of them are specified.
410    /// Afterwards it runs the [`build`](Self::build) command if specified.
411    ///
412    /// Note that the git clone directory is set as working directory for both the
413    /// [`build`](Self::build) command and the specified [`path`](Self::path).
414    ///
415    /// ## Example
416    ///
417    /// ```yaml
418    /// nodes:
419    ///   - id: rust-node
420    ///     git: https://github.com/dora-rs/dora.git
421    ///     build: cargo build -p rust-dataflow-example-node
422    ///     path: target/debug/rust-dataflow-example-node
423    /// ```
424    ///
425    /// In the above example, `dora build` will first clone the specified `git` repository and then
426    /// run the specified `build` inside the local clone directory.
427    /// When `dora run` or `dora start` is invoked, the working directory will be the git clone
428    /// directory too. So a relative `path` will start from the clone directory.
429    #[serde(default, skip_serializing_if = "Option::is_none")]
430    pub git: Option<String>,
431
432    /// Git branch to checkout after cloning.
433    ///
434    /// The `branch` field is only allowed in combination with the [`git`](#git) field.
435    /// It specifies the branch that should be checked out after cloning.
436    /// Only one of `branch`, `tag`, or `rev` can be specified.
437    ///
438    /// ## Example
439    ///
440    /// ```yaml
441    /// nodes:
442    ///   - id: rust-node
443    ///     git: https://github.com/dora-rs/dora.git
444    ///     branch: some-branch-name
445    /// ```
446    #[serde(default, skip_serializing_if = "Option::is_none")]
447    pub branch: Option<String>,
448
449    /// Git tag to checkout after cloning.
450    ///
451    /// The `tag` field is only allowed in combination with the [`git`](#git) field.
452    /// It specifies the git tag that should be checked out after cloning.
453    /// Only one of `branch`, `tag`, or `rev` can be specified.
454    ///
455    /// ## Example
456    ///
457    /// ```yaml
458    /// nodes:
459    ///   - id: rust-node
460    ///     git: https://github.com/dora-rs/dora.git
461    ///     tag: v0.3.0
462    /// ```
463    #[serde(default, skip_serializing_if = "Option::is_none")]
464    pub tag: Option<String>,
465
466    /// Git revision (e.g. commit hash) to checkout after cloning.
467    ///
468    /// The `rev` field is only allowed in combination with the [`git`](#git) field.
469    /// It specifies the git revision (e.g. a commit hash) that should be checked out after cloning.
470    /// Only one of `branch`, `tag`, or `rev` can be specified.
471    ///
472    /// ## Example
473    ///
474    /// ```yaml
475    /// nodes:
476    ///   - id: rust-node
477    ///     git: https://github.com/dora-rs/dora.git
478    ///     rev: 64ab0d7c
479    /// ```
480    #[serde(default, skip_serializing_if = "Option::is_none")]
481    pub rev: Option<String>,
482
483    /// Whether this node should be restarted on exit or error.
484    ///
485    /// Defaults to `RestartPolicy::Never`.
486    #[serde(default)]
487    pub restart_policy: RestartPolicy,
488
489    /// Unstable machine deployment configuration
490    #[schemars(skip)]
491    #[serde(rename = "_unstable_deploy")]
492    pub deploy: Option<Deploy>,
493}
494
495#[derive(Debug, Clone, Serialize, Deserialize)]
496pub struct ResolvedNode {
497    pub id: NodeId,
498    pub name: Option<String>,
499    pub description: Option<String>,
500    pub env: Option<BTreeMap<String, EnvValue>>,
501
502    #[serde(default)]
503    pub deploy: Option<Deploy>,
504
505    #[serde(flatten)]
506    pub kind: CoreNodeKind,
507}
508
509impl ResolvedNode {
510    pub fn has_git_source(&self) -> bool {
511        self.kind
512            .as_custom()
513            .map(|n| n.source.is_git())
514            .unwrap_or_default()
515    }
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize)]
519#[serde(rename_all = "lowercase")]
520#[allow(clippy::large_enum_variant)]
521pub enum CoreNodeKind {
522    /// Dora runtime node
523    #[serde(rename = "operators")]
524    Runtime(RuntimeNode),
525    Custom(CustomNode),
526}
527
528impl CoreNodeKind {
529    pub fn as_custom(&self) -> Option<&CustomNode> {
530        match self {
531            CoreNodeKind::Runtime(_) => None,
532            CoreNodeKind::Custom(custom_node) => Some(custom_node),
533        }
534    }
535}
536
537#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
538#[serde(transparent)]
539pub struct RuntimeNode {
540    /// List of operators running in this runtime
541    pub operators: Vec<OperatorDefinition>,
542}
543
544#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
545pub struct OperatorDefinition {
546    /// Unique operator identifier within the runtime
547    pub id: OperatorId,
548    #[serde(flatten)]
549    pub config: OperatorConfig,
550}
551
552#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
553pub struct SingleOperatorDefinition {
554    /// Operator identifier (optional for single operators)
555    pub id: Option<OperatorId>,
556    #[serde(flatten)]
557    pub config: OperatorConfig,
558}
559
560#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
561pub struct OperatorConfig {
562    /// Human-readable operator name
563    pub name: Option<String>,
564    /// Detailed description of the operator
565    pub description: Option<String>,
566
567    /// Input data connections
568    #[serde(default)]
569    pub inputs: BTreeMap<DataId, Input>,
570    /// Output data identifiers
571    #[serde(default)]
572    pub outputs: BTreeSet<DataId>,
573
574    /// Operator source configuration (Python, shared library, etc.)
575    #[serde(flatten)]
576    pub source: OperatorSource,
577
578    /// Build commands for this operator
579    #[serde(default, skip_serializing_if = "Option::is_none")]
580    pub build: Option<String>,
581    /// Redirect stdout to data output
582    #[serde(skip_serializing_if = "Option::is_none")]
583    pub send_stdout_as: Option<String>,
584}
585
586#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
587#[serde(rename_all = "kebab-case")]
588pub enum OperatorSource {
589    SharedLibrary(String),
590    Python(PythonSource),
591    #[schemars(skip)]
592    Wasm(String),
593}
594#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
595#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
596pub struct PythonSource {
597    pub source: String,
598    pub conda_env: Option<String>,
599}
600
601#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
602#[serde(untagged)]
603pub enum PythonSourceDef {
604    SourceOnly(String),
605    WithOptions {
606        source: String,
607        conda_env: Option<String>,
608    },
609}
610
611impl From<PythonSource> for PythonSourceDef {
612    fn from(input: PythonSource) -> Self {
613        match input {
614            PythonSource {
615                source,
616                conda_env: None,
617            } => Self::SourceOnly(source),
618            PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
619        }
620    }
621}
622
623impl From<PythonSourceDef> for PythonSource {
624    fn from(value: PythonSourceDef) -> Self {
625        match value {
626            PythonSourceDef::SourceOnly(source) => Self {
627                source,
628                conda_env: None,
629            },
630            PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
631        }
632    }
633}
634
635#[derive(Debug, Serialize, Deserialize, Clone)]
636#[serde(deny_unknown_fields)]
637pub struct PythonOperatorConfig {
638    pub path: PathBuf,
639    #[serde(default)]
640    pub inputs: BTreeMap<DataId, InputMapping>,
641    #[serde(default)]
642    pub outputs: BTreeSet<DataId>,
643}
644
645#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
646pub struct CustomNode {
647    /// Path of the source code
648    ///
649    /// If you want to use a specific `conda` environment.
650    /// Provide the python path within the source.
651    ///
652    /// source: /home/peter/miniconda3/bin/python
653    ///
654    /// args: some_node.py
655    ///
656    /// Source can match any executable in PATH.
657    pub path: String,
658    pub source: NodeSource,
659    /// Args for the executable.
660    #[serde(default, skip_serializing_if = "Option::is_none")]
661    pub args: Option<String>,
662    /// Environment variables for the custom nodes
663    ///
664    /// Deprecated, use outer-level `env` field instead.
665    pub envs: Option<BTreeMap<String, EnvValue>>,
666    #[serde(default, skip_serializing_if = "Option::is_none")]
667    pub build: Option<String>,
668    /// Send stdout and stderr to another node
669    #[serde(skip_serializing_if = "Option::is_none")]
670    pub send_stdout_as: Option<String>,
671
672    #[serde(default)]
673    pub restart_policy: RestartPolicy,
674
675    #[serde(flatten)]
676    pub run_config: NodeRunConfig,
677}
678
679#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
680pub enum NodeSource {
681    Local,
682    GitBranch {
683        repo: String,
684        rev: Option<GitRepoRev>,
685    },
686}
687
688impl NodeSource {
689    pub fn is_git(&self) -> bool {
690        matches!(self, Self::GitBranch { .. })
691    }
692}
693
694#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
695pub enum ResolvedNodeSource {
696    Local,
697    GitCommit { repo: String, commit_hash: String },
698}
699
700#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
701pub enum GitRepoRev {
702    Branch(String),
703    Tag(String),
704    Rev(String),
705}
706
707#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
708#[serde(untagged)]
709pub enum EnvValue {
710    #[serde(deserialize_with = "with_expand_envs")]
711    Bool(bool),
712    #[serde(deserialize_with = "with_expand_envs")]
713    Integer(i64),
714    #[serde(deserialize_with = "with_expand_envs")]
715    Float(f64),
716    #[serde(deserialize_with = "with_expand_envs")]
717    String(String),
718}
719
720impl fmt::Display for EnvValue {
721    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
722        match self {
723            EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
724            EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
725            EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
726            EnvValue::String(str) => fmt.write_str(str),
727        }
728    }
729}