use std::collections::HashSet;
use std::fmt::Display;
use crate::models::{Entity, EntityAccept, VisitEntity};
use serde::Deserialize;
use strum::{Display, EnumString};
use super::data::{data_ty_to_literal, DataType};
use super::default_mqtt_dependency;
use super::dependency::{
default_amqp_dependency, default_avro_dependency, default_cql_dependency,
default_csv_dependency, default_dynamodb_dependency, default_json_dependency,
default_kafka_dependency, default_kube_dependency, default_mysql_dependency,
default_psql_dependency, default_redis_dependency, default_reqwest_dependency,
default_rocksdb_dependency, default_s3_dependency, default_sns_dependency,
default_sqs_dependency, default_warp_dependency, Dependency, UseCrate,
};
use super::meta::{meta_to_literal, meta_value_str, meta_value_usize, Meta};
#[derive(Clone, Display, EnumString, PartialEq, Debug, Deserialize)]
pub enum PipeType {
#[strum(to_string = "listener")]
Listener,
#[strum(to_string = "poller")]
Poller,
#[strum(to_string = "mapper")]
Mapper,
#[strum(to_string = "collector")]
Collector,
#[strum(to_string = "selector")]
Selector,
#[strum(to_string = "exporter")]
Exporter,
#[strum(to_string = "streamer")]
Streamer,
}
#[derive(Deserialize, Debug, Clone)]
pub struct PipeConfig {
ty: String,
path: Option<String>,
}
impl PipeConfig {
pub fn get_path(&self) -> Option<&String> {
self.path.as_ref()
}
pub fn get_config_type(&self) -> &String {
&self.ty
}
pub fn resolve_pipe_ty(&self) -> Option<PipeType> {
match self.ty.as_str() {
"InMemoryBagCollectorConfig"
| "InMemorySetCollectorConfig"
| "InMemoryWindowCollectorConfig"
| "TextCollectorConfig" => Some(PipeType::Collector),
"TimerConfig" | "SqsMessageReceiverConfig" => Some(PipeType::Poller),
"LocalFilePathVisitorConfig"
| "WarpIngestionServerConfig"
| "KubeLogReaderConfig"
| "KubeEventReaderConfig"
| "RedisSubscriberConfig"
| "KafkaConsumerConfig"
| "MqttSubscriberConfig"
| "AmqpConsumerConfig" => Some(PipeType::Listener),
"PrinterConfig"
| "ReqwestPosterConfig"
| "CqlWriterConfig"
| "CqlPreparedWriterConfig"
| "PsqlWriterConfig"
| "PsqlPreparedWriterConfig"
| "RedisStringWriterConfig"
| "RedisStringBatchWriterConfig"
| "RedisPublisherConfig"
| "KafkaProducerConfig"
| "KafkaPartitionedProducerConfig"
| "S3WriterConfig"
| "MySQLWriterConfig"
| "MySQLPreparedWriterConfig"
| "DynamoDBWriterConfig"
| "SnsPublisherConfig"
| "MqttPublisherConfig"
| "AmqpPublisherConfig" => Some(PipeType::Exporter),
"AddAggregatorConfig"
| "ConversionConfig"
| "EchoConfig"
| "FieldVisitConfig"
| "FileReaderConfig"
| "FileWriterConfig"
| "FilterMapConfig"
| "OrderedGroupAddAggregatorConfig"
| "ProjectionConfig"
| "StringSplitterConfig"
| "TopAggregatorConfig"
| "UnorderedGroupAddAggregatorConfig"
| "AvroSerConfig"
| "AvroDeserConfig"
| "JsonSerConfig"
| "JsonDeserConfig"
| "JsonRecordSerConfig"
| "CsvSerConfig"
| "CsvDeserConfig"
| "RedisUnorderedGroupAddAggregatorConfig"
| "RocksDBUnorderedGroupAddAggregatorConfig"
| "ReqwestGetterConfig"
| "ReqwestQueryConfig" => Some(PipeType::Mapper),
"DefaultHashSelectorConfig" | "RandomSelectorConfig" | "RoundRobinSelectorConfig" => {
Some(PipeType::Selector)
}
"FileSplitReaderConfig" | "FileLineReaderConfig" | "IteratorReaderConfig" => {
Some(PipeType::Streamer)
}
_ => None,
}
}
}
impl Display for PipeConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.path {
Some(ref path) => write!(f, "{{ type: {}, path: {} }}", self.ty, path),
None => write!(f, "{{ type: {} }}", self.ty),
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct Pipe {
name: String,
ty: Option<PipeType>,
config: PipeConfig,
buffer: Option<usize>,
upstreams: Option<Vec<String>>,
output: Option<DataType>,
}
impl Pipe {
pub fn init(&mut self) {
if self.upstreams.is_none() {
self.upstreams = Some(Vec::new())
}
if !self.has_type() {
self.ty = self.config.resolve_pipe_ty()
}
}
pub fn has_type(&self) -> bool {
self.ty.is_some()
}
pub fn is_source(&self) -> bool {
let ty = self.ty.as_ref().unwrap();
matches!(ty, PipeType::Listener | PipeType::Poller)
}
pub fn is_sink(&self) -> bool {
let ty = self.ty.as_ref().unwrap();
matches!(ty, PipeType::Exporter)
}
fn get_name_meta(&self) -> Meta {
meta_value_str("name", &self.name, false)
}
fn get_type_meta(&self) -> Meta {
let ty = self.ty.as_ref().unwrap();
meta_value_str("ty", &ty.to_string(), false)
}
fn get_config_meta(&self) -> Meta {
let mut config_metas = vec![meta_value_str("ty", self.config.get_config_type(), false)];
if let Some(path) = self.config.get_path() {
config_metas.push(meta_value_str("path", path, false));
};
Meta::List {
name: "config".to_owned(),
metas: config_metas,
}
}
fn get_upstream_meta(&self) -> Option<Meta> {
let upstreams = self.upstreams.as_ref().expect("upstreams not inited");
if upstreams.is_empty() {
return None;
}
Some(meta_value_str("upstream", &upstreams.join(", "), false))
}
fn get_output_data_type_meta(&self) -> Option<Meta> {
let output = match self.output {
Some(ref output) => output,
None => return None,
};
Some(meta_value_str("output", &data_ty_to_literal(output), false))
}
pub(crate) fn get_output_data_type(&self) -> Option<&DataType> {
self.output.as_ref()
}
pub(crate) fn has_output(&self) -> bool {
self.output.is_some()
}
pub(crate) fn get_channel_buffer_meta(&self) -> Option<Meta> {
let buffer = match self.buffer {
Some(ref buffer) => buffer,
None => return None,
};
Some(meta_value_usize("buffer", buffer))
}
pub fn filter_upstreams(&mut self, pipe_id_filter: &HashSet<String>) {
let upstreams = match self.upstreams {
Some(ref upstreams) => upstreams,
None => return,
};
self.upstreams = Some(
upstreams
.iter()
.cloned()
.filter(|id| pipe_id_filter.contains(id))
.collect(),
)
}
}
impl Entity for Pipe {
fn get_id(&self) -> String {
self.name.to_owned()
}
fn list_dependency(&self) -> Vec<String> {
match self.upstreams {
Some(ref upstreams) => upstreams.to_owned(),
None => vec![],
}
}
fn to_literal(&self, indent: usize) -> String {
let mut metas: Vec<Meta> = vec![
self.get_name_meta(),
self.get_type_meta(),
self.get_config_meta(),
];
if let Some(meta) = self.get_upstream_meta() {
metas.push(meta)
};
if let Some(meta) = self.get_output_data_type_meta() {
metas.push(meta)
};
if let Some(meta) = self.get_channel_buffer_meta() {
metas.push(meta)
};
let meta = Meta::List {
name: "pipe".to_owned(),
metas,
};
meta_to_literal(&meta, indent)
}
}
impl Display for Pipe {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ty = self.ty.as_ref().unwrap();
writeln!(f, "Name: {}", self.name)?;
writeln!(f, "Type: {}", ty)?;
writeln!(f, "Config: {}", self.config)?;
let upstream = match self.upstreams {
Some(ref upstreams) => upstreams.join(", "),
None => "".to_owned(),
};
writeln!(f, "Upstream: [{}]", upstream)
}
}
impl<V: VisitEntity<Pipe>> EntityAccept<V> for Pipe {}
impl UseCrate for Pipe {
fn get_crate(&self) -> Option<Dependency> {
let config_ty = self.config.get_config_type().as_str();
match config_ty {
"AvroDeserConfig" | "AvroSerConfig" => Some(default_avro_dependency()),
"CqlPreparedWriterConfig" | "CqlWriterConfig" => Some(default_cql_dependency()),
"CsvDeserConfig" | "CsvSerConfig" => Some(default_csv_dependency()),
"JsonDeserConfig" | "JsonRecordSerConfig" | "JsonSerConfig" => {
Some(default_json_dependency())
}
"KafkaConsumerConfig" | "KafkaPartitionedProducerConfig" | "KafkaProducerConfig" => {
Some(default_kafka_dependency())
}
"KubeEventReaderConfig" | "KubeLogReaderConfig" => Some(default_kube_dependency()),
"MySQLPreparedWriterConfig" | "MySQLWriterConfig" => Some(default_mysql_dependency()),
"PsqlPreparedWriterConfig" | "PsqlWriterConfig" => Some(default_psql_dependency()),
"RedisPublisherConfig"
| "RedisStringBatchWriterConfig"
| "RedisStringWriterConfig"
| "RedisSubscriberConfig"
| "RedisUnorderedGroupAddAggregatorConfig" => Some(default_redis_dependency()),
"ReqwestGetterConfig" | "ReqwestPosterConfig" | "ReqwestQueryConfig" => {
Some(default_reqwest_dependency())
}
"RocksDBUnorderedGroupAddAggregatorConfig" => Some(default_rocksdb_dependency()),
"WarpIngestionServerConfig" => Some(default_warp_dependency()),
"DynamoDBWriterConfig" => Some(default_dynamodb_dependency()),
"S3WriterConfig" => Some(default_s3_dependency()),
"SnsPublisherConfig" => Some(default_sns_dependency()),
"SqsMessageReceiverConfig" => Some(default_sqs_dependency()),
"MqttPublisherConfig" | "MqttSubscriberConfig" => Some(default_mqtt_dependency()),
"AmqpPublisherConfig" | "AmqpConsumerConfig" => Some(default_amqp_dependency()),
_ => None,
}
}
}