1#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
3#[serde(rename_all = "lowercase")]
4pub enum FileFormat {
5 Csv,
6 Json,
7 Jsonl,
8 Parquet,
9 #[serde(untagged)]
10 Unknown(String),
11}
12
13impl std::fmt::Display for FileFormat {
14 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15 match self {
16 Self::Csv => write!(f, "csv"),
17 Self::Json => write!(f, "json"),
18 Self::Jsonl => write!(f, "jsonl"),
19 Self::Parquet => write!(f, "parquet"),
20 Self::Unknown(s) => write!(f, "{}", s),
21 }
22 }
23}
24
25#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum QueryEngine {
29 Postgres,
30 MySql,
31 Sqlite,
32 Snowflake,
33 BigQuery,
34 #[serde(untagged)]
35 Custom(String),
36}
37
38impl std::fmt::Display for QueryEngine {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 Self::Postgres => write!(f, "postgres"),
42 Self::MySql => write!(f, "mysql"),
43 Self::Sqlite => write!(f, "sqlite"),
44 Self::Snowflake => write!(f, "snowflake"),
45 Self::BigQuery => write!(f, "bigquery"),
46 Self::Custom(s) => write!(f, "{}", s),
47 }
48 }
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
53#[serde(rename_all = "lowercase")]
54pub enum DataFrameLibrary {
55 Pandas,
56 Polars,
57 PyArrow,
58 #[serde(untagged)]
59 Custom(String),
60}
61
62impl std::fmt::Display for DataFrameLibrary {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 match self {
65 Self::Pandas => write!(f, "pandas"),
66 Self::Polars => write!(f, "polars"),
67 Self::PyArrow => write!(f, "pyarrow"),
68 Self::Custom(s) => write!(f, "{}", s),
69 }
70 }
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
75#[serde(rename_all = "lowercase")]
76pub enum StreamSourceSystem {
77 Kafka,
78 Kinesis,
79 Pulsar,
80 Http,
81 WebSocket,
82 #[serde(rename = "object_store")]
83 ObjectStore,
84 #[serde(rename = "message_queue")]
85 MessageQueue,
86 Database,
87 #[serde(untagged)]
88 Custom(String),
89}
90
91impl std::fmt::Display for StreamSourceSystem {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 match self {
94 Self::Kafka => write!(f, "kafka"),
95 Self::Kinesis => write!(f, "kinesis"),
96 Self::Pulsar => write!(f, "pulsar"),
97 Self::Http => write!(f, "http"),
98 Self::WebSocket => write!(f, "websocket"),
99 Self::ObjectStore => write!(f, "object_store"),
100 Self::MessageQueue => write!(f, "message_queue"),
101 Self::Database => write!(f, "database"),
102 Self::Custom(s) => write!(f, "{}", s),
103 }
104 }
105}
106
107#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
109pub struct ParquetMetadata {
110 pub num_row_groups: usize,
112 pub compression: String,
114 pub version: i32,
116 pub schema_summary: String,
118 pub compressed_size_bytes: u64,
120 pub uncompressed_size_bytes: Option<u64>,
122}
123
124#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
126#[serde(tag = "type", rename_all = "snake_case")]
127pub enum DataSource {
128 File {
130 path: String,
132 format: FileFormat,
134 size_bytes: u64,
136 #[serde(skip_serializing_if = "Option::is_none")]
138 modified_at: Option<String>,
139 #[serde(skip_serializing_if = "Option::is_none")]
141 parquet_metadata: Option<ParquetMetadata>,
142 },
143 Query {
145 engine: QueryEngine,
147 statement: String,
149 #[serde(skip_serializing_if = "Option::is_none")]
151 database: Option<String>,
152 #[serde(skip_serializing_if = "Option::is_none")]
154 execution_id: Option<String>,
155 },
156 #[serde(rename = "dataframe")]
158 DataFrame {
159 name: String,
161 source_library: DataFrameLibrary,
163 row_count: usize,
165 column_count: usize,
167 #[serde(skip_serializing_if = "Option::is_none")]
169 memory_bytes: Option<u64>,
170 },
171 Stream {
173 topic: String,
175 batch_id: String,
177 #[serde(skip_serializing_if = "Option::is_none")]
179 partition: Option<u32>,
180 #[serde(skip_serializing_if = "Option::is_none")]
182 consumer_group: Option<String>,
183 source_system: StreamSourceSystem,
185 #[serde(skip_serializing_if = "Option::is_none")]
187 session_id: Option<String>,
188 #[serde(skip_serializing_if = "Option::is_none")]
190 first_record_at: Option<String>,
191 #[serde(skip_serializing_if = "Option::is_none")]
193 last_record_at: Option<String>,
194 },
195}
196
197impl DataSource {
198 pub fn identifier(&self) -> String {
200 match self {
201 Self::File { path, .. } => path.clone(),
202 Self::Query {
203 engine, statement, ..
204 } => {
205 let truncated = if statement.chars().count() > 50 {
206 let mut prefix: String = statement.chars().take(47).collect();
207 prefix.push_str("...");
208 prefix
209 } else {
210 statement.clone()
211 };
212 format!("{}: {}", engine, truncated)
213 }
214 Self::DataFrame {
215 name,
216 source_library,
217 ..
218 } => format!("{}[{}]", source_library, name),
219 Self::Stream {
220 source_system,
221 topic,
222 batch_id,
223 ..
224 } => format!("{}[{}]-batch:{}", source_system, topic, batch_id),
225 }
226 }
227
228 pub fn size_mb(&self) -> Option<f64> {
230 match self {
231 Self::File { size_bytes, .. } => Some(*size_bytes as f64 / 1_048_576.0),
232 Self::DataFrame { memory_bytes, .. } => memory_bytes.map(|b| b as f64 / 1_048_576.0),
233 Self::Query { .. } | Self::Stream { .. } => None,
234 }
235 }
236
237 pub fn is_file(&self) -> bool {
239 matches!(self, Self::File { .. })
240 }
241
242 pub fn is_query(&self) -> bool {
244 matches!(self, Self::Query { .. })
245 }
246
247 pub fn is_dataframe(&self) -> bool {
249 matches!(self, Self::DataFrame { .. })
250 }
251
252 pub fn is_stream(&self) -> bool {
254 matches!(self, Self::Stream { .. })
255 }
256
257 pub fn file_path(&self) -> Option<&str> {
259 match self {
260 Self::File { path, .. } => Some(path),
261 _ => None,
262 }
263 }
264
265 pub fn stream_topic(&self) -> Option<&str> {
267 match self {
268 Self::Stream { topic, .. } => Some(topic),
269 _ => None,
270 }
271 }
272
273 pub fn batch_id(&self) -> Option<&str> {
275 match self {
276 Self::Stream { batch_id, .. } => Some(batch_id),
277 _ => None,
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[test]
287 fn test_data_source_file_identifier() {
288 let ds = DataSource::File {
289 path: "/path/to/data.csv".to_string(),
290 format: FileFormat::Csv,
291 size_bytes: 0,
292 modified_at: None,
293 parquet_metadata: None,
294 };
295
296 assert_eq!(ds.identifier(), "/path/to/data.csv");
297 assert!(ds.is_file());
298 assert!(!ds.is_query());
299 assert!(!ds.is_dataframe());
300 assert!(!ds.is_stream());
301 }
302
303 #[test]
304 fn test_data_source_stream_identifier_and_helpers() {
305 let ds = DataSource::Stream {
306 topic: "events".to_string(),
307 batch_id: "b1".to_string(),
308 partition: Some(0),
309 consumer_group: None,
310 source_system: StreamSourceSystem::Kafka,
311 session_id: None,
312 first_record_at: None,
313 last_record_at: None,
314 };
315
316 assert_eq!(ds.identifier(), "kafka[events]-batch:b1");
317 assert!(ds.is_stream());
318 assert_eq!(ds.stream_topic(), Some("events"));
319 assert_eq!(ds.batch_id(), Some("b1"));
320 assert!(!ds.is_file());
321 assert!(!ds.is_query());
322 assert!(ds.size_mb().is_none());
323 }
324
325 #[test]
326 fn test_stream_json_serialization() {
327 let ds = DataSource::Stream {
328 topic: "sensor-data".to_string(),
329 batch_id: "batch-789".to_string(),
330 partition: Some(2),
331 consumer_group: Some("processing-group".to_string()),
332 source_system: StreamSourceSystem::Kinesis,
333 session_id: Some("session-1".to_string()),
334 first_record_at: Some("2023-01-01T10:00:00Z".to_string()),
335 last_record_at: Some("2023-01-01T10:05:00Z".to_string()),
336 };
337
338 let json = serde_json::to_string(&ds).unwrap();
339 assert!(json.contains(r#""type":"stream""#));
340 assert!(json.contains(r#""source_system":"kinesis""#));
341 assert!(json.contains(r#""topic":"sensor-data""#));
342
343 let deserialized: DataSource = serde_json::from_str(&json).unwrap();
344 assert!(deserialized.is_stream());
345 assert_eq!(deserialized.stream_topic(), Some("sensor-data"));
346 }
347
348 #[test]
349 fn test_stream_source_system_serialization_names() {
350 let object_store = serde_json::to_string(&StreamSourceSystem::ObjectStore).unwrap();
351 let message_queue = serde_json::to_string(&StreamSourceSystem::MessageQueue).unwrap();
352 let database = serde_json::to_string(&StreamSourceSystem::Database).unwrap();
353
354 assert_eq!(object_store, r#""object_store""#);
355 assert_eq!(message_queue, r#""message_queue""#);
356 assert_eq!(database, r#""database""#);
357
358 let object_store: StreamSourceSystem = serde_json::from_str(r#""object_store""#).unwrap();
359 let message_queue: StreamSourceSystem = serde_json::from_str(r#""message_queue""#).unwrap();
360 let database: StreamSourceSystem = serde_json::from_str(r#""database""#).unwrap();
361
362 assert_eq!(object_store, StreamSourceSystem::ObjectStore);
363 assert_eq!(message_queue, StreamSourceSystem::MessageQueue);
364 assert_eq!(database, StreamSourceSystem::Database);
365 }
366}