1use std::collections::HashSet;
2use std::fmt::Display;
3
4use crate::models::{Entity, EntityAccept, VisitEntity};
5use serde::Deserialize;
6use strum::{Display, EnumString};
7
8use super::data::{data_ty_to_literal, DataType};
9use super::default_mqtt_dependency;
10use super::dependency::{
11 default_amqp_dependency, default_avro_dependency, default_cql_dependency,
12 default_csv_dependency, default_dynamodb_dependency, default_json_dependency,
13 default_kafka_dependency, default_kube_dependency, default_mysql_dependency,
14 default_psql_dependency, default_redis_dependency, default_reqwest_dependency,
15 default_rocksdb_dependency, default_s3_dependency, default_sns_dependency,
16 default_sqs_dependency, default_warp_dependency, Dependency, UseCrate,
17};
18use super::meta::{meta_to_literal, meta_value_str, meta_value_usize, Meta};
19
20#[derive(Clone, Display, EnumString, PartialEq, Debug, Deserialize)]
21pub enum PipeType {
22 #[strum(to_string = "listener")]
23 Listener,
24 #[strum(to_string = "poller")]
25 Poller,
26 #[strum(to_string = "mapper")]
27 Mapper,
28 #[strum(to_string = "collector")]
29 Collector,
30 #[strum(to_string = "selector")]
31 Selector,
32 #[strum(to_string = "exporter")]
33 Exporter,
34 #[strum(to_string = "streamer")]
35 Streamer,
36}
37
38#[derive(Deserialize, Debug, Clone)]
39pub struct PipeConfig {
40 ty: String,
41 path: Option<String>,
42}
43
44impl PipeConfig {
45 pub fn get_path(&self) -> Option<&String> {
46 self.path.as_ref()
47 }
48 pub fn get_config_type(&self) -> &String {
49 &self.ty
50 }
51 pub fn resolve_pipe_ty(&self) -> Option<PipeType> {
52 match self.ty.as_str() {
53 "InMemoryBagCollectorConfig"
54 | "InMemorySetCollectorConfig"
55 | "InMemoryWindowCollectorConfig"
56 | "TextCollectorConfig" => Some(PipeType::Collector),
57 "TimerConfig" | "SqsMessageReceiverConfig" => Some(PipeType::Poller),
58 "LocalFilePathVisitorConfig"
59 | "WarpIngestionServerConfig"
60 | "KubeLogReaderConfig"
61 | "KubeEventReaderConfig"
62 | "RedisSubscriberConfig"
63 | "KafkaConsumerConfig"
64 | "MqttSubscriberConfig"
65 | "AmqpConsumerConfig" => Some(PipeType::Listener),
66 "PrinterConfig"
67 | "ReqwestPosterConfig"
68 | "CqlWriterConfig"
69 | "CqlPreparedWriterConfig"
70 | "PsqlWriterConfig"
71 | "PsqlPreparedWriterConfig"
72 | "RedisStringWriterConfig"
73 | "RedisStringBatchWriterConfig"
74 | "RedisPublisherConfig"
75 | "KafkaProducerConfig"
76 | "KafkaPartitionedProducerConfig"
77 | "S3WriterConfig"
78 | "MySQLWriterConfig"
79 | "MySQLPreparedWriterConfig"
80 | "DynamoDBWriterConfig"
81 | "SnsPublisherConfig"
82 | "MqttPublisherConfig"
83 | "AmqpPublisherConfig" => Some(PipeType::Exporter),
84 "AddAggregatorConfig"
85 | "ConversionConfig"
86 | "EchoConfig"
87 | "FieldVisitConfig"
88 | "FileReaderConfig"
89 | "FileWriterConfig"
90 | "FilterMapConfig"
91 | "OrderedGroupAddAggregatorConfig"
92 | "ProjectionConfig"
93 | "StringSplitterConfig"
94 | "TopAggregatorConfig"
95 | "UnorderedGroupAddAggregatorConfig"
96 | "AvroSerConfig"
97 | "AvroDeserConfig"
98 | "JsonSerConfig"
99 | "JsonDeserConfig"
100 | "JsonRecordSerConfig"
101 | "CsvSerConfig"
102 | "CsvDeserConfig"
103 | "RedisUnorderedGroupAddAggregatorConfig"
104 | "RocksDBUnorderedGroupAddAggregatorConfig"
105 | "ReqwestGetterConfig"
106 | "ReqwestQueryConfig" => Some(PipeType::Mapper),
107 "DefaultHashSelectorConfig" | "RandomSelectorConfig" | "RoundRobinSelectorConfig" => {
108 Some(PipeType::Selector)
109 }
110 "FileSplitReaderConfig" | "FileLineReaderConfig" | "IteratorReaderConfig" => {
111 Some(PipeType::Streamer)
112 }
113 _ => None,
114 }
115 }
116}
117
118impl Display for PipeConfig {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 match self.path {
121 Some(ref path) => write!(f, "{{ type: {}, path: {} }}", self.ty, path),
122 None => write!(f, "{{ type: {} }}", self.ty),
123 }
124 }
125}
126
127#[derive(Deserialize, Debug, Clone)]
128pub struct Pipe {
129 name: String,
130 ty: Option<PipeType>,
131 config: PipeConfig,
132 buffer: Option<usize>,
134 upstreams: Option<Vec<String>>,
136 output: Option<DataType>,
138}
139
140impl Pipe {
141 pub fn init(&mut self) {
142 if self.upstreams.is_none() {
143 self.upstreams = Some(Vec::new())
144 }
145 if !self.has_type() {
146 self.ty = self.config.resolve_pipe_ty()
148 }
149 }
150
151 pub fn has_type(&self) -> bool {
152 self.ty.is_some()
153 }
154
155 pub fn is_source(&self) -> bool {
156 let ty = self.ty.as_ref().unwrap();
157 matches!(ty, PipeType::Listener | PipeType::Poller)
158 }
159
160 pub fn is_sink(&self) -> bool {
161 let ty = self.ty.as_ref().unwrap();
162 matches!(ty, PipeType::Exporter)
163 }
164
165 fn get_name_meta(&self) -> Meta {
166 meta_value_str("name", &self.name, false)
167 }
168
169 fn get_type_meta(&self) -> Meta {
170 let ty = self.ty.as_ref().unwrap();
171 meta_value_str("ty", &ty.to_string(), false)
172 }
173
174 fn get_config_meta(&self) -> Meta {
175 let mut config_metas = vec![meta_value_str("ty", self.config.get_config_type(), false)];
176 if let Some(path) = self.config.get_path() {
177 config_metas.push(meta_value_str("path", path, false));
178 };
179 Meta::List {
180 name: "config".to_owned(),
181 metas: config_metas,
182 }
183 }
184
185 fn get_upstream_meta(&self) -> Option<Meta> {
186 let upstreams = self.upstreams.as_ref().expect("upstreams not inited");
187 if upstreams.is_empty() {
188 return None;
189 }
190 Some(meta_value_str("upstream", &upstreams.join(", "), false))
191 }
192
193 fn get_output_data_type_meta(&self) -> Option<Meta> {
194 let output = match self.output {
195 Some(ref output) => output,
196 None => return None,
197 };
198 Some(meta_value_str("output", &data_ty_to_literal(output), false))
199 }
200
201 pub(crate) fn get_output_data_type(&self) -> Option<&DataType> {
202 self.output.as_ref()
203 }
204
205 pub(crate) fn has_output(&self) -> bool {
206 self.output.is_some()
207 }
208
209 pub(crate) fn get_channel_buffer_meta(&self) -> Option<Meta> {
210 let buffer = match self.buffer {
211 Some(ref buffer) => buffer,
212 None => return None,
213 };
214 Some(meta_value_usize("buffer", buffer))
215 }
216
217 pub fn filter_upstreams(&mut self, pipe_id_filter: &HashSet<String>) {
218 let upstreams = match self.upstreams {
219 Some(ref upstreams) => upstreams,
220 None => return,
221 };
222 self.upstreams = Some(
223 upstreams
224 .iter()
225 .cloned()
226 .filter(|id| pipe_id_filter.contains(id))
227 .collect(),
228 )
229 }
230}
231
232impl Entity for Pipe {
233 fn get_id(&self) -> String {
234 self.name.to_owned()
235 }
236
237 fn list_dependency(&self) -> Vec<String> {
238 match self.upstreams {
239 Some(ref upstreams) => upstreams.to_owned(),
240 None => vec![],
241 }
242 }
243
244 fn to_literal(&self, indent: usize) -> String {
246 let mut metas: Vec<Meta> = vec![
247 self.get_name_meta(),
248 self.get_type_meta(),
249 self.get_config_meta(),
250 ];
251 if let Some(meta) = self.get_upstream_meta() {
252 metas.push(meta)
253 };
254 if let Some(meta) = self.get_output_data_type_meta() {
255 metas.push(meta)
256 };
257 if let Some(meta) = self.get_channel_buffer_meta() {
258 metas.push(meta)
259 };
260 let meta = Meta::List {
261 name: "pipe".to_owned(),
262 metas,
263 };
264 meta_to_literal(&meta, indent)
265 }
266}
267
268impl Display for Pipe {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 let ty = self.ty.as_ref().unwrap();
271 writeln!(f, "Name: {}", self.name)?;
272 writeln!(f, "Type: {}", ty)?;
273 writeln!(f, "Config: {}", self.config)?;
274 let upstream = match self.upstreams {
275 Some(ref upstreams) => upstreams.join(", "),
276 None => "".to_owned(),
277 };
278 writeln!(f, "Upstream: [{}]", upstream)
279 }
280}
281
282impl<V: VisitEntity<Pipe>> EntityAccept<V> for Pipe {}
283
284impl UseCrate for Pipe {
285 fn get_crate(&self) -> Option<Dependency> {
286 let config_ty = self.config.get_config_type().as_str();
287 match config_ty {
288 "AvroDeserConfig" | "AvroSerConfig" => Some(default_avro_dependency()),
289 "CqlPreparedWriterConfig" | "CqlWriterConfig" => Some(default_cql_dependency()),
290 "CsvDeserConfig" | "CsvSerConfig" => Some(default_csv_dependency()),
291 "JsonDeserConfig" | "JsonRecordSerConfig" | "JsonSerConfig" => {
292 Some(default_json_dependency())
293 }
294 "KafkaConsumerConfig" | "KafkaPartitionedProducerConfig" | "KafkaProducerConfig" => {
295 Some(default_kafka_dependency())
296 }
297 "KubeEventReaderConfig" | "KubeLogReaderConfig" => Some(default_kube_dependency()),
298 "MySQLPreparedWriterConfig" | "MySQLWriterConfig" => Some(default_mysql_dependency()),
299 "PsqlPreparedWriterConfig" | "PsqlWriterConfig" => Some(default_psql_dependency()),
300 "RedisPublisherConfig"
301 | "RedisStringBatchWriterConfig"
302 | "RedisStringWriterConfig"
303 | "RedisSubscriberConfig"
304 | "RedisUnorderedGroupAddAggregatorConfig" => Some(default_redis_dependency()),
305 "ReqwestGetterConfig" | "ReqwestPosterConfig" | "ReqwestQueryConfig" => {
306 Some(default_reqwest_dependency())
307 }
308 "RocksDBUnorderedGroupAddAggregatorConfig" => Some(default_rocksdb_dependency()),
309 "WarpIngestionServerConfig" => Some(default_warp_dependency()),
310 "DynamoDBWriterConfig" => Some(default_dynamodb_dependency()),
311 "S3WriterConfig" => Some(default_s3_dependency()),
312 "SnsPublisherConfig" => Some(default_sns_dependency()),
313 "SqsMessageReceiverConfig" => Some(default_sqs_dependency()),
314 "MqttPublisherConfig" | "MqttSubscriberConfig" => Some(default_mqtt_dependency()),
315 "AmqpPublisherConfig" | "AmqpConsumerConfig" => Some(default_amqp_dependency()),
316 _ => None,
317 }
318 }
319}