use crate::{
config::{CommunicationConfig, Input, InputMapping, NodeRunConfig},
id::{DataId, NodeId, OperatorId},
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_with_expand_env::with_expand_envs;
use std::{
collections::{BTreeMap, BTreeSet},
fmt,
path::PathBuf,
};
pub const SHELL_SOURCE: &str = "shell";
pub const DYNAMIC_SOURCE: &str = "dynamic";
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
#[schemars(title = "dora-rs specification")]
pub struct Descriptor {
#[schemars(skip)]
#[serde(default)]
pub communication: CommunicationConfig,
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,
pub nodes: Vec<Node>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Deploy {
pub machine: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Node {
pub id: NodeId,
pub name: Option<String>,
pub description: Option<String>,
pub env: Option<BTreeMap<String, EnvValue>>,
#[schemars(skip)]
#[serde(default, rename = "_unstable_deploy")]
pub deploy: Deploy,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub operators: Option<RuntimeNode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub custom: Option<CustomNode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub operator: Option<SingleOperatorDefinition>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
#[serde(default)]
pub inputs: BTreeMap<DataId, Input>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResolvedNode {
pub id: NodeId,
pub name: Option<String>,
pub description: Option<String>,
pub env: Option<BTreeMap<String, EnvValue>>,
#[serde(default)]
pub deploy: ResolvedDeploy,
#[serde(flatten)]
pub kind: CoreNodeKind,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResolvedDeploy {
pub machine: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CoreNodeKind {
#[serde(rename = "operators")]
Runtime(RuntimeNode),
Custom(CustomNode),
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(transparent)]
pub struct RuntimeNode {
pub operators: Vec<OperatorDefinition>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct OperatorDefinition {
pub id: OperatorId,
#[serde(flatten)]
pub config: OperatorConfig,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct SingleOperatorDefinition {
pub id: Option<OperatorId>,
#[serde(flatten)]
pub config: OperatorConfig,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
pub struct OperatorConfig {
pub name: Option<String>,
pub description: Option<String>,
#[serde(default)]
pub inputs: BTreeMap<DataId, Input>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
#[serde(flatten)]
pub source: OperatorSource,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, JsonSchema, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(String),
Python(PythonSource),
#[schemars(skip)]
Wasm(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(from = "PythonSourceDef", into = "PythonSourceDef")]
pub struct PythonSource {
pub source: String,
pub conda_env: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum PythonSourceDef {
SourceOnly(String),
WithOptions {
source: String,
conda_env: Option<String>,
},
}
impl From<PythonSource> for PythonSourceDef {
fn from(input: PythonSource) -> Self {
match input {
PythonSource {
source,
conda_env: None,
} => Self::SourceOnly(source),
PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
}
}
}
impl From<PythonSourceDef> for PythonSource {
fn from(value: PythonSourceDef) -> Self {
match value {
PythonSourceDef::SourceOnly(source) => Self {
source,
conda_env: None,
},
PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(deny_unknown_fields)]
pub struct PythonOperatorConfig {
pub path: PathBuf,
#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct CustomNode {
pub source: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
pub envs: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
#[serde(flatten)]
pub run_config: NodeRunConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(untagged)]
pub enum EnvValue {
#[serde(deserialize_with = "with_expand_envs")]
Bool(bool),
#[serde(deserialize_with = "with_expand_envs")]
Integer(u64),
#[serde(deserialize_with = "with_expand_envs")]
String(String),
}
impl fmt::Display for EnvValue {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match self {
EnvValue::Bool(bool) => fmt.write_str(&bool.to_string()),
EnvValue::Integer(u64) => fmt.write_str(&u64.to_string()),
EnvValue::String(str) => fmt.write_str(str),
}
}
}