dora_message/
descriptor.rs

1#![warn(missing_docs)]
2
3use crate::{
4    config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
5    id::{DataId, NodeId, OperatorId},
6};
7use schemars::JsonSchema;
8use serde::{Deserialize, Serialize};
9use serde_with_expand_env::with_expand_envs;
10use std::{
11    collections::{BTreeMap, BTreeSet},
12    fmt,
13    path::PathBuf,
14};
15
16pub const SHELL_SOURCE: &str = "shell";
17/// Set the [`Node::path`] field to this value to treat the node as a
18/// [_dynamic node_](https://docs.rs/dora-node-api/latest/dora_node_api/).
19pub const DYNAMIC_SOURCE: &str = "dynamic";
20
21/// # Dataflow Specification
22///
23/// The main configuration structure for defining a Dora dataflow. Dataflows are
24/// specified through YAML files that describe the nodes, their connections, and
25/// execution parameters.
26///
27/// ## Structure
28///
29/// A dataflow consists of:
30/// - **Nodes**: The computational units that process data
31/// - **Communication**: Optional communication configuration
32/// - **Deployment**: Optional deployment configuration (unstable)
33/// - **Debug options**: Optional development and debugging settings (unstable)
34///
35/// ## Example
36///
37/// ```yaml
38/// nodes:
39///  - id: webcam
40///     operator:
41///       python: webcam.py
42///       inputs:
43///         tick: dora/timer/millis/100
44///       outputs:
45///         - image
46///   - id: plot
47///     operator:
48///       python: plot.py
49///       inputs:
50///         image: webcam/image
51/// ```
52#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
53#[serde(deny_unknown_fields)]
54#[schemars(title = "dora-rs specification")]
55pub struct Descriptor {
56    /// List of nodes in the dataflow
57    ///
58    /// This is the most important field of the dataflow specification.
59    /// Each node must be identified by a unique `id`:
60    ///
61    /// ## Example
62    ///
63    /// ```yaml
64    /// nodes:
65    ///   - id: foo
66    ///     path: path/to/the/executable
67    ///     # ... (see below)
68    ///   - id: bar
69    ///     path: path/to/another/executable
70    ///     # ... (see below)
71    /// ```
72    ///
73    /// For each node, you need to specify the `path` of the executable or script that Dora should run when starting the node.
74    /// Most of the other node fields are optional, but you typically want to specify at least some `inputs` and/or `outputs`.
75    pub nodes: Vec<Node>,
76
77    /// Communication configuration (optional, uses defaults)
78    #[schemars(skip)]
79    #[serde(default)]
80    pub communication: CommunicationConfig,
81
82    /// Deployment configuration (optional, unstable)
83    #[schemars(skip)]
84    #[serde(rename = "_unstable_deploy")]
85    pub deploy: Option<Deploy>,
86
87    /// Debug options (optional, unstable)
88    #[schemars(skip)]
89    #[serde(default, rename = "_unstable_debug")]
90    pub debug: Debug,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
94#[serde(deny_unknown_fields)]
95pub struct Deploy {
96    /// Target machine for deployment
97    pub machine: Option<String>,
98    /// Working directory for the deployment
99    pub working_dir: Option<PathBuf>,
100}
101
102#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
103pub struct Debug {
104    /// Whether to publish all messages to Zenoh for debugging
105    #[serde(default)]
106    pub publish_all_messages_to_zenoh: bool,
107}
108
109/// # Dora Node Configuration
110///
111/// A node represents a computational unit in a Dora dataflow. Each node runs as a
112/// separate process and can communicate with other nodes through inputs and outputs.
113#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
114#[serde(deny_unknown_fields)]
115pub struct Node {
116    /// Unique node identifier. Must not contain `/` characters.
117    ///
118    /// Node IDs can be arbitrary strings with the following limitations:
119    ///
120    /// - They must not contain any `/` characters (slashes).
121    /// - We do not recommend using whitespace characters (e.g. spaces) in IDs
122    ///
123    /// Each node must have an ID field.
124    ///
125    /// ## Example
126    ///
127    /// ```yaml
128    /// nodes:
129    ///   - id: camera_node
130    ///   - id: some_other_node
131    /// ```
132    pub id: NodeId,
133
134    /// Human-readable node name for documentation.
135    ///
136    /// This optional field can be used to define a more descriptive name in addition to a short
137    /// [`id`](Self::id).
138    ///
139    /// ## Example
140    ///
141    /// ```yaml
142    /// nodes:
143    ///   - id: camera_node
144    ///     name: "Camera Input Handler"
145    pub name: Option<String>,
146
147    /// Detailed description of the node's functionality.
148    ///
149    /// ## Example
150    ///
151    /// ```yaml
152    /// nodes:
153    ///   - id: camera_node
154    ///     description: "Captures video frames from webcam"
155    /// ```
156    pub description: Option<String>,
157
158    /// Path to executable or script that should be run.
159    ///
160    /// Specifies the path of the executable or script that Dora should run when starting the
161    /// dataflow.
162    /// This can point to a normal executable (e.g. when using a compiled language such as Rust) or
163    /// a Python script.
164    ///
165    /// Dora will automatically append a `.exe` extension on Windows systems when the specified
166    /// file name has no extension.
167    ///
168    /// ## Example
169    ///
170    /// ```yaml
171    /// nodes:
172    ///   - id: rust-example
173    ///     path: target/release/rust-node
174    ///   - id: python-example
175    ///     path: ./receive_data.py
176    /// ```
177    ///
178    /// ## URL as Path
179    ///
180    /// The `path` field can also point to a URL instead of a local path.
181    /// In this case, Dora will download the given file when starting the dataflow.
182    ///
183    /// Note that this is quite an old feature and using this functionality is **not recommended**
184    /// anymore. Instead, we recommend using a [`git`][Self::git] and/or [`build`](Self::build)
185    /// key.
186    #[serde(default, skip_serializing_if = "Option::is_none")]
187    pub path: Option<String>,
188
189    /// Command-line arguments passed to the executable.
190    ///
191    /// The command-line arguments that should be passed to the executable/script specified in `path`.
192    /// The arguments should be separated by space.
193    /// This field is optional and defaults to an empty argument list.
194    ///
195    /// ## Example
196    /// ```yaml
197    /// nodes:
198    ///   - id: example
199    ///     path: example-node
200    ///     args: -v --some-flag foo
201    /// ```
202    #[serde(default, skip_serializing_if = "Option::is_none")]
203    pub args: Option<String>,
204
205    /// Environment variables for node builds and execution.
206    ///
207    /// Key-value map of environment variables that should be set for both the
208    /// [`build`](Self::build) operation and the node execution (i.e. when the node is spawned
209    /// through [`path`](Self::path)).
210    ///
211    /// Supports strings, numbers, and booleans.
212    ///
213    /// ## Example
214    ///
215    /// ```yaml
216    /// nodes:
217    ///   - id: example-node
218    ///     path: path/to/node
219    ///     env:
220    ///       DEBUG: true
221    ///       PORT: 8080
222    ///       API_KEY: "secret-key"
223    /// ```
224    pub env: Option<BTreeMap<String, EnvValue>>,
225
226    /// Multiple operators running in a shared runtime process.
227    ///
228    /// Operators are an experimental, lightweight alternative to nodes.
229    /// Instead of running as a separate process, operators are linked into a runtime process.
230    /// This allows running multiple operators to share a single address space (not supported for
231    /// Python currently).
232    ///
233    /// Operators are defined as part of the node list, as children of a runtime node.
234    /// A runtime node is a special node that specifies no [`path`](Self::path) field, but contains
235    /// an `operators` field instead.
236    ///
237    /// ## Example
238    ///
239    /// ```yaml
240    /// nodes:
241    ///   - id: runtime-node
242    ///     operators:
243    ///       - id: processor
244    ///         python: process.py
245    /// ```
246    #[serde(default, skip_serializing_if = "Option::is_none")]
247    pub operators: Option<RuntimeNode>,
248
249    /// Single operator configuration.
250    ///
251    /// This is a convenience field for defining runtime nodes that contain only a single operator.
252    /// This field is an alternative to the [`operators`](Self::operators) field, which can be used
253    /// if there is only a single operator defined for the runtime node.
254    ///
255    /// ## Example
256    ///
257    /// ```yaml
258    /// nodes:
259    ///   - id: runtime-node
260    ///     operator:
261    ///       id: processor
262    ///       python: script.py
263    ///       outputs: [data]
264    /// ```
265    #[serde(default, skip_serializing_if = "Option::is_none")]
266    pub operator: Option<SingleOperatorDefinition>,
267
268    /// Legacy node configuration (deprecated).
269    ///
270    /// Please use the top-level [`path`](Self::path), [`args`](Self::args), etc. fields instead.
271    #[serde(default, skip_serializing_if = "Option::is_none")]
272    pub custom: Option<CustomNode>,
273
274    /// Output data identifiers produced by this node.
275    ///
276    /// List of output identifiers that the node sends.
277    /// Must contain all `output_id` values that the node uses when sending output, e.g. through the
278    /// [`send_output`](https://docs.rs/dora-node-api/latest/dora_node_api/struct.DoraNode.html#method.send_output)
279    /// function.
280    ///
281    /// ## Example
282    ///
283    /// ```yaml
284    /// nodes:
285    ///   - id: example-node
286    ///     outputs:
287    ///       - processed_image
288    ///       - metadata
289    /// ```
290    #[serde(default)]
291    pub outputs: BTreeSet<DataId>,
292
293    /// Input data connections from other nodes.
294    ///
295    /// Defines the inputs that this node is subscribing to.
296    ///
297    /// The `inputs` field should be a key-value map of the following format:
298    ///
299    /// `input_id: source_node_id/source_node_output_id`
300    ///
301    /// The components are defined as follows:
302    ///
303    ///   - `input_id` is the local identifier that should be used for this input.
304    ///
305    ///     This will map to the `id` field of
306    ///     [`Event::Input`](https://docs.rs/dora-node-api/latest/dora_node_api/enum.Event.html#variant.Input)
307    ///     events sent to the node event loop.
308    ///   - `source_node_id` should be the `id` field of the node that sends the output that we want
309    ///     to subscribe to
310    ///   - `source_node_output_id` should be the identifier of the output that that we want
311    ///     to subscribe to
312    ///
313    /// ## Example
314    ///
315    /// ```yaml
316    /// nodes:
317    ///   - id: example-node
318    ///     outputs:
319    ///       - one
320    ///       - two
321    ///   - id: receiver
322    ///     inputs:
323    ///         my_input: example-node/two
324    /// ```
325    #[serde(default)]
326    pub inputs: BTreeMap<DataId, Input>,
327
328    /// Redirect stdout/stderr to a data output.
329    ///
330    /// This field can be used to send all stdout and stderr output of the node as a Dora output.
331    /// Each output line is sent as a separate message.
332    ///
333    ///
334    /// ## Example
335    ///
336    /// ```yaml
337    /// nodes:
338    ///   - id: example
339    ///     send_stdout_as: stdout_output
340    ///   - id: logger
341    ///     inputs:
342    ///         example_output: example/stdout_output
343    /// ```
344    #[serde(skip_serializing_if = "Option::is_none")]
345    pub send_stdout_as: Option<String>,
346
347    /// Build commands executed during `dora build`. Each line runs separately.
348    ///
349    /// The `build` key specifies the command that should be invoked for building the node.
350    /// The key expects a single- or multi-line string.
351    ///
352    /// Each line is run as a separate command.
353    /// Spaces are used to separate arguments.
354    ///
355    /// Note that all the environment variables specified in the [`env`](Self::env) field are also
356    /// applied to the build commands.
357    ///
358    /// ## Special treatment of `pip`
359    ///
360    /// Build lines that start with `pip` or `pip3` are treated in a special way:
361    /// If the `--uv` argument is passed to the `dora build` command, all `pip`/`pip3` commands are
362    /// run through the [`uv` package manager](https://docs.astral.sh/uv/).
363    ///
364    /// ## Example
365    ///
366    /// ```yaml
367    /// nodes:
368    /// - id: build-example
369    ///   build: cargo build -p receive_data --release
370    ///   path: target/release/receive_data
371    /// - id: multi-line-example
372    ///   build: |
373    ///       pip install requirements.txt
374    ///       pip install -e some/local/package
375    ///   path: package
376    /// ```
377    ///
378    /// In the above example, the `pip` commands will be replaced by `uv pip` when run through
379    /// `dora build --uv`.
380    #[serde(default, skip_serializing_if = "Option::is_none")]
381    pub build: Option<String>,
382
383    /// Git repository URL for downloading nodes.
384    ///
385    /// The `git` key allows downloading nodes (i.e. their source code) from git repositories.
386    /// This can be especially useful for distributed dataflows.
387    ///
388    /// When a `git` key is specified, `dora build` automatically clones the specified repository
389    /// (or reuse an existing clone).
390    /// Then it checks out the specified [`branch`](Self::branch), [`tag`](Self::tag), or
391    /// [`rev`](Self::rev), or the default branch if none of them are specified.
392    /// Afterwards it runs the [`build`](Self::build) command if specified.
393    ///
394    /// Note that the git clone directory is set as working directory for both the
395    /// [`build`](Self::build) command and the specified [`path`](Self::path).
396    ///
397    /// ## Example
398    ///
399    /// ```yaml
400    /// nodes:
401    ///   - id: rust-node
402    ///     git: https://github.com/dora-rs/dora.git
403    ///     build: cargo build -p rust-dataflow-example-node
404    ///     path: target/debug/rust-dataflow-example-node
405    /// ```
406    ///
407    /// In the above example, `dora build` will first clone the specified `git` repository and then
408    /// run the specified `build` inside the local clone directory.
409    /// When `dora run` or `dora start` is invoked, the working directory will be the git clone
410    /// directory too. So a relative `path` will start from the clone directory.
411    #[serde(default, skip_serializing_if = "Option::is_none")]
412    pub git: Option<String>,
413
414    /// Git branch to checkout after cloning.
415    ///
416    /// The `branch` field is only allowed in combination with the [`git`](#git) field.
417    /// It specifies the branch that should be checked out after cloning.
418    /// Only one of `branch`, `tag`, or `rev` can be specified.
419    ///
420    /// ## Example
421    ///
422    /// ```yaml
423    /// nodes:
424    ///   - id: rust-node
425    ///     git: https://github.com/dora-rs/dora.git
426    ///     branch: some-branch-name
427    /// ```
428    #[serde(default, skip_serializing_if = "Option::is_none")]
429    pub branch: Option<String>,
430
431    /// Git tag to checkout after cloning.
432    ///
433    /// The `tag` field is only allowed in combination with the [`git`](#git) field.
434    /// It specifies the git tag that should be checked out after cloning.
435    /// Only one of `branch`, `tag`, or `rev` can be specified.
436    ///
437    /// ## Example
438    ///
439    /// ```yaml
440    /// nodes:
441    ///   - id: rust-node
442    ///     git: https://github.com/dora-rs/dora.git
443    ///     tag: v0.3.0
444    /// ```
445    #[serde(default, skip_serializing_if = "Option::is_none")]
446    pub tag: Option<String>,
447
448    /// Git revision (e.g. commit hash) to checkout after cloning.
449    ///
450    /// The `rev` field is only allowed in combination with the [`git`](#git) field.
451    /// It specifies the git revision (e.g. a commit hash) that should be checked out after cloning.
452    /// Only one of `branch`, `tag`, or `rev` can be specified.
453    ///
454    /// ## Example
455    ///
456    /// ```yaml
457    /// nodes:
458    ///   - id: rust-node
459    ///     git: https://github.com/dora-rs/dora.git
460    ///     rev: 64ab0d7c
461    /// ```
462    #[serde(default, skip_serializing_if = "Option::is_none")]
463    pub rev: Option<String>,
464
465    /// Unstable machine deployment configuration
466    #[schemars(skip)]
467    #[serde(rename = "_unstable_deploy")]
468    pub deploy: Option<Deploy>,
469}
470
471#[derive(Debug, Clone, Serialize, Deserialize)]
472pub struct ResolvedNode {
473    pub id: NodeId,
474    pub name: Option<String>,
475    pub description: Option<String>,
476    pub env: Option<BTreeMap<String, EnvValue>>,
477
478    #[serde(default)]
479    pub deploy: Option<Deploy>,
480
481    #[serde(flatten)]
482    pub kind: CoreNodeKind,
483}
484
485impl ResolvedNode {
486    pub fn has_git_source(&self) -> bool {
487        self.kind
488            .as_custom()
489            .map(|n| n.source.is_git())
490            .unwrap_or_default()
491    }
492}
493
494#[derive(Debug, Clone, Serialize, Deserialize)]
495#[serde(rename_all = "lowercase")]
496#[allow(clippy::large_enum_variant)]
497pub enum CoreNodeKind {
498    /// Dora runtime node
499    #[serde(rename = "operators")]
500    Runtime(RuntimeNode),
501    Custom(CustomNode),
502}
503
504impl CoreNodeKind {
505    pub fn as_custom(&self) -> Option<&CustomNode> {
506        match self {
507            CoreNodeKind::Runtime(_) => None,
508            CoreNodeKind::Custom(custom_node) => Some(custom_node),
509        }
510    }
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
514#[serde(transparent)]
515pub struct RuntimeNode {
516    /// List of operators running in this runtime
517    pub operators: Vec<OperatorDefinition>,
518}
519
520#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
521pub struct OperatorDefinition {
522    /// Unique operator identifier within the runtime
523    pub id: OperatorId,
524    #[serde(flatten)]
525    pub config: OperatorConfig,
526}
527
528#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
529pub struct SingleOperatorDefinition {
530    /// Operator identifier (optional for single operators)
531    pub id: Option<OperatorId>,
532    #[serde(flatten)]
533    pub config: OperatorConfig,
534}
535
536#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
537pub struct OperatorConfig {
538    /// Human-readable operator name
539    pub name: Option<String>,
540    /// Detailed description of the operator
541    pub description: Option<String>,
542
543    /// Input data connections
544    #[serde(default)]
545    pub inputs: BTreeMap<DataId, Input>,
546    /// Output data identifiers
547    #[serde(default)]
548    pub outputs: BTreeSet<DataId>,
549
550    /// Operator source configuration (Python, shared library, etc.)
551    #[serde(flatten)]
552    pub source: OperatorSource,
553
554    /// Build commands for this operator
555    #[serde(default, skip_serializing_if = "Option::is_none")]
556    pub build: Option<String>,
557    /// Redirect stdout to data output
558    #[serde(skip_serializing_if = "Option::is_none")]
559    pub send_stdout_as: Option<String>,
560}
561
562#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
563#[serde(rename_all = "kebab-case")]
564pub enum OperatorSource {
565    SharedLibrary(String),
566    Python(PythonSource),
567    #[schemars(skip)]
568    Wasm(String),
569}
570#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
571#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
572pub struct PythonSource {
573    pub source: String,
574    pub conda_env: Option<String>,
575}
576
577#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
578#[serde(untagged)]
579pub enum PythonSourceDef {
580    SourceOnly(String),
581    WithOptions {
582        source: String,
583        conda_env: Option<String>,
584    },
585}
586
587impl From<PythonSource> for PythonSourceDef {
588    fn from(input: PythonSource) -> Self {
589        match input {
590            PythonSource {
591                source,
592                conda_env: None,
593            } => Self::SourceOnly(source),
594            PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
595        }
596    }
597}
598
599impl From<PythonSourceDef> for PythonSource {
600    fn from(value: PythonSourceDef) -> Self {
601        match value {
602            PythonSourceDef::SourceOnly(source) => Self {
603                source,
604                conda_env: None,
605            },
606            PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
607        }
608    }
609}
610
611#[derive(Debug, Serialize, Deserialize, Clone)]
612#[serde(deny_unknown_fields)]
613pub struct PythonOperatorConfig {
614    pub path: PathBuf,
615    #[serde(default)]
616    pub inputs: BTreeMap<DataId, InputMapping>,
617    #[serde(default)]
618    pub outputs: BTreeSet<DataId>,
619}
620
621#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
622pub struct CustomNode {
623    /// Path of the source code
624    ///
625    /// If you want to use a specific `conda` environment.
626    /// Provide the python path within the source.
627    ///
628    /// source: /home/peter/miniconda3/bin/python
629    ///
630    /// args: some_node.py
631    ///
632    /// Source can match any executable in PATH.
633    pub path: String,
634    pub source: NodeSource,
635    /// Args for the executable.
636    #[serde(default, skip_serializing_if = "Option::is_none")]
637    pub args: Option<String>,
638    /// Environment variables for the custom nodes
639    ///
640    /// Deprecated, use outer-level `env` field instead.
641    pub envs: Option<BTreeMap<String, EnvValue>>,
642    #[serde(default, skip_serializing_if = "Option::is_none")]
643    pub build: Option<String>,
644    /// Send stdout and stderr to another node
645    #[serde(skip_serializing_if = "Option::is_none")]
646    pub send_stdout_as: Option<String>,
647
648    #[serde(flatten)]
649    pub run_config: NodeRunConfig,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
653pub enum NodeSource {
654    Local,
655    GitBranch {
656        repo: String,
657        rev: Option<GitRepoRev>,
658    },
659}
660
661impl NodeSource {
662    pub fn is_git(&self) -> bool {
663        matches!(self, Self::GitBranch { .. })
664    }
665}
666
667#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
668pub enum ResolvedNodeSource {
669    Local,
670    GitCommit { repo: String, commit_hash: String },
671}
672
673#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
674pub enum GitRepoRev {
675    Branch(String),
676    Tag(String),
677    Rev(String),
678}
679
680#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
681#[serde(untagged)]
682pub enum EnvValue {
683    #[serde(deserialize_with = "with_expand_envs")]
684    Bool(bool),
685    #[serde(deserialize_with = "with_expand_envs")]
686    Integer(i64),
687    #[serde(deserialize_with = "with_expand_envs")]
688    Float(f64),
689    #[serde(deserialize_with = "with_expand_envs")]
690    String(String),
691}
692
693impl fmt::Display for EnvValue {
694    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
695        match self {
696            EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
697            EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
698            EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
699            EnvValue::String(str) => fmt.write_str(str),
700        }
701    }
702}