term_guard/sources/
mod.rs

1//! Data source connectors for Term validation library.
2//!
3//! This module provides implementations for various data sources including
4//! file formats (CSV, Parquet, JSON) with support for compression and glob patterns.
5
6use crate::prelude::*;
7use async_trait::async_trait;
8use datafusion::arrow::datatypes::Schema;
9use datafusion::prelude::SessionContext;
10use std::fmt::Debug;
11use std::sync::Arc;
12
13mod csv;
14mod json;
15mod parquet;
16
17// #[cfg(feature = "database")]
18// mod database;
19
20#[cfg(feature = "cloud-storage")]
21mod cloud;
22
23pub use csv::{CsvOptions, CsvSource};
24pub use json::{JsonOptions, JsonSource};
25pub use parquet::{ParquetOptions, ParquetSource};
26
27// #[cfg(feature = "database")]
28// pub use database::{DatabaseConfig, DatabaseSource};
29
30// #[cfg(all(feature = "database", feature = "postgres"))]
31// pub use database::PostgresSource;
32
33// #[cfg(all(feature = "database", feature = "mysql"))]
34// pub use database::MySqlSource;
35
36// #[cfg(all(feature = "database", feature = "sqlite"))]
37// pub use database::SqliteSource;
38
39#[cfg(feature = "cloud-storage")]
40pub use cloud::{AzureConfig, GcsConfig, S3Config};
41
42#[cfg(all(feature = "cloud-storage", feature = "s3"))]
43pub use cloud::{S3Auth, S3Source};
44
45#[cfg(all(feature = "cloud-storage", feature = "gcs"))]
46pub use cloud::{GcsAuth, GcsSource};
47
48#[cfg(all(feature = "cloud-storage", feature = "azure"))]
49pub use cloud::{AzureAuth, AzureBlobSource};
50
51/// A data source that can be registered with a DataFusion context.
52///
53/// This trait defines the interface for all data sources in the Term library.
54/// Implementations should handle schema inference, compression detection, and
55/// efficient data loading.
56///
57/// # Examples
58///
59/// ```rust,ignore
60/// use term_guard::sources::{DataSource, CsvSource};
61///
62/// # async fn example() -> Result<()> {
63/// let source = CsvSource::new("data/users.csv")?;
64/// let ctx = SessionContext::new();
65/// source.register(&ctx, "users").await?;
66/// # Ok(())
67/// # }
68/// ```
69#[async_trait]
70pub trait DataSource: Debug + Send + Sync {
71    /// Registers this data source with the given session context.
72    ///
73    /// This method should handle:
74    /// - Schema inference if not explicitly provided
75    /// - Compression detection and handling
76    /// - Efficient data loading
77    /// - Telemetry spans for data loading operations
78    ///
79    /// # Arguments
80    ///
81    /// * `ctx` - The DataFusion session context to register with
82    /// * `table_name` - The name to register the table as
83    /// * `telemetry` - Optional telemetry configuration for tracing data loading
84    ///
85    /// # Returns
86    ///
87    /// A `Result` indicating success or failure
88    async fn register(&self, ctx: &SessionContext, table_name: &str) -> Result<()> {
89        self.register_with_telemetry(ctx, table_name, None).await
90    }
91
92    /// Registers this data source with telemetry support.
93    ///
94    /// This is the main implementation method that data sources should override.
95    /// The default `register` method delegates to this with `None` telemetry.
96    async fn register_with_telemetry(
97        &self,
98        ctx: &SessionContext,
99        table_name: &str,
100        telemetry: Option<&Arc<TermTelemetry>>,
101    ) -> Result<()>;
102
103    /// Returns the schema of this data source if known.
104    ///
105    /// This may return `None` if schema inference hasn't been performed yet.
106    fn schema(&self) -> Option<&Arc<Schema>>;
107
108    /// Returns a human-readable description of this data source.
109    fn description(&self) -> String;
110}
111
112/// Common compression formats supported by file sources.
113#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum CompressionType {
115    /// No compression
116    None,
117    /// Gzip compression
118    Gzip,
119    /// Zstandard compression
120    Zstd,
121    /// Bzip2 compression
122    Bzip2,
123    /// LZ4 compression
124    Lz4,
125    /// Snappy compression
126    Snappy,
127    /// Automatic detection based on file extension
128    Auto,
129}
130
131impl CompressionType {
132    /// Detects compression type from file path extension.
133    pub fn from_path(path: &str) -> Self {
134        let lower = path.to_lowercase();
135        if lower.ends_with(".gz") || lower.ends_with(".gzip") {
136            Self::Gzip
137        } else if lower.ends_with(".zst") || lower.ends_with(".zstd") {
138            Self::Zstd
139        } else if lower.ends_with(".bz2") || lower.ends_with(".bzip2") {
140            Self::Bzip2
141        } else if lower.ends_with(".lz4") {
142            Self::Lz4
143        } else if lower.ends_with(".snappy") || lower.ends_with(".sz") {
144            Self::Snappy
145        } else {
146            Self::None
147        }
148    }
149
150    /// Returns the file extension for this compression type.
151    pub fn extension(&self) -> &'static str {
152        match self {
153            Self::None => "",
154            Self::Gzip => ".gz",
155            Self::Zstd => ".zst",
156            Self::Bzip2 => ".bz2",
157            Self::Lz4 => ".lz4",
158            Self::Snappy => ".snappy",
159            Self::Auto => "",
160        }
161    }
162}
163
164/// Utility function to expand glob patterns into file paths.
165pub(crate) async fn expand_globs(patterns: &[String]) -> Result<Vec<String>> {
166    use glob::glob;
167
168    let mut paths = Vec::new();
169    for pattern in patterns {
170        let matches = glob(pattern).map_err(|e| {
171            TermError::Configuration(format!("Invalid glob pattern '{pattern}': {e}"))
172        })?;
173
174        for entry in matches {
175            let path = entry
176                .map_err(|e| TermError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
177
178            if path.is_file() {
179                if let Some(path_str) = path.to_str() {
180                    paths.push(path_str.to_string());
181                }
182            }
183        }
184    }
185
186    if paths.is_empty() {
187        return Err(TermError::DataSource {
188            source_type: "file".to_string(),
189            message: "No files found matching glob patterns".to_string(),
190            source: None,
191        });
192    }
193
194    Ok(paths)
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn test_compression_detection() {
203        assert_eq!(
204            CompressionType::from_path("data.csv"),
205            CompressionType::None
206        );
207        assert_eq!(
208            CompressionType::from_path("data.csv.gz"),
209            CompressionType::Gzip
210        );
211        assert_eq!(
212            CompressionType::from_path("data.CSV.GZ"),
213            CompressionType::Gzip
214        );
215        assert_eq!(
216            CompressionType::from_path("data.csv.zst"),
217            CompressionType::Zstd
218        );
219        assert_eq!(
220            CompressionType::from_path("data.csv.bz2"),
221            CompressionType::Bzip2
222        );
223        assert_eq!(
224            CompressionType::from_path("data.csv.lz4"),
225            CompressionType::Lz4
226        );
227        assert_eq!(
228            CompressionType::from_path("data.csv.snappy"),
229            CompressionType::Snappy
230        );
231    }
232
233    #[test]
234    fn test_compression_extension() {
235        assert_eq!(CompressionType::None.extension(), "");
236        assert_eq!(CompressionType::Gzip.extension(), ".gz");
237        assert_eq!(CompressionType::Zstd.extension(), ".zst");
238        assert_eq!(CompressionType::Bzip2.extension(), ".bz2");
239        assert_eq!(CompressionType::Lz4.extension(), ".lz4");
240        assert_eq!(CompressionType::Snappy.extension(), ".snappy");
241    }
242}