term_guard/sources/
mod.rs1use crate::prelude::*;
7use async_trait::async_trait;
8use datafusion::arrow::datatypes::Schema;
9use datafusion::prelude::SessionContext;
10use std::fmt::Debug;
11use std::sync::Arc;
12
13mod csv;
14mod joined;
15mod json;
16mod parquet;
17
18#[cfg(feature = "database")]
19mod database;
20
21#[cfg(feature = "cloud-storage")]
22mod cloud;
23
24pub use csv::{CsvOptions, CsvSource};
25pub use joined::{JoinCondition, JoinType, JoinedSource};
26pub use json::{JsonOptions, JsonSource};
27pub use parquet::{ParquetOptions, ParquetSource};
28
29#[cfg(feature = "database")]
30pub use database::{DatabaseConfig, DatabaseSource};
31
32#[cfg(all(feature = "database", feature = "postgres"))]
33pub use database::PostgresSource;
34
35#[cfg(all(feature = "database", feature = "mysql"))]
36pub use database::MySqlSource;
37
38#[cfg(all(feature = "database", feature = "sqlite"))]
39pub use database::SqliteSource;
40
41#[cfg(feature = "cloud-storage")]
42pub use cloud::{AzureConfig, GcsConfig, S3Config};
43
44#[cfg(all(feature = "cloud-storage", feature = "s3"))]
45pub use cloud::{S3Auth, S3Source};
46
47#[cfg(all(feature = "cloud-storage", feature = "gcs"))]
48pub use cloud::{GcsAuth, GcsSource};
49
50#[cfg(all(feature = "cloud-storage", feature = "azure"))]
51pub use cloud::{AzureAuth, AzureBlobSource};
52
53#[async_trait]
72pub trait DataSource: Debug + Send + Sync {
73 async fn register(&self, ctx: &SessionContext, table_name: &str) -> Result<()> {
91 self.register_with_telemetry(ctx, table_name, None).await
92 }
93
94 async fn register_with_telemetry(
99 &self,
100 ctx: &SessionContext,
101 table_name: &str,
102 telemetry: Option<&Arc<TermTelemetry>>,
103 ) -> Result<()>;
104
105 fn schema(&self) -> Option<&Arc<Schema>>;
109
110 fn description(&self) -> String;
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum CompressionType {
117 None,
119 Gzip,
121 Zstd,
123 Bzip2,
125 Lz4,
127 Snappy,
129 Auto,
131}
132
133impl CompressionType {
134 pub fn from_path(path: &str) -> Self {
136 let lower = path.to_lowercase();
137 if lower.ends_with(".gz") || lower.ends_with(".gzip") {
138 Self::Gzip
139 } else if lower.ends_with(".zst") || lower.ends_with(".zstd") {
140 Self::Zstd
141 } else if lower.ends_with(".bz2") || lower.ends_with(".bzip2") {
142 Self::Bzip2
143 } else if lower.ends_with(".lz4") {
144 Self::Lz4
145 } else if lower.ends_with(".snappy") || lower.ends_with(".sz") {
146 Self::Snappy
147 } else {
148 Self::None
149 }
150 }
151
152 pub fn extension(&self) -> &'static str {
154 match self {
155 Self::None => "",
156 Self::Gzip => ".gz",
157 Self::Zstd => ".zst",
158 Self::Bzip2 => ".bz2",
159 Self::Lz4 => ".lz4",
160 Self::Snappy => ".snappy",
161 Self::Auto => "",
162 }
163 }
164}
165
166pub(crate) async fn expand_globs(patterns: &[String]) -> Result<Vec<String>> {
168 use glob::glob;
169
170 let mut paths = Vec::new();
171 for pattern in patterns {
172 let matches = glob(pattern).map_err(|e| {
173 TermError::Configuration(format!("Invalid glob pattern '{pattern}': {e}"))
174 })?;
175
176 for entry in matches {
177 let path = entry
178 .map_err(|e| TermError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
179
180 if path.is_file() {
181 if let Some(path_str) = path.to_str() {
182 paths.push(path_str.to_string());
183 }
184 }
185 }
186 }
187
188 if paths.is_empty() {
189 return Err(TermError::DataSource {
190 source_type: "file".to_string(),
191 message: "No files found matching glob patterns".to_string(),
192 source: None,
193 });
194 }
195
196 Ok(paths)
197}
198
199#[cfg(test)]
200mod tests {
201 use super::*;
202
203 #[test]
204 fn test_compression_detection() {
205 assert_eq!(
206 CompressionType::from_path("data.csv"),
207 CompressionType::None
208 );
209 assert_eq!(
210 CompressionType::from_path("data.csv.gz"),
211 CompressionType::Gzip
212 );
213 assert_eq!(
214 CompressionType::from_path("data.CSV.GZ"),
215 CompressionType::Gzip
216 );
217 assert_eq!(
218 CompressionType::from_path("data.csv.zst"),
219 CompressionType::Zstd
220 );
221 assert_eq!(
222 CompressionType::from_path("data.csv.bz2"),
223 CompressionType::Bzip2
224 );
225 assert_eq!(
226 CompressionType::from_path("data.csv.lz4"),
227 CompressionType::Lz4
228 );
229 assert_eq!(
230 CompressionType::from_path("data.csv.snappy"),
231 CompressionType::Snappy
232 );
233 }
234
235 #[test]
236 fn test_compression_extension() {
237 assert_eq!(CompressionType::None.extension(), "");
238 assert_eq!(CompressionType::Gzip.extension(), ".gz");
239 assert_eq!(CompressionType::Zstd.extension(), ".zst");
240 assert_eq!(CompressionType::Bzip2.extension(), ".bz2");
241 assert_eq!(CompressionType::Lz4.extension(), ".lz4");
242 assert_eq!(CompressionType::Snappy.extension(), ".snappy");
243 }
244}