term_guard/sources/
mod.rs1use crate::prelude::*;
7use async_trait::async_trait;
8use datafusion::arrow::datatypes::Schema;
9use datafusion::prelude::SessionContext;
10use std::fmt::Debug;
11use std::sync::Arc;
12
13mod csv;
14mod json;
15mod parquet;
16
17#[cfg(feature = "cloud-storage")]
21mod cloud;
22
23pub use csv::{CsvOptions, CsvSource};
24pub use json::{JsonOptions, JsonSource};
25pub use parquet::{ParquetOptions, ParquetSource};
26
27#[cfg(feature = "cloud-storage")]
40pub use cloud::{AzureConfig, GcsConfig, S3Config};
41
42#[cfg(all(feature = "cloud-storage", feature = "s3"))]
43pub use cloud::{S3Auth, S3Source};
44
45#[cfg(all(feature = "cloud-storage", feature = "gcs"))]
46pub use cloud::{GcsAuth, GcsSource};
47
48#[cfg(all(feature = "cloud-storage", feature = "azure"))]
49pub use cloud::{AzureAuth, AzureBlobSource};
50
51#[async_trait]
70pub trait DataSource: Debug + Send + Sync {
71 async fn register(&self, ctx: &SessionContext, table_name: &str) -> Result<()> {
89 self.register_with_telemetry(ctx, table_name, None).await
90 }
91
92 async fn register_with_telemetry(
97 &self,
98 ctx: &SessionContext,
99 table_name: &str,
100 telemetry: Option<&Arc<TermTelemetry>>,
101 ) -> Result<()>;
102
103 fn schema(&self) -> Option<&Arc<Schema>>;
107
108 fn description(&self) -> String;
110}
111
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
114pub enum CompressionType {
115 None,
117 Gzip,
119 Zstd,
121 Bzip2,
123 Lz4,
125 Snappy,
127 Auto,
129}
130
131impl CompressionType {
132 pub fn from_path(path: &str) -> Self {
134 let lower = path.to_lowercase();
135 if lower.ends_with(".gz") || lower.ends_with(".gzip") {
136 Self::Gzip
137 } else if lower.ends_with(".zst") || lower.ends_with(".zstd") {
138 Self::Zstd
139 } else if lower.ends_with(".bz2") || lower.ends_with(".bzip2") {
140 Self::Bzip2
141 } else if lower.ends_with(".lz4") {
142 Self::Lz4
143 } else if lower.ends_with(".snappy") || lower.ends_with(".sz") {
144 Self::Snappy
145 } else {
146 Self::None
147 }
148 }
149
150 pub fn extension(&self) -> &'static str {
152 match self {
153 Self::None => "",
154 Self::Gzip => ".gz",
155 Self::Zstd => ".zst",
156 Self::Bzip2 => ".bz2",
157 Self::Lz4 => ".lz4",
158 Self::Snappy => ".snappy",
159 Self::Auto => "",
160 }
161 }
162}
163
164pub(crate) async fn expand_globs(patterns: &[String]) -> Result<Vec<String>> {
166 use glob::glob;
167
168 let mut paths = Vec::new();
169 for pattern in patterns {
170 let matches = glob(pattern).map_err(|e| {
171 TermError::Configuration(format!("Invalid glob pattern '{pattern}': {e}"))
172 })?;
173
174 for entry in matches {
175 let path = entry
176 .map_err(|e| TermError::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
177
178 if path.is_file() {
179 if let Some(path_str) = path.to_str() {
180 paths.push(path_str.to_string());
181 }
182 }
183 }
184 }
185
186 if paths.is_empty() {
187 return Err(TermError::DataSource {
188 source_type: "file".to_string(),
189 message: "No files found matching glob patterns".to_string(),
190 source: None,
191 });
192 }
193
194 Ok(paths)
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[test]
202 fn test_compression_detection() {
203 assert_eq!(
204 CompressionType::from_path("data.csv"),
205 CompressionType::None
206 );
207 assert_eq!(
208 CompressionType::from_path("data.csv.gz"),
209 CompressionType::Gzip
210 );
211 assert_eq!(
212 CompressionType::from_path("data.CSV.GZ"),
213 CompressionType::Gzip
214 );
215 assert_eq!(
216 CompressionType::from_path("data.csv.zst"),
217 CompressionType::Zstd
218 );
219 assert_eq!(
220 CompressionType::from_path("data.csv.bz2"),
221 CompressionType::Bzip2
222 );
223 assert_eq!(
224 CompressionType::from_path("data.csv.lz4"),
225 CompressionType::Lz4
226 );
227 assert_eq!(
228 CompressionType::from_path("data.csv.snappy"),
229 CompressionType::Snappy
230 );
231 }
232
233 #[test]
234 fn test_compression_extension() {
235 assert_eq!(CompressionType::None.extension(), "");
236 assert_eq!(CompressionType::Gzip.extension(), ".gz");
237 assert_eq!(CompressionType::Zstd.extension(), ".zst");
238 assert_eq!(CompressionType::Bzip2.extension(), ".bz2");
239 assert_eq!(CompressionType::Lz4.extension(), ".lz4");
240 assert_eq!(CompressionType::Snappy.extension(), ".snappy");
241 }
242}