Skip to main content

dataprof_core/
source.rs

1/// Supported file formats for data profiling
2#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
3#[serde(rename_all = "lowercase")]
4pub enum FileFormat {
5    Csv,
6    Json,
7    Jsonl,
8    Parquet,
9    #[serde(untagged)]
10    Unknown(String),
11}
12
13impl std::fmt::Display for FileFormat {
14    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
15        match self {
16            Self::Csv => write!(f, "csv"),
17            Self::Json => write!(f, "json"),
18            Self::Jsonl => write!(f, "jsonl"),
19            Self::Parquet => write!(f, "parquet"),
20            Self::Unknown(s) => write!(f, "{}", s),
21        }
22    }
23}
24
25/// Supported query engines for SQL-based profiling
26#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
27#[serde(rename_all = "lowercase")]
28pub enum QueryEngine {
29    Postgres,
30    MySql,
31    Sqlite,
32    Snowflake,
33    BigQuery,
34    #[serde(untagged)]
35    Custom(String),
36}
37
38impl std::fmt::Display for QueryEngine {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            Self::Postgres => write!(f, "postgres"),
42            Self::MySql => write!(f, "mysql"),
43            Self::Sqlite => write!(f, "sqlite"),
44            Self::Snowflake => write!(f, "snowflake"),
45            Self::BigQuery => write!(f, "bigquery"),
46            Self::Custom(s) => write!(f, "{}", s),
47        }
48    }
49}
50
51/// Source library for in-memory DataFrame profiling
52#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
53#[serde(rename_all = "lowercase")]
54pub enum DataFrameLibrary {
55    Pandas,
56    Polars,
57    PyArrow,
58    #[serde(untagged)]
59    Custom(String),
60}
61
62impl std::fmt::Display for DataFrameLibrary {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        match self {
65            Self::Pandas => write!(f, "pandas"),
66            Self::Polars => write!(f, "polars"),
67            Self::PyArrow => write!(f, "pyarrow"),
68            Self::Custom(s) => write!(f, "{}", s),
69        }
70    }
71}
72
73/// Supported stream source systems
74#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
75#[serde(rename_all = "lowercase")]
76pub enum StreamSourceSystem {
77    Kafka,
78    Kinesis,
79    Pulsar,
80    Http,
81    WebSocket,
82    #[serde(rename = "object_store")]
83    ObjectStore,
84    #[serde(rename = "message_queue")]
85    MessageQueue,
86    Database,
87    #[serde(untagged)]
88    Custom(String),
89}
90
91impl std::fmt::Display for StreamSourceSystem {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        match self {
94            Self::Kafka => write!(f, "kafka"),
95            Self::Kinesis => write!(f, "kinesis"),
96            Self::Pulsar => write!(f, "pulsar"),
97            Self::Http => write!(f, "http"),
98            Self::WebSocket => write!(f, "websocket"),
99            Self::ObjectStore => write!(f, "object_store"),
100            Self::MessageQueue => write!(f, "message_queue"),
101            Self::Database => write!(f, "database"),
102            Self::Custom(s) => write!(f, "{}", s),
103        }
104    }
105}
106
107/// Metadata specific to Parquet files
108#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
109pub struct ParquetMetadata {
110    /// Number of row groups in the Parquet file
111    pub num_row_groups: usize,
112    /// Compression codec used (e.g., "SNAPPY", "GZIP", "ZSTD", "UNCOMPRESSED")
113    pub compression: String,
114    /// Parquet file version (e.g., "1.0", "2.0")
115    pub version: i32,
116    /// Arrow schema as string representation
117    pub schema_summary: String,
118    /// Total compressed size in bytes
119    pub compressed_size_bytes: u64,
120    /// Estimated uncompressed size if available
121    pub uncompressed_size_bytes: Option<u64>,
122}
123
124/// Source-agnostic data source metadata.
125#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
126#[serde(tag = "type", rename_all = "snake_case")]
127pub enum DataSource {
128    /// File-based data source (CSV, JSON, Parquet, etc.)
129    File {
130        /// Absolute or relative path to the file
131        path: String,
132        /// Detected or specified file format
133        format: FileFormat,
134        /// File size in bytes
135        size_bytes: u64,
136        /// Last modification timestamp (ISO 8601 / RFC 3339)
137        #[serde(skip_serializing_if = "Option::is_none")]
138        modified_at: Option<String>,
139        /// Parquet-specific metadata (only present for Parquet files)
140        #[serde(skip_serializing_if = "Option::is_none")]
141        parquet_metadata: Option<ParquetMetadata>,
142    },
143    /// SQL query-based data source
144    Query {
145        /// Database engine used for the query
146        engine: QueryEngine,
147        /// SQL statement executed
148        statement: String,
149        /// Target database name (if applicable)
150        #[serde(skip_serializing_if = "Option::is_none")]
151        database: Option<String>,
152        /// Unique execution identifier for tracing
153        #[serde(skip_serializing_if = "Option::is_none")]
154        execution_id: Option<String>,
155    },
156    /// In-memory DataFrame source (pandas/polars via PyCapsule)
157    #[serde(rename = "dataframe")]
158    DataFrame {
159        /// User-provided name for identification
160        name: String,
161        /// Source library (pandas, polars, pyarrow)
162        source_library: DataFrameLibrary,
163        /// Number of rows at profiling time
164        row_count: usize,
165        /// Number of columns
166        column_count: usize,
167        /// Memory usage in bytes (if available)
168        #[serde(skip_serializing_if = "Option::is_none")]
169        memory_bytes: Option<u64>,
170    },
171    /// Streaming data source
172    Stream {
173        /// Stream identifier (e.g., Kafka topic, Kinesis stream name)
174        topic: String,
175        /// Batch identifier for ordering and deduplication
176        batch_id: String,
177        /// Partition for parallel processing (optional)
178        #[serde(skip_serializing_if = "Option::is_none")]
179        partition: Option<u32>,
180        /// Consumer group for Kafka-style coordination (optional)
181        #[serde(skip_serializing_if = "Option::is_none")]
182        consumer_group: Option<String>,
183        /// Source system identifier (kafka, kinesis, pulsar, http, etc.)
184        source_system: StreamSourceSystem,
185        /// Session ID for multi-tenant scenarios
186        #[serde(skip_serializing_if = "Option::is_none")]
187        session_id: Option<String>,
188        /// Timestamp of first record in batch (ISO 8601)
189        #[serde(skip_serializing_if = "Option::is_none")]
190        first_record_at: Option<String>,
191        /// Timestamp of last record in batch (ISO 8601)
192        #[serde(skip_serializing_if = "Option::is_none")]
193        last_record_at: Option<String>,
194    },
195}
196
197impl DataSource {
198    /// Get a human-readable identifier for this data source.
199    pub fn identifier(&self) -> String {
200        match self {
201            Self::File { path, .. } => path.clone(),
202            Self::Query {
203                engine, statement, ..
204            } => {
205                let truncated = if statement.chars().count() > 50 {
206                    let mut prefix: String = statement.chars().take(47).collect();
207                    prefix.push_str("...");
208                    prefix
209                } else {
210                    statement.clone()
211                };
212                format!("{}: {}", engine, truncated)
213            }
214            Self::DataFrame {
215                name,
216                source_library,
217                ..
218            } => format!("{}[{}]", source_library, name),
219            Self::Stream {
220                source_system,
221                topic,
222                batch_id,
223                ..
224            } => format!("{}[{}]-batch:{}", source_system, topic, batch_id),
225        }
226    }
227
228    /// Get file size in megabytes if this is a file-based source or dataframe.
229    pub fn size_mb(&self) -> Option<f64> {
230        match self {
231            Self::File { size_bytes, .. } => Some(*size_bytes as f64 / 1_048_576.0),
232            Self::DataFrame { memory_bytes, .. } => memory_bytes.map(|b| b as f64 / 1_048_576.0),
233            Self::Query { .. } | Self::Stream { .. } => None,
234        }
235    }
236
237    /// Check if this is a file-based source.
238    pub fn is_file(&self) -> bool {
239        matches!(self, Self::File { .. })
240    }
241
242    /// Check if this is a query-based source.
243    pub fn is_query(&self) -> bool {
244        matches!(self, Self::Query { .. })
245    }
246
247    /// Check if this is a DataFrame-based source.
248    pub fn is_dataframe(&self) -> bool {
249        matches!(self, Self::DataFrame { .. })
250    }
251
252    /// Check if this is a Stream-based source.
253    pub fn is_stream(&self) -> bool {
254        matches!(self, Self::Stream { .. })
255    }
256
257    /// Get the file path if this is a file-based source.
258    pub fn file_path(&self) -> Option<&str> {
259        match self {
260            Self::File { path, .. } => Some(path),
261            _ => None,
262        }
263    }
264
265    /// Get the stream topic if this is a stream-based source.
266    pub fn stream_topic(&self) -> Option<&str> {
267        match self {
268            Self::Stream { topic, .. } => Some(topic),
269            _ => None,
270        }
271    }
272
273    /// Get the batch ID if this is a stream-based source.
274    pub fn batch_id(&self) -> Option<&str> {
275        match self {
276            Self::Stream { batch_id, .. } => Some(batch_id),
277            _ => None,
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[test]
287    fn test_data_source_file_identifier() {
288        let ds = DataSource::File {
289            path: "/path/to/data.csv".to_string(),
290            format: FileFormat::Csv,
291            size_bytes: 0,
292            modified_at: None,
293            parquet_metadata: None,
294        };
295
296        assert_eq!(ds.identifier(), "/path/to/data.csv");
297        assert!(ds.is_file());
298        assert!(!ds.is_query());
299        assert!(!ds.is_dataframe());
300        assert!(!ds.is_stream());
301    }
302
303    #[test]
304    fn test_data_source_stream_identifier_and_helpers() {
305        let ds = DataSource::Stream {
306            topic: "events".to_string(),
307            batch_id: "b1".to_string(),
308            partition: Some(0),
309            consumer_group: None,
310            source_system: StreamSourceSystem::Kafka,
311            session_id: None,
312            first_record_at: None,
313            last_record_at: None,
314        };
315
316        assert_eq!(ds.identifier(), "kafka[events]-batch:b1");
317        assert!(ds.is_stream());
318        assert_eq!(ds.stream_topic(), Some("events"));
319        assert_eq!(ds.batch_id(), Some("b1"));
320        assert!(!ds.is_file());
321        assert!(!ds.is_query());
322        assert!(ds.size_mb().is_none());
323    }
324
325    #[test]
326    fn test_stream_json_serialization() {
327        let ds = DataSource::Stream {
328            topic: "sensor-data".to_string(),
329            batch_id: "batch-789".to_string(),
330            partition: Some(2),
331            consumer_group: Some("processing-group".to_string()),
332            source_system: StreamSourceSystem::Kinesis,
333            session_id: Some("session-1".to_string()),
334            first_record_at: Some("2023-01-01T10:00:00Z".to_string()),
335            last_record_at: Some("2023-01-01T10:05:00Z".to_string()),
336        };
337
338        let json = serde_json::to_string(&ds).unwrap();
339        assert!(json.contains(r#""type":"stream""#));
340        assert!(json.contains(r#""source_system":"kinesis""#));
341        assert!(json.contains(r#""topic":"sensor-data""#));
342
343        let deserialized: DataSource = serde_json::from_str(&json).unwrap();
344        assert!(deserialized.is_stream());
345        assert_eq!(deserialized.stream_topic(), Some("sensor-data"));
346    }
347
348    #[test]
349    fn test_stream_source_system_serialization_names() {
350        let object_store = serde_json::to_string(&StreamSourceSystem::ObjectStore).unwrap();
351        let message_queue = serde_json::to_string(&StreamSourceSystem::MessageQueue).unwrap();
352        let database = serde_json::to_string(&StreamSourceSystem::Database).unwrap();
353
354        assert_eq!(object_store, r#""object_store""#);
355        assert_eq!(message_queue, r#""message_queue""#);
356        assert_eq!(database, r#""database""#);
357
358        let object_store: StreamSourceSystem = serde_json::from_str(r#""object_store""#).unwrap();
359        let message_queue: StreamSourceSystem = serde_json::from_str(r#""message_queue""#).unwrap();
360        let database: StreamSourceSystem = serde_json::from_str(r#""database""#).unwrap();
361
362        assert_eq!(object_store, StreamSourceSystem::ObjectStore);
363        assert_eq!(message_queue, StreamSourceSystem::MessageQueue);
364        assert_eq!(database, StreamSourceSystem::Database);
365    }
366}