term_guard/sources/
mod.rs

1//! Data source connectors for Term validation library.
2//!
3//! This module provides implementations for various data sources including
4//! file formats (CSV, Parquet, JSON) with support for compression and glob patterns.
5
6use crate::prelude::*;
7use async_trait::async_trait;
8use datafusion::arrow::datatypes::Schema;
9use datafusion::prelude::SessionContext;
10use std::fmt::Debug;
11use std::sync::Arc;
12
13mod csv;
14mod joined;
15mod json;
16mod parquet;
17
18#[cfg(feature = "database")]
19mod database;
20
21#[cfg(feature = "cloud-storage")]
22mod cloud;
23
24pub use csv::{CsvOptions, CsvSource};
25pub use joined::{JoinCondition, JoinType, JoinedSource};
26pub use json::{JsonOptions, JsonSource};
27pub use parquet::{ParquetOptions, ParquetSource};
28
29#[cfg(feature = "database")]
30pub use database::{DatabaseConfig, DatabaseSource};
31
32#[cfg(all(feature = "database", feature = "postgres"))]
33pub use database::PostgresSource;
34
35#[cfg(all(feature = "database", feature = "mysql"))]
36pub use database::MySqlSource;
37
38#[cfg(all(feature = "database", feature = "sqlite"))]
39pub use database::SqliteSource;
40
41#[cfg(feature = "cloud-storage")]
42pub use cloud::{AzureConfig, GcsConfig, S3Config};
43
44#[cfg(all(feature = "cloud-storage", feature = "s3"))]
45pub use cloud::{S3Auth, S3Source};
46
47#[cfg(all(feature = "cloud-storage", feature = "gcs"))]
48pub use cloud::{GcsAuth, GcsSource};
49
50#[cfg(all(feature = "cloud-storage", feature = "azure"))]
51pub use cloud::{AzureAuth, AzureBlobSource};
52
53/// A data source that can be registered with a DataFusion context.
54///
55/// This trait defines the interface for all data sources in the Term library.
56/// Implementations should handle schema inference, compression detection, and
57/// efficient data loading.
58///
59/// # Examples
60///
61/// ```rust,ignore
62/// use term_guard::sources::{DataSource, CsvSource};
63///
64/// # async fn example() -> Result<()> {
65/// let source = CsvSource::new("data/users.csv")?;
66/// let ctx = SessionContext::new();
67/// source.register(&ctx, "users").await?;
68/// # Ok(())
69/// # }
70/// ```
71#[async_trait]
72pub trait DataSource: Debug + Send + Sync {
73    /// Registers this data source with the given session context.
74    ///
75    /// This method should handle:
76    /// - Schema inference if not explicitly provided
77    /// - Compression detection and handling
78    /// - Efficient data loading
79    /// - Telemetry spans for data loading operations
80    ///
81    /// # Arguments
82    ///
83    /// * `ctx` - The DataFusion session context to register with
84    /// * `table_name` - The name to register the table as
85    /// * `telemetry` - Optional telemetry configuration for tracing data loading
86    ///
87    /// # Returns
88    ///
89    /// A `Result` indicating success or failure
90    async fn register(&self, ctx: &SessionContext, table_name: &str) -> Result<()> {
91        self.register_with_telemetry(ctx, table_name, None).await
92    }
93
94    /// Registers this data source with telemetry support.
95    ///
96    /// This is the main implementation method that data sources should override.
97    /// The default `register` method delegates to this with `None` telemetry.
98    async fn register_with_telemetry(
99        &self,
100        ctx: &SessionContext,
101        table_name: &str,
102        telemetry: Option<&Arc<TermTelemetry>>,
103    ) -> Result<()>;
104
105    /// Returns the schema of this data source if known.
106    ///
107    /// This may return `None` if schema inference hasn't been performed yet.
108    fn schema(&self) -> Option<&Arc<Schema>>;
109
110    /// Returns a human-readable description of this data source.
111    fn description(&self) -> String;
112}
113
114/// Common compression formats supported by file sources.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum CompressionType {
117    /// No compression
118    None,
119    /// Gzip compression
120    Gzip,
121    /// Zstandard compression
122    Zstd,
123    /// Bzip2 compression
124    Bzip2,
125    /// LZ4 compression
126    Lz4,
127    /// Snappy compression
128    Snappy,
129    /// Automatic detection based on file extension
130    Auto,
131}
132
133impl CompressionType {
134    /// Detects compression type from file path extension.
135    pub fn from_path(path: &str) -> Self {
136        let lower = path.to_lowercase();
137        if lower.ends_with(".gz") || lower.ends_with(".gzip") {
138            Self::Gzip
139        } else if lower.ends_with(".zst") || lower.ends_with(".zstd") {
140            Self::Zstd
141        } else if lower.ends_with(".bz2") || lower.ends_with(".bzip2") {
142            Self::Bzip2
143        } else if lower.ends_with(".lz4") {
144            Self::Lz4
145        } else if lower.ends_with(".snappy") || lower.ends_with(".sz") {
146            Self::Snappy
147        } else {
148            Self::None
149        }
150    }
151
152    /// Returns the file extension for this compression type.
153    pub fn extension(&self) -> &'static str {
154        match self {
155            Self::None => "",
156            Self::Gzip => ".gz",
157            Self::Zstd => ".zst",
158            Self::Bzip2 => ".bz2",
159            Self::Lz4 => ".lz4",
160            Self::Snappy => ".snappy",
161            Self::Auto => "",
162        }
163    }
164}
165
166/// Utility function to expand glob patterns into file paths.
167pub(crate) async fn expand_globs(patterns: &[String]) -> Result<Vec<String>> {
168    use glob::glob;
169
170    let mut paths = Vec::new();
171    for pattern in patterns {
172        let matches = glob(pattern).map_err(|e| {
173            TermError::Configuration(format!("Invalid glob pattern '{pattern}': {e}"))
174        })?;
175
176        for entry in matches {
177            let path = entry
178                .map_err(|e| TermError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
179
180            if path.is_file() {
181                if let Some(path_str) = path.to_str() {
182                    paths.push(path_str.to_string());
183                }
184            }
185        }
186    }
187
188    if paths.is_empty() {
189        return Err(TermError::DataSource {
190            source_type: "file".to_string(),
191            message: "No files found matching glob patterns".to_string(),
192            source: None,
193        });
194    }
195
196    Ok(paths)
197}
198
199#[cfg(test)]
200mod tests {
201    use super::*;
202
203    #[test]
204    fn test_compression_detection() {
205        assert_eq!(
206            CompressionType::from_path("data.csv"),
207            CompressionType::None
208        );
209        assert_eq!(
210            CompressionType::from_path("data.csv.gz"),
211            CompressionType::Gzip
212        );
213        assert_eq!(
214            CompressionType::from_path("data.CSV.GZ"),
215            CompressionType::Gzip
216        );
217        assert_eq!(
218            CompressionType::from_path("data.csv.zst"),
219            CompressionType::Zstd
220        );
221        assert_eq!(
222            CompressionType::from_path("data.csv.bz2"),
223            CompressionType::Bzip2
224        );
225        assert_eq!(
226            CompressionType::from_path("data.csv.lz4"),
227            CompressionType::Lz4
228        );
229        assert_eq!(
230            CompressionType::from_path("data.csv.snappy"),
231            CompressionType::Snappy
232        );
233    }
234
235    #[test]
236    fn test_compression_extension() {
237        assert_eq!(CompressionType::None.extension(), "");
238        assert_eq!(CompressionType::Gzip.extension(), ".gz");
239        assert_eq!(CompressionType::Zstd.extension(), ".zst");
240        assert_eq!(CompressionType::Bzip2.extension(), ".bz2");
241        assert_eq!(CompressionType::Lz4.extension(), ".lz4");
242        assert_eq!(CompressionType::Snappy.extension(), ".snappy");
243    }
244}