#![allow(clippy::module_name_repetitions)]
use super::{
docs::Docs, helper::Scope, node_id::BaseRef, raw::BaseExpr, CreationalWith, DefinitionalArgs,
DefinitionalArgsWith, NodeMeta,
};
use super::{node_id::NodeId, PipelineDefinition};
use super::{HashMap, Value};
use crate::{impl_expr, impl_expr_no_lt};
pub(crate) mod raw;
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct Deploy<'script> {
pub config: HashMap<String, Value<'script>>,
pub stmts: DeployStmts<'script>,
pub scope: Scope<'script>,
#[serde(skip)]
pub docs: Docs,
}
impl<'script> Deploy<'script> {
#[must_use]
#[allow(clippy::unused_self)]
pub fn dot(&self) -> String {
"todo".to_string() }
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub enum DeployStmt<'script> {
FlowDefinition(Box<FlowDefinition<'script>>),
PipelineDefinition(Box<PipelineDefinition<'script>>),
ConnectorDefinition(Box<ConnectorDefinition<'script>>),
DeployFlowStmt(Box<DeployFlow<'script>>),
}
impl<'script> BaseRef for DeployStmt<'script> {
#[must_use]
fn fqn(&self) -> String {
match self {
DeployStmt::FlowDefinition(stmt) => stmt.id.clone(),
DeployStmt::PipelineDefinition(stmt) => stmt.id.clone(),
DeployStmt::ConnectorDefinition(stmt) => stmt.id.clone(),
DeployStmt::DeployFlowStmt(stmt) => stmt.fqn(),
}
}
}
impl<'script> BaseExpr for DeployStmt<'script> {
fn meta(&self) -> &NodeMeta {
match self {
DeployStmt::PipelineDefinition(s) => s.meta(),
DeployStmt::ConnectorDefinition(s) => s.meta(),
DeployStmt::FlowDefinition(s) => s.meta(),
DeployStmt::DeployFlowStmt(s) => s.meta(),
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct ConnectorDefinition<'script> {
pub(crate) mid: Box<NodeMeta>,
pub id: String,
pub params: DefinitionalArgsWith<'script>,
pub builtin_kind: String,
pub config: Value<'script>,
#[serde(skip)]
pub docs: Option<String>,
}
impl_expr!(ConnectorDefinition);
impl<'script> ConnectorDefinition<'script> {
pub const CODEC: &'static str = "codec";
pub const CONFIG: &'static str = "config";
pub const PREPROCESSORS: &'static str = "preprocessors";
pub const POSTPROCESSORS: &'static str = "postprocessors";
pub const METRICS_INTERVAL_S: &'static str = "metrics_interval_s";
pub const RECONNECT: &'static str = "reconnect";
const AVAILABLE_PARAMS: [&'static str; 6] = [
Self::CODEC,
Self::CONFIG,
Self::METRICS_INTERVAL_S,
Self::POSTPROCESSORS,
Self::PREPROCESSORS,
Self::RECONNECT,
];
}
type DeployStmts<'script> = Vec<DeployStmt<'script>>;
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub enum ConnectStmt {
ConnectorToPipeline {
mid: Box<NodeMeta>,
from: DeployEndpoint,
to: DeployEndpoint,
},
PipelineToConnector {
mid: Box<NodeMeta>,
from: DeployEndpoint,
to: DeployEndpoint,
},
PipelineToPipeline {
mid: Box<NodeMeta>,
from: DeployEndpoint,
to: DeployEndpoint,
},
}
impl BaseExpr for ConnectStmt {
fn meta(&self) -> &NodeMeta {
match self {
ConnectStmt::ConnectorToPipeline { mid, .. }
| ConnectStmt::PipelineToConnector { mid, .. }
| ConnectStmt::PipelineToPipeline { mid, .. } => mid,
}
}
}
impl ConnectStmt {
#[allow(clippy::wrong_self_convention)]
pub(crate) fn from_mut(&mut self) -> &mut DeployEndpoint {
match self {
ConnectStmt::ConnectorToPipeline { from, .. }
| ConnectStmt::PipelineToConnector { from, .. }
| ConnectStmt::PipelineToPipeline { from, .. } => from,
}
}
pub(crate) fn to_mut(&mut self) -> &mut DeployEndpoint {
match self {
ConnectStmt::ConnectorToPipeline { to, .. }
| ConnectStmt::PipelineToConnector { to, .. }
| ConnectStmt::PipelineToPipeline { to, .. } => to,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize)]
pub struct DeployEndpoint {
alias: String,
port: String,
#[serde(skip_serializing)]
mid: Box<NodeMeta>,
}
impl_expr_no_lt!(DeployEndpoint);
impl std::fmt::Display for DeployEndpoint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.alias, self.port)
}
}
impl DeployEndpoint {
pub fn new<A, P>(alias: &A, port: &P, mid: &NodeMeta) -> Self
where
A: ToString + ?Sized,
P: ToString + ?Sized,
{
Self {
alias: alias.to_string(),
port: port.to_string(),
mid: Box::new(mid.clone()),
}
}
#[must_use]
pub fn alias(&self) -> &str {
&self.alias
}
#[must_use]
pub fn port(&self) -> &str {
&self.port
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct FlowDefinition<'script> {
pub(crate) mid: Box<NodeMeta>,
pub id: String,
pub params: DefinitionalArgs<'script>,
pub connections: Vec<ConnectStmt>,
pub creates: Vec<CreateStmt<'script>>,
#[serde(skip)]
pub docs: Option<String>,
}
impl_expr!(FlowDefinition);
#[derive(Clone, Debug, PartialEq, Serialize)]
pub enum CreateTargetDefinition<'script> {
Connector(ConnectorDefinition<'script>),
Pipeline(Box<PipelineDefinition<'script>>),
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct CreateStmt<'script> {
pub(crate) mid: Box<NodeMeta>,
pub from_target: NodeId,
pub instance_alias: String,
pub with: CreationalWith<'script>,
pub defn: CreateTargetDefinition<'script>,
}
impl_expr!(CreateStmt);
impl crate::ast::node_id::BaseRef for CreateStmt<'_> {
fn fqn(&self) -> String {
self.instance_alias.clone()
}
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct DeployFlow<'script> {
pub(crate) mid: Box<NodeMeta>,
pub from_target: NodeId,
pub instance_alias: String,
pub defn: FlowDefinition<'script>,
#[serde(skip)]
pub docs: Option<String>,
}
impl_expr!(DeployFlow);
impl crate::ast::node_id::BaseRef for DeployFlow<'_> {
fn fqn(&self) -> String {
self.instance_alias.clone()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn connect_stmt_mid() {
let mid = NodeMeta::dummy();
let stmt = ConnectStmt::ConnectorToPipeline {
mid: mid.clone(),
from: DeployEndpoint::new("from", "from", &mid),
to: DeployEndpoint::new("to", "to", &mid),
};
assert_eq!(stmt.meta(), &*mid);
let stmt = ConnectStmt::PipelineToConnector {
mid: mid.clone(),
from: DeployEndpoint::new("from", "from", &mid),
to: DeployEndpoint::new("to", "to", &mid),
};
assert_eq!(stmt.meta(), &*mid);
let stmt = ConnectStmt::PipelineToPipeline {
mid: mid.clone(),
from: DeployEndpoint::new("from", "from", &mid),
to: DeployEndpoint::new("to", "to", &mid),
};
assert_eq!(stmt.meta(), &*mid);
}
}