pipegen/models/
pipe.rs

1use std::collections::HashSet;
2use std::fmt::Display;
3
4use crate::models::{Entity, EntityAccept, VisitEntity};
5use serde::Deserialize;
6use strum::{Display, EnumString};
7
8use super::data::{data_ty_to_literal, DataType};
9use super::default_mqtt_dependency;
10use super::dependency::{
11    default_amqp_dependency, default_avro_dependency, default_cql_dependency,
12    default_csv_dependency, default_dynamodb_dependency, default_json_dependency,
13    default_kafka_dependency, default_kube_dependency, default_mysql_dependency,
14    default_psql_dependency, default_redis_dependency, default_reqwest_dependency,
15    default_rocksdb_dependency, default_s3_dependency, default_sns_dependency,
16    default_sqs_dependency, default_warp_dependency, Dependency, UseCrate,
17};
18use super::meta::{meta_to_literal, meta_value_str, meta_value_usize, Meta};
19
20#[derive(Clone, Display, EnumString, PartialEq, Debug, Deserialize)]
21pub enum PipeType {
22    #[strum(to_string = "listener")]
23    Listener,
24    #[strum(to_string = "poller")]
25    Poller,
26    #[strum(to_string = "mapper")]
27    Mapper,
28    #[strum(to_string = "collector")]
29    Collector,
30    #[strum(to_string = "selector")]
31    Selector,
32    #[strum(to_string = "exporter")]
33    Exporter,
34    #[strum(to_string = "streamer")]
35    Streamer,
36}
37
38#[derive(Deserialize, Debug, Clone)]
39pub struct PipeConfig {
40    ty: String,
41    path: Option<String>,
42}
43
44impl PipeConfig {
45    pub fn get_path(&self) -> Option<&String> {
46        self.path.as_ref()
47    }
48    pub fn get_config_type(&self) -> &String {
49        &self.ty
50    }
51    pub fn resolve_pipe_ty(&self) -> Option<PipeType> {
52        match self.ty.as_str() {
53            "InMemoryBagCollectorConfig"
54            | "InMemorySetCollectorConfig"
55            | "InMemoryWindowCollectorConfig"
56            | "TextCollectorConfig" => Some(PipeType::Collector),
57            "TimerConfig" | "SqsMessageReceiverConfig" => Some(PipeType::Poller),
58            "LocalFilePathVisitorConfig"
59            | "WarpIngestionServerConfig"
60            | "KubeLogReaderConfig"
61            | "KubeEventReaderConfig"
62            | "RedisSubscriberConfig"
63            | "KafkaConsumerConfig"
64            | "MqttSubscriberConfig"
65            | "AmqpConsumerConfig" => Some(PipeType::Listener),
66            "PrinterConfig"
67            | "ReqwestPosterConfig"
68            | "CqlWriterConfig"
69            | "CqlPreparedWriterConfig"
70            | "PsqlWriterConfig"
71            | "PsqlPreparedWriterConfig"
72            | "RedisStringWriterConfig"
73            | "RedisStringBatchWriterConfig"
74            | "RedisPublisherConfig"
75            | "KafkaProducerConfig"
76            | "KafkaPartitionedProducerConfig"
77            | "S3WriterConfig"
78            | "MySQLWriterConfig"
79            | "MySQLPreparedWriterConfig"
80            | "DynamoDBWriterConfig"
81            | "SnsPublisherConfig"
82            | "MqttPublisherConfig"
83            | "AmqpPublisherConfig" => Some(PipeType::Exporter),
84            "AddAggregatorConfig"
85            | "ConversionConfig"
86            | "EchoConfig"
87            | "FieldVisitConfig"
88            | "FileReaderConfig"
89            | "FileWriterConfig"
90            | "FilterMapConfig"
91            | "OrderedGroupAddAggregatorConfig"
92            | "ProjectionConfig"
93            | "StringSplitterConfig"
94            | "TopAggregatorConfig"
95            | "UnorderedGroupAddAggregatorConfig"
96            | "AvroSerConfig"
97            | "AvroDeserConfig"
98            | "JsonSerConfig"
99            | "JsonDeserConfig"
100            | "JsonRecordSerConfig"
101            | "CsvSerConfig"
102            | "CsvDeserConfig"
103            | "RedisUnorderedGroupAddAggregatorConfig"
104            | "RocksDBUnorderedGroupAddAggregatorConfig"
105            | "ReqwestGetterConfig"
106            | "ReqwestQueryConfig" => Some(PipeType::Mapper),
107            "DefaultHashSelectorConfig" | "RandomSelectorConfig" | "RoundRobinSelectorConfig" => {
108                Some(PipeType::Selector)
109            }
110            "FileSplitReaderConfig" | "FileLineReaderConfig" | "IteratorReaderConfig" => {
111                Some(PipeType::Streamer)
112            }
113            _ => None,
114        }
115    }
116}
117
118impl Display for PipeConfig {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        match self.path {
121            Some(ref path) => write!(f, "{{ type: {}, path: {} }}", self.ty, path),
122            None => write!(f, "{{ type: {} }}", self.ty),
123        }
124    }
125}
126
127#[derive(Deserialize, Debug, Clone)]
128pub struct Pipe {
129    name: String,
130    ty: Option<PipeType>,
131    config: PipeConfig,
132    // pipe channel buffer
133    buffer: Option<usize>,
134    // upstream pipe names
135    upstreams: Option<Vec<String>>,
136    // output data type
137    output: Option<DataType>,
138}
139
140impl Pipe {
141    pub fn init(&mut self) {
142        if self.upstreams.is_none() {
143            self.upstreams = Some(Vec::new())
144        }
145        if !self.has_type() {
146            // resolve type based on pipe config
147            self.ty = self.config.resolve_pipe_ty()
148        }
149    }
150
151    pub fn has_type(&self) -> bool {
152        self.ty.is_some()
153    }
154
155    pub fn is_source(&self) -> bool {
156        let ty = self.ty.as_ref().unwrap();
157        matches!(ty, PipeType::Listener | PipeType::Poller)
158    }
159
160    pub fn is_sink(&self) -> bool {
161        let ty = self.ty.as_ref().unwrap();
162        matches!(ty, PipeType::Exporter)
163    }
164
165    fn get_name_meta(&self) -> Meta {
166        meta_value_str("name", &self.name, false)
167    }
168
169    fn get_type_meta(&self) -> Meta {
170        let ty = self.ty.as_ref().unwrap();
171        meta_value_str("ty", &ty.to_string(), false)
172    }
173
174    fn get_config_meta(&self) -> Meta {
175        let mut config_metas = vec![meta_value_str("ty", self.config.get_config_type(), false)];
176        if let Some(path) = self.config.get_path() {
177            config_metas.push(meta_value_str("path", path, false));
178        };
179        Meta::List {
180            name: "config".to_owned(),
181            metas: config_metas,
182        }
183    }
184
185    fn get_upstream_meta(&self) -> Option<Meta> {
186        let upstreams = self.upstreams.as_ref().expect("upstreams not inited");
187        if upstreams.is_empty() {
188            return None;
189        }
190        Some(meta_value_str("upstream", &upstreams.join(", "), false))
191    }
192
193    fn get_output_data_type_meta(&self) -> Option<Meta> {
194        let output = match self.output {
195            Some(ref output) => output,
196            None => return None,
197        };
198        Some(meta_value_str("output", &data_ty_to_literal(output), false))
199    }
200
201    pub(crate) fn get_output_data_type(&self) -> Option<&DataType> {
202        self.output.as_ref()
203    }
204
205    pub(crate) fn has_output(&self) -> bool {
206        self.output.is_some()
207    }
208
209    pub(crate) fn get_channel_buffer_meta(&self) -> Option<Meta> {
210        let buffer = match self.buffer {
211            Some(ref buffer) => buffer,
212            None => return None,
213        };
214        Some(meta_value_usize("buffer", buffer))
215    }
216
217    pub fn filter_upstreams(&mut self, pipe_id_filter: &HashSet<String>) {
218        let upstreams = match self.upstreams {
219            Some(ref upstreams) => upstreams,
220            None => return,
221        };
222        self.upstreams = Some(
223            upstreams
224                .iter()
225                .cloned()
226                .filter(|id| pipe_id_filter.contains(id))
227                .collect(),
228        )
229    }
230}
231
232impl Entity for Pipe {
233    fn get_id(&self) -> String {
234        self.name.to_owned()
235    }
236
237    fn list_dependency(&self) -> Vec<String> {
238        match self.upstreams {
239            Some(ref upstreams) => upstreams.to_owned(),
240            None => vec![],
241        }
242    }
243
244    // to pipe meta
245    fn to_literal(&self, indent: usize) -> String {
246        let mut metas: Vec<Meta> = vec![
247            self.get_name_meta(),
248            self.get_type_meta(),
249            self.get_config_meta(),
250        ];
251        if let Some(meta) = self.get_upstream_meta() {
252            metas.push(meta)
253        };
254        if let Some(meta) = self.get_output_data_type_meta() {
255            metas.push(meta)
256        };
257        if let Some(meta) = self.get_channel_buffer_meta() {
258            metas.push(meta)
259        };
260        let meta = Meta::List {
261            name: "pipe".to_owned(),
262            metas,
263        };
264        meta_to_literal(&meta, indent)
265    }
266}
267
268impl Display for Pipe {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        let ty = self.ty.as_ref().unwrap();
271        writeln!(f, "Name:   {}", self.name)?;
272        writeln!(f, "Type:   {}", ty)?;
273        writeln!(f, "Config: {}", self.config)?;
274        let upstream = match self.upstreams {
275            Some(ref upstreams) => upstreams.join(", "),
276            None => "".to_owned(),
277        };
278        writeln!(f, "Upstream: [{}]", upstream)
279    }
280}
281
282impl<V: VisitEntity<Pipe>> EntityAccept<V> for Pipe {}
283
284impl UseCrate for Pipe {
285    fn get_crate(&self) -> Option<Dependency> {
286        let config_ty = self.config.get_config_type().as_str();
287        match config_ty {
288            "AvroDeserConfig" | "AvroSerConfig" => Some(default_avro_dependency()),
289            "CqlPreparedWriterConfig" | "CqlWriterConfig" => Some(default_cql_dependency()),
290            "CsvDeserConfig" | "CsvSerConfig" => Some(default_csv_dependency()),
291            "JsonDeserConfig" | "JsonRecordSerConfig" | "JsonSerConfig" => {
292                Some(default_json_dependency())
293            }
294            "KafkaConsumerConfig" | "KafkaPartitionedProducerConfig" | "KafkaProducerConfig" => {
295                Some(default_kafka_dependency())
296            }
297            "KubeEventReaderConfig" | "KubeLogReaderConfig" => Some(default_kube_dependency()),
298            "MySQLPreparedWriterConfig" | "MySQLWriterConfig" => Some(default_mysql_dependency()),
299            "PsqlPreparedWriterConfig" | "PsqlWriterConfig" => Some(default_psql_dependency()),
300            "RedisPublisherConfig"
301            | "RedisStringBatchWriterConfig"
302            | "RedisStringWriterConfig"
303            | "RedisSubscriberConfig"
304            | "RedisUnorderedGroupAddAggregatorConfig" => Some(default_redis_dependency()),
305            "ReqwestGetterConfig" | "ReqwestPosterConfig" | "ReqwestQueryConfig" => {
306                Some(default_reqwest_dependency())
307            }
308            "RocksDBUnorderedGroupAddAggregatorConfig" => Some(default_rocksdb_dependency()),
309            "WarpIngestionServerConfig" => Some(default_warp_dependency()),
310            "DynamoDBWriterConfig" => Some(default_dynamodb_dependency()),
311            "S3WriterConfig" => Some(default_s3_dependency()),
312            "SnsPublisherConfig" => Some(default_sns_dependency()),
313            "SqsMessageReceiverConfig" => Some(default_sqs_dependency()),
314            "MqttPublisherConfig" | "MqttSubscriberConfig" => Some(default_mqtt_dependency()),
315            "AmqpPublisherConfig" | "AmqpConsumerConfig" => Some(default_amqp_dependency()),
316            _ => None,
317        }
318    }
319}