#![warn(missing_docs)]
use crate::{
config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
id::{DataId, NodeId, OperatorId},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with_expand_env::with_expand_envs;
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
path::PathBuf,
};
pub const SHELL_SOURCE: &str = "shell";
pub const DYNAMIC_SOURCE: &str = "dynamic";
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
#[schemars(title = "dora-rs specification")]
pub struct Descriptor {
pub nodes: Vec<Node>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub env: Option<BTreeMap<String, EnvValue>>,
#[schemars(skip)]
#[serde(default)]
pub communication: CommunicationConfig,
#[schemars(skip)]
#[serde(rename = "_unstable_deploy")]
pub deploy: Option<Deploy>,
#[schemars(skip)]
#[serde(default, rename = "_unstable_debug")]
pub debug: Debug,
}
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub enum RestartPolicy {
#[default]
Never,
OnFailure,
Always,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Deploy {
pub machine: Option<String>,
pub working_dir: Option<PathBuf>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
pub struct Debug {
#[serde(default)]
pub publish_all_messages_to_zenoh: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Node {
pub id: NodeId,
pub name: Option<String>,
pub description: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
pub env: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub operators: Option<RuntimeNode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub operator: Option<SingleOperatorDefinition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub custom: Option<CustomNode>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
#[serde(default)]
pub inputs: BTreeMap<DataId, Input>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub git: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub branch: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tag: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rev: Option<String>,
#[serde(default)]
pub restart_policy: RestartPolicy,
#[schemars(skip)]
#[serde(rename = "_unstable_deploy")]
pub deploy: Option<Deploy>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResolvedNode {
pub id: NodeId,
pub name: Option<String>,
pub description: Option<String>,
pub env: Option<BTreeMap<String, EnvValue>>,
#[serde(default)]
pub deploy: Option<Deploy>,
#[serde(flatten)]
pub kind: CoreNodeKind,
}
impl ResolvedNode {
pub fn has_git_source(&self) -> bool {
self.kind
.as_custom()
.map(|n| n.source.is_git())
.unwrap_or_default()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
#[allow(clippy::large_enum_variant)]
pub enum CoreNodeKind {
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
}
impl CoreNodeKind {
pub fn as_custom(&self) -> Option<&CustomNode> {
match self {
CoreNodeKind::Runtime(_) => None,
CoreNodeKind::Custom(custom_node) => Some(custom_node),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(transparent)]
pub struct RuntimeNode {
pub operators: Vec<OperatorDefinition>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct OperatorDefinition {
pub id: OperatorId,
#[serde(flatten)]
pub config: OperatorConfig,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct SingleOperatorDefinition {
pub id: Option<OperatorId>,
#[serde(flatten)]
pub config: OperatorConfig,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct OperatorConfig {
pub name: Option<String>,
pub description: Option<String>,
#[serde(default)]
pub inputs: BTreeMap<DataId, Input>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
#[serde(flatten)]
pub source: OperatorSource,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(String),
Python(PythonSource),
#[schemars(skip)]
Wasm(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
pub struct PythonSource {
pub source: String,
pub conda_env: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum PythonSourceDef {
SourceOnly(String),
WithOptions {
source: String,
conda_env: Option<String>,
},
}
impl From<PythonSource> for PythonSourceDef {
fn from(input: PythonSource) -> Self {
match input {
PythonSource {
source,
conda_env: None,
} => Self::SourceOnly(source),
PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
}
}
}
impl From<PythonSourceDef> for PythonSource {
fn from(value: PythonSourceDef) -> Self {
match value {
PythonSourceDef::SourceOnly(source) => Self {
source,
conda_env: None,
},
PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct PythonOperatorConfig {
pub path: PathBuf,
#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct CustomNode {
pub path: String,
pub source: NodeSource,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
pub envs: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
#[serde(default)]
pub restart_policy: RestartPolicy,
#[serde(flatten)]
pub run_config: NodeRunConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum NodeSource {
Local,
GitBranch {
repo: String,
rev: Option<GitRepoRev>,
},
}
impl NodeSource {
pub fn is_git(&self) -> bool {
matches!(self, Self::GitBranch { .. })
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum ResolvedNodeSource {
Local,
GitCommit { repo: String, commit_hash: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum GitRepoRev {
Branch(String),
Tag(String),
Rev(String),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum EnvValue {
#[serde(deserialize_with = "with_expand_envs")]
Bool(bool),
#[serde(deserialize_with = "with_expand_envs")]
Integer(i64),
#[serde(deserialize_with = "with_expand_envs")]
Float(f64),
#[serde(deserialize_with = "with_expand_envs")]
String(String),
}
impl fmt::Display for EnvValue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
EnvValue::Integer(i64) => fmt.write_str(&i64.to_string()),
EnvValue::Float(f64) => fmt.write_str(&f64.to_string()),
EnvValue::String(str) => fmt.write_str(str),
}
}
}