1use crate::error::{Error, Result};
4use arrow::datatypes::Schema as ArrowSchema;
5use arrow::record_batch::{RecordBatch, RecordBatchReader};
6use std::fs::File;
7use std::io::BufReader;
8use std::sync::Arc;
9
10pub trait DataSource: std::fmt::Debug + Send + Sync {
15 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)>;
19}
20
21#[derive(Debug, Clone)]
23pub struct CsvSource {
24 path: String,
26
27 has_header: bool,
29
30 batch_size: usize,
32
33 schema: Option<Arc<ArrowSchema>>,
35
36 delimiter: u8,
38}
39
40impl CsvSource {
41 pub fn new(path: impl Into<String>) -> Self {
43 Self {
44 path: path.into(),
45 has_header: true,
46 batch_size: 8192,
47 schema: None,
48 delimiter: b',',
49 }
50 }
51
52 pub fn with_header(mut self, has_header: bool) -> Self {
54 self.has_header = has_header;
55 self
56 }
57
58 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
60 self.batch_size = batch_size;
61 self
62 }
63
64 pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
66 self.schema = Some(schema);
67 self
68 }
69
70 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
72 self.delimiter = delimiter;
73 self
74 }
75}
76
77impl DataSource for CsvSource {
78 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
79 use arrow_csv::ReaderBuilder;
80
81 let file = File::open(&self.path).map_err(|e| {
83 Error::io(format!("Failed to open CSV file '{}': {}", self.path, e))
84 })?;
85
86 let format = arrow_csv::reader::Format::default()
88 .with_header(self.has_header)
89 .with_delimiter(self.delimiter);
90
91 let reader = if let Some(schema) = &self.schema {
93 ReaderBuilder::new(schema.clone())
94 .with_format(format)
95 .with_batch_size(self.batch_size)
96 .build(file)
97 .map_err(|e| {
98 Error::arrow(format!("Failed to create CSV reader: {}", e))
99 })?
100 } else {
101 let buf_reader = BufReader::new(file);
103 let (inferred_schema, _) = format.infer_schema(buf_reader, Some(100))
104 .map_err(|e| {
105 Error::arrow(format!("Failed to infer CSV schema: {}", e))
106 })?;
107
108 let file = File::open(&self.path).map_err(|e| {
110 Error::io(format!("Failed to re-open CSV file '{}': {}", self.path, e))
111 })?;
112
113 ReaderBuilder::new(Arc::new(inferred_schema))
114 .with_format(format)
115 .with_batch_size(self.batch_size)
116 .build(file)
117 .map_err(|e| {
118 Error::arrow(format!("Failed to create CSV reader: {}", e))
119 })?
120 };
121
122 let schema = reader.schema();
124
125 let mut batches = Vec::new();
127 for batch_result in reader {
128 let batch = batch_result.map_err(|e| {
129 Error::arrow(format!("Failed to read CSV batch: {}", e))
130 })?;
131 batches.push(batch);
132 }
133
134 if batches.is_empty() {
135 return Err(Error::data(format!("CSV file '{}' is empty", self.path)));
136 }
137
138 Ok((schema, batches))
139 }
140}
141
142#[derive(Debug, Clone)]
144pub struct ParquetSource {
145 path: String,
147
148 batch_size: usize,
150}
151
152impl ParquetSource {
153 pub fn new(path: impl Into<String>) -> Self {
155 Self {
156 path: path.into(),
157 batch_size: 8192,
158 }
159 }
160
161 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
163 self.batch_size = batch_size;
164 self
165 }
166}
167
168impl DataSource for ParquetSource {
169 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
170 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
171
172 let file = File::open(&self.path).map_err(|e| {
174 Error::io(format!("Failed to open Parquet file '{}': {}", self.path, e))
175 })?;
176
177 let builder = ParquetRecordBatchReaderBuilder::try_new(file).map_err(|e| {
179 Error::arrow(format!("Failed to create Parquet reader: {}", e))
180 })?;
181
182 let schema = builder.schema().clone();
183
184 let reader = builder
185 .with_batch_size(self.batch_size)
186 .build()
187 .map_err(|e| {
188 Error::arrow(format!("Failed to build Parquet reader: {}", e))
189 })?;
190
191 let mut batches = Vec::new();
193 for batch_result in reader {
194 let batch = batch_result.map_err(|e| {
195 Error::arrow(format!("Failed to read Parquet batch: {}", e))
196 })?;
197 batches.push(batch);
198 }
199
200 if batches.is_empty() {
201 return Err(Error::data(format!("Parquet file '{}' is empty", self.path)));
202 }
203
204 Ok((schema, batches))
205 }
206}
207
208#[derive(Debug, Clone)]
210pub struct JsonSource {
211 path: String,
213
214 batch_size: usize,
216
217 schema: Option<Arc<ArrowSchema>>,
219}
220
221impl JsonSource {
222 pub fn new(path: impl Into<String>) -> Self {
224 Self {
225 path: path.into(),
226 batch_size: 8192,
227 schema: None,
228 }
229 }
230
231 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
233 self.batch_size = batch_size;
234 self
235 }
236
237 pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
239 self.schema = Some(schema);
240 self
241 }
242}
243
244impl DataSource for JsonSource {
245 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
246 use arrow_json::ReaderBuilder;
247
248 let file = File::open(&self.path).map_err(|e| {
250 Error::io(format!("Failed to open JSON file '{}': {}", self.path, e))
251 })?;
252 let buf_reader = BufReader::new(file);
253
254 let reader = if let Some(schema) = &self.schema {
256 ReaderBuilder::new(schema.clone())
257 .with_batch_size(self.batch_size)
258 .build(buf_reader)
259 .map_err(|e| {
260 Error::arrow(format!("Failed to create JSON reader: {}", e))
261 })?
262 } else {
263 let file_for_infer = File::open(&self.path).map_err(|e| {
265 Error::io(format!("Failed to open JSON file for schema inference '{}': {}", self.path, e))
266 })?;
267 let buf_reader_infer = BufReader::new(file_for_infer);
268
269 let inferred_result = arrow_json::reader::infer_json_schema(buf_reader_infer, Some(100))
270 .map_err(|e| {
271 Error::arrow(format!("Failed to infer JSON schema: {}", e))
272 })?;
273
274 let inferred_schema = inferred_result.0;
276
277 let file = File::open(&self.path).map_err(|e| {
279 Error::io(format!("Failed to re-open JSON file '{}': {}", self.path, e))
280 })?;
281 let buf_reader = BufReader::new(file);
282
283 ReaderBuilder::new(Arc::new(inferred_schema))
284 .with_batch_size(self.batch_size)
285 .build(buf_reader)
286 .map_err(|e| {
287 Error::arrow(format!("Failed to create JSON reader: {}", e))
288 })?
289 };
290
291 let schema = reader.schema();
292
293 let mut batches = Vec::new();
295 for batch_result in reader {
296 let batch = batch_result.map_err(|e| {
297 Error::arrow(format!("Failed to read JSON batch: {}", e))
298 })?;
299 batches.push(batch);
300 }
301
302 if batches.is_empty() {
303 return Err(Error::data(format!("JSON file '{}' is empty", self.path)));
304 }
305
306 Ok((schema, batches))
307 }
308}
309
310#[derive(Debug)]
312pub struct RecordBatchSource {
313 schema: Arc<ArrowSchema>,
314 batches: Vec<RecordBatch>,
315}
316
317impl RecordBatchSource {
318 pub fn new(schema: Arc<ArrowSchema>, batches: Vec<RecordBatch>) -> Result<Self> {
320 if batches.is_empty() {
321 return Err(Error::data("RecordBatchSource requires at least one batch"));
322 }
323
324 for batch in &batches {
326 if batch.schema().as_ref() != schema.as_ref() {
327 return Err(Error::schema(
328 "All RecordBatches must have the same schema as the provided schema"
329 ));
330 }
331 }
332
333 Ok(Self { schema, batches })
334 }
335}
336
337impl DataSource for RecordBatchSource {
338 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
339 Ok((self.schema.clone(), self.batches.clone()))
340 }
341}
342
343#[cfg(feature = "database")]
348pub mod database {
349 use super::*;
350 use arrow_odbc::OdbcReaderBuilder;
351 use arrow_odbc::odbc_api::{Environment, ConnectionOptions};
352
353 #[derive(Debug, Clone)]
374 pub struct OdbcSource {
375 connection_string: String,
377
378 query: String,
380
381 max_bytes_per_batch: usize,
383
384 max_rows: Option<usize>,
386 }
387
388 impl OdbcSource {
389 pub fn new(connection_string: impl Into<String>, query: impl Into<String>) -> Self {
403 Self {
404 connection_string: connection_string.into(),
405 query: query.into(),
406 max_bytes_per_batch: 8 * 1024 * 1024, max_rows: None,
408 }
409 }
410
411 pub fn with_max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
413 self.max_bytes_per_batch = max_bytes;
414 self
415 }
416
417 pub fn with_max_rows(mut self, max_rows: usize) -> Self {
419 self.max_rows = Some(max_rows);
420 self
421 }
422 }
423
424 impl DataSource for OdbcSource {
425 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
426 let env = Environment::new().map_err(|e| {
428 Error::data(format!("Failed to create ODBC environment: {}", e))
429 })?;
430
431 let conn = env.connect_with_connection_string(
433 &self.connection_string,
434 ConnectionOptions::default()
435 ).map_err(|e| {
436 Error::data(format!("Failed to connect to database: {}", e))
437 })?;
438
439 let cursor = match conn.execute(&self.query, (), self.max_rows).map_err(|e| {
442 Error::data(format!("Failed to execute SQL query: {}", e))
443 })? {
444 Some(cursor) => cursor,
445 None => {
446 return Err(Error::data("SQL query did not return a result set (cursor). Use SELECT statements for data loading."));
447 }
448 };
449
450 let reader = OdbcReaderBuilder::new()
452 .with_max_bytes_per_batch(self.max_bytes_per_batch)
453 .build(cursor)
454 .map_err(|e| {
455 Error::data(format!("Failed to create ODBC reader: {}", e))
456 })?;
457
458 let schema = reader.schema();
459
460 let mut batches = Vec::new();
463
464 for batch_result in reader {
465 let batch = batch_result.map_err(|e| {
466 Error::arrow(format!("Failed to read ODBC batch: {}", e))
467 })?;
468 batches.push(batch);
469 }
470
471 if batches.is_empty() {
472 return Err(Error::data("ODBC query returned no results"));
473 }
474
475 Ok((schema, batches))
476 }
477 }
478
479 #[derive(Debug, Clone)]
488 pub struct PostgresSource {
489 host: String,
490 database: String,
491 username: String,
492 password: String,
493 port: u16,
494 query: String,
495 max_bytes_per_batch: usize,
496 }
497
498 impl PostgresSource {
499 pub fn new(
501 host: impl Into<String>,
502 database: impl Into<String>,
503 username: impl Into<String>,
504 password: impl Into<String>,
505 ) -> Self {
506 Self {
507 host: host.into(),
508 database: database.into(),
509 username: username.into(),
510 password: password.into(),
511 port: 5432,
512 query: String::new(),
513 max_bytes_per_batch: 8 * 1024 * 1024, }
515 }
516
517 pub fn with_port(mut self, port: u16) -> Self {
519 self.port = port;
520 self
521 }
522
523 pub fn with_query(mut self, query: impl Into<String>) -> Self {
525 self.query = query.into();
526 self
527 }
528
529 pub fn with_max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
531 self.max_bytes_per_batch = max_bytes;
532 self
533 }
534
535 pub(crate) fn connection_string(&self) -> String {
537 format!(
538 "Driver={{PostgreSQL Unicode}};Server={};Port={};Database={};Uid={};Pwd={};",
539 self.host, self.port, self.database, self.username, self.password
540 )
541 }
542 }
543
544 impl DataSource for PostgresSource {
545 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
546 if self.query.is_empty() {
547 return Err(Error::data("PostgreSQL query cannot be empty. Use with_query() to set it."));
548 }
549
550 let odbc_source = OdbcSource::new(self.connection_string(), &self.query)
551 .with_max_bytes_per_batch(self.max_bytes_per_batch);
552
553 odbc_source.load()
554 }
555 }
556
557 #[derive(Debug, Clone)]
566 pub struct MySqlSource {
567 host: String,
568 database: String,
569 username: String,
570 password: String,
571 port: u16,
572 query: String,
573 max_bytes_per_batch: usize,
574 }
575
576 impl MySqlSource {
577 pub fn new(
579 host: impl Into<String>,
580 database: impl Into<String>,
581 username: impl Into<String>,
582 password: impl Into<String>,
583 ) -> Self {
584 Self {
585 host: host.into(),
586 database: database.into(),
587 username: username.into(),
588 password: password.into(),
589 port: 3306,
590 query: String::new(),
591 max_bytes_per_batch: 8 * 1024 * 1024, }
593 }
594
595 pub fn with_port(mut self, port: u16) -> Self {
597 self.port = port;
598 self
599 }
600
601 pub fn with_query(mut self, query: impl Into<String>) -> Self {
603 self.query = query.into();
604 self
605 }
606
607 pub fn with_max_bytes_per_batch(mut self, max_bytes: usize) -> Self {
609 self.max_bytes_per_batch = max_bytes;
610 self
611 }
612
613 pub(crate) fn connection_string(&self) -> String {
615 format!(
616 "Driver={{MySQL ODBC 8.0 Unicode Driver}};Server={};Port={};Database={};Uid={};Pwd={};",
617 self.host, self.port, self.database, self.username, self.password
618 )
619 }
620 }
621
622 impl DataSource for MySqlSource {
623 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
624 if self.query.is_empty() {
625 return Err(Error::data("MySQL query cannot be empty. Use with_query() to set it."));
626 }
627
628 let odbc_source = OdbcSource::new(self.connection_string(), &self.query)
629 .with_max_bytes_per_batch(self.max_bytes_per_batch);
630
631 odbc_source.load()
632 }
633 }
634}
635
636#[cfg(feature = "rest-api")]
641pub mod rest {
642 use super::*;
643 use reqwest::blocking::Client;
644 use std::collections::HashMap;
645 use std::io::Cursor;
646
647 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
649 pub enum HttpMethod {
650 Get,
651 Post,
652 }
653
654 #[derive(Debug, Clone)]
667 pub struct RestApiSource {
668 url: String,
670
671 method: HttpMethod,
673
674 headers: HashMap<String, String>,
676
677 query_params: HashMap<String, String>,
679
680 body: Option<String>,
682
683 batch_size: usize,
685
686 schema: Option<Arc<ArrowSchema>>,
688
689 timeout_secs: u64,
691 }
692
693 impl RestApiSource {
694 pub fn new(url: impl Into<String>) -> Self {
696 Self {
697 url: url.into(),
698 method: HttpMethod::Get,
699 headers: HashMap::new(),
700 query_params: HashMap::new(),
701 body: None,
702 batch_size: 8192,
703 schema: None,
704 timeout_secs: 30,
705 }
706 }
707
708 pub fn with_method(mut self, method: HttpMethod) -> Self {
710 self.method = method;
711 self
712 }
713
714 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
716 self.headers.insert(key.into(), value.into());
717 self
718 }
719
720 pub fn with_query_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
722 self.query_params.insert(key.into(), value.into());
723 self
724 }
725
726 pub fn with_body(mut self, body: impl Into<String>) -> Self {
728 self.body = Some(body.into());
729 self
730 }
731
732 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
734 self.batch_size = batch_size;
735 self
736 }
737
738 pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
740 self.schema = Some(schema);
741 self
742 }
743
744 pub fn with_timeout_secs(mut self, timeout_secs: u64) -> Self {
746 self.timeout_secs = timeout_secs;
747 self
748 }
749 }
750
751 impl DataSource for RestApiSource {
752 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
753 use arrow_json::ReaderBuilder;
754
755 let client = Client::builder()
757 .timeout(std::time::Duration::from_secs(self.timeout_secs))
758 .build()
759 .map_err(|e| Error::io(format!("Failed to create HTTP client: {}", e)))?;
760
761 let mut url = url::Url::parse(&self.url)
763 .map_err(|e| Error::data(format!("Invalid URL '{}': {}", self.url, e)))?;
764
765 for (key, value) in &self.query_params {
766 url.query_pairs_mut().append_pair(key, value);
767 }
768
769 let mut request = match self.method {
771 HttpMethod::Get => client.get(url.as_str()),
772 HttpMethod::Post => {
773 let mut req = client.post(url.as_str());
774 if let Some(body) = &self.body {
775 req = req.body(body.clone());
776 }
777 req
778 }
779 };
780
781 for (key, value) in &self.headers {
783 request = request.header(key, value);
784 }
785
786 let response = request
788 .send()
789 .map_err(|e| Error::io(format!("HTTP request failed: {}", e)))?;
790
791 if !response.status().is_success() {
793 return Err(Error::data(format!(
794 "HTTP request failed with status {}: {}",
795 response.status(),
796 response.text().unwrap_or_default()
797 )));
798 }
799
800 let response_bytes = response
802 .bytes()
803 .map_err(|e| Error::io(format!("Failed to read HTTP response: {}", e)))?;
804
805 let cursor = Cursor::new(response_bytes.as_ref());
807
808 let reader = if let Some(schema) = &self.schema {
810 ReaderBuilder::new(schema.clone())
811 .with_batch_size(self.batch_size)
812 .build(cursor)
813 .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
814 } else {
815 let cursor_for_infer = Cursor::new(response_bytes.as_ref());
817 let inferred_result = arrow_json::reader::infer_json_schema(cursor_for_infer, None)
818 .map_err(|e| Error::arrow(format!("Failed to infer JSON schema from API response: {}", e)))?;
819
820 let inferred_schema = inferred_result.0;
821 let cursor = Cursor::new(response_bytes.as_ref());
822
823 ReaderBuilder::new(Arc::new(inferred_schema))
824 .with_batch_size(self.batch_size)
825 .build(cursor)
826 .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
827 };
828
829 let schema = reader.schema();
830
831 let mut batches = Vec::new();
833 for batch_result in reader {
834 let batch = batch_result.map_err(|e| {
835 Error::arrow(format!("Failed to read JSON batch from API response: {}", e))
836 })?;
837 batches.push(batch);
838 }
839
840 if batches.is_empty() {
841 return Err(Error::data(format!("API response from '{}' is empty", self.url)));
842 }
843
844 Ok((schema, batches))
845 }
846 }
847}
848
849#[cfg(test)]
850mod tests {
851 use super::*;
852
853 #[test]
854 fn test_csv_source_builder() {
855 let source = CsvSource::new("test.csv")
856 .with_header(true)
857 .with_batch_size(1024)
858 .with_delimiter(b';');
859
860 assert_eq!(source.path, "test.csv");
861 assert_eq!(source.has_header, true);
862 assert_eq!(source.batch_size, 1024);
863 assert_eq!(source.delimiter, b';');
864 }
865
866 #[test]
867 fn test_parquet_source_builder() {
868 let source = ParquetSource::new("test.parquet")
869 .with_batch_size(2048);
870
871 assert_eq!(source.path, "test.parquet");
872 assert_eq!(source.batch_size, 2048);
873 }
874
875 #[test]
876 fn test_json_source_builder() {
877 let source = JsonSource::new("test.json")
878 .with_batch_size(512);
879
880 assert_eq!(source.path, "test.json");
881 assert_eq!(source.batch_size, 512);
882 }
883
884 #[cfg(feature = "database")]
885 #[test]
886 fn test_postgres_source_builder() {
887 let source = database::PostgresSource::new("localhost", "testdb", "user", "pass")
888 .with_port(5432)
889 .with_query("SELECT * FROM test");
890
891 assert_eq!(source.connection_string(),
893 "Driver={PostgreSQL Unicode};Server=localhost;Port=5432;Database=testdb;Uid=user;Pwd=pass;");
894 }
895
896 #[cfg(feature = "database")]
897 #[test]
898 fn test_mysql_source_builder() {
899 let source = database::MySqlSource::new("localhost", "testdb", "user", "pass")
900 .with_port(3306)
901 .with_query("SELECT * FROM test");
902
903 assert_eq!(source.connection_string(),
905 "Driver={MySQL ODBC 8.0 Unicode Driver};Server=localhost;Port=3306;Database=testdb;Uid=user;Pwd=pass;");
906 }
907
908 #[cfg(feature = "rest-api")]
909 #[test]
910 fn test_rest_api_source_builder() {
911 let _source = rest::RestApiSource::new("https://api.example.com/data")
913 .with_method(rest::HttpMethod::Get)
914 .with_header("Authorization", "Bearer token")
915 .with_query_param("limit", "100")
916 .with_batch_size(512)
917 .with_timeout_secs(60);
918
919 assert!(true);
921 }
922
923 #[cfg(feature = "object-storage")]
924 #[test]
925 fn test_s3_source_builder() {
926 use object_storage::{S3Source, StorageFileFormat};
927
928 let source = S3Source::new("my-bucket", "data/sales.parquet")
929 .with_region("us-west-2")
930 .with_format(StorageFileFormat::Parquet)
931 .with_batch_size(4096);
932
933 assert!(true);
935 }
936
937 #[cfg(feature = "object-storage")]
938 #[test]
939 fn test_gcs_source_builder() {
940 use object_storage::{GcsSource, StorageFileFormat};
941
942 let source = GcsSource::new("my-bucket", "data/analytics.json")
943 .with_format(StorageFileFormat::Json)
944 .with_batch_size(8192);
945
946 assert!(true);
948 }
949
950 #[cfg(feature = "object-storage")]
951 #[test]
952 fn test_azure_source_builder() {
953 use object_storage::{AzureSource, StorageFileFormat};
954
955 let source = AzureSource::new("myaccount", "mycontainer", "data/logs.csv")
956 .with_format(StorageFileFormat::Csv)
957 .with_batch_size(2048);
958
959 assert!(true);
961 }
962}
963
964#[cfg(feature = "object-storage")]
969pub mod object_storage {
970 use super::*;
971 use bytes::Bytes;
972 use object_store::{ObjectStore, path::Path as ObjectPath};
973 use std::sync::Arc as StdArc;
974
975 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
977 pub enum StorageFileFormat {
978 Parquet,
980 Csv,
982 Json,
984 }
985
986 #[derive(Debug)]
1003 pub struct ObjectStorageSource {
1004 store: StdArc<dyn ObjectStore>,
1006
1007 path: String,
1009
1010 format: StorageFileFormat,
1012
1013 batch_size: usize,
1015
1016 schema: Option<Arc<ArrowSchema>>,
1018
1019 csv_has_header: bool,
1021
1022 csv_delimiter: u8,
1024 }
1025
1026 impl ObjectStorageSource {
1027 pub fn new(store: StdArc<dyn ObjectStore>, path: impl Into<String>) -> Self {
1033 Self {
1034 store,
1035 path: path.into(),
1036 format: StorageFileFormat::Parquet,
1037 batch_size: 8192,
1038 schema: None,
1039 csv_has_header: true,
1040 csv_delimiter: b',',
1041 }
1042 }
1043
1044 pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1046 self.format = format;
1047 self
1048 }
1049
1050 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1052 self.batch_size = batch_size;
1053 self
1054 }
1055
1056 pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1058 self.schema = Some(schema);
1059 self
1060 }
1061
1062 pub fn with_csv_header(mut self, has_header: bool) -> Self {
1064 self.csv_has_header = has_header;
1065 self
1066 }
1067
1068 pub fn with_csv_delimiter(mut self, delimiter: u8) -> Self {
1070 self.csv_delimiter = delimiter;
1071 self
1072 }
1073
1074 async fn download_file(&self) -> Result<Bytes> {
1076 let path = ObjectPath::from(self.path.as_str());
1077
1078 let result = self.store.get(&path).await.map_err(|e| {
1080 Error::io(format!("Failed to download file '{}' from object storage: {}", self.path, e))
1081 })?;
1082
1083 let bytes = result.bytes().await.map_err(|e| {
1085 Error::io(format!("Failed to read bytes from object storage: {}", e))
1086 })?;
1087
1088 Ok(bytes)
1089 }
1090 }
1091
1092 impl DataSource for ObjectStorageSource {
1093 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1094 let runtime = tokio::runtime::Runtime::new().map_err(|e| {
1096 Error::io(format!("Failed to create tokio runtime: {}", e))
1097 })?;
1098
1099 runtime.block_on(async {
1100 let bytes = self.download_file().await?;
1102
1103 match self.format {
1105 StorageFileFormat::Parquet => {
1106 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1107
1108 let builder = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).map_err(|e| {
1111 Error::arrow(format!("Failed to create Parquet reader: {}", e))
1112 })?;
1113
1114 let schema = builder.schema().clone();
1115 let reader = builder.with_batch_size(self.batch_size).build().map_err(|e| {
1116 Error::arrow(format!("Failed to build Parquet reader: {}", e))
1117 })?;
1118
1119 let mut batches = Vec::new();
1120 for batch_result in reader {
1121 let batch = batch_result.map_err(|e| {
1122 Error::arrow(format!("Failed to read Parquet batch: {}", e))
1123 })?;
1124 batches.push(batch);
1125 }
1126
1127 if batches.is_empty() {
1128 return Err(Error::data(format!("Parquet file '{}' is empty", self.path)));
1129 }
1130
1131 Ok((schema, batches))
1132 }
1133
1134 StorageFileFormat::Csv => {
1135 use arrow_csv::ReaderBuilder;
1136 use std::io::Cursor;
1137
1138 let format = arrow_csv::reader::Format::default()
1139 .with_header(self.csv_has_header)
1140 .with_delimiter(self.csv_delimiter);
1141
1142 let reader = if let Some(schema) = &self.schema {
1143 let cursor = Cursor::new(bytes);
1144 ReaderBuilder::new(schema.clone())
1145 .with_format(format)
1146 .with_batch_size(self.batch_size)
1147 .build(cursor)
1148 .map_err(|e| Error::arrow(format!("Failed to create CSV reader: {}", e)))?
1149 } else {
1150 let cursor_for_infer = Cursor::new(bytes.clone());
1152 let buf_reader = BufReader::new(cursor_for_infer);
1153 let (inferred_schema, _) = format.infer_schema(buf_reader, Some(100))
1154 .map_err(|e| Error::arrow(format!("Failed to infer CSV schema: {}", e)))?;
1155
1156 let cursor = Cursor::new(bytes);
1157 ReaderBuilder::new(Arc::new(inferred_schema))
1158 .with_format(format)
1159 .with_batch_size(self.batch_size)
1160 .build(cursor)
1161 .map_err(|e| Error::arrow(format!("Failed to create CSV reader: {}", e)))?
1162 };
1163
1164 let schema = reader.schema();
1165 let mut batches = Vec::new();
1166 for batch_result in reader {
1167 let batch = batch_result.map_err(|e| {
1168 Error::arrow(format!("Failed to read CSV batch: {}", e))
1169 })?;
1170 batches.push(batch);
1171 }
1172
1173 if batches.is_empty() {
1174 return Err(Error::data(format!("CSV file '{}' is empty", self.path)));
1175 }
1176
1177 Ok((schema, batches))
1178 }
1179
1180 StorageFileFormat::Json => {
1181 use arrow_json::ReaderBuilder;
1182 use std::io::Cursor;
1183
1184 let cursor = Cursor::new(bytes.clone());
1185
1186 let reader = if let Some(schema) = &self.schema {
1187 ReaderBuilder::new(schema.clone())
1188 .with_batch_size(self.batch_size)
1189 .build(cursor)
1190 .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
1191 } else {
1192 let cursor_for_infer = Cursor::new(bytes.clone());
1194 let buf_reader = BufReader::new(cursor_for_infer);
1195 let inferred_result = arrow_json::reader::infer_json_schema(buf_reader, Some(100))
1196 .map_err(|e| Error::arrow(format!("Failed to infer JSON schema: {}", e)))?;
1197
1198 let inferred_schema = inferred_result.0;
1199 let cursor = Cursor::new(bytes);
1200 ReaderBuilder::new(Arc::new(inferred_schema))
1201 .with_batch_size(self.batch_size)
1202 .build(cursor)
1203 .map_err(|e| Error::arrow(format!("Failed to create JSON reader: {}", e)))?
1204 };
1205
1206 let schema = reader.schema();
1207 let mut batches = Vec::new();
1208 for batch_result in reader {
1209 let batch = batch_result.map_err(|e| {
1210 Error::arrow(format!("Failed to read JSON batch: {}", e))
1211 })?;
1212 batches.push(batch);
1213 }
1214
1215 if batches.is_empty() {
1216 return Err(Error::data(format!("JSON file '{}' is empty", self.path)));
1217 }
1218
1219 Ok((schema, batches))
1220 }
1221 }
1222 })
1223 }
1224 }
1225
1226 #[derive(Debug, Clone)]
1235 pub struct S3Source {
1236 bucket: String,
1237 path: String,
1238 region: Option<String>,
1239 access_key_id: Option<String>,
1240 secret_access_key: Option<String>,
1241 endpoint: Option<String>,
1242 format: StorageFileFormat,
1243 batch_size: usize,
1244 schema: Option<Arc<ArrowSchema>>,
1245 }
1246
1247 impl S3Source {
1248 pub fn new(bucket: impl Into<String>, path: impl Into<String>) -> Self {
1258 Self {
1259 bucket: bucket.into(),
1260 path: path.into(),
1261 region: None,
1262 access_key_id: None,
1263 secret_access_key: None,
1264 endpoint: None,
1265 format: StorageFileFormat::Parquet,
1266 batch_size: 8192,
1267 schema: None,
1268 }
1269 }
1270
1271 pub fn with_region(mut self, region: impl Into<String>) -> Self {
1273 self.region = Some(region.into());
1274 self
1275 }
1276
1277 pub fn with_access_key(
1279 mut self,
1280 access_key_id: impl Into<String>,
1281 secret_access_key: impl Into<String>,
1282 ) -> Self {
1283 self.access_key_id = Some(access_key_id.into());
1284 self.secret_access_key = Some(secret_access_key.into());
1285 self
1286 }
1287
1288 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
1290 self.endpoint = Some(endpoint.into());
1291 self
1292 }
1293
1294 pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1296 self.format = format;
1297 self
1298 }
1299
1300 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1302 self.batch_size = batch_size;
1303 self
1304 }
1305
1306 pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1308 self.schema = Some(schema);
1309 self
1310 }
1311
1312 fn build_store(&self) -> Result<StdArc<dyn ObjectStore>> {
1314 use object_store::aws::AmazonS3Builder;
1315
1316 let mut builder = AmazonS3Builder::new()
1317 .with_bucket_name(&self.bucket);
1318
1319 if let Some(region) = &self.region {
1320 builder = builder.with_region(region);
1321 }
1322
1323 if let Some(access_key_id) = &self.access_key_id {
1324 builder = builder.with_access_key_id(access_key_id);
1325 }
1326
1327 if let Some(secret_access_key) = &self.secret_access_key {
1328 builder = builder.with_secret_access_key(secret_access_key);
1329 }
1330
1331 if let Some(endpoint) = &self.endpoint {
1332 builder = builder.with_endpoint(endpoint);
1333 }
1334
1335 let store = builder.build().map_err(|e| {
1336 Error::data(format!("Failed to build S3 store: {}", e))
1337 })?;
1338
1339 Ok(StdArc::new(store))
1340 }
1341 }
1342
1343 impl DataSource for S3Source {
1344 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1345 let store = self.build_store()?;
1346
1347 let mut obj_source = ObjectStorageSource::new(store, &self.path)
1348 .with_format(self.format)
1349 .with_batch_size(self.batch_size);
1350
1351 if let Some(schema) = &self.schema {
1352 obj_source = obj_source.with_schema(schema.clone());
1353 }
1354
1355 obj_source.load()
1356 }
1357 }
1358
1359 #[derive(Debug, Clone)]
1368 pub struct GcsSource {
1369 bucket: String,
1370 path: String,
1371 service_account_key: Option<String>,
1372 format: StorageFileFormat,
1373 batch_size: usize,
1374 schema: Option<Arc<ArrowSchema>>,
1375 }
1376
1377 impl GcsSource {
1378 pub fn new(bucket: impl Into<String>, path: impl Into<String>) -> Self {
1388 Self {
1389 bucket: bucket.into(),
1390 path: path.into(),
1391 service_account_key: None,
1392 format: StorageFileFormat::Parquet,
1393 batch_size: 8192,
1394 schema: None,
1395 }
1396 }
1397
1398 pub fn with_service_account_key(mut self, key: impl Into<String>) -> Self {
1400 self.service_account_key = Some(key.into());
1401 self
1402 }
1403
1404 pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1406 self.format = format;
1407 self
1408 }
1409
1410 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1412 self.batch_size = batch_size;
1413 self
1414 }
1415
1416 pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1418 self.schema = Some(schema);
1419 self
1420 }
1421
1422 fn build_store(&self) -> Result<StdArc<dyn ObjectStore>> {
1424 use object_store::gcp::GoogleCloudStorageBuilder;
1425
1426 let mut builder = GoogleCloudStorageBuilder::new()
1427 .with_bucket_name(&self.bucket);
1428
1429 if let Some(key) = &self.service_account_key {
1430 builder = builder.with_service_account_key(key);
1431 }
1432
1433 let store = builder.build().map_err(|e| {
1434 Error::data(format!("Failed to build GCS store: {}", e))
1435 })?;
1436
1437 Ok(StdArc::new(store))
1438 }
1439 }
1440
1441 impl DataSource for GcsSource {
1442 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1443 let store = self.build_store()?;
1444
1445 let mut obj_source = ObjectStorageSource::new(store, &self.path)
1446 .with_format(self.format)
1447 .with_batch_size(self.batch_size);
1448
1449 if let Some(schema) = &self.schema {
1450 obj_source = obj_source.with_schema(schema.clone());
1451 }
1452
1453 obj_source.load()
1454 }
1455 }
1456
1457 #[derive(Debug, Clone)]
1466 pub struct AzureSource {
1467 account: String,
1468 container: String,
1469 path: String,
1470 access_key: Option<String>,
1471 sas_token: Option<String>,
1472 format: StorageFileFormat,
1473 batch_size: usize,
1474 schema: Option<Arc<ArrowSchema>>,
1475 }
1476
1477 impl AzureSource {
1478 pub fn new(
1488 account: impl Into<String>,
1489 container: impl Into<String>,
1490 path: impl Into<String>,
1491 ) -> Self {
1492 Self {
1493 account: account.into(),
1494 container: container.into(),
1495 path: path.into(),
1496 access_key: None,
1497 sas_token: None,
1498 format: StorageFileFormat::Parquet,
1499 batch_size: 8192,
1500 schema: None,
1501 }
1502 }
1503
1504 pub fn with_access_key(mut self, access_key: impl Into<String>) -> Self {
1506 self.access_key = Some(access_key.into());
1507 self
1508 }
1509
1510 pub fn with_sas_token(mut self, sas_token: impl Into<String>) -> Self {
1512 self.sas_token = Some(sas_token.into());
1513 self
1514 }
1515
1516 pub fn with_format(mut self, format: StorageFileFormat) -> Self {
1518 self.format = format;
1519 self
1520 }
1521
1522 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
1524 self.batch_size = batch_size;
1525 self
1526 }
1527
1528 pub fn with_schema(mut self, schema: Arc<ArrowSchema>) -> Self {
1530 self.schema = Some(schema);
1531 self
1532 }
1533
1534 fn build_store(&self) -> Result<StdArc<dyn ObjectStore>> {
1536 use object_store::azure::{MicrosoftAzureBuilder, AzureConfigKey};
1537
1538 let mut builder = MicrosoftAzureBuilder::new()
1539 .with_account(&self.account)
1540 .with_container_name(&self.container);
1541
1542 if let Some(access_key) = &self.access_key {
1543 builder = builder.with_access_key(access_key);
1544 }
1545
1546 if let Some(sas_token) = &self.sas_token {
1547 builder = builder.with_config(AzureConfigKey::SasKey, sas_token);
1549 }
1550
1551 let store = builder.build().map_err(|e| {
1552 Error::data(format!("Failed to build Azure store: {}", e))
1553 })?;
1554
1555 Ok(StdArc::new(store))
1556 }
1557 }
1558
1559 impl DataSource for AzureSource {
1560 fn load(&self) -> Result<(Arc<ArrowSchema>, Vec<RecordBatch>)> {
1561 let store = self.build_store()?;
1562
1563 let mut obj_source = ObjectStorageSource::new(store, &self.path)
1564 .with_format(self.format)
1565 .with_batch_size(self.batch_size);
1566
1567 if let Some(schema) = &self.schema {
1568 obj_source = obj_source.with_schema(schema.clone());
1569 }
1570
1571 obj_source.load()
1572 }
1573 }
1574}