elasticube_core/
sources.rs

1//! Data source connectors for ElastiCube
2
3use crate::error::{Error, Result};
4use arrow::datatypes::Schema as ArrowSchema;
5use arrow::record_batch::{RecordBatch, RecordBatchReader};
6use std::fs::File;
7use std::io::BufReader;
8use std::sync::Arc;
9
10/// Trait for data sources that can load data into a cube
11///
12/// Data sources must be Send + Sync to allow use in multi-threaded contexts,
13/// particularly for Python bindings via PyO3.
14pub trait DataSource: std::fmt::Debug + Send + Sync {
15    /// Load data from the source
16    ///
17    /// Returns a tuple of (Arrow schema, vector of RecordBatches)
18    fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)>;
19}
20
21/// CSV data source configuration
22#[derive(Debug, Clone)]
23pub struct CsvSource {
24    /// Path to the CSV file
25    path: String,
26
27    /// Whether the CSV has a header row
28    has_header: bool,
29
30    /// Batch size for reading (number of rows per batch)
31    batch_size: usize,
32
33    /// Optional schema (if None, will be inferred)
34    schema: Option<Arc<ArrowSchema>>,
35
36    /// Delimiter character (default: ',')
37    delimiter: u8,
38}
39
40impl CsvSource {
41    /// Create a new CSV source
42    pub fn new(path: impl Into<String>) -> Self {
43        Self {
44            path: path.into(),
45            has_header: true,
46            batch_size: 8192,
47            schema: None,
48            delimiter: b',',
49        }
50    }
51
52    /// Set whether the CSV has a header row
53    pub fn with_header(mut self, has_header: bool) -> Self {
54        self.has_header = has_header;
55        self
56    }
57
58    /// Set the batch size for reading
59    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
60        self.batch_size = batch_size;
61        self
62    }
63
64    /// Set the expected schema
65    pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
66        self.schema = Some(schema);
67        self
68    }
69
70    /// Set the delimiter character
71    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
72        self.delimiter = delimiter;
73        self
74    }
75}
76
77impl DataSource for CsvSource {
78    fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
79        use arrow_csv::ReaderBuilder;
80
81        // Open the file
82        let file = File::open(&self.path).map_err(|e| {
83            Error::io(format!("Failed to open CSV file '{}': {}", self.path, e))
84        })?;
85
86        // Create format with delimiter
87        let format = arrow_csv::reader::Format::default()
88            .with_header(self.has_header)
89            .with_delimiter(self.delimiter);
90
91        // Build the CSV reader with or without schema
92        let reader = if let Some(schema) = &self.schema {
93            ReaderBuilder::new(schema.clone())
94                .with_format(format)
95                .with_batch_size(self.batch_size)
96                .build(file)
97                .map_err(|e| {
98                    Error::arrow(format!("Failed to create CSV reader: {}", e))
99                })?
100        } else {
101            // For schema inference, create a buffered reader first
102            let buf_reader = BufReader::new(file);
103            let (inferred_schema, _) = format.infer_schema(buf_reader, Some(100))
104                .map_err(|e| {
105                    Error::arrow(format!("Failed to infer CSV schema: {}", e))
106                })?;
107
108            // Re-open the file for reading
109            let file = File::open(&self.path).map_err(|e| {
110                Error::io(format!("Failed to re-open CSV file '{}': {}", self.path, e))
111            })?;
112
113            ReaderBuilder::new(Arc::new(inferred_schema))
114                .with_format(format)
115                .with_batch_size(self.batch_size)
116                .build(file)
117                .map_err(|e| {
118                    Error::arrow(format!("Failed to create CSV reader: {}", e))
119                })?
120        };
121
122        // Get the schema from the reader
123        let schema = reader.schema();
124
125        // Read all batches
126        let mut batches = Vec::new();
127        for batch_result in reader {
128            let batch = batch_result.map_err(|e| {
129                Error::arrow(format!("Failed to read CSV batch: {}", e))
130            })?;
131            batches.push(batch);
132        }
133
134        if batches.is_empty() {
135            return Err(Error::data(format!("CSV file '{}' is empty", self.path)));
136        }
137
138        Ok((schema, batches))
139    }
140}
141
142/// Parquet data source configuration
143#[derive(Debug, Clone)]
144pub struct ParquetSource {
145    /// Path to the Parquet file
146    path: String,
147
148    /// Batch size for reading
149    batch_size: usize,
150}
151
152impl ParquetSource {
153    /// Create a new Parquet source
154    pub fn new(path: impl Into<String>) -> Self {
155        Self {
156            path: path.into(),
157            batch_size: 8192,
158        }
159    }
160
161    /// Set the batch size for reading
162    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
163        self.batch_size = batch_size;
164        self
165    }
166}
167
168impl DataSource for ParquetSource {
169    fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
170        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
171
172        // Open the file
173        let file = File::open(&self.path).map_err(|e| {
174            Error::io(format!("Failed to open Parquet file '{}': {}", self.path, e))
175        })?;
176
177        // Create the Parquet reader
178        let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
179            Error::arrow(format!("Failed to create Parquet reader: {}", e))
180        })?;
181
182        let schema = builder.schema().clone();
183
184        let reader = builder
185            .with_batch_size(self.batch_size)
186            .build()
187            .map_err(|e| {
188                Error::arrow(format!("Failed to build Parquet reader: {}", e))
189            })?;
190
191        // Read all batches
192        let mut batches = Vec::new();
193        for batch_result in reader {
194            let batch = batch_result.map_err(|e| {
195                Error::arrow(format!("Failed to read Parquet batch: {}", e))
196            })?;
197            batches.push(batch);
198        }
199
200        if batches.is_empty() {
201            return Err(Error::data(format!("Parquet file '{}' is empty", self.path)));
202        }
203
204        Ok((schema, batches))
205    }
206}
207
208/// JSON data source configuration
209#[derive(Debug, Clone)]
210pub struct JsonSource {
211    /// Path to the JSON file
212    path: String,
213
214    /// Batch size for reading
215    batch_size: usize,
216
217    /// Optional schema (if None, will be inferred)
218    schema: Option<Arc<ArrowSchema>>,
219}
220
221impl JsonSource {
222    /// Create a new JSON source
223    pub fn new(path: impl Into<String>) -> Self {
224        Self {
225            path: path.into(),
226            batch_size: 8192,
227            schema: None,
228        }
229    }
230
231    /// Set the batch size for reading
232    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
233        self.batch_size = batch_size;
234        self
235    }
236
237    /// Set the expected schema
238    pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
239        self.schema = Some(schema);
240        self
241    }
242}
243
244impl DataSource for JsonSource {
245    fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
246        use arrow_json::ReaderBuilder;
247
248        // Open the file with buffered reader
249        let file = File::open(&self.path).map_err(|e| {
250            Error::io(format!("Failed to open JSON file '{}': {}", self.path, e))
251        })?;
252        let buf_reader = BufReader::new(file);
253
254        // Build the JSON reader
255        let reader = if let Some(schema) = &self.schema {
256            ReaderBuilder::new(schema.clone())
257                .with_batch_size(self.batch_size)
258                .build(buf_reader)
259                .map_err(|e| {
260                    Error::arrow(format!("Failed to create JSON reader: {}", e))
261                })?
262        } else {
263            // For schema inference, read and infer first
264            let file_for_infer = File::open(&self.path).map_err(|e| {
265                Error::io(format!("Failed to open JSON file for schema inference '{}': {}", self.path, e))
266            })?;
267            let buf_reader_infer = BufReader::new(file_for_infer);
268
269            let inferred_result = arrow_json::reader::infer_json_schema(buf_reader_infer, Some(100))
270                .map_err(|e| {
271                    Error::arrow(format!("Failed to infer JSON schema: {}", e))
272                })?;
273
274            // Extract schema from tuple (schema, inferred_rows)
275            let inferred_schema = inferred_result.0;
276
277            // Re-open the file for reading data
278            let file = File::open(&self.path).map_err(|e| {
279                Error::io(format!("Failed to re-open JSON file '{}': {}", self.path, e))
280            })?;
281            let buf_reader = BufReader::new(file);
282
283            ReaderBuilder::new(Arc::new(inferred_schema))
284                .with_batch_size(self.batch_size)
285                .build(buf_reader)
286                .map_err(|e| {
287                    Error::arrow(format!("Failed to create JSON reader: {}", e))
288                })?
289        };
290
291        let schema = reader.schema();
292
293        // Read all batches
294        let mut batches = Vec::new();
295        for batch_result in reader {
296            let batch = batch_result.map_err(|e| {
297                Error::arrow(format!("Failed to read JSON batch: {}", e))
298            })?;
299            batches.push(batch);
300        }
301
302        if batches.is_empty() {
303            return Err(Error::data(format!("JSON file '{}' is empty", self.path)));
304        }
305
306        Ok((schema, batches))
307    }
308}
309
310/// In-memory data source from Arrow RecordBatches
311#[derive(Debug)]
312pub struct RecordBatchSource {
313    schema: Arc<ArrowSchema>,
314    batches: Vec<RecordBatch>,
315}
316
317impl RecordBatchSource {
318    /// Create a new in-memory source from RecordBatches
319    pub fn new(schema: Arc<ArrowSchema>, batches: Vec<RecordBatch>) -> Result<Self> {
320        if batches.is_empty() {
321            return Err(Error::data("RecordBatchSource requires at least one batch"));
322        }
323
324        // Validate that all batches have the same schema
325        for batch in &batches {
326            if batch.schema().as_ref() != schema.as_ref() {
327                return Err(Error::schema(
328                    "All RecordBatches must have the same schema as the provided schema"
329                ));
330            }
331        }
332
333        Ok(Self { schema, batches })
334    }
335}
336
337impl DataSource for RecordBatchSource {
338    fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
339        Ok((self.schema.clone(), self.batches.clone()))
340    }
341}
342
343// ==============================================================================
344// Database Sources (via ODBC)
345// ==============================================================================
346
347#[cfg(feature = "database")]
348pub mod database {
349    use super::*;
350    use arrow_odbc::OdbcReaderBuilder;
351    use arrow_odbc::odbc_api::{Environment, ConnectionOptions};
352
353    /// Configuration for connecting to databases via ODBC
354    ///
355    /// Supports PostgreSQL, MySQL, SQL Server, SQLite, and any ODBC-compatible database.
356    ///
357    /// # Example Connection Strings
358    ///
359    /// **PostgreSQL**:
360    /// ```text
361    /// Driver={PostgreSQL Unicode};Server=localhost;Port=5432;Database=mydb;Uid=user;Pwd=pass;
362    /// ```
363    ///
364    /// **MySQL**:
365    /// ```text
366    /// Driver={MySQL ODBC 8.0 Unicode Driver};Server=localhost;Port=3306;Database=mydb;Uid=user;Pwd=pass;
367    /// ```
368    ///
369    /// **SQL Server**:
370    /// ```text
371    /// Driver={ODBC Driver 17 for SQL Server};Server=localhost;Database=mydb;Uid=user;Pwd=pass;
372    /// ```
373    #[derive(Debug, Clone)]
374    pub struct OdbcSource {
375        /// ODBC connection string
376        connection_string: String,
377
378        /// SQL query to execute
379        query: String,
380
381        /// Maximum bytes per batch (default: 8MB)
382        max_bytes_per_batch: usize,
383
384        /// Maximum number of rows to fetch (None = unlimited)
385        max_rows: Option<usize>,
386    }
387
388    impl OdbcSource {
389        /// Create a new ODBC data source
390        ///
391        /// # Arguments
392        /// * `connection_string` - ODBC connection string
393        /// * `query` - SQL query to execute
394        ///
395        /// # Example
396        /// ```rust,ignore
397        /// let source = OdbcSource::new(
398        ///     "Driver={PostgreSQL Unicode};Server=localhost;Database=sales;Uid=user;Pwd=pass",
399        ///     "SELECT * FROM transactions WHERE date >= '2025-01-01'"
400        /// );
401        /// ```
402        pub fn new(connection_string: impl Into<String>, query: impl Into<String>) -> Self {
403            Self {
404                connection_string: connection_string.into(),
405                query: query.into(),
406                max_bytes_per_batch: 8 * 1024 * 1024, // 8MB default
407                max_rows: None,
408            }
409        }
410
411        /// Set the maximum bytes per batch
412        pub fn with_max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
413            self.max_bytes_per_batch = max_bytes;
414            self
415        }
416
417        /// Set maximum number of rows to fetch
418        pub fn with_max_rows(mut self, max_rows: usize) -> Self {
419            self.max_rows = Some(max_rows);
420            self
421        }
422    }
423
424    impl DataSource for OdbcSource {
425        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
426            // Create ODBC environment
427            let env = Environment::new().map_err(|e| {
428                Error::data(format!("Failed to create ODBC environment: {}", e))
429            })?;
430
431            // Connect to database
432            let conn = env.connect_with_connection_string(
433                &self.connection_string,
434                ConnectionOptions::default()
435            ).map_err(|e| {
436                Error::data(format!("Failed to connect to database: {}", e))
437            })?;
438
439            // Execute query to get cursor
440            // Third parameter is max_rows (None = unlimited)
441            let cursor = match conn.execute(&self.query, (), self.max_rows).map_err(|e| {
442                Error::data(format!("Failed to execute SQL query: {}", e))
443            })? {
444                Some(cursor) => cursor,
445                None => {
446                    return Err(Error::data("SQL query did not return a result set (cursor). Use SELECT statements for data loading."));
447                }
448            };
449
450            // Build the ODBC reader
451            let reader = OdbcReaderBuilder::new()
452                .with_max_bytes_per_batch(self.max_bytes_per_batch)
453                .build(cursor)
454                .map_err(|e| {
455                    Error::data(format!("Failed to create ODBC reader: {}", e))
456                })?;
457
458            let schema = reader.schema();
459
460            // Read all batches
461            // Note: max_rows is already handled by the execute() method above
462            let mut batches = Vec::new();
463
464            for batch_result in reader {
465                let batch = batch_result.map_err(|e| {
466                    Error::arrow(format!("Failed to read ODBC batch: {}", e))
467                })?;
468                batches.push(batch);
469            }
470
471            if batches.is_empty() {
472                return Err(Error::data("ODBC query returned no results"));
473            }
474
475            Ok((schema, batches))
476        }
477    }
478
479    /// Convenience wrapper for PostgreSQL connections
480    ///
481    /// # Example
482    /// ```rust,ignore
483    /// let source = PostgresSource::new("localhost", "mydb", "user", "pass")
484    ///     .with_port(5432)
485    ///     .with_query("SELECT * FROM sales");
486    /// ```
487    #[derive(Debug, Clone)]
488    pub struct PostgresSource {
489        host: String,
490        database: String,
491        username: String,
492        password: String,
493        port: u16,
494        query: String,
495        max_bytes_per_batch: usize,
496    }
497
498    impl PostgresSource {
499        /// Create a new PostgreSQL data source
500        pub fn new(
501            host: impl Into<String>,
502            database: impl Into<String>,
503            username: impl Into<String>,
504            password: impl Into<String>,
505        ) -> Self {
506            Self {
507                host: host.into(),
508                database: database.into(),
509                username: username.into(),
510                password: password.into(),
511                port: 5432,
512                query: String::new(),
513                max_bytes_per_batch: 8 * 1024 * 1024, // 8MB default
514            }
515        }
516
517        /// Set the port (default: 5432)
518        pub fn with_port(mut self, port: u16) -> Self {
519            self.port = port;
520            self
521        }
522
523        /// Set the SQL query to execute
524        pub fn with_query(mut self, query: impl Into<String>) -> Self {
525            self.query = query.into();
526            self
527        }
528
529        /// Set the maximum bytes per batch
530        pub fn with_max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
531            self.max_bytes_per_batch = max_bytes;
532            self
533        }
534
535        /// Build the ODBC connection string
536        pub(crate) fn connection_string(&self) -> String {
537            format!(
538                "Driver={{PostgreSQL Unicode}};Server={};Port={};Database={};Uid={};Pwd={};",
539                self.host, self.port, self.database, self.username, self.password
540            )
541        }
542    }
543
544    impl DataSource for PostgresSource {
545        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
546            if self.query.is_empty() {
547                return Err(Error::data("PostgreSQL query cannot be empty. Use with_query() to set it."));
548            }
549
550            let odbc_source = OdbcSource::new(self.connection_string(), &self.query)
551                .with_max_bytes_per_batch(self.max_bytes_per_batch);
552
553            odbc_source.load()
554        }
555    }
556
557    /// Convenience wrapper for MySQL connections
558    ///
559    /// # Example
560    /// ```rust,ignore
561    /// let source = MySqlSource::new("localhost", "mydb", "user", "pass")
562    ///     .with_port(3306)
563    ///     .with_query("SELECT * FROM orders");
564    /// ```
565    #[derive(Debug, Clone)]
566    pub struct MySqlSource {
567        host: String,
568        database: String,
569        username: String,
570        password: String,
571        port: u16,
572        query: String,
573        max_bytes_per_batch: usize,
574    }
575
576    impl MySqlSource {
577        /// Create a new MySQL data source
578        pub fn new(
579            host: impl Into<String>,
580            database: impl Into<String>,
581            username: impl Into<String>,
582            password: impl Into<String>,
583        ) -> Self {
584            Self {
585                host: host.into(),
586                database: database.into(),
587                username: username.into(),
588                password: password.into(),
589                port: 3306,
590                query: String::new(),
591                max_bytes_per_batch: 8 * 1024 * 1024, // 8MB default
592            }
593        }
594
595        /// Set the port (default: 3306)
596        pub fn with_port(mut self, port: u16) -> Self {
597            self.port = port;
598            self
599        }
600
601        /// Set the SQL query to execute
602        pub fn with_query(mut self, query: impl Into<String>) -> Self {
603            self.query = query.into();
604            self
605        }
606
607        /// Set the maximum bytes per batch
608        pub fn with_max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
609            self.max_bytes_per_batch = max_bytes;
610            self
611        }
612
613        /// Build the ODBC connection string
614        pub(crate) fn connection_string(&self) -> String {
615            format!(
616                "Driver={{MySQL ODBC 8.0 Unicode Driver}};Server={};Port={};Database={};Uid={};Pwd={};",
617                self.host, self.port, self.database, self.username, self.password
618            )
619        }
620    }
621
622    impl DataSource for MySqlSource {
623        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
624            if self.query.is_empty() {
625                return Err(Error::data("MySQL query cannot be empty. Use with_query() to set it."));
626            }
627
628            let odbc_source = OdbcSource::new(self.connection_string(), &self.query)
629                .with_max_bytes_per_batch(self.max_bytes_per_batch);
630
631            odbc_source.load()
632        }
633    }
634}
635
636// ==============================================================================
637// REST API Sources
638// ==============================================================================
639
640#[cfg(feature = "rest-api")]
641pub mod rest {
642    use super::*;
643    use reqwest::blocking::Client;
644    use std::collections::HashMap;
645    use std::io::Cursor;
646
647    /// HTTP method for REST API requests
648    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
649    pub enum HttpMethod {
650        Get,
651        Post,
652    }
653
654    /// REST API data source that fetches JSON data
655    ///
656    /// Supports GET and POST requests with optional headers and query parameters.
657    /// The response must be JSON (either array of objects or single object).
658    ///
659    /// # Example
660    /// ```rust,ignore
661    /// let source = RestApiSource::new("https://api.example.com/sales")
662    ///     .with_method(HttpMethod::Get)
663    ///     .with_header("Authorization", "Bearer token123")
664    ///     .with_query_param("date_from", "2024-01-01");
665    /// ```
666    #[derive(Debug, Clone)]
667    pub struct RestApiSource {
668        /// Base URL for the API endpoint
669        url: String,
670
671        /// HTTP method (GET or POST)
672        method: HttpMethod,
673
674        /// HTTP headers
675        headers: HashMap<String, String>,
676
677        /// Query parameters (for GET requests)
678        query_params: HashMap<String, String>,
679
680        /// Request body (for POST requests)
681        body: Option<String>,
682
683        /// Batch size for reading
684        batch_size: usize,
685
686        /// Optional schema (if None, will be inferred from JSON)
687        schema: Option<Arc<ArrowSchema>>,
688
689        /// Timeout in seconds (default: 30)
690        timeout_secs: u64,
691    }
692
693    impl RestApiSource {
694        /// Create a new REST API data source
695        pub fn new(url: impl Into<String>) -> Self {
696            Self {
697                url: url.into(),
698                method: HttpMethod::Get,
699                headers: HashMap::new(),
700                query_params: HashMap::new(),
701                body: None,
702                batch_size: 8192,
703                schema: None,
704                timeout_secs: 30,
705            }
706        }
707
708        /// Set the HTTP method
709        pub fn with_method(mut self, method: HttpMethod) -> Self {
710            self.method = method;
711            self
712        }
713
714        /// Add an HTTP header
715        pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
716            self.headers.insert(key.into(), value.into());
717            self
718        }
719
720        /// Add a query parameter
721        pub fn with_query_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
722            self.query_params.insert(key.into(), value.into());
723            self
724        }
725
726        /// Set the request body (for POST requests)
727        pub fn with_body(mut self, body: impl Into<String>) -> Self {
728            self.body = Some(body.into());
729            self
730        }
731
732        /// Set the batch size
733        pub fn with_batch_size(mut self, batch_size: usize) -> Self {
734            self.batch_size = batch_size;
735            self
736        }
737
738        /// Set the expected schema
739        pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
740            self.schema = Some(schema);
741            self
742        }
743
744        /// Set the request timeout in seconds
745        pub fn with_timeout_secs(mut self, timeout_secs: u64) -> Self {
746            self.timeout_secs = timeout_secs;
747            self
748        }
749    }
750
751    impl DataSource for RestApiSource {
752        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
753            use arrow_json::ReaderBuilder;
754
755            // Build the HTTP client
756            let client = Client::builder()
757                .timeout(std::time::Duration::from_secs(self.timeout_secs))
758                .build()
759                .map_err(|e| Error::io(format!("Failed to create HTTP client: {}", e)))?;
760
761            // Build the URL with query parameters
762            let mut url = url::Url::parse(&self.url)
763                .map_err(|e| Error::data(format!("Invalid URL '{}': {}", self.url, e)))?;
764
765            for (key, value) in &self.query_params {
766                url.query_pairs_mut().append_pair(key, value);
767            }
768
769            // Build the request
770            let mut request = match self.method {
771                HttpMethod::Get => client.get(url.as_str()),
772                HttpMethod::Post => {
773                    let mut req = client.post(url.as_str());
774                    if let Some(body) = &self.body {
775                        req = req.body(body.clone());
776                    }
777                    req
778                }
779            };
780
781            // Add headers
782            for (key, value) in &self.headers {
783                request = request.header(key, value);
784            }
785
786            // Execute the request
787            let response = request
788                .send()
789                .map_err(|e| Error::io(format!("HTTP request failed: {}", e)))?;
790
791            // Check status
792            if !response.status().is_success() {
793                return Err(Error::data(format!(
794                    "HTTP request failed with status {}: {}",
795                    response.status(),
796                    response.text().unwrap_or_default()
797                )));
798            }
799
800            // Get the response body as bytes
801            let response_bytes = response
802                .bytes()
803                .map_err(|e| Error::io(format!("Failed to read HTTP response: {}", e)))?;
804
805            // Parse JSON and convert to Arrow RecordBatch
806            let cursor = Cursor::new(response_bytes.as_ref());
807
808            // Build the JSON reader
809            let reader = if let Some(schema) = &self.schema {
810                ReaderBuilder::new(schema.clone())
811                    .with_batch_size(self.batch_size)
812                    .build(cursor)
813                    .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
814            } else {
815                // Infer schema from JSON
816                let cursor_for_infer = Cursor::new(response_bytes.as_ref());
817                let inferred_result = arrow_json::reader::infer_json_schema(cursor_for_infer, None)
818                    .map_err(|e| Error::arrow(format!("Failed to infer JSON schema from API response: {}", e)))?;
819
820                let inferred_schema = inferred_result.0;
821                let cursor = Cursor::new(response_bytes.as_ref());
822
823                ReaderBuilder::new(Arc::new(inferred_schema))
824                    .with_batch_size(self.batch_size)
825                    .build(cursor)
826                    .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
827            };
828
829            let schema = reader.schema();
830
831            // Read all batches
832            let mut batches = Vec::new();
833            for batch_result in reader {
834                let batch = batch_result.map_err(|e| {
835                    Error::arrow(format!("Failed to read JSON batch from API response: {}", e))
836                })?;
837                batches.push(batch);
838            }
839
840            if batches.is_empty() {
841                return Err(Error::data(format!("API response from '{}' is empty", self.url)));
842            }
843
844            Ok((schema, batches))
845        }
846    }
847}
848
849#[cfg(test)]
850mod tests {
851    use super::*;
852
853    #[test]
854    fn test_csv_source_builder() {
855        let source = CsvSource::new("test.csv")
856            .with_header(true)
857            .with_batch_size(1024)
858            .with_delimiter(b';');
859
860        assert_eq!(source.path, "test.csv");
861        assert_eq!(source.has_header, true);
862        assert_eq!(source.batch_size, 1024);
863        assert_eq!(source.delimiter, b';');
864    }
865
866    #[test]
867    fn test_parquet_source_builder() {
868        let source = ParquetSource::new("test.parquet")
869            .with_batch_size(2048);
870
871        assert_eq!(source.path, "test.parquet");
872        assert_eq!(source.batch_size, 2048);
873    }
874
875    #[test]
876    fn test_json_source_builder() {
877        let source = JsonSource::new("test.json")
878            .with_batch_size(512);
879
880        assert_eq!(source.path, "test.json");
881        assert_eq!(source.batch_size, 512);
882    }
883
884    #[cfg(feature = "database")]
885    #[test]
886    fn test_postgres_source_builder() {
887        let source = database::PostgresSource::new("localhost", "testdb", "user", "pass")
888            .with_port(5432)
889            .with_query("SELECT * FROM test");
890
891        // Test connection string generation
892        assert_eq!(source.connection_string(),
893            "Driver={PostgreSQL Unicode};Server=localhost;Port=5432;Database=testdb;Uid=user;Pwd=pass;");
894    }
895
896    #[cfg(feature = "database")]
897    #[test]
898    fn test_mysql_source_builder() {
899        let source = database::MySqlSource::new("localhost", "testdb", "user", "pass")
900            .with_port(3306)
901            .with_query("SELECT * FROM test");
902
903        // Test connection string generation
904        assert_eq!(source.connection_string(),
905            "Driver={MySQL ODBC 8.0 Unicode Driver};Server=localhost;Port=3306;Database=testdb;Uid=user;Pwd=pass;");
906    }
907
908    #[cfg(feature = "rest-api")]
909    #[test]
910    fn test_rest_api_source_builder() {
911        // Just test that the builder pattern works without errors
912        let _source = rest::RestApiSource::new("https://api.example.com/data")
913            .with_method(rest::HttpMethod::Get)
914            .with_header("Authorization", "Bearer token")
915            .with_query_param("limit", "100")
916            .with_batch_size(512)
917            .with_timeout_secs(60);
918
919        // Builder pattern works - source created successfully
920        assert!(true);
921    }
922
923    #[cfg(feature = "object-storage")]
924    #[test]
925    fn test_s3_source_builder() {
926        use object_storage::{S3Source, StorageFileFormat};
927
928        let source = S3Source::new("my-bucket", "data/sales.parquet")
929            .with_region("us-west-2")
930            .with_format(StorageFileFormat::Parquet)
931            .with_batch_size(4096);
932
933        // Builder pattern works - source created successfully
934        assert!(true);
935    }
936
937    #[cfg(feature = "object-storage")]
938    #[test]
939    fn test_gcs_source_builder() {
940        use object_storage::{GcsSource, StorageFileFormat};
941
942        let source = GcsSource::new("my-bucket", "data/analytics.json")
943            .with_format(StorageFileFormat::Json)
944            .with_batch_size(8192);
945
946        // Builder pattern works - source created successfully
947        assert!(true);
948    }
949
950    #[cfg(feature = "object-storage")]
951    #[test]
952    fn test_azure_source_builder() {
953        use object_storage::{AzureSource, StorageFileFormat};
954
955        let source = AzureSource::new("myaccount", "mycontainer", "data/logs.csv")
956            .with_format(StorageFileFormat::Csv)
957            .with_batch_size(2048);
958
959        // Builder pattern works - source created successfully
960        assert!(true);
961    }
962}
963
964// ==============================================================================
965// Object Storage Sources (S3, GCS, Azure)
966// ==============================================================================
967
968#[cfg(feature = "object-storage")]
969pub mod object_storage {
970    use super::*;
971    use bytes::Bytes;
972    use object_store::{ObjectStore, path::Path as ObjectPath};
973    use std::sync::Arc as StdArc;
974
975    /// File format for object storage files
976    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
977    pub enum StorageFileFormat {
978        /// Parquet format
979        Parquet,
980        /// CSV format
981        Csv,
982        /// JSON format (newline-delimited JSON)
983        Json,
984    }
985
986    /// Generic object storage source that works with any ObjectStore backend
987    ///
988    /// This can be used with S3, GCS, Azure, or local file system via object_store.
989    ///
990    /// # Example
991    /// ```rust,ignore
992    /// use object_store::aws::AmazonS3Builder;
993    ///
994    /// let store = AmazonS3Builder::new()
995    ///     .with_bucket_name("my-bucket")
996    ///     .with_region("us-west-2")
997    ///     .build()?;
998    ///
999    /// let source = ObjectStorageSource::new(store, "data/sales.parquet")
1000    ///     .with_format(StorageFileFormat::Parquet);
1001    /// ```
1002    #[derive(Debug)]
1003    pub struct ObjectStorageSource {
1004        /// Object store instance
1005        store: StdArc<dyn ObjectStore>,
1006
1007        /// Path to the file in the object store
1008        path: String,
1009
1010        /// File format
1011        format: StorageFileFormat,
1012
1013        /// Batch size for reading
1014        batch_size: usize,
1015
1016        /// Optional schema for CSV/JSON
1017        schema: Option<Arc<ArrowSchema>>,
1018
1019        /// CSV-specific: has header row
1020        csv_has_header: bool,
1021
1022        /// CSV-specific: delimiter
1023        csv_delimiter: u8,
1024    }
1025
1026    impl ObjectStorageSource {
1027        /// Create a new object storage source
1028        ///
1029        /// # Arguments
1030        /// * `store` - ObjectStore instance (S3, GCS, Azure, etc.)
1031        /// * `path` - Path to the file in the object store
1032        pub fn new(store: StdArc<dyn ObjectStore>, path: impl Into<String>) -> Self {
1033            Self {
1034                store,
1035                path: path.into(),
1036                format: StorageFileFormat::Parquet,
1037                batch_size: 8192,
1038                schema: None,
1039                csv_has_header: true,
1040                csv_delimiter: b',',
1041            }
1042        }
1043
1044        /// Set the file format
1045        pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1046            self.format = format;
1047            self
1048        }
1049
1050        /// Set the batch size
1051        pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1052            self.batch_size = batch_size;
1053            self
1054        }
1055
1056        /// Set the schema (for CSV/JSON)
1057        pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1058            self.schema = Some(schema);
1059            self
1060        }
1061
1062        /// Set CSV header flag
1063        pub fn with_csv_header(mut self, has_header: bool) -> Self {
1064            self.csv_has_header = has_header;
1065            self
1066        }
1067
1068        /// Set CSV delimiter
1069        pub fn with_csv_delimiter(mut self, delimiter: u8) -> Self {
1070            self.csv_delimiter = delimiter;
1071            self
1072        }
1073
1074        /// Download the file from object storage
1075        async fn download_file(&self) -> Result<Bytes> {
1076            let path = ObjectPath::from(self.path.as_str());
1077
1078            // Use get() to fetch the entire object
1079            let result = self.store.get(&path).await.map_err(|e| {
1080                Error::io(format!("Failed to download file '{}' from object storage: {}", self.path, e))
1081            })?;
1082
1083            // Read all bytes
1084            let bytes = result.bytes().await.map_err(|e| {
1085                Error::io(format!("Failed to read bytes from object storage: {}", e))
1086            })?;
1087
1088            Ok(bytes)
1089        }
1090    }
1091
1092    impl DataSource for ObjectStorageSource {
1093        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1094            // Use tokio runtime to run async code
1095            let runtime = tokio::runtime::Runtime::new().map_err(|e| {
1096                Error::io(format!("Failed to create tokio runtime: {}", e))
1097            })?;
1098
1099            runtime.block_on(async {
1100                // Download the file
1101                let bytes = self.download_file().await?;
1102
1103                // Parse based on format
1104                match self.format {
1105                    StorageFileFormat::Parquet => {
1106                        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1107
1108                        // ParquetRecordBatchReaderBuilder requires a type that implements ChunkReader
1109                        // Bytes implements ChunkReader directly, so we don't need Cursor
1110                        let builder = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).map_err(|e| {
1111                            Error::arrow(format!("Failed to create Parquet reader: {}", e))
1112                        })?;
1113
1114                        let schema = builder.schema().clone();
1115                        let reader = builder.with_batch_size(self.batch_size).build().map_err(|e| {
1116                            Error::arrow(format!("Failed to build Parquet reader: {}", e))
1117                        })?;
1118
1119                        let mut batches = Vec::new();
1120                        for batch_result in reader {
1121                            let batch = batch_result.map_err(|e| {
1122                                Error::arrow(format!("Failed to read Parquet batch: {}", e))
1123                            })?;
1124                            batches.push(batch);
1125                        }
1126
1127                        if batches.is_empty() {
1128                            return Err(Error::data(format!("Parquet file '{}' is empty", self.path)));
1129                        }
1130
1131                        Ok((schema, batches))
1132                    }
1133
1134                    StorageFileFormat::Csv => {
1135                        use arrow_csv::ReaderBuilder;
1136                        use std::io::Cursor;
1137
1138                        let format = arrow_csv::reader::Format::default()
1139                            .with_header(self.csv_has_header)
1140                            .with_delimiter(self.csv_delimiter);
1141
1142                        let reader = if let Some(schema) = &self.schema {
1143                            let cursor = Cursor::new(bytes);
1144                            ReaderBuilder::new(schema.clone())
1145                                .with_format(format)
1146                                .with_batch_size(self.batch_size)
1147                                .build(cursor)
1148                                .map_err(|e| Error::arrow(format!("Failed to create CSV reader: {}", e)))?
1149                        } else {
1150                            // Infer schema
1151                            let cursor_for_infer = Cursor::new(bytes.clone());
1152                            let buf_reader = BufReader::new(cursor_for_infer);
1153                            let (inferred_schema, _) = format.infer_schema(buf_reader, Some(100))
1154                                .map_err(|e| Error::arrow(format!("Failed to infer CSV schema: {}", e)))?;
1155
1156                            let cursor = Cursor::new(bytes);
1157                            ReaderBuilder::new(Arc::new(inferred_schema))
1158                                .with_format(format)
1159                                .with_batch_size(self.batch_size)
1160                                .build(cursor)
1161                                .map_err(|e| Error::arrow(format!("Failed to create CSV reader: {}", e)))?
1162                        };
1163
1164                        let schema = reader.schema();
1165                        let mut batches = Vec::new();
1166                        for batch_result in reader {
1167                            let batch = batch_result.map_err(|e| {
1168                                Error::arrow(format!("Failed to read CSV batch: {}", e))
1169                            })?;
1170                            batches.push(batch);
1171                        }
1172
1173                        if batches.is_empty() {
1174                            return Err(Error::data(format!("CSV file '{}' is empty", self.path)));
1175                        }
1176
1177                        Ok((schema, batches))
1178                    }
1179
1180                    StorageFileFormat::Json => {
1181                        use arrow_json::ReaderBuilder;
1182                        use std::io::Cursor;
1183
1184                        let cursor = Cursor::new(bytes.clone());
1185
1186                        let reader = if let Some(schema) = &self.schema {
1187                            ReaderBuilder::new(schema.clone())
1188                                .with_batch_size(self.batch_size)
1189                                .build(cursor)
1190                                .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
1191                        } else {
1192                            // Infer schema
1193                            let cursor_for_infer = Cursor::new(bytes.clone());
1194                            let buf_reader = BufReader::new(cursor_for_infer);
1195                            let inferred_result = arrow_json::reader::infer_json_schema(buf_reader, Some(100))
1196                                .map_err(|e| Error::arrow(format!("Failed to infer JSON schema: {}", e)))?;
1197
1198                            let inferred_schema = inferred_result.0;
1199                            let cursor = Cursor::new(bytes);
1200                            ReaderBuilder::new(Arc::new(inferred_schema))
1201                                .with_batch_size(self.batch_size)
1202                                .build(cursor)
1203                                .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
1204                        };
1205
1206                        let schema = reader.schema();
1207                        let mut batches = Vec::new();
1208                        for batch_result in reader {
1209                            let batch = batch_result.map_err(|e| {
1210                                Error::arrow(format!("Failed to read JSON batch: {}", e))
1211                            })?;
1212                            batches.push(batch);
1213                        }
1214
1215                        if batches.is_empty() {
1216                            return Err(Error::data(format!("JSON file '{}' is empty", self.path)));
1217                        }
1218
1219                        Ok((schema, batches))
1220                    }
1221                }
1222            })
1223        }
1224    }
1225
1226    /// AWS S3 data source
1227    ///
1228    /// # Example
1229    /// ```rust,ignore
1230    /// let source = S3Source::new("my-bucket", "data/sales.parquet")
1231    ///     .with_region("us-west-2")
1232    ///     .with_format(StorageFileFormat::Parquet);
1233    /// ```
1234    #[derive(Debug, Clone)]
1235    pub struct S3Source {
1236        bucket: String,
1237        path: String,
1238        region: Option<String>,
1239        access_key_id: Option<String>,
1240        secret_access_key: Option<String>,
1241        endpoint: Option<String>,
1242        format: StorageFileFormat,
1243        batch_size: usize,
1244        schema: Option<Arc<ArrowSchema>>,
1245    }
1246
1247    impl S3Source {
1248        /// Create a new S3 data source
1249        ///
1250        /// # Arguments
1251        /// * `bucket` - S3 bucket name
1252        /// * `path` - Path to the file in the bucket (e.g., "data/sales.parquet")
1253        ///
1254        /// # Authentication
1255        /// By default, uses AWS credentials from environment variables or ~/.aws/credentials.
1256        /// Use `with_access_key()` to provide explicit credentials.
1257        pub fn new(bucket: impl Into<String>, path: impl Into<String>) -> Self {
1258            Self {
1259                bucket: bucket.into(),
1260                path: path.into(),
1261                region: None,
1262                access_key_id: None,
1263                secret_access_key: None,
1264                endpoint: None,
1265                format: StorageFileFormat::Parquet,
1266                batch_size: 8192,
1267                schema: None,
1268            }
1269        }
1270
1271        /// Set the AWS region (e.g., "us-west-2")
1272        pub fn with_region(mut self, region: impl Into<String>) -> Self {
1273            self.region = Some(region.into());
1274            self
1275        }
1276
1277        /// Set explicit AWS credentials
1278        pub fn with_access_key(
1279            mut self,
1280            access_key_id: impl Into<String>,
1281            secret_access_key: impl Into<String>,
1282        ) -> Self {
1283            self.access_key_id = Some(access_key_id.into());
1284            self.secret_access_key = Some(secret_access_key.into());
1285            self
1286        }
1287
1288        /// Set custom S3 endpoint (for S3-compatible services like MinIO)
1289        pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1290            self.endpoint = Some(endpoint.into());
1291            self
1292        }
1293
1294        /// Set the file format
1295        pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1296            self.format = format;
1297            self
1298        }
1299
1300        /// Set the batch size
1301        pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1302            self.batch_size = batch_size;
1303            self
1304        }
1305
1306        /// Set the schema (for CSV/JSON)
1307        pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1308            self.schema = Some(schema);
1309            self
1310        }
1311
1312        /// Build the ObjectStore instance
1313        fn build_store(&self) -> Result<StdArc<dyn ObjectStore>> {
1314            use object_store::aws::AmazonS3Builder;
1315
1316            let mut builder = AmazonS3Builder::new()
1317                .with_bucket_name(&self.bucket);
1318
1319            if let Some(region) = &self.region {
1320                builder = builder.with_region(region);
1321            }
1322
1323            if let Some(access_key_id) = &self.access_key_id {
1324                builder = builder.with_access_key_id(access_key_id);
1325            }
1326
1327            if let Some(secret_access_key) = &self.secret_access_key {
1328                builder = builder.with_secret_access_key(secret_access_key);
1329            }
1330
1331            if let Some(endpoint) = &self.endpoint {
1332                builder = builder.with_endpoint(endpoint);
1333            }
1334
1335            let store = builder.build().map_err(|e| {
1336                Error::data(format!("Failed to build S3 store: {}", e))
1337            })?;
1338
1339            Ok(StdArc::new(store))
1340        }
1341    }
1342
1343    impl DataSource for S3Source {
1344        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1345            let store = self.build_store()?;
1346
1347            let mut obj_source = ObjectStorageSource::new(store, &self.path)
1348                .with_format(self.format)
1349                .with_batch_size(self.batch_size);
1350
1351            if let Some(schema) = &self.schema {
1352                obj_source = obj_source.with_schema(schema.clone());
1353            }
1354
1355            obj_source.load()
1356        }
1357    }
1358
1359    /// Google Cloud Storage (GCS) data source
1360    ///
1361    /// # Example
1362    /// ```rust,ignore
1363    /// let source = GcsSource::new("my-bucket", "data/sales.parquet")
1364    ///     .with_service_account_key("path/to/key.json")
1365    ///     .with_format(StorageFileFormat::Parquet);
1366    /// ```
1367    #[derive(Debug, Clone)]
1368    pub struct GcsSource {
1369        bucket: String,
1370        path: String,
1371        service_account_key: Option<String>,
1372        format: StorageFileFormat,
1373        batch_size: usize,
1374        schema: Option<Arc<ArrowSchema>>,
1375    }
1376
1377    impl GcsSource {
1378        /// Create a new GCS data source
1379        ///
1380        /// # Arguments
1381        /// * `bucket` - GCS bucket name
1382        /// * `path` - Path to the file in the bucket
1383        ///
1384        /// # Authentication
1385        /// By default, uses Google Cloud credentials from GOOGLE_APPLICATION_CREDENTIALS env var.
1386        /// Use `with_service_account_key()` to provide explicit credentials.
1387        pub fn new(bucket: impl Into<String>, path: impl Into<String>) -> Self {
1388            Self {
1389                bucket: bucket.into(),
1390                path: path.into(),
1391                service_account_key: None,
1392                format: StorageFileFormat::Parquet,
1393                batch_size: 8192,
1394                schema: None,
1395            }
1396        }
1397
1398        /// Set the service account key path or JSON content
1399        pub fn with_service_account_key(mut self, key: impl Into<String>) -> Self {
1400            self.service_account_key = Some(key.into());
1401            self
1402        }
1403
1404        /// Set the file format
1405        pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1406            self.format = format;
1407            self
1408        }
1409
1410        /// Set the batch size
1411        pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1412            self.batch_size = batch_size;
1413            self
1414        }
1415
1416        /// Set the schema (for CSV/JSON)
1417        pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1418            self.schema = Some(schema);
1419            self
1420        }
1421
1422        /// Build the ObjectStore instance
1423        fn build_store(&self) -> Result<StdArc<dyn ObjectStore>> {
1424            use object_store::gcp::GoogleCloudStorageBuilder;
1425
1426            let mut builder = GoogleCloudStorageBuilder::new()
1427                .with_bucket_name(&self.bucket);
1428
1429            if let Some(key) = &self.service_account_key {
1430                builder = builder.with_service_account_key(key);
1431            }
1432
1433            let store = builder.build().map_err(|e| {
1434                Error::data(format!("Failed to build GCS store: {}", e))
1435            })?;
1436
1437            Ok(StdArc::new(store))
1438        }
1439    }
1440
1441    impl DataSource for GcsSource {
1442        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1443            let store = self.build_store()?;
1444
1445            let mut obj_source = ObjectStorageSource::new(store, &self.path)
1446                .with_format(self.format)
1447                .with_batch_size(self.batch_size);
1448
1449            if let Some(schema) = &self.schema {
1450                obj_source = obj_source.with_schema(schema.clone());
1451            }
1452
1453            obj_source.load()
1454        }
1455    }
1456
1457    /// Azure Blob Storage data source
1458    ///
1459    /// # Example
1460    /// ```rust,ignore
1461    /// let source = AzureSource::new("myaccount", "mycontainer", "data/sales.parquet")
1462    ///     .with_access_key("access_key")
1463    ///     .with_format(StorageFileFormat::Parquet);
1464    /// ```
1465    #[derive(Debug, Clone)]
1466    pub struct AzureSource {
1467        account: String,
1468        container: String,
1469        path: String,
1470        access_key: Option<String>,
1471        sas_token: Option<String>,
1472        format: StorageFileFormat,
1473        batch_size: usize,
1474        schema: Option<Arc<ArrowSchema>>,
1475    }
1476
1477    impl AzureSource {
1478        /// Create a new Azure Blob Storage data source
1479        ///
1480        /// # Arguments
1481        /// * `account` - Azure storage account name
1482        /// * `container` - Container name
1483        /// * `path` - Path to the file in the container
1484        ///
1485        /// # Authentication
1486        /// Use `with_access_key()` or `with_sas_token()` to provide credentials.
1487        pub fn new(
1488            account: impl Into<String>,
1489            container: impl Into<String>,
1490            path: impl Into<String>,
1491        ) -> Self {
1492            Self {
1493                account: account.into(),
1494                container: container.into(),
1495                path: path.into(),
1496                access_key: None,
1497                sas_token: None,
1498                format: StorageFileFormat::Parquet,
1499                batch_size: 8192,
1500                schema: None,
1501            }
1502        }
1503
1504        /// Set the access key for authentication
1505        pub fn with_access_key(mut self, access_key: impl Into<String>) -> Self {
1506            self.access_key = Some(access_key.into());
1507            self
1508        }
1509
1510        /// Set the SAS token for authentication
1511        pub fn with_sas_token(mut self, sas_token: impl Into<String>) -> Self {
1512            self.sas_token = Some(sas_token.into());
1513            self
1514        }
1515
1516        /// Set the file format
1517        pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1518            self.format = format;
1519            self
1520        }
1521
1522        /// Set the batch size
1523        pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1524            self.batch_size = batch_size;
1525            self
1526        }
1527
1528        /// Set the schema (for CSV/JSON)
1529        pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1530            self.schema = Some(schema);
1531            self
1532        }
1533
1534        /// Build the ObjectStore instance
1535        fn build_store(&self) -> Result<StdArc<dyn ObjectStore>> {
1536            use object_store::azure::{MicrosoftAzureBuilder, AzureConfigKey};
1537
1538            let mut builder = MicrosoftAzureBuilder::new()
1539                .with_account(&self.account)
1540                .with_container_name(&self.container);
1541
1542            if let Some(access_key) = &self.access_key {
1543                builder = builder.with_access_key(access_key);
1544            }
1545
1546            if let Some(sas_token) = &self.sas_token {
1547                // SAS token is set using with_config method
1548                builder = builder.with_config(AzureConfigKey::SasKey, sas_token);
1549            }
1550
1551            let store = builder.build().map_err(|e| {
1552                Error::data(format!("Failed to build Azure store: {}", e))
1553            })?;
1554
1555            Ok(StdArc::new(store))
1556        }
1557    }
1558
1559    impl DataSource for AzureSource {
1560        fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1561            let store = self.build_store()?;
1562
1563            let mut obj_source = ObjectStorageSource::new(store, &self.path)
1564                .with_format(self.format)
1565                .with_batch_size(self.batch_size);
1566
1567            if let Some(schema) = &self.schema {
1568                obj_source = obj_source.with_schema(schema.clone());
1569            }
1570
1571            obj_source.load()
1572        }
1573    }
1574}