1pub mod prelude;
2pub mod features;
3mod helper_funcs;
4mod custom_error;
5mod normalizers;
6mod csvwrite;
7mod sqlbuilder;
8
9use regex::Regex;
11use datafusion::prelude::*;
12use futures::future::BoxFuture;
13use datafusion::datasource::MemTable;
14use std::sync::Arc;
15use arrow::datatypes::{DataType as ArrowDataType};
16use arrow::array::{ArrayRef, Array, Float64Array,Int64Array};
17
18use arrow::record_batch::RecordBatch;
19use arrow::csv::writer::WriterBuilder;
20
21use std::fs::{self, File, OpenOptions};
23use std::io::{Write, BufWriter};
24
25use datafusion::prelude::SessionContext;
27use datafusion::dataframe::{DataFrame,DataFrameWriteOptions};
28
29use serde_json::Value;
31use serde::{Deserialize, Serialize};
32use std::collections::{HashMap, HashSet};
33
34use deltalake::DeltaTableError;
36use std::result::Result;
37use std::path::Path as LocalPath;
38use deltalake::writer::WriteMode;
39use deltalake::open_table;
40
41use crate::features::delta::write_to_delta_impl;
43use crate::features::delta::DeltaPathManager;
44use crate::features::calendar::DateFormat;
45
46use std::fmt::Debug;
48use crate::custom_error::cust_error::ElusionError;
49use crate::custom_error::cust_error::ElusionResult;
50use crate::custom_error::cust_error::extract_column_from_duplicate_error;
51use crate::custom_error::cust_error::extract_missing_column;
52use crate::custom_error::cust_error::extract_column_from_projection_error;
53use crate::custom_error::cust_error::extract_table_from_join_error;
54use crate::custom_error::cust_error::extract_function_from_error;
55use crate::custom_error::cust_error::extract_column_from_agg_error;
56use crate::custom_error::cust_error::detect_function_usage_in_error;
57use crate::custom_error::cust_error::generate_enhanced_groupby_suggestion;
58use crate::custom_error::cust_error::extract_window_function_name;
59use crate::custom_error::cust_error::extract_window_function_columns;
60
61use arrow::compute;
63use arrow::array::StringArray;
64
65pub use features::dashboard::{ReportLayout, TableOptions};
67use crate::prelude::PlotlyPlot;
68
69use datafusion::common::ScalarValue;
71use std::io::BufReader;
72use serde_json::Deserializer;
73
74use chrono::{DateTime, Utc};
76use crate::features::cashandview::MATERIALIZED_VIEW_MANAGER;
77use crate::features::cashandview::QUERY_CACHE;
78use crate::features::cashandview::QueryCache;
79
80use chrono::Weekday;
82
83use crate::features::postgres::PostgresConnection;
85
86use crate::features::mysql::MySqlConnection;
88
89use crate::helper_funcs::registertable::register_df_as_table;
91use crate::helper_funcs::array_val_to_json::array_value_to_json;
92use crate::helper_funcs::build_rec_batch::build_record_batch;
93use crate::helper_funcs::infer_schema_json::infer_schema_from_json;
94
95use crate::normalizers::normalize::is_simple_column;
97use crate::normalizers::normalize::normalize_alias;
98use crate::normalizers::normalize::normalize_column_name;
99use crate::normalizers::normalize::lowercase_column_names;
100use crate::normalizers::normalize::normalize_condition;
101use crate::normalizers::normalize::normalize_expression;
102use crate::normalizers::normalize::normalize_condition_filter;
103use crate::normalizers::normalize::is_expression;
104use crate::normalizers::normalize::is_aggregate_expression;
105use crate::normalizers::normalize::normalize_alias_write;
106use crate::normalizers::normalize::normalize_window_function;
107use crate::normalizers::normalize::normalize_simple_expression;
108use crate::normalizers::normalize::resolve_alias_to_original;
109use crate::normalizers::normalize::is_groupable_column;
110use crate::normalizers::normalize::extract_base_column_name;
111
112use crate::features::csv::load_csv_with_type_handling;
114use crate::csvwrite::csvwriteops::CsvWriteOptions;
115
116use crate::sqlbuilder::sqlbuild::SqlBuilder;
118
119use datafusion::physical_plan::SendableRecordBatchStream;
121use futures::StreamExt;
122use crate::features::csv::load_csv_smart;
123use arrow::util::pretty::pretty_format_batches;
124
125use crate::features::redis::RedisCacheConnection;
127use crate::features::redis::RedisCacheStats;
128use crate::features::redis::clear_redis_cache_impl;
129use crate::features::redis::create_redis_cache_connection;
130use crate::features::redis::create_redis_cache_connection_with_config;
131use crate::features::redis::elusion_with_redis_cache_impl;
132use crate::features::redis::get_redis_cache_stats_impl;
133use crate::features::redis::invalidate_redis_cache_impl;
134
135use crate::features::xml::XmlProcessingMode;
137use crate::features::xml::load_xml_with_mode;
138
139#[derive(Debug, PartialEq, Clone)]
141pub enum DatabaseType {
142 MySQL,
143 PostgreSQL,
144 MongoDB,
145 SQLServer,
146 Unknown
147}
148
149#[derive(Clone, Debug)]
150pub struct Join {
151 dataframe: CustomDataFrame,
152 condition: String,
153 join_type: String,
154}
155
156#[derive(Debug)]
157pub struct CustomDataFrame {
158 df: DataFrame,
159 table_alias: String,
160 from_table: String,
161 selected_columns: Vec<String>,
162 pub alias_map: Vec<(String, String)>,
163 aggregations: Vec<String>,
164 group_by_columns: Vec<String>,
165 where_conditions: Vec<String>,
166 having_conditions: Vec<String>,
167 order_by_columns: Vec<(String, bool)>,
168 limit_count: Option<u64>,
169 joins: Vec<Join>,
170 window_functions: Vec<String>,
171 ctes: Vec<String>,
172 pub subquery_source: Option<String>,
173 set_operations: Vec<String>,
174 pub query: String,
175 pub aggregated_df: Option<DataFrame>,
176 union_tables: Option<Vec<(String, DataFrame, String)>>,
177 original_expressions: Vec<String>,
178
179 needs_normalization: bool,
180 raw_selected_columns: Vec<String>,
181 raw_group_by_columns: Vec<String>,
182 raw_where_conditions: Vec<String>,
183 raw_having_conditions: Vec<String>,
184 raw_join_conditions: Vec<String>,
185 raw_aggregations: Vec<String>,
186
187 uses_group_by_all: bool
188}
189
190impl Clone for CustomDataFrame {
191 fn clone(&self) -> Self {
192 Self {
193 df: self.df.clone(),
194 table_alias: self.table_alias.clone(),
195 from_table: self.from_table.clone(),
196 query: self.query.clone(),
197 selected_columns: self.selected_columns.clone(),
198 alias_map: self.alias_map.clone(),
199 aggregations: self.aggregations.clone(),
200 group_by_columns: self.group_by_columns.clone(),
201 where_conditions: self.where_conditions.clone(),
202 having_conditions: self.having_conditions.clone(),
203 order_by_columns: self.order_by_columns.clone(),
204 joins: self.joins.clone(),
205 window_functions: self.window_functions.clone(),
206 ctes: self.ctes.clone(),
207 set_operations: self.set_operations.clone(),
208 original_expressions: self.original_expressions.clone(),
209 limit_count: self.limit_count,
210 subquery_source: self.subquery_source.clone(),
211 aggregated_df: self.aggregated_df.clone(),
212 union_tables: self.union_tables.clone(),
213 needs_normalization: self.needs_normalization,
214 raw_selected_columns: self.raw_selected_columns.clone(),
215 raw_group_by_columns: self.raw_group_by_columns.clone(),
216 raw_where_conditions: self.raw_where_conditions.clone(),
217 raw_having_conditions: self.raw_having_conditions.clone(),
218 raw_join_conditions: self.raw_join_conditions.clone(),
219 raw_aggregations: self.raw_aggregations.clone(),
220
221 uses_group_by_all: self.uses_group_by_all
222 }
223 }
224}
225
226#[allow(dead_code)]
229#[derive(Deserialize, Serialize, Debug)]
230struct GenericJson {
231 #[serde(flatten)]
232 fields: HashMap<String, Value>,
233}
234
235#[derive(Debug, Default)]
237pub struct ColumnStats {
238 pub columns: Vec<ColumnStatistics>,
239}
240
241#[derive(Debug)]
242pub struct ColumnStatistics {
243 pub name: String,
244 pub total_count: i64,
245 pub non_null_count: i64,
246 pub mean: Option<f64>,
247 pub min_value: ScalarValue,
248 pub max_value: ScalarValue,
249 pub std_dev: Option<f64>,
250}
251
252
253#[derive(Debug)]
254pub struct NullAnalysis {
255 pub counts: Vec<NullCount>,
256}
257
258#[derive(Debug)]
259pub struct NullCount {
260 pub column_name: String,
261 pub total_rows: i64,
262 pub null_count: i64,
263 pub null_percentage: f64,
264}
265#[derive(Debug)]
267pub struct ColumnErrorContext {
268 context: String,
269 location: String,
270 suggestion: String,
271}
272
273pub struct AliasedDataFrame {
275 pub dataframe: DataFrame,
276 pub alias: String,
277}
278
279impl CustomDataFrame {
280
281 pub async fn new<'a>(
283 file_path: &'a str,
284 alias: &'a str,
285 ) -> ElusionResult<Self> {
286 let aliased_df = Self::load(file_path, alias).await?;
287
288 Ok(CustomDataFrame {
289 df: aliased_df.dataframe,
290 table_alias: aliased_df.alias,
291 from_table: alias.to_string(),
292 selected_columns: Vec::new(),
293 alias_map: Vec::new(),
294 aggregations: Vec::new(),
295 group_by_columns: Vec::new(),
296 where_conditions: Vec::new(),
297 having_conditions: Vec::new(),
298 order_by_columns: Vec::new(),
299 limit_count: None,
300 joins: Vec::new(),
301 window_functions: Vec::new(),
302 ctes: Vec::new(),
303 subquery_source: None,
304 set_operations: Vec::new(),
305 query: String::new(),
306 aggregated_df: None,
307 union_tables: None,
308 original_expressions: Vec::new(),
309 needs_normalization: false,
310 raw_selected_columns: Vec::new(),
311 raw_group_by_columns: Vec::new(),
312 raw_where_conditions: Vec::new(),
313 raw_having_conditions: Vec::new(),
314 raw_join_conditions: Vec::new(),
315 raw_aggregations: Vec::new(),
316 uses_group_by_all: false
317 })
318 }
319
320 pub async fn empty() -> ElusionResult<Self> {
323 let ctx = SessionContext::new();
325
326 let sql = "SELECT 1 as dummy";
327
328 let df = ctx.sql(sql).await
330 .map_err(|e| ElusionError::InvalidOperation {
331 operation: "Single Row Creation".to_string(),
332 reason: format!("Failed to create single-row DataFrame: {}", e),
333 suggestion: "💡 Verify SQL execution capabilities in context.".to_string()
334 })?;
335
336 Ok(CustomDataFrame {
338 df,
339 table_alias: "dummy_table".to_string(),
340 from_table: "dummy_table".to_string(),
341 selected_columns: Vec::new(),
342 alias_map: Vec::new(),
343 aggregations: Vec::new(),
344 group_by_columns: Vec::new(),
345 where_conditions: Vec::new(),
346 having_conditions: Vec::new(),
347 order_by_columns: Vec::new(),
348 limit_count: None,
349 joins: Vec::new(),
350 window_functions: Vec::new(),
351 ctes: Vec::new(),
352 subquery_source: None,
353 set_operations: Vec::new(),
354 query: String::new(),
355 aggregated_df: None,
356 union_tables: None,
357 original_expressions: Vec::new(),
358 needs_normalization: false,
359 raw_selected_columns: Vec::new(),
360 raw_group_by_columns: Vec::new(),
361 raw_where_conditions: Vec::new(),
362 raw_having_conditions: Vec::new(),
363 raw_join_conditions: Vec::new(),
364 raw_aggregations: Vec::new(),
365 uses_group_by_all: false
366 })
367 }
368
369 pub async fn create_date_range_table(
372 start_date: &str,
373 end_date: &str,
374 alias: &str
375 ) -> ElusionResult<Self> {
376 crate::features::calendar::create_date_range_table_impl(start_date, end_date, alias).await
377 }
378
379 pub async fn create_formatted_date_range_table(
381 start_date: &str,
382 end_date: &str,
383 alias: &str,
384 format_name: String,
385 format: DateFormat,
386 include_period_ranges: bool,
387 week_start_day: Weekday
388 ) -> ElusionResult<Self> {
389 crate::features::calendar::create_formatted_date_range_table_impl(
390 start_date,
391 end_date,
392 alias,
393 format_name,
394 format,
395 include_period_ranges,
396 week_start_day
397 ).await
398 }
399
400 pub async fn new_with_schema(
416 file_path: &str,
417 schema: crate::features::with_schema::FileSchema,
418 alias: &str,
419 ) -> ElusionResult<Self> {
420 let aliased_df = crate::features::with_schema::load_with_schema(file_path, schema, alias).await?;
421
422 Ok(CustomDataFrame {
423 df: aliased_df.dataframe,
424 table_alias: aliased_df.alias.clone(),
425 from_table: alias.to_string(),
426 selected_columns: Vec::new(),
427 alias_map: Vec::new(),
428 aggregations: Vec::new(),
429 group_by_columns: Vec::new(),
430 where_conditions: Vec::new(),
431 having_conditions: Vec::new(),
432 order_by_columns: Vec::new(),
433 limit_count: None,
434 joins: Vec::new(),
435 window_functions: Vec::new(),
436 ctes: Vec::new(),
437 subquery_source: None,
438 set_operations: Vec::new(),
439 query: String::new(),
440 aggregated_df: None,
441 union_tables: None,
442 original_expressions: Vec::new(),
443 needs_normalization: false,
444 raw_selected_columns: Vec::new(),
445 raw_group_by_columns: Vec::new(),
446 raw_where_conditions: Vec::new(),
447 raw_having_conditions: Vec::new(),
448 raw_join_conditions: Vec::new(),
449 raw_aggregations: Vec::new(),
450 uses_group_by_all: false
451 })
452 }
453
454 pub fn schema_from_json(json_spec: &str) -> ElusionResult<crate::features::with_schema::FileSchema> {
456 crate::features::with_schema::schema_from_json(json_spec)
457 }
458
459 pub fn schema_builder() -> crate::features::with_schema::SchemaBuilder {
461 crate::features::with_schema::FileSchema::builder()
462 }
463
464 pub async fn create_view(
466 &self,
467 view_name: &str,
468 ttl_seconds: Option<u64>,
469 ) -> ElusionResult<()> {
470 let sql = self.construct_sql();
472
473 let ctx = SessionContext::new();
475
476 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
478
479 for join in &self.joins {
480 register_df_as_table(&ctx, &join.dataframe.table_alias, &join.dataframe.df).await?;
481 }
482
483 let mut manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
485 manager.create_view(&ctx, view_name, &sql, ttl_seconds).await
486 }
487
488 pub async fn from_view(view_name: &str) -> ElusionResult<Self> {
490 let ctx = SessionContext::new();
491 let manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
492
493 let df = manager.get_view_as_dataframe(&ctx, view_name).await?;
494
495 Ok(CustomDataFrame {
496 df,
497 table_alias: view_name.to_string(),
498 from_table: view_name.to_string(),
499 selected_columns: Vec::new(),
500 alias_map: Vec::new(),
501 aggregations: Vec::new(),
502 group_by_columns: Vec::new(),
503 where_conditions: Vec::new(),
504 having_conditions: Vec::new(),
505 order_by_columns: Vec::new(),
506 limit_count: None,
507 joins: Vec::new(),
508 window_functions: Vec::new(),
509 ctes: Vec::new(),
510 subquery_source: None,
511 set_operations: Vec::new(),
512 query: String::new(),
513 aggregated_df: None,
514 union_tables: None,
515 original_expressions: Vec::new(),
516 needs_normalization: false,
517 raw_selected_columns: Vec::new(),
518 raw_group_by_columns: Vec::new(),
519 raw_where_conditions: Vec::new(),
520 raw_having_conditions: Vec::new(),
521 raw_join_conditions: Vec::new(),
522 raw_aggregations: Vec::new(),
523
524 uses_group_by_all: false
525 })
526 }
527
528 pub async fn refresh_view(view_name: &str) -> ElusionResult<()> {
530 let ctx = SessionContext::new();
531 let mut manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
532 manager.refresh_view(&ctx, view_name).await
533 }
534
535 pub async fn drop_view(view_name: &str) -> ElusionResult<()> {
537 let mut manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
538 manager.drop_view(view_name)
539 }
540
541 pub async fn list_views() -> Vec<(String, DateTime<Utc>, Option<u64>)> {
543 let manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
544 let views = manager.list_views();
545
546 if views.is_empty() {
547 println!("There are no materialized views created.");
548 }
549
550 views
551 }
552
553 pub async fn elusion_with_cache(&self, alias: &str) -> ElusionResult<Self> {
555 let sql = self.construct_sql();
556
557 let mut cache = QUERY_CACHE.lock().unwrap();
559 if let Some(cached_result) = cache.get_cached_result(&sql) {
560 println!("✅ Using cached result for query");
561
562 let ctx = SessionContext::new();
564 let schema = cached_result[0].schema();
565
566 let mem_table = MemTable::try_new(schema.clone(), vec![cached_result])
567 .map_err(|e| ElusionError::Custom(format!("Failed to create memory table from cache: {}", e)))?;
568
569 ctx.register_table(alias, Arc::new(mem_table))
570 .map_err(|e| ElusionError::Custom(format!("Failed to register table from cache: {}", e)))?;
571
572 let df = ctx.table(alias).await
573 .map_err(|e| ElusionError::Custom(format!("Failed to create DataFrame from cache: {}", e)))?;
574
575 return Ok(CustomDataFrame {
576 df,
577 table_alias: alias.to_string(),
578 from_table: alias.to_string(),
579 selected_columns: Vec::new(),
580 alias_map: Vec::new(),
581 aggregations: Vec::new(),
582 group_by_columns: Vec::new(),
583 where_conditions: Vec::new(),
584 having_conditions: Vec::new(),
585 order_by_columns: Vec::new(),
586 limit_count: None,
587 joins: Vec::new(),
588 window_functions: Vec::new(),
589 ctes: Vec::new(),
590 subquery_source: None,
591 set_operations: Vec::new(),
592 query: sql,
593 aggregated_df: None,
594 union_tables: None,
595 original_expressions: self.original_expressions.clone(),
596 needs_normalization: false,
597 raw_selected_columns: Vec::new(),
598 raw_group_by_columns: Vec::new(),
599 raw_where_conditions: Vec::new(),
600 raw_having_conditions: Vec::new(),
601 raw_join_conditions: Vec::new(),
602 raw_aggregations: Vec::new(),
603
604 uses_group_by_all: false
605 });
606 }
607
608 let result = self.elusion(alias).await?;
610
611 let batches = result.df.clone().collect().await
613 .map_err(|e| ElusionError::Custom(format!("Failed to collect batches: {}", e)))?;
614
615 cache.cache_query(&sql, batches);
616
617 Ok(result)
618 }
619
620 pub fn invalidate_cache(table_names: &[String]) {
622 let mut cache = QUERY_CACHE.lock().unwrap();
623 cache.invalidate(table_names);
624 }
625
626 pub fn clear_cache() {
628 let mut cache = QUERY_CACHE.lock().unwrap();
629 let size_before = cache.cached_queries.len();
630 cache.clear();
631 println!("Cache cleared: {} queries removed from cache.", size_before);
632 }
633
634 pub fn configure_cache(max_size: usize, ttl_seconds: Option<u64>) {
636 *QUERY_CACHE.lock().unwrap() = QueryCache::new(max_size, ttl_seconds);
637 }
638
639
640 pub async fn elusion_with_redis_cache(
643 &self,
644 redis_conn: &RedisCacheConnection,
645 alias: &str,
646 ttl_seconds: Option<u64>
647 ) -> ElusionResult<Self> {
648 elusion_with_redis_cache_impl(self, redis_conn, alias, ttl_seconds).await
649 }
650
651 pub async fn clear_redis_cache(
653 redis_conn: &RedisCacheConnection,
654 pattern: Option<&str>
655 ) -> ElusionResult<()> {
656 clear_redis_cache_impl(redis_conn, pattern).await
657 }
658
659
660 pub async fn redis_cache_stats(
662 redis_conn: &RedisCacheConnection
663 ) -> ElusionResult<RedisCacheStats> {
664 get_redis_cache_stats_impl(redis_conn).await
665 }
666
667 pub async fn invalidate_redis_cache(
669 redis_conn: &RedisCacheConnection,
670 table_names: &[&str]
671 ) -> ElusionResult<()> {
672 invalidate_redis_cache_impl(redis_conn, table_names).await
673 }
674
675 pub async fn create_redis_cache_connection() -> ElusionResult<RedisCacheConnection> {
677 create_redis_cache_connection().await
678 }
679
680 pub async fn create_redis_cache_connection_with_config(
682 host: &str,
683 port: u16,
684 password: Option<&str>,
685 database: Option<u8>,
686 ) -> ElusionResult<RedisCacheConnection> {
687 create_redis_cache_connection_with_config(host, port, password, database).await
688 }
689
690
691 #[cfg(feature = "sharepoint")]
693 pub async fn load_from_sharepoint(
694 site_url: &str,
695 file_path: &str,
696 alias: &str,
697 ) -> ElusionResult<Self> {
698 crate::features::sharepoint::load_from_sharepoint_impl(
699 site_url, file_path, alias
700 ).await
701 }
702
703 #[cfg(not(feature = "sharepoint"))]
704 pub async fn load_from_sharepoint(
705 _site_url: &str,
706 _file_path: &str,
707 _alias: &str,
708 ) -> ElusionResult<Self> {
709 Err(ElusionError::InvalidOperation {
710 operation: "SharePoint File Loading".to_string(),
711 reason: "SharePoint feature not enabled".to_string(),
712 suggestion: "💡 Add 'sharepoint' to your features: features = [\"sharepoint\"]".to_string(),
713 })
714 }
715
716 #[cfg(feature = "sharepoint")]
717 pub async fn load_folder_from_sharepoint(
718 site_url: &str,
719 folder_path: &str,
720 file_extensions: Option<Vec<&str>>,
721 result_alias: &str,
722 ) -> ElusionResult<Self> {
723 crate::features::sharepoint::load_folder_from_sharepoint_impl(
724 site_url, folder_path, file_extensions, result_alias
725 ).await
726 }
727
728 #[cfg(not(feature = "sharepoint"))]
729 pub async fn load_folder_from_sharepoint(
730 _site_url: &str,
731 _folder_path: &str,
732 _file_extensions: Option<Vec<&str>>,
733 _result_alias: &str,
734 ) -> ElusionResult<Self> {
735 Err(ElusionError::InvalidOperation {
736 operation: "SharePoint Folder Loading".to_string(),
737 reason: "SharePoint feature not enabled".to_string(),
738 suggestion: "💡 Add 'sharepoint' to your features: features = [\"sharepoint\"]".to_string(),
739 })
740 }
741
742 #[cfg(feature = "sharepoint")]
743 pub async fn load_folder_from_sharepoint_with_filename_column(
744 site_url: &str,
745 folder_path: &str,
746 file_extensions: Option<Vec<&str>>,
747 result_alias: &str,
748 ) -> ElusionResult<Self> {
749 crate::features::sharepoint::load_folder_from_sharepoint_with_filename_column_impl(
750 site_url, folder_path, file_extensions, result_alias
751 ).await
752 }
753
754 #[cfg(not(feature = "sharepoint"))]
755 pub async fn load_folder_from_sharepoint_with_filename_column(
756 _site_url: &str,
757 _folder_path: &str,
758 _file_extensions: Option<Vec<&str>>,
759 _result_alias: &str,
760 ) -> ElusionResult<Self> {
761 Err(ElusionError::InvalidOperation {
762 operation: "SharePoint Folder Loading with Filename".to_string(),
763 reason: "SharePoint feature not enabled".to_string(),
764 suggestion: "💡 Add 'sharepoint' to your features: features = [\"sharepoint\"]".to_string(),
765 })
766 }
767
768 #[cfg(feature = "sharepoint")]
770 pub async fn load_from_sharepoint_with_service_principal(
771 tenant_id: &str,
772 client_id: &str,
773 client_secret: &str,
774 site_url: &str,
775 file_path: &str,
776 alias: &str,
777 ) -> ElusionResult<Self> {
778 crate::features::sharepoint::load_from_sharepoint_with_service_principal_impl(
779 tenant_id, client_id, client_secret, site_url, file_path, alias
780 ).await
781 }
782
783 #[cfg(not(feature = "sharepoint"))]
784 pub async fn load_from_sharepoint_with_service_principal(
785 _tenant_id: &str,
786 _client_id: &str,
787 _client_secret: &str,
788 _site_url: &str,
789 _file_path: &str,
790 _alias: &str,
791 ) -> ElusionResult<Self> {
792 Err(ElusionError::InvalidOperation {
793 operation: "SharePoint File Loading".to_string(),
794 reason: "SharePoint feature not enabled".to_string(),
795 suggestion: "💡 Add 'sharepoint' to your features: features = [\"sharepoint\"]".to_string(),
796 })
797 }
798
799 #[cfg(feature = "sharepoint")]
800 pub async fn load_folder_from_sharepoint_with_service_principal(
801 tenant_id: &str,
802 client_id: &str,
803 client_secret: &str,
804 site_url: &str,
805 folder_path: &str,
806 file_extensions: Option<Vec<&str>>,
807 result_alias: &str,
808 ) -> ElusionResult<Self> {
809 crate::features::sharepoint::load_folder_from_sharepoint_with_service_principal_impl(
810 tenant_id, client_id, client_secret, site_url, folder_path, file_extensions, result_alias
811 ).await
812 }
813
814 #[cfg(not(feature = "sharepoint"))]
815 pub async fn load_folder_from_sharepoint_with_service_principal(
816 _tenant_id: &str,
817 _client_id: &str,
818 _client_secret: &str,
819 _site_url: &str,
820 _folder_path: &str,
821 _file_extensions: Option<Vec<&str>>,
822 _result_alias: &str,
823 ) -> ElusionResult<Self> {
824 Err(ElusionError::InvalidOperation {
825 operation: "SharePoint Folder Loading".to_string(),
826 reason: "SharePoint feature not enabled".to_string(),
827 suggestion: "💡 Add 'sharepoint' to your features: features = [\"sharepoint\"]".to_string(),
828 })
829 }
830
831 #[cfg(feature = "sharepoint")]
832 pub async fn load_folder_from_sharepoint_with_filename_column_with_service_principal(
833 tenant_id: &str,
834 client_id: &str,
835 client_secret: &str,
836 site_url: &str,
837 folder_path: &str,
838 file_extensions: Option<Vec<&str>>,
839 result_alias: &str,
840 ) -> ElusionResult<Self> {
841 crate::features::sharepoint::load_folder_from_sharepoint_with_filename_column_with_service_principal_impl(
842 tenant_id, client_id, client_secret, site_url, folder_path, file_extensions, result_alias
843 ).await
844 }
845
846 #[cfg(not(feature = "sharepoint"))]
847 pub async fn load_folder_from_sharepoint_with_filename_column_with_service_principal(
848 _tenant_id: &str,
849 _client_id: &str,
850 _client_secret: &str,
851 _site_url: &str,
852 _folder_path: &str,
853 _file_extensions: Option<Vec<&str>>,
854 _result_alias: &str,
855 ) -> ElusionResult<Self> {
856 Err(ElusionError::InvalidOperation {
857 operation: "SharePoint Folder Loading".to_string(),
858 reason: "SharePoint feature not enabled".to_string(),
859 suggestion: "💡 Add 'sharepoint' to your features: features = [\"sharepoint\"]".to_string(),
860 })
861 }
862
863 #[cfg(feature = "postgres")]
866 pub async fn from_postgres(
867 conn: &PostgresConnection,
868 query: &str,
869 alias: &str
870 ) -> ElusionResult<Self> {
871 crate::features::postgres::from_postgres_impl(conn, query, alias).await
872 }
873
874 #[cfg(not(feature = "postgres"))]
875 pub async fn from_postgres(
876 _conn: &PostgresConnection,
877 _query: &str,
878 _alias: &str
879 ) -> ElusionResult<Self> {
880 Err(ElusionError::Custom("*** Warning ***: Postgres feature not enabled. Add feature under [dependencies]".to_string()))
881 }
882
883 #[cfg(feature = "mysql")]
885 pub async fn from_mysql(
886 conn: &MySqlConnection,
887 query: &str,
888 alias: &str
889 ) -> ElusionResult<Self> {
890 crate::features::mysql::from_mysql_impl(conn, query, alias).await
891 }
892
893 #[cfg(not(feature = "mysql"))]
894 pub async fn from_mysql(
895 _conn: &MySqlConnection,
896 _query: &str,
897 _alias: &str
898 ) -> ElusionResult<Self> {
899 Err(ElusionError::Custom("*** Warning ***: MySQL feature not enabled. Add feature = [\"mysql\"] under [dependencies]".to_string()))
900 }
901
902 pub fn join<const N: usize>(
906 mut self,
907 other: CustomDataFrame,
908 conditions: [&str; N],
909 join_type: &str
910 ) -> Self {
911 self.raw_join_conditions.extend(conditions.iter().map(|&s| s.to_string()));
913 self.needs_normalization = true;
914
915 let condition = conditions.iter()
916 .map(|&cond| normalize_condition(cond))
917 .collect::<Vec<_>>()
918 .join(" AND ");
919
920 self.joins.push(Join {
921 dataframe: other,
922 condition,
923 join_type: join_type.to_string(),
924 });
925
926 if self.should_warn_complexity() {
928 println!("⚠️ Complex query detected - consider calling .elusion() to materialize intermediate results for better performance");
929 }
930
931 self
932 }
933
934 pub fn join_many<const N: usize, const M: usize>(
937 self,
938 joins: [(CustomDataFrame, [&str; M], &str); N]
939 ) -> Self {
940 let join_inputs = joins.into_iter()
941 .map(|(df, conds, jt)| {
942 let condition = conds.iter()
943 .map(|&cond| normalize_condition(cond))
944 .collect::<Vec<_>>()
945 .join(" AND ");
946
947 Join {
948 dataframe: df,
949 condition,
950 join_type: jt.to_string(),
951 }
952 })
953 .collect::<Vec<_>>();
954 self.join_many_vec(join_inputs)
955 }
956
957 pub fn join_many_vec(mut self, joins: Vec<Join>) -> Self {
958 self.joins.extend(joins);
959 self
960 }
961
962 pub fn group_by<const N: usize>(self, group_columns: [&str; N]) -> Self {
964 self.group_by_vec(group_columns.to_vec())
965 }
966
967 pub fn group_by_vec(mut self, columns: Vec<&str>) -> Self {
968 self.raw_group_by_columns.extend(columns.iter().map(|&s| s.to_string()));
970 self.needs_normalization = true;
971
972 self.group_by_columns = columns
973 .into_iter()
974 .map(|s| {
975 let resolved_column = resolve_alias_to_original(s, &self.raw_selected_columns);
977
978 if is_simple_column(&resolved_column) {
979 normalize_column_name(&resolved_column)
980 } else if resolved_column.to_uppercase().contains(" AS ") {
981 let as_pattern = regex::Regex::new(r"(?i)\s+AS\s+").unwrap();
983 if let Some(as_match) = as_pattern.find(&resolved_column) {
984 let expr_part = resolved_column[..as_match.start()].trim();
985 normalize_expression(expr_part, &self.table_alias)
986 } else {
987 normalize_expression(&resolved_column, &self.table_alias)
988 }
989 } else {
990 normalize_expression(&resolved_column, &self.table_alias)
992 }
993 })
994 .collect();
995 self
996 }
997
998 pub fn group_by_all(mut self) -> Self {
1000 self.uses_group_by_all = true;
1001
1002 let mut all_group_by = Vec::new();
1003
1004 for col_expr in &self.raw_selected_columns {
1007 let base_expression = if col_expr.to_uppercase().contains(" AS ") {
1011 let as_pattern = regex::Regex::new(r"(?i)\s+AS\s+").unwrap();
1012 if let Some(as_match) = as_pattern.find(col_expr) {
1013 col_expr[..as_match.start()].trim()
1014 } else {
1015 col_expr.as_str()
1016 }
1017 } else {
1018 col_expr.as_str()
1019 };
1020
1021 if is_groupable_column(base_expression) {
1027 let normalized = if is_simple_column(base_expression) {
1028 normalize_column_name(base_expression)
1029 } else {
1030 normalize_expression(base_expression, &self.table_alias)
1031 };
1032
1033 if !all_group_by.contains(&normalized) {
1034 all_group_by.push(normalized.clone());
1035 }
1037 } else {
1038 }
1040 }
1041
1042 self.group_by_columns = all_group_by;
1045 self
1046 }
1047
1048 pub fn filter_many<const N: usize>(self, conditions: [&str; N]) -> Self {
1051 self.filter_vec(conditions.to_vec())
1052 }
1053
1054 pub fn filter_vec(mut self, conditions: Vec<&str>) -> Self {
1056 self.raw_where_conditions.extend(conditions.iter().map(|&s| s.to_string()));
1058 self.needs_normalization = true;
1059
1060 self.where_conditions.extend(
1061 conditions.into_iter().map(|c| normalize_condition_filter(c))
1062 );
1063 self
1064 }
1065 pub fn filter(mut self, condition: &str) -> Self {
1067 self.raw_where_conditions.push(condition.to_string());
1069 self.needs_normalization = true;
1070
1071 self.where_conditions.push(normalize_condition_filter(condition));
1072 self
1073 }
1074
1075 pub fn having_many<const N: usize>(self, conditions: [&str; N]) -> Self {
1078 self.having_conditions_vec(conditions.to_vec())
1079 }
1080
1081 pub fn having_conditions_vec(mut self, conditions: Vec<&str>) -> Self {
1083 self.raw_having_conditions.extend(conditions.iter().map(|&s| s.to_string()));
1085 self.needs_normalization = true;
1086
1087 self.having_conditions.extend(
1088 conditions.into_iter().map(|c| normalize_condition(c))
1089 );
1090 self
1091 }
1092
1093 pub fn having(mut self, condition: &str) -> Self {
1095 self.raw_having_conditions.push(condition.to_string());
1097 self.needs_normalization = true;
1098
1099 self.having_conditions.push(normalize_condition(condition));
1100 self
1101 }
1102 pub fn order_by_vec(mut self, columns: Vec<String>, ascending: Vec<bool>) -> Self {
1104 assert!(
1106 columns.len() == ascending.len(),
1107 "Columns and ascending flags must have the same length"
1108 );
1109
1110 self.order_by_columns = columns.into_iter()
1112 .zip(ascending.into_iter())
1113 .collect();
1114 self
1115 }
1116
1117 pub fn order_by_many_vec(mut self, orders: Vec<(String, bool)>) -> Self {
1119 self.order_by_columns = orders;
1120 self
1121 }
1122 pub fn order_by<const N: usize>(self, columns: [&str; N], directions: [&str; N]) -> Self {
1124 let ascending: Vec<bool> = directions.iter()
1125 .map(|&dir| match dir.to_uppercase().as_str() {
1126 "ASC" | "ASCENDING" => true,
1127 "DESC" | "DESCENDING" => false,
1128 _ => panic!("Invalid sort direction: '{}'. Use 'ASC' or 'DESC'", dir),
1129 })
1130 .collect();
1131
1132 let normalized_columns: Vec<String> = columns.iter()
1133 .map(|c| normalize_column_name(c))
1134 .collect();
1135
1136 self.order_by_vec(normalized_columns, ascending)
1137 }
1138
1139 pub fn order_by_many<const N: usize>(self, orders: [(&str, &str); N]) -> Self {
1141 let orderings = orders.into_iter()
1142 .map(|(col, dir)| {
1143 let ascending = match dir.to_uppercase().as_str() {
1144 "ASC" | "ASCENDING" => true,
1145 "DESC" | "DESCENDING" => false,
1146 _ => panic!("Invalid sort direction: '{}'. Use 'ASC' or 'DESC'", dir),
1147 };
1148 (normalize_column_name(col), ascending)
1149 })
1150 .collect::<Vec<_>>();
1151 self.order_by_many_vec(orderings)
1152 }
1153
1154 pub fn limit(mut self, count: u64) -> Self {
1156 self.limit_count = Some(count);
1157 self
1158 }
1159
1160 pub fn window(mut self, window_expr: &str) -> Self {
1162 let normalized = normalize_window_function(window_expr);
1163 self.window_functions.push(normalized);
1164 self
1165 }
1166
1167 pub fn with_ctes<const N: usize>(self, ctes: [&str; N]) -> Self {
1170 self.with_ctes_vec(ctes.to_vec())
1171 }
1172
1173 pub fn with_ctes_vec(mut self, ctes: Vec<&str>) -> Self {
1175 self.ctes.extend(ctes.into_iter().map(|c| c.to_string()));
1176 self
1177 }
1178
1179 pub fn with_cte_single(mut self, cte: &str) -> Self {
1181 self.ctes.push(cte.to_string());
1182 self
1183 }
1184
1185 pub fn set_operation(mut self, set_op: &str) -> Self {
1187 self.set_operations.push(set_op.to_string());
1188 self
1189 }
1190
1191 pub fn string_functions<const N: usize>(mut self, expressions: [&str; N]) -> Self {
1193
1194 self.needs_normalization = true;
1195
1196 for expr in expressions.iter() {
1197 self.selected_columns.push(normalize_expression(expr, &self.table_alias));
1199
1200 if !self.group_by_columns.is_empty() {
1201 let expr_part = expr.split(" AS ")
1202 .next()
1203 .unwrap_or(expr);
1204 self.group_by_columns.push(normalize_expression(expr_part, &self.table_alias));
1205 }
1206 }
1207 self
1208 }
1209
1210 pub fn datetime_functions<const N: usize>(mut self, expressions: [&str; N]) -> Self {
1212 self.needs_normalization = true;
1213
1214 for expr in expressions.iter() {
1215 self.selected_columns.push(normalize_expression(expr, &self.table_alias));
1216
1217 if !self.group_by_columns.is_empty() {
1218 let expr_part = expr.split(" AS ")
1219 .next()
1220 .unwrap_or(expr);
1221 self.group_by_columns.push(normalize_expression(expr_part, &self.table_alias));
1222 }
1223 }
1224 self
1225 }
1226
1227 pub fn agg<const N: usize>(self, aggregations: [&str; N]) -> Self {
1230 self.clone().agg_vec(
1231 aggregations.iter()
1232 .filter(|&expr| is_aggregate_expression(expr))
1233 .map(|s| normalize_expression(s, &self.table_alias))
1234 .collect()
1235 )
1236 }
1237
1238 pub fn agg_vec(mut self, aggregations: Vec<String>) -> Self {
1241 self.raw_aggregations.extend(aggregations.iter().cloned());
1243 self.needs_normalization = true;
1244
1245 let valid_aggs = aggregations.into_iter()
1246 .filter(|expr| is_aggregate_expression(expr))
1247 .collect::<Vec<_>>();
1248
1249 self.aggregations.extend(valid_aggs);
1250 self
1251 }
1252
1253 pub async fn append(self, other: CustomDataFrame) -> ElusionResult<Self> {
1255
1256 let ctx = Arc::new(SessionContext::new());
1265
1266 let mut batches_self = self.df.clone().collect().await
1267 .map_err(|e| ElusionError::InvalidOperation {
1268 operation: "Collecting batches from first dataframe".to_string(),
1269 reason: e.to_string(),
1270 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
1271 })?;
1272
1273 let batches_other = other.df.clone().collect().await
1274 .map_err(|e| ElusionError::InvalidOperation {
1275 operation: "Collecting batches from second dataframe".to_string(),
1276 reason: e.to_string(),
1277 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
1278 })?;
1279
1280 batches_self.extend(batches_other);
1281
1282 let mem_table = MemTable::try_new(self.df.schema().clone().into(), vec![batches_self])
1283 .map_err(|e| ElusionError::InvalidOperation {
1284 operation: "Creating memory table".to_string(),
1285 reason: e.to_string(),
1286 suggestion: "💡 Verify data consistency, number of columns or memory availability".to_string(),
1287 })?;
1288
1289 let alias = "append_result";
1290
1291 ctx.register_table(alias, Arc::new(mem_table))
1292 .map_err(|e| ElusionError::InvalidOperation {
1293 operation: "Registering table".to_string(),
1294 reason: e.to_string(),
1295 suggestion: "💡 Check if table name is unique in context".to_string(),
1296 })?;
1297
1298 let df = ctx.table(alias).await
1299 .map_err(|e| ElusionError::Custom(format!("Failed to create union DataFrame: {}", e)))?;
1300
1301 Ok(CustomDataFrame {
1302 df,
1303 table_alias: alias.to_string(),
1304 from_table: alias.to_string(),
1305 selected_columns: self.selected_columns.clone(),
1306 alias_map: self.alias_map.clone(),
1307 aggregations: Vec::new(),
1308 group_by_columns: Vec::new(),
1309 where_conditions: Vec::new(),
1310 having_conditions: Vec::new(),
1311 order_by_columns: Vec::new(),
1312 limit_count: None,
1313 joins: Vec::new(),
1314 window_functions: Vec::new(),
1315 ctes: Vec::new(),
1316 subquery_source: None,
1317 set_operations: Vec::new(),
1318 query: String::new(),
1319 aggregated_df: None,
1320 union_tables: None,
1321 original_expressions: self.original_expressions.clone(),
1322 needs_normalization: false,
1323 raw_selected_columns: Vec::new(),
1324 raw_group_by_columns: Vec::new(),
1325 raw_where_conditions: Vec::new(),
1326 raw_having_conditions: Vec::new(),
1327 raw_join_conditions: Vec::new(),
1328 raw_aggregations: Vec::new(),
1329 uses_group_by_all: false
1330 })
1331 }
1332 pub async fn append_many<const N: usize>(self, others: [CustomDataFrame; N]) -> ElusionResult<Self> {
1334
1335 if N == 0 {
1336 return Err(ElusionError::SetOperationError {
1337 operation: "APPEND MANY".to_string(),
1338 reason: "No dataframes provided for append operation".to_string(),
1339 suggestion: "💡 Provide at least one dataframe to append".to_string(),
1340 });
1341 }
1342
1343 let ctx = Arc::new(SessionContext::new());
1354
1355 let mut all_batches = self.df.clone().collect().await
1356 .map_err(|e| ElusionError::InvalidOperation {
1357 operation: "Collecting base dataframe".to_string(),
1358 reason: e.to_string(),
1359 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
1360 })?;
1361
1362 for (i, other) in others.iter().enumerate() {
1363 let other_batches = other.df.clone().collect().await
1364 .map_err(|e| ElusionError::InvalidOperation {
1365 operation: format!("Collecting dataframe at index {}", i),
1366 reason: e.to_string(),
1367 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
1368 })?;
1369 all_batches.extend(other_batches);
1370 }
1371
1372 let mem_table = MemTable::try_new(self.df.schema().clone().into(), vec![all_batches])
1373 .map_err(|e| ElusionError::InvalidOperation {
1374 operation: "Creating memory table".to_string(),
1375 reason: e.to_string(),
1376 suggestion: "💡 Verify data consistency and memory availability".to_string(),
1377 })?;
1378
1379 let alias = "union_many_result";
1380
1381 ctx.register_table(alias, Arc::new(mem_table))
1382 .map_err(|e| ElusionError::InvalidOperation {
1383 operation: "Registering result table".to_string(),
1384 reason: e.to_string(),
1385 suggestion: "💡 Check if table name is unique in context".to_string(),
1386 })?;
1387
1388 let df = ctx.table(alias).await
1389 .map_err(|e| ElusionError::SetOperationError {
1390 operation: "APPEND MANY".to_string(),
1391 reason: e.to_string(),
1392 suggestion: "💡 Verify final table creation".to_string(),
1393 })?;
1394
1395 Ok(CustomDataFrame {
1396 df,
1397 table_alias: alias.to_string(),
1398 from_table: alias.to_string(),
1399 selected_columns: self.selected_columns.clone(),
1400 alias_map: self.alias_map.clone(),
1401 aggregations: Vec::new(),
1402 group_by_columns: Vec::new(),
1403 where_conditions: Vec::new(),
1404 having_conditions: Vec::new(),
1405 order_by_columns: Vec::new(),
1406 limit_count: None,
1407 joins: Vec::new(),
1408 window_functions: Vec::new(),
1409 ctes: Vec::new(),
1410 subquery_source: None,
1411 set_operations: Vec::new(),
1412 query: String::new(),
1413 aggregated_df: None,
1414 union_tables: None,
1415 original_expressions: self.original_expressions.clone(),
1416 needs_normalization: false,
1417 raw_selected_columns: Vec::new(),
1418 raw_group_by_columns: Vec::new(),
1419 raw_where_conditions: Vec::new(),
1420 raw_having_conditions: Vec::new(),
1421 raw_join_conditions: Vec::new(),
1422 raw_aggregations: Vec::new(),
1423 uses_group_by_all: false
1424 })
1425 }
1426 pub async fn union(self, other: CustomDataFrame) -> ElusionResult<Self> {
1428
1429 let ctx = Arc::new(SessionContext::new());
1438
1439 register_df_as_table(&ctx, &self.table_alias, &self.df).await
1440 .map_err(|e| ElusionError::InvalidOperation {
1441 operation: "Registering first table".to_string(),
1442 reason: e.to_string(),
1443 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1444 })?;
1445
1446 register_df_as_table(&ctx, &other.table_alias, &other.df).await
1447 .map_err(|e| ElusionError::InvalidOperation {
1448 operation: "Registering second table".to_string(),
1449 reason: e.to_string(),
1450 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1451 })?;
1452
1453
1454 let sql = format!(
1455 "SELECT DISTINCT * FROM {} UNION SELECT DISTINCT * FROM {}",
1456 normalize_alias(&self.table_alias),
1457 normalize_alias(&other.table_alias)
1458 );
1459
1460 let df = ctx.sql(&sql).await
1461 .map_err(|e| ElusionError::SetOperationError {
1462 operation: "UNION".to_string(),
1463 reason: e.to_string(),
1464 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
1465 })?;
1466
1467 Ok(CustomDataFrame {
1468 df,
1469 table_alias: "union_result".to_string(),
1470 from_table: "union_result".to_string(),
1471 selected_columns: self.selected_columns.clone(),
1472 alias_map: self.alias_map.clone(),
1473 aggregations: Vec::new(),
1474 group_by_columns: Vec::new(),
1475 where_conditions: Vec::new(),
1476 having_conditions: Vec::new(),
1477 order_by_columns: Vec::new(),
1478 limit_count: None,
1479 joins: Vec::new(),
1480 window_functions: Vec::new(),
1481 ctes: Vec::new(),
1482 subquery_source: None,
1483 set_operations: Vec::new(),
1484 query: String::new(),
1485 aggregated_df: None,
1486 union_tables: None,
1487 original_expressions: self.original_expressions.clone(),
1488 needs_normalization: false,
1489 raw_selected_columns: Vec::new(),
1490 raw_group_by_columns: Vec::new(),
1491 raw_where_conditions: Vec::new(),
1492 raw_having_conditions: Vec::new(),
1493 raw_join_conditions: Vec::new(),
1494 raw_aggregations: Vec::new(),
1495 uses_group_by_all: false
1496 })
1497 }
1498 pub async fn union_many<const N: usize>(self, others: [CustomDataFrame; N]) -> ElusionResult<Self> {
1500
1501 if N == 0 {
1502 return Err(ElusionError::SetOperationError {
1503 operation: "UNION MANY".to_string(),
1504 reason: "No dataframes provided for union operation".to_string(),
1505 suggestion: "💡 Provide at least one dataframe to union with".to_string(),
1506 });
1507 }
1508
1509 let ctx = Arc::new(SessionContext::new());
1520
1521 register_df_as_table(&ctx, &self.table_alias, &self.df).await
1522 .map_err(|e| ElusionError::InvalidOperation {
1523 operation: "Registering base table".to_string(),
1524 reason: e.to_string(),
1525 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1526 })?;
1527
1528 for (i, other) in others.iter().enumerate() {
1529 let alias = format!("union_source_{}", i);
1530 register_df_as_table(&ctx, &alias, &other.df).await
1531 .map_err(|e| ElusionError::InvalidOperation {
1532 operation: format!("Registering table {}", i),
1533 reason: e.to_string(),
1534 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1535 })?;
1536 }
1537
1538 let mut sql = format!("SELECT DISTINCT * FROM {}", normalize_alias(&self.table_alias));
1539 for i in 0..N {
1540 sql.push_str(&format!(" UNION SELECT DISTINCT * FROM {}",
1541 normalize_alias(&format!("union_source_{}", i))));
1542 }
1543
1544 let df = ctx.sql(&sql).await
1545 .map_err(|e| ElusionError::SetOperationError {
1546 operation: "UNION MANY".to_string(),
1547 reason: e.to_string(),
1548 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
1549 })?;
1550
1551 Ok(CustomDataFrame {
1552 df,
1553 table_alias: "union_many_result".to_string(),
1554 from_table: "union_many_result".to_string(),
1555 selected_columns: self.selected_columns.clone(),
1556 alias_map: self.alias_map.clone(),
1557 aggregations: Vec::new(),
1558 group_by_columns: Vec::new(),
1559 where_conditions: Vec::new(),
1560 having_conditions: Vec::new(),
1561 order_by_columns: Vec::new(),
1562 limit_count: None,
1563 joins: Vec::new(),
1564 window_functions: Vec::new(),
1565 ctes: Vec::new(),
1566 subquery_source: None,
1567 set_operations: Vec::new(),
1568 query: String::new(),
1569 aggregated_df: None,
1570 union_tables: None,
1571 original_expressions: self.original_expressions.clone(),
1572 needs_normalization: false,
1573 raw_selected_columns: Vec::new(),
1574 raw_group_by_columns: Vec::new(),
1575 raw_where_conditions: Vec::new(),
1576 raw_having_conditions: Vec::new(),
1577 raw_join_conditions: Vec::new(),
1578 raw_aggregations: Vec::new(),
1579 uses_group_by_all: false
1580 })
1581 }
1582
1583 pub async fn union_all(self, other: CustomDataFrame) -> ElusionResult<Self> {
1585
1586 let ctx = Arc::new(SessionContext::new());
1595
1596 register_df_as_table(&ctx, &self.table_alias, &self.df).await
1597 .map_err(|e| ElusionError::InvalidOperation {
1598 operation: "Registering first table".to_string(),
1599 reason: e.to_string(),
1600 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1601 })?;
1602
1603 register_df_as_table(&ctx, &other.table_alias, &other.df).await
1604 .map_err(|e| ElusionError::InvalidOperation {
1605 operation: "Registering second table".to_string(),
1606 reason: e.to_string(),
1607 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1608 })?;
1609
1610 let sql = format!(
1611 "SELECT * FROM {} UNION ALL SELECT * FROM {}",
1612 normalize_alias(&self.table_alias),
1613 normalize_alias(&other.table_alias)
1614 );
1615
1616 let df = ctx.sql(&sql).await
1617 .map_err(|e| ElusionError::SetOperationError {
1618 operation: "UNION ALL".to_string(),
1619 reason: e.to_string(),
1620 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
1621 })?;
1622
1623 Ok(CustomDataFrame {
1624 df,
1625 table_alias: "union_all_result".to_string(),
1626 from_table: "union_all_result".to_string(),
1627 selected_columns: self.selected_columns.clone(),
1628 alias_map: self.alias_map.clone(),
1629 aggregations: Vec::new(),
1630 group_by_columns: Vec::new(),
1631 where_conditions: Vec::new(),
1632 having_conditions: Vec::new(),
1633 order_by_columns: Vec::new(),
1634 limit_count: None,
1635 joins: Vec::new(),
1636 window_functions: Vec::new(),
1637 ctes: Vec::new(),
1638 subquery_source: None,
1639 set_operations: Vec::new(),
1640 query: String::new(),
1641 aggregated_df: None,
1642 union_tables: None,
1643 original_expressions: self.original_expressions.clone(),
1644 needs_normalization: false,
1645 raw_selected_columns: Vec::new(),
1646 raw_group_by_columns: Vec::new(),
1647 raw_where_conditions: Vec::new(),
1648 raw_having_conditions: Vec::new(),
1649 raw_join_conditions: Vec::new(),
1650 raw_aggregations: Vec::new(),
1651 uses_group_by_all: false
1652 })
1653 }
1654 pub async fn union_all_many<const N: usize>(self, others: [CustomDataFrame; N]) -> ElusionResult<Self> {
1656
1657 if N == 0 {
1658 return Err(ElusionError::SetOperationError {
1659 operation: "UNION ALL MANY".to_string(),
1660 reason: "No dataframes provided for union operation".to_string(),
1661 suggestion: "💡 Provide at least one dataframe to union with".to_string(),
1662 });
1663 }
1664
1665 let ctx = Arc::new(SessionContext::new());
1676
1677 register_df_as_table(&ctx, &self.table_alias, &self.df).await
1678 .map_err(|e| ElusionError::InvalidOperation {
1679 operation: "Registering base table".to_string(),
1680 reason: e.to_string(),
1681 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1682 })?;
1683
1684 for (i, other) in others.iter().enumerate() {
1685 let alias = format!("union_all_source_{}", i);
1686 register_df_as_table(&ctx, &alias, &other.df).await
1687 .map_err(|e| ElusionError::InvalidOperation {
1688 operation: format!("Registering table {}", i),
1689 reason: e.to_string(),
1690 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1691 })?;
1692 }
1693
1694 let mut sql = format!("SELECT * FROM {}", normalize_alias(&self.table_alias));
1695 for i in 0..N {
1696 sql.push_str(&format!(" UNION ALL SELECT * FROM {}",
1697 normalize_alias(&format!("union_all_source_{}", i))));
1698 }
1699
1700 let df = ctx.sql(&sql).await
1701 .map_err(|e| ElusionError::SetOperationError {
1702 operation: "UNION ALL MANY".to_string(),
1703 reason: e.to_string(),
1704 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
1705 })?;
1706
1707 Ok(CustomDataFrame {
1708 df,
1709 table_alias: "union_all_many_result".to_string(),
1710 from_table: "union_all_many_result".to_string(),
1711 selected_columns: self.selected_columns.clone(),
1712 alias_map: self.alias_map.clone(),
1713 aggregations: Vec::new(),
1714 group_by_columns: Vec::new(),
1715 where_conditions: Vec::new(),
1716 having_conditions: Vec::new(),
1717 order_by_columns: Vec::new(),
1718 limit_count: None,
1719 joins: Vec::new(),
1720 window_functions: Vec::new(),
1721 ctes: Vec::new(),
1722 subquery_source: None,
1723 set_operations: Vec::new(),
1724 query: String::new(),
1725 aggregated_df: None,
1726 union_tables: None,
1727 original_expressions: self.original_expressions.clone(),
1728 needs_normalization: false,
1729 raw_selected_columns: Vec::new(),
1730 raw_group_by_columns: Vec::new(),
1731 raw_where_conditions: Vec::new(),
1732 raw_having_conditions: Vec::new(),
1733 raw_join_conditions: Vec::new(),
1734 raw_aggregations: Vec::new(),
1735 uses_group_by_all: false
1736 })
1737 }
1738 pub async fn except(self, other: CustomDataFrame) -> ElusionResult<Self> {
1740
1741 let ctx = Arc::new(SessionContext::new());
1750
1751 register_df_as_table(&ctx, &self.table_alias, &self.df).await
1752 .map_err(|e| ElusionError::InvalidOperation {
1753 operation: "Registering first table".to_string(),
1754 reason: e.to_string(),
1755 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1756 })?;
1757
1758 register_df_as_table(&ctx, &other.table_alias, &other.df).await
1759 .map_err(|e| ElusionError::InvalidOperation {
1760 operation: "Registering second table".to_string(),
1761 reason: e.to_string(),
1762 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1763 })?;
1764
1765 let sql = format!(
1766 "SELECT * FROM {} EXCEPT SELECT * FROM {}",
1767 normalize_alias(&self.table_alias),
1768 normalize_alias(&other.table_alias)
1769 );
1770
1771 let df = ctx.sql(&sql).await
1772 .map_err(|e| ElusionError::SetOperationError {
1773 operation: "EXCEPT".to_string(),
1774 reason: e.to_string(),
1775 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
1776 })?;
1777
1778 Ok(CustomDataFrame {
1779 df,
1780 table_alias: "except_result".to_string(),
1781 from_table: "except_result".to_string(),
1782 selected_columns: self.selected_columns.clone(),
1783 alias_map: self.alias_map.clone(),
1784 aggregations: Vec::new(),
1785 group_by_columns: Vec::new(),
1786 where_conditions: Vec::new(),
1787 having_conditions: Vec::new(),
1788 order_by_columns: Vec::new(),
1789 limit_count: None,
1790 joins: Vec::new(),
1791 window_functions: Vec::new(),
1792 ctes: Vec::new(),
1793 subquery_source: None,
1794 set_operations: Vec::new(),
1795 query: String::new(),
1796 aggregated_df: None,
1797 union_tables: None,
1798 original_expressions: self.original_expressions.clone(),
1799 needs_normalization: false,
1800 raw_selected_columns: Vec::new(),
1801 raw_group_by_columns: Vec::new(),
1802 raw_where_conditions: Vec::new(),
1803 raw_having_conditions: Vec::new(),
1804 raw_join_conditions: Vec::new(),
1805 raw_aggregations: Vec::new(),
1806 uses_group_by_all: false
1807 })
1808 }
1809
1810 pub async fn intersect(self, other: CustomDataFrame) -> ElusionResult<Self> {
1812
1813 let ctx = Arc::new(SessionContext::new());
1822
1823 register_df_as_table(&ctx, &self.table_alias, &self.df).await
1824 .map_err(|e| ElusionError::InvalidOperation {
1825 operation: "Registering first table".to_string(),
1826 reason: e.to_string(),
1827 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1828 })?;
1829
1830 register_df_as_table(&ctx, &other.table_alias, &other.df).await
1831 .map_err(|e| ElusionError::InvalidOperation {
1832 operation: "Registering second table".to_string(),
1833 reason: e.to_string(),
1834 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
1835 })?;
1836
1837 let sql = format!(
1838 "SELECT * FROM {} INTERSECT SELECT * FROM {}",
1839 normalize_alias(&self.table_alias),
1840 normalize_alias(&other.table_alias)
1841 );
1842
1843 let df = ctx.sql(&sql).await
1844 .map_err(|e| ElusionError::SetOperationError {
1845 operation: "INTERSECT".to_string(),
1846 reason: e.to_string(),
1847 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
1848 })?;
1849
1850 Ok(CustomDataFrame {
1851 df,
1852 table_alias: "intersect_result".to_string(),
1853 from_table: "intersect_result".to_string(),
1854 selected_columns: self.selected_columns.clone(),
1855 alias_map: self.alias_map.clone(),
1856 aggregations: Vec::new(),
1857 group_by_columns: Vec::new(),
1858 where_conditions: Vec::new(),
1859 having_conditions: Vec::new(),
1860 order_by_columns: Vec::new(),
1861 limit_count: None,
1862 joins: Vec::new(),
1863 window_functions: Vec::new(),
1864 ctes: Vec::new(),
1865 subquery_source: None,
1866 set_operations: Vec::new(),
1867 query: String::new(),
1868 aggregated_df: None,
1869 union_tables: None,
1870 original_expressions: self.original_expressions.clone(),
1871 needs_normalization: false,
1872 raw_selected_columns: Vec::new(),
1873 raw_group_by_columns: Vec::new(),
1874 raw_where_conditions: Vec::new(),
1875 raw_having_conditions: Vec::new(),
1876 raw_join_conditions: Vec::new(),
1877 raw_aggregations: Vec::new(),
1878 uses_group_by_all: false
1879 })
1880 }
1881
1882 pub async fn pivot<const N: usize>(
1884 mut self,
1885 row_keys: [&str; N],
1886 pivot_column: &str,
1887 value_column: &str,
1888 aggregate_func: &str,
1889 ) -> ElusionResult<Self> {
1890 let ctx = Arc::new(SessionContext::new());
1891
1892 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
1894
1895 let schema = self.df.schema();
1896 let exact_pivot_column = schema.fields().iter()
1901 .find(|f| f.name().to_uppercase() == pivot_column.to_uppercase())
1902 .ok_or_else(|| {
1903 let available = schema.fields().iter()
1904 .map(|f| f.name())
1905 .collect::<Vec<_>>();
1906 ElusionError::Custom(format!(
1907 "Column {} not found in current data. Available columns: {:?}",
1908 pivot_column, available
1909 ))
1910 })?
1911 .name();
1912
1913 let exact_value_column = schema.fields().iter()
1914 .find(|f| f.name().to_uppercase() == value_column.to_uppercase())
1915 .ok_or_else(|| {
1916 let available = schema.fields().iter()
1917 .map(|f| f.name())
1918 .collect::<Vec<_>>();
1919 ElusionError::Custom(format!(
1920 "Column {} not found in current data. Available columns: {:?}",
1921 value_column, available
1922 ))
1923 })?
1924 .name();
1925
1926 let distinct_query = format!(
1943 "SELECT DISTINCT \"{}\" \
1944 FROM \"{}\" AS {} \
1945 WHERE \"{}\" IS NOT NULL \
1946 AND \"{}\" IS NOT NULL \
1947 ORDER BY \"{}\"",
1948 exact_pivot_column,
1949 self.from_table,
1950 self.table_alias,
1951 exact_pivot_column,
1952 exact_value_column,
1953 exact_pivot_column
1954 );
1955
1956 let distinct_df = ctx.sql(&distinct_query).await
1957 .map_err(|e| ElusionError::Custom(format!("Failed to execute distinct query: {}", e)))?;
1958
1959 let distinct_batches = distinct_df.collect().await
1960 .map_err(|e| ElusionError::Custom(format!("Failed to collect distinct values: {}", e)))?;
1961
1962 let distinct_values: Vec<String> = distinct_batches
1964 .iter()
1965 .flat_map(|batch| {
1966 let array = batch.column(0);
1967 match array.data_type() {
1968 ArrowDataType::Utf8 => {
1969 let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
1970 (0..batch.num_rows())
1971 .map(|i| string_array.value(i).to_string())
1972 .collect::<Vec<_>>()
1973 },
1974 _ => {
1975 let string_array = compute::cast(array, &ArrowDataType::Utf8)
1977 .unwrap();
1978 let string_array = string_array.as_any().downcast_ref::<StringArray>().unwrap();
1979 (0..batch.num_rows())
1980 .map(|i| string_array.value(i).to_string())
1981 .collect::<Vec<_>>()
1982 }
1983 }
1984 })
1985 .collect();
1986
1987 let pivot_cols: Vec<String> = distinct_values
1989 .iter()
1990 .map(|val| {
1991 let value_expr = if schema.field_with_name(None, &exact_pivot_column)
1993 .map(|f| matches!(f.data_type(), ArrowDataType::Int32 | ArrowDataType::Int64 | ArrowDataType::Float32 | ArrowDataType::Float64))
1994 .unwrap_or(false) {
1995 format!(
1997 "COALESCE({}(CASE WHEN \"{}\" = '{}' THEN \"{}\" END), 0)",
1998 aggregate_func,
1999 exact_pivot_column,
2000 val,
2001 exact_value_column
2002 )
2003 } else {
2004 format!(
2006 "COALESCE({}(CASE WHEN \"{}\" = '{}' THEN \"{}\" END), 0)",
2007 aggregate_func,
2008 exact_pivot_column,
2009 val.replace("'", "''"), exact_value_column
2011 )
2012 };
2013
2014 format!(
2016 "{} AS \"{}_{}\"",
2017 value_expr,
2018 exact_pivot_column,
2019 val.replace("\"", "\"\"") )
2021 })
2022 .collect();
2023
2024 let row_keys_str = row_keys.iter()
2025 .map(|&key| {
2026 let exact_key = schema.fields().iter()
2027 .find(|f| f.name().to_uppercase() == key.to_uppercase())
2028 .map_or(key.to_string(), |f| f.name().to_string());
2029 format!("\"{}\"", exact_key)
2030 })
2031 .collect::<Vec<_>>()
2032 .join(", ");
2033
2034 let pivot_subquery = format!(
2036 "(SELECT {}, {} FROM \"{}\" AS {} GROUP BY {})",
2037 row_keys_str,
2038 pivot_cols.join(", "),
2039 self.from_table,
2040 self.table_alias,
2041 row_keys_str
2042 );
2043
2044 self.from_table = pivot_subquery;
2046 self.selected_columns.clear();
2047 self.group_by_columns.clear();
2048
2049 self.selected_columns.extend(row_keys.iter().map(|&s| s.to_string()));
2051
2052 for val in distinct_values {
2054 self.selected_columns.push(
2055 format!("{}_{}",
2056 normalize_column_name(pivot_column),
2057 normalize_column_name(&val)
2058 )
2059 );
2060 }
2061
2062
2063
2064 Ok(self)
2065 }
2066
2067 pub async fn unpivot<const N: usize, const M: usize>(
2069 mut self,
2070 id_columns: [&str; N],
2071 value_columns: [&str; M],
2072 name_column: &str,
2073 value_column: &str,
2074 ) -> ElusionResult<Self> {
2075 let ctx = Arc::new(SessionContext::new());
2076
2077 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
2079
2080 let schema = self.df.schema();
2081 let exact_id_columns: Vec<String> = id_columns.iter()
2086 .map(|&id| {
2087 schema.fields().iter()
2088 .find(|f| f.name().to_uppercase() == id.to_uppercase())
2089 .map(|f| f.name().to_string())
2090 .ok_or_else(|| {
2091 let available = schema.fields().iter()
2092 .map(|f| f.name())
2093 .collect::<Vec<_>>();
2094 ElusionError::Custom(format!(
2095 "ID column '{}' not found in current data. Available columns: {:?}",
2096 id, available
2097 ))
2098 })
2099 })
2100 .collect::<Result<Vec<_>, _>>()?;
2101
2102 let exact_value_columns: Vec<String> = value_columns.iter()
2104 .map(|&val| {
2105 schema.fields().iter()
2106 .find(|f| f.name().to_uppercase() == val.to_uppercase())
2107 .map(|f| f.name().to_string())
2108 .ok_or_else(|| {
2109 let available = schema.fields().iter()
2110 .map(|f| f.name())
2111 .collect::<Vec<_>>();
2112 ElusionError::Custom(format!(
2113 "Value column '{}' not found in current data. Available columns: {:?}",
2114 val, available
2115 ))
2116 })
2117 })
2118 .collect::<Result<Vec<_>, _>>()?;
2119
2120 let selects: Vec<String> = exact_value_columns.iter().map(|val_col| {
2122 let id_cols_str = exact_id_columns.iter()
2123 .map(|id| format!("\"{}\"", id))
2124 .collect::<Vec<_>>()
2125 .join(", ");
2126
2127 format!(
2135 "SELECT {}, '{}' AS \"{}\", \"{}\" AS \"{}\" FROM \"{}\" AS {}",
2136 id_cols_str,
2137 val_col,
2138 name_column.to_lowercase(),
2140 val_col,
2141 value_column.to_lowercase(),
2142 self.from_table,
2143 self.table_alias
2144 )
2145 }).collect();
2146
2147 let unpivot_subquery = format!(
2149 "({})",
2150 selects.join(" UNION ALL ")
2151 );
2152
2153 self.from_table = unpivot_subquery;
2155 self.selected_columns.clear();
2156
2157 self.selected_columns.extend(
2159 exact_id_columns.iter()
2160 .map(|id| format!("\"{}\"", id))
2161 );
2162 self.selected_columns.push(format!("\"{}\"", name_column));
2163 self.selected_columns.push(format!("\"{}\"", value_column));
2164
2165 Ok(self)
2166 }
2167
2168 pub fn fill_down<const N: usize>(
2170 mut self,
2171 columns: [&str; N]
2172 ) -> Self {
2173 let normalized_columns: Vec<String> = columns
2174 .iter()
2175 .map(|col| {
2176 col.trim().replace(" ", "_").to_lowercase()
2177 })
2178 .collect();
2179
2180 let operation = format!("FILL_DOWN:{}", normalized_columns.join(","));
2183 self.set_operations.push(operation);
2184 self
2185 }
2186
2187 pub fn fill_down_with_set_ops<const N: usize>(
2189 mut self,
2190 columns: [&str; N]
2191 ) -> Self {
2192 let operation = format!("FILL_DOWN:{}", columns.join(","));
2193 self.set_operations.push(operation);
2194 self
2195 }
2196
2197 pub async fn fill_down_now<const N: usize>(
2199 self,
2200 columns: [&str; N],
2201 alias: &str
2202 ) -> ElusionResult<CustomDataFrame> {
2203 self.fill_down_vec_now(columns.to_vec(), alias).await
2204 }
2205
2206 pub async fn fill_down_vec_now(
2207 self,
2208 columns: Vec<&str>,
2209 alias: &str
2210 ) -> ElusionResult<CustomDataFrame> {
2211 let ctx = SessionContext::new();
2212
2213 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
2215
2216 let all_data_sql = format!("SELECT * FROM {}", normalize_alias(&self.table_alias));
2218 let temp_df = ctx.sql(&all_data_sql).await?;
2219 let batches = temp_df.clone().collect().await?;
2220
2221 if batches.is_empty() {
2222 return Ok(CustomDataFrame {
2223 df: temp_df,
2224 table_alias: alias.to_string(),
2225 from_table: alias.to_string(),
2226 selected_columns: Vec::new(),
2227 alias_map: self.alias_map,
2228 aggregations: Vec::new(),
2229 group_by_columns: Vec::new(),
2230 where_conditions: Vec::new(),
2231 having_conditions: Vec::new(),
2232 order_by_columns: Vec::new(),
2233 limit_count: None,
2234 joins: Vec::new(),
2235 window_functions: Vec::new(),
2236 ctes: Vec::new(),
2237 subquery_source: None,
2238 set_operations: Vec::new(),
2239 query: all_data_sql,
2240 aggregated_df: None,
2241 union_tables: None,
2242 original_expressions: Vec::new(),
2243 needs_normalization: false,
2244 raw_selected_columns: Vec::new(),
2245 raw_group_by_columns: Vec::new(),
2246 raw_where_conditions: Vec::new(),
2247 raw_having_conditions: Vec::new(),
2248 raw_join_conditions: Vec::new(),
2249 raw_aggregations: Vec::new(),
2250 uses_group_by_all: false
2251 });
2252 }
2253
2254 let schema = batches[0].schema();
2256 let mut processed_batches = Vec::new();
2257
2258 let fill_column_indices: Vec<usize> = columns
2260 .iter()
2261 .filter_map(|col_name| {
2262 schema.fields().iter().position(|field| field.name() == col_name)
2263 })
2264 .collect();
2265
2266 for batch in batches {
2269 let mut new_columns = Vec::new();
2270 let mut fill_values: Vec<Option<String>> = vec![None; fill_column_indices.len()];
2271
2272 for (col_idx, _field) in schema.fields().iter().enumerate() {
2274 let array = batch.column(col_idx);
2275
2276 if let Some(fill_idx) = fill_column_indices.iter().position(|&idx| idx == col_idx) {
2277 let string_array = array.as_any().downcast_ref::<arrow::array::StringArray>()
2279 .ok_or_else(|| ElusionError::Custom("Expected string array".to_string()))?;
2280
2281 let mut new_values = Vec::new();
2282 for i in 0..string_array.len() {
2283 if string_array.is_null(i) {
2285 new_values.push(fill_values[fill_idx].clone());
2286 } else {
2287 let value = string_array.value(i);
2288 if !value.trim().is_empty() {
2289 fill_values[fill_idx] = Some(value.to_string());
2290 new_values.push(Some(value.to_string()));
2291 } else {
2292 new_values.push(fill_values[fill_idx].clone());
2293 }
2294 }
2295 }
2296
2297 let new_array = arrow::array::StringArray::from(new_values);
2298 new_columns.push(Arc::new(new_array) as ArrayRef);
2299 } else {
2300 new_columns.push(array.clone());
2302 }
2303 }
2304
2305 let new_batch = RecordBatch::try_new(schema.clone(), new_columns)
2307 .map_err(|e| ElusionError::Custom(format!("Failed to create record batch: {}", e)))?;
2308 processed_batches.push(new_batch);
2309 }
2310
2311 let result_mem_table = MemTable::try_new(schema.clone().into(), vec![processed_batches])
2313 .map_err(|e| ElusionError::Custom(format!("Failed to create mem table: {}", e)))?;
2314 ctx.register_table(alias, Arc::new(result_mem_table))
2315 .map_err(|e| ElusionError::Custom(format!("Failed to register table: {}", e)))?;
2316 let result_df = ctx.table(alias).await
2317 .map_err(|e| ElusionError::Custom(format!("Failed to get table: {}", e)))?;
2318
2319 Ok(CustomDataFrame {
2320 df: result_df,
2321 table_alias: alias.to_string(),
2322 from_table: alias.to_string(),
2323 selected_columns: Vec::new(),
2324 alias_map: self.alias_map,
2325 aggregations: Vec::new(),
2326 group_by_columns: Vec::new(),
2327 where_conditions: Vec::new(),
2328 having_conditions: Vec::new(),
2329 order_by_columns: Vec::new(),
2330 limit_count: None,
2331 joins: Vec::new(),
2332 window_functions: Vec::new(),
2333 ctes: Vec::new(),
2334 subquery_source: None,
2335 set_operations: Vec::new(),
2336 query: format!("-- Manual fill down processing for columns: {:?}", columns),
2337 aggregated_df: None,
2338 union_tables: None,
2339 original_expressions: Vec::new(),
2340 needs_normalization: false,
2341 raw_selected_columns: Vec::new(),
2342 raw_group_by_columns: Vec::new(),
2343 raw_where_conditions: Vec::new(),
2344 raw_having_conditions: Vec::new(),
2345 raw_join_conditions: Vec::new(),
2346 raw_aggregations: Vec::new(),
2347 uses_group_by_all: false
2348 })
2349 }
2350
2351 fn handle_set_operation(&self, operation: &str, base_sql: String) -> String {
2353 if let Some(columns_and_value) = operation.strip_prefix("FILL_NULL:") {
2354 self.handle_fill_null_operation(columns_and_value, base_sql)
2355 } else if let Some(columns_str) = operation.strip_prefix("DROP_NULL:") {
2356 self.handle_drop_null_operation(columns_str, base_sql)
2357 } else if let Some(columns_str) = operation.strip_prefix("FILL_DOWN:") {
2358 self.handle_fill_down_operation(columns_str, base_sql)
2359 } else if let Some(skip_count) = operation.strip_prefix("SKIP_ROWS:") {
2360 self.handle_skip_rows_operation(skip_count, base_sql)
2361 } else if operation.starts_with("UNION") {
2362 base_sql
2363 } else {
2364 base_sql
2365 }
2366 }
2367
2368 fn handle_fill_down_operation(&self, columns_str: &str, base_sql: String) -> String {
2370 let columns: Vec<&str> = columns_str.split(',').collect();
2371 let selected_cols = if self.selected_columns.is_empty() {
2374 self.df.schema()
2375 .fields()
2376 .iter()
2377 .map(|f| f.name().clone())
2378 .collect::<Vec<_>>()
2379 } else {
2380 self.selected_columns
2381 .iter()
2382 .map(|col| {
2383 let col_lower = col.to_lowercase();
2387
2388 let mut as_positions = Vec::new();
2390 let mut search_from = 0;
2391 while let Some(pos) = col_lower[search_from..].find(" as ") {
2392 as_positions.push(search_from + pos);
2393 search_from = search_from + pos + 4;
2394 }
2395
2396 for &as_pos in as_positions.iter().rev() {
2399 let before_as = &col[..as_pos];
2400
2401 let mut paren_depth = 0;
2403 for ch in before_as.chars() {
2404 match ch {
2405 '(' => paren_depth += 1,
2406 ')' => paren_depth -= 1,
2407 _ => {}
2408 }
2409 }
2410
2411 if paren_depth == 0 {
2413 let alias = &col[as_pos + 4..]; return alias.trim().trim_matches('"').to_string();
2415 }
2416 }
2417
2418 col.trim_matches('"')
2420 .split('.')
2421 .last()
2422 .unwrap_or(col)
2423 .trim_matches('"')
2424 .to_string()
2425 })
2426 .collect()
2427 };
2428
2429 let fill_expressions: Vec<String> = selected_cols
2434 .iter()
2435 .map(|col_name| {
2436 if columns.contains(&col_name.as_str()) {
2437 format!(
2441 r#"LAST_VALUE(
2442 CASE
2443 WHEN "{0}" IS NULL OR TRIM("{0}") = '' OR TRIM("{0}") = 'null' THEN NULL
2444 ELSE "{0}"
2445 END
2446 ) IGNORE NULLS OVER (
2447 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
2448 ) AS "{0}""#,
2449 col_name
2450 )
2451 } else {
2452 format!(r#""{0}""#, col_name)
2454 }
2455 })
2456 .collect();
2457
2458 let result_sql = format!(
2459 r#"WITH fill_down_base AS (
2460 {}
2461 )
2462 SELECT {}
2463 FROM fill_down_base"#,
2464 base_sql,
2465 fill_expressions.join(", ")
2466 );
2467
2468 result_sql
2470 }
2471
2472 pub fn skip_rows(mut self, n: u64) -> Self {
2474 if n == 0 {
2475 return self;
2477 }
2478
2479 let operation = format!("SKIP_ROWS:{}", n);
2480 self.set_operations.push(operation);
2481 self
2482 }
2483
2484 fn handle_skip_rows_operation(&self, skip_count_str: &str, base_sql: String) -> String {
2486 let skip_count = match skip_count_str.parse::<u64>() {
2487 Ok(n) => n,
2488 Err(_) => return base_sql, };
2490
2491 if skip_count == 0 {
2492 return base_sql; }
2494
2495 format!(
2496 r#"WITH skip_rows_base AS (
2497 {}
2498 ),
2499 skip_rows_numbered AS (
2500 SELECT *,
2501 ROW_NUMBER() OVER () as rn
2502 FROM skip_rows_base
2503 )
2504 SELECT * EXCEPT (rn)
2505 FROM skip_rows_numbered
2506 WHERE rn > {}"#,
2507 base_sql,
2508 skip_count
2509 )
2510 }
2511
2512 pub fn fill_null<const N: usize>(mut self, columns: [&str; N], fill_value: &str) -> Self {
2514 if N == 0 {
2515 return self;
2517 }
2518
2519 let columns_str = columns.join(",");
2521 let operation = format!("FILL_NULL:{}:{}", columns_str, fill_value);
2522 self.set_operations.push(operation);
2523 self
2524 }
2525
2526 pub fn drop_null<const N: usize>(mut self, columns: [&str; N]) -> Self {
2528 if N == 0 {
2529 return self;
2531 }
2532
2533 let columns_str = columns.join(",");
2534 let operation = format!("DROP_NULL:{}", columns_str);
2535 self.set_operations.push(operation);
2536 self
2537 }
2538
2539 fn handle_fill_null_operation(&self, columns_and_value: &str, base_sql: String) -> String {
2540 let parts: Vec<&str> = columns_and_value.split(':').collect();
2541 if parts.len() != 2 {
2542 return base_sql;
2543 }
2544
2545 let columns_str = parts[0];
2546 let fill_value = parts[1];
2547 let fill_columns: Vec<&str> = columns_str.split(',').collect();
2548
2549 let select_expressions: Vec<String> = if self.selected_columns.is_empty() {
2551 self.df.schema()
2552 .fields()
2553 .iter()
2554 .map(|f| {
2555 let col_name = f.name();
2556 if fill_columns.iter().any(|&c| c.eq_ignore_ascii_case(col_name)) {
2557 format!(
2558 r#"CASE
2559 WHEN "{0}" IS NULL OR
2560 TRIM("{0}") = '' OR
2561 UPPER(TRIM("{0}")) = 'NULL' OR
2562 UPPER(TRIM("{0}")) = 'NA' OR
2563 UPPER(TRIM("{0}")) = 'N/A' OR
2564 UPPER(TRIM("{0}")) = 'NONE' OR
2565 TRIM("{0}") = '-' OR
2566 TRIM("{0}") = '?' OR
2567 TRIM("{0}") = 'NaN' OR
2568 UPPER(TRIM("{0}")) = 'NAN'
2569 THEN '{1}'
2570 ELSE "{0}"
2571 END AS "{0}""#,
2572 col_name, fill_value
2573 )
2574 } else {
2575 format!("\"{}\"", col_name)
2576 }
2577 })
2578 .collect()
2579 } else {
2580 let column_names_and_aliases = self.selected_columns
2581 .iter()
2582 .map(|expr| {
2583 if let Some(as_pos) = Self::find_alias_position(expr) {
2585 let alias = expr[as_pos + 4..].trim().trim_matches('"').to_string();
2586 (alias.clone(), Some(alias))
2587 } else {
2588 let col_name = expr.trim_matches('"')
2590 .split('.')
2591 .last()
2592 .unwrap_or(expr)
2593 .to_string();
2594 (col_name, None)
2595 }
2596 })
2597 .collect::<Vec<_>>();
2598
2599 column_names_and_aliases
2600 .iter()
2601 .map(|(col_name, alias_opt)| {
2602 let target_name = alias_opt.as_ref().unwrap_or(col_name);
2603
2604 if fill_columns.iter().any(|&c| c.eq_ignore_ascii_case(target_name)) {
2606 format!(
2607 r#"CASE
2608 WHEN "{0}" IS NULL OR
2609 TRIM("{0}") = '' OR
2610 UPPER(TRIM("{0}")) = 'NULL' OR
2611 UPPER(TRIM("{0}")) = 'NA' OR
2612 UPPER(TRIM("{0}")) = 'N/A' OR
2613 UPPER(TRIM("{0}")) = 'NONE' OR
2614 TRIM("{0}") = '-' OR
2615 TRIM("{0}") = '?' OR
2616 TRIM("{0}") = 'NaN' OR
2617 UPPER(TRIM("{0}")) = 'NAN'
2618 THEN '{1}'
2619 ELSE "{0}"
2620 END AS "{0}""#,
2621 target_name, fill_value
2622 )
2623 } else {
2624 format!("\"{}\"", target_name)
2626 }
2627 })
2628 .collect()
2629 };
2630
2631 format!(
2632 r#"WITH fill_null_base AS (
2633 {}
2634 )
2635 SELECT {}
2636 FROM fill_null_base"#,
2637 base_sql,
2638 select_expressions.join(", ")
2639 )
2640 }
2641
2642 fn find_alias_position(expr: &str) -> Option<usize> {
2644 let expr_lower = expr.to_lowercase();
2645 let mut as_positions = Vec::new();
2646 let mut search_from = 0;
2647
2648 while let Some(pos) = expr_lower[search_from..].find(" as ") {
2649 as_positions.push(search_from + pos);
2650 search_from = search_from + pos + 4;
2651 }
2652
2653 for &as_pos in as_positions.iter().rev() {
2654 let before_as = &expr[..as_pos];
2655 let mut paren_depth = 0;
2656
2657 for ch in before_as.chars() {
2658 match ch {
2659 '(' => paren_depth += 1,
2660 ')' => paren_depth -= 1,
2661 _ => {}
2662 }
2663 }
2664
2665 if paren_depth == 0 {
2666 return Some(as_pos);
2667 }
2668 }
2669
2670 None
2671 }
2672
2673 fn handle_drop_null_operation(&self, columns_str: &str, base_sql: String) -> String {
2675 let columns: Vec<&str> = columns_str.split(',').collect();
2676
2677 let where_conditions: Vec<String> = columns
2678 .iter()
2679 .map(|&col| {
2680 let normalized_col = col.trim().replace(" ", "_").to_lowercase();
2681 let quoted_col = format!("\"{}\"", normalized_col);
2682 format!(
2683 r#"({0} IS NOT NULL AND
2684 TRIM({0}) != '' AND
2685 UPPER(TRIM({0})) != 'NULL' AND
2686 UPPER(TRIM({0})) != 'NA' AND
2687 UPPER(TRIM({0})) != 'N/A' AND
2688 UPPER(TRIM({0})) != 'NONE' AND
2689 TRIM({0}) != '-' AND
2690 TRIM({0}) != '?' AND
2691 TRIM({0}) != 'NaN' AND
2692 UPPER(TRIM({0})) != 'NAN')"#,
2693 quoted_col
2694 )
2695 })
2696 .collect();
2697
2698 format!(
2699 r#"WITH drop_null_base AS (
2700 {}
2701 )
2702 SELECT *
2703 FROM drop_null_base
2704 WHERE {}"#,
2705 base_sql,
2706 where_conditions.join(" AND ")
2707 )
2708 }
2709
2710 pub async fn head(&self, n: u64) -> ElusionResult<Self> {
2712 let limit = n;
2713
2714 if limit == 0 {
2715 return Err(ElusionError::LimitError {
2716 message: "Head limit cannot be zero".to_string(),
2717 value: 0,
2718 suggestion: "💡 Use a positive number for head() limit".to_string(),
2719 });
2720 }
2721
2722 let ctx = SessionContext::new();
2723
2724 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
2725
2726 let sql = format!(
2727 "SELECT * FROM {} LIMIT {}",
2728 normalize_alias(&self.table_alias),
2729 limit
2730 );
2731
2732 let head_df = ctx.sql(&sql).await
2733 .map_err(|e| ElusionError::InvalidOperation {
2734 operation: "Head Operation".to_string(),
2735 reason: format!("Failed to execute head query: {}", e),
2736 suggestion: "💡 Check if DataFrame contains valid data".to_string(),
2737 })?;
2738
2739 let result_alias = format!("{}_head", self.table_alias);
2740
2741 let batches = head_df.clone().collect().await
2742 .map_err(|e| ElusionError::InvalidOperation {
2743 operation: "Head Data Collection".to_string(),
2744 reason: format!("Failed to collect head results: {}", e),
2745 suggestion: "💡 Verify DataFrame contains data".to_string(),
2746 })?;
2747
2748 let mem_table = MemTable::try_new(head_df.schema().clone().into(), vec![batches])
2749 .map_err(|e| ElusionError::SchemaError {
2750 message: format!("Failed to create head result table: {}", e),
2751 schema: Some(head_df.schema().to_string()),
2752 suggestion: "💡 Check schema compatibility".to_string(),
2753 })?;
2754
2755 ctx.register_table(&result_alias, Arc::new(mem_table))
2756 .map_err(|e| ElusionError::InvalidOperation {
2757 operation: "Head Result Registration".to_string(),
2758 reason: format!("Failed to register head result: {}", e),
2759 suggestion: "💡 Try using a different result alias".to_string(),
2760 })?;
2761
2762 let result_df = ctx.table(&result_alias).await
2763 .map_err(|e| ElusionError::InvalidOperation {
2764 operation: "Head Result Creation".to_string(),
2765 reason: format!("Failed to create head result DataFrame: {}", e),
2766 suggestion: "💡 Verify table registration succeeded".to_string(),
2767 })?;
2768
2769 Ok(CustomDataFrame {
2770 df: result_df,
2771 table_alias: result_alias.clone(),
2772 from_table: result_alias.clone(),
2773 selected_columns: self.selected_columns.clone(),
2774 alias_map: self.alias_map.clone(),
2775 aggregations: Vec::new(),
2776 group_by_columns: Vec::new(),
2777 where_conditions: Vec::new(),
2778 having_conditions: Vec::new(),
2779 order_by_columns: Vec::new(),
2780 limit_count: Some(limit),
2781 joins: Vec::new(),
2782 window_functions: Vec::new(),
2783 ctes: Vec::new(),
2784 subquery_source: None,
2785 set_operations: Vec::new(),
2786 query: sql,
2787 aggregated_df: Some(head_df),
2788 union_tables: None,
2789 original_expressions: self.original_expressions.clone(),
2790 needs_normalization: false,
2791 raw_selected_columns: Vec::new(),
2792 raw_group_by_columns: Vec::new(),
2793 raw_where_conditions: Vec::new(),
2794 raw_having_conditions: Vec::new(),
2795 raw_join_conditions: Vec::new(),
2796 raw_aggregations: Vec::new(),
2797 uses_group_by_all: false
2798 })
2799 }
2800
2801 pub async fn tail(&self, n: u64) -> ElusionResult<Self> {
2803 let limit = n;
2804
2805 if limit == 0 {
2806 return Err(ElusionError::LimitError {
2807 message: "Tail limit cannot be zero".to_string(),
2808 value: 0,
2809 suggestion: "💡 Use a positive number for tail() limit".to_string(),
2810 });
2811 }
2812
2813 let ctx = SessionContext::new();
2814
2815 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
2816
2817 let count_sql = format!(
2818 "SELECT COUNT(*) as total_count FROM {}",
2819 normalize_alias(&self.table_alias)
2820 );
2821
2822 let count_df = ctx.sql(&count_sql).await
2823 .map_err(|e| ElusionError::InvalidOperation {
2824 operation: "Tail Count Operation".to_string(),
2825 reason: format!("Failed to count rows for tail: {}", e),
2826 suggestion: "💡 Check if DataFrame contains valid data".to_string(),
2827 })?;
2828
2829 let count_batches = count_df.collect().await
2830 .map_err(|e| ElusionError::InvalidOperation {
2831 operation: "Tail Count Collection".to_string(),
2832 reason: format!("Failed to collect count results: {}", e),
2833 suggestion: "💡 Verify DataFrame contains data".to_string(),
2834 })?;
2835
2836 if count_batches.is_empty() {
2837 return Err(ElusionError::InvalidOperation {
2838 operation: "Tail Operation".to_string(),
2839 reason: "No data found in DataFrame".to_string(),
2840 suggestion: "💡 Ensure DataFrame contains data before using tail()".to_string(),
2841 });
2842 }
2843
2844 let total_count = count_batches[0]
2845 .column(0)
2846 .as_any()
2847 .downcast_ref::<Int64Array>()
2848 .ok_or_else(|| ElusionError::InvalidOperation {
2849 operation: "Tail Count Extraction".to_string(),
2850 reason: "Failed to extract row count".to_string(),
2851 suggestion: "💡 This is an internal error, please report it".to_string(),
2852 })?
2853 .value(0);
2854
2855 if total_count == 0 {
2856 return Err(ElusionError::InvalidOperation {
2857 operation: "Tail Operation".to_string(),
2858 reason: "DataFrame is empty".to_string(),
2859 suggestion: "💡 Ensure DataFrame contains data before using tail()".to_string(),
2860 });
2861 }
2862
2863 let offset = if total_count <= limit as i64 {
2864 0 } else {
2866 total_count - limit as i64
2867 };
2868
2869 let sql = format!(
2870 "SELECT * FROM {} LIMIT {} OFFSET {}",
2871 normalize_alias(&self.table_alias),
2872 limit,
2873 offset
2874 );
2875
2876 let tail_df = ctx.sql(&sql).await
2877 .map_err(|e| ElusionError::InvalidOperation {
2878 operation: "Tail Operation".to_string(),
2879 reason: format!("Failed to execute tail query: {}", e),
2880 suggestion: "💡 Check if DataFrame contains valid data".to_string(),
2881 })?;
2882
2883 let result_alias = format!("{}_tail", self.table_alias);
2884
2885 let batches = tail_df.clone().collect().await
2886 .map_err(|e| ElusionError::InvalidOperation {
2887 operation: "Tail Data Collection".to_string(),
2888 reason: format!("Failed to collect tail results: {}", e),
2889 suggestion: "💡 Verify DataFrame contains data".to_string(),
2890 })?;
2891
2892 let mem_table = MemTable::try_new(tail_df.schema().clone().into(), vec![batches])
2893 .map_err(|e| ElusionError::SchemaError {
2894 message: format!("Failed to create tail result table: {}", e),
2895 schema: Some(tail_df.schema().to_string()),
2896 suggestion: "💡 Check schema compatibility".to_string(),
2897 })?;
2898
2899 ctx.register_table(&result_alias, Arc::new(mem_table))
2900 .map_err(|e| ElusionError::InvalidOperation {
2901 operation: "Tail Result Registration".to_string(),
2902 reason: format!("Failed to register tail result: {}", e),
2903 suggestion: "💡 Try using a different result alias".to_string(),
2904 })?;
2905
2906 let result_df = ctx.table(&result_alias).await
2907 .map_err(|e| ElusionError::InvalidOperation {
2908 operation: "Tail Result Creation".to_string(),
2909 reason: format!("Failed to create tail result DataFrame: {}", e),
2910 suggestion: "💡 Verify table registration succeeded".to_string(),
2911 })?;
2912
2913 Ok(CustomDataFrame {
2914 df: result_df,
2915 table_alias: result_alias.clone(),
2916 from_table: result_alias.clone(),
2917 selected_columns: self.selected_columns.clone(),
2918 alias_map: self.alias_map.clone(),
2919 aggregations: Vec::new(),
2920 group_by_columns: Vec::new(),
2921 where_conditions: Vec::new(),
2922 having_conditions: Vec::new(),
2923 order_by_columns: Vec::new(),
2924 limit_count: Some(limit),
2925 joins: Vec::new(),
2926 window_functions: Vec::new(),
2927 ctes: Vec::new(),
2928 subquery_source: None,
2929 set_operations: Vec::new(),
2930 query: sql,
2931 aggregated_df: Some(tail_df),
2932 union_tables: None,
2933 original_expressions: self.original_expressions.clone(),
2934 needs_normalization: false,
2935 raw_selected_columns: Vec::new(),
2936 raw_group_by_columns: Vec::new(),
2937 raw_where_conditions: Vec::new(),
2938 raw_having_conditions: Vec::new(),
2939 raw_join_conditions: Vec::new(),
2940 raw_aggregations: Vec::new(),
2941 uses_group_by_all: false
2942 })
2943 }
2944
2945 pub async fn show_head(&self, n: u64) -> ElusionResult<()> {
2947 let head_df = self.head(n).await?;
2948 head_df.display().await
2949 }
2950
2951 pub async fn show_tail(&self, n: u64) -> ElusionResult<()> {
2953 let tail_df = self.tail(n).await?;
2954 tail_df.display().await
2955 }
2956
2957 pub async fn peek(&self, n: u64) -> ElusionResult<()> {
2959 let limit = n;
2960
2961 println!("📊 DataFrame Overview:");
2962 println!("🔝 First {} rows:", limit);
2963 self.show_head(limit).await?;
2964
2965 println!("\n🔽 Last {} rows:", limit);
2966 self.show_tail(limit).await?;
2967
2968 Ok(())
2969 }
2970
2971 pub fn select<const N: usize>(self, columns: [&str; N]) -> Self {
2973 self.select_vec(columns.to_vec())
2974 }
2975
2976 pub fn select_vec(mut self, columns: Vec<&str>) -> Self {
2977 let has_star_expansion = columns.iter().any(|&col| col == "*" || col.ends_with(".*") || col.ends_with(". *"));
2978
2979 let final_columns = if has_star_expansion {
2980 let expanded = self.expand_star_columns(columns);
2982 self.remove_duplicate_columns(expanded)
2983 } else {
2984 columns.into_iter().map(|s| s.to_string()).collect()
2986 };
2987
2988 let column_refs: Vec<&str> = final_columns.iter().map(|s| s.as_str()).collect();
2989
2990 self.original_expressions = column_refs
2992 .iter()
2993 .filter(|&col| col.contains(" AS "))
2994 .map(|&s| s.to_string())
2995 .collect();
2996
2997 self.raw_selected_columns.extend(column_refs.iter().map(|&s| s.to_string()));
2999 self.needs_normalization = true;
3000
3001 let mut all_columns = self.selected_columns.clone();
3002
3003 if !self.group_by_columns.is_empty() {
3004 for col in column_refs {
3006 if is_expression(col) {
3007 if is_aggregate_expression(col) {
3008 all_columns.push(normalize_expression(col, &self.table_alias));
3010 } else {
3011 let expr_without_alias = if col.contains(" AS ") {
3013 col.split(" AS ").next().unwrap_or(col).trim()
3014 } else {
3015 col
3016 };
3017
3018 let group_by_expr = normalize_simple_expression(expr_without_alias, &self.table_alias);
3020 if !self.group_by_columns.contains(&group_by_expr) {
3021 self.group_by_columns.push(group_by_expr);
3022 }
3023
3024 all_columns.push(normalize_expression(col, &self.table_alias));
3026 }
3027 } else {
3028 let normalized_col = normalize_column_name(col);
3030 if !self.group_by_columns.contains(&normalized_col) {
3031 self.group_by_columns.push(normalized_col.clone());
3032 }
3033 all_columns.push(normalized_col);
3034 }
3035 }
3036 } else {
3037 let aggregate_aliases: Vec<String> = self
3039 .aggregations
3040 .iter()
3041 .filter_map(|agg| {
3042 agg.split(" AS ")
3043 .nth(1)
3044 .map(|alias| normalize_alias(alias))
3045 })
3046 .collect();
3047
3048 all_columns.extend(
3049 column_refs
3050 .into_iter()
3051 .filter(|col| !aggregate_aliases.contains(&normalize_alias(col)))
3052 .map(|s| {
3053 if is_expression(s) {
3054 normalize_expression(s, &self.table_alias)
3055 } else {
3056 normalize_column_name(s)
3057 }
3058 })
3059 );
3060 }
3061
3062 let mut seen = HashSet::new();
3064 self.selected_columns = all_columns
3065 .into_iter()
3066 .filter(|x| seen.insert(x.clone()))
3067 .collect();
3068
3069 self
3070 }
3071
3072 fn remove_duplicate_columns(&self, columns: Vec<String>) -> Vec<String> {
3074 let mut seen_base_names = HashSet::new();
3075 let mut result = Vec::new();
3076
3077 for col in columns {
3080 let base_name = extract_base_column_name(&col);
3081 if !seen_base_names.contains(&base_name) {
3084 seen_base_names.insert(base_name.clone());
3085 result.push(col.clone());
3086 } else {
3088 }
3090 }
3091
3092 result
3095 }
3096
3097 fn expand_star_columns(&self, columns: Vec<&str>) -> Vec<String> {
3099 let mut result = Vec::new();
3100
3101 for col in columns {
3102 match col {
3103 "*" => {
3104 let schema = self.df.schema();
3106 for field in schema.fields() {
3107 result.push(format!("{}.{}", self.table_alias, field.name()));
3108 }
3109
3110 for join in &self.joins {
3112 let join_schema = join.dataframe.df.schema();
3113 for field in join_schema.fields() {
3114 result.push(format!("{}.{}", join.dataframe.table_alias, field.name()));
3115 }
3116 }
3117 }
3118 table_star if table_star.ends_with(".*") => {
3119 let table_alias = &table_star[..table_star.len() - 2];
3120
3121 if table_alias == self.table_alias {
3123 let schema = self.df.schema();
3124 for field in schema.fields() {
3125 result.push(format!("{}.{}", table_alias, field.name()));
3126 }
3127 } else {
3128 for join in &self.joins {
3130 if join.dataframe.table_alias == table_alias {
3131 let schema = join.dataframe.df.schema();
3132 for field in schema.fields() {
3133 result.push(format!("{}.{}", table_alias, field.name()));
3134 }
3135 break;
3136 }
3137 }
3138 }
3139 }
3140 regular => {
3141 result.push(regular.to_string());
3142 }
3143 }
3144 }
3145
3146 result
3147 }
3148
3149 pub fn json<'a, const N: usize>(mut self, columns: [&'a str; N]) -> Self {
3151 let mut json_expressions = Vec::new();
3152
3153 for expr in columns.iter() {
3154 let re = Regex::new(r"(?i)\s+AS\s+").unwrap();
3156 let parts: Vec<&str> = re.split(expr).collect();
3157
3158 if parts.len() != 2 {
3159 continue; }
3161
3162 let path_part = parts[0].trim();
3163 let alias = parts[1].trim().to_lowercase();
3164
3165
3166 if !path_part.contains(".'$") {
3167 continue; }
3169
3170 let col_path_parts: Vec<&str> = path_part.split(".'$").collect();
3171 let column_name = col_path_parts[0].trim();
3172 let json_path = col_path_parts[1].trim_end_matches('\'');
3173
3174 let normalized_column = if column_name.contains('.') {
3176 normalize_column_name(column_name)
3178 } else {
3179 format!("\"{}\".\"{}\"",
3181 self.table_alias.to_lowercase(),
3182 column_name.to_lowercase())
3183 };
3184
3185 let search_pattern = format!("\"{}\":", json_path);
3186
3187 let sql_expr = format!(
3188 "CASE
3189 WHEN POSITION('{}' IN {}) > 0 THEN
3190 TRIM(BOTH '\"' FROM
3191 SUBSTRING(
3192 {},
3193 POSITION('{}' IN {}) + {},
3194 CASE
3195 WHEN POSITION(',\"' IN SUBSTRING({}, POSITION('{}' IN {}) + {})) > 0 THEN
3196 POSITION(',\"' IN SUBSTRING({}, POSITION('{}' IN {}) + {})) - 1
3197 WHEN POSITION('}}' IN SUBSTRING({}, POSITION('{}' IN {}) + {})) > 0 THEN
3198 POSITION('}}' IN SUBSTRING({}, POSITION('{}' IN {}) + {})) - 1
3199 ELSE 300 -- arbitrary large value
3200 END
3201 )
3202 )
3203 ELSE NULL
3204 END as \"{}\"",
3205 search_pattern, normalized_column,
3206 normalized_column,
3207 search_pattern, normalized_column, search_pattern.len(),
3208 normalized_column, search_pattern, normalized_column, search_pattern.len(),
3209 normalized_column, search_pattern, normalized_column, search_pattern.len(),
3210 normalized_column, search_pattern, normalized_column, search_pattern.len(),
3211 normalized_column, search_pattern, normalized_column, search_pattern.len(),
3212 alias
3213 );
3214
3215 json_expressions.push(sql_expr);
3216 }
3217
3218 self.selected_columns.extend(json_expressions);
3219
3220 self
3221 }
3222
3223 pub fn json_array<'a, const N: usize>(mut self, columns: [&'a str; N]) -> Self {
3225 let mut json_expressions = Vec::new();
3226
3227 for expr in columns.iter() {
3228 let re = Regex::new(r"(?i)\s+AS\s+").unwrap();
3230 let parts: Vec<&str> = re.split(expr).collect();
3231
3232 if parts.len() != 2 {
3233 continue;
3234 }
3235
3236 let path_part = parts[0].trim();
3237 let alias = parts[1].trim().to_lowercase();
3238
3239 if !path_part.contains(".'$") {
3240 continue;
3241 }
3242
3243 let col_path_parts: Vec<&str> = path_part.split(".'$").collect();
3244 let column_name = col_path_parts[0].trim();
3245 let filter_expr = col_path_parts[1].trim_end_matches('\'');
3246
3247 let normalized_column = if column_name.contains('.') {
3249 normalize_column_name(column_name)
3251 } else {
3252 format!("\"{}\".\"{}\"",
3254 self.table_alias.to_lowercase(),
3255 column_name.to_lowercase())
3256 };
3257
3258 let filter_parts: Vec<&str> = filter_expr.split(':').collect();
3259
3260 let sql_expr: String;
3261
3262 if filter_parts.len() == 2 {
3263 let value_field = filter_parts[0].trim();
3265 let condition = filter_parts[1].trim();
3266
3267 let condition_parts: Vec<&str> = condition.split('=').collect();
3268 if condition_parts.len() != 2 {
3269 continue;
3270 }
3271
3272 let id_field = condition_parts[0].trim();
3273 let id_value = condition_parts[1].trim();
3274
3275 sql_expr = format!(
3276 "CASE
3277 WHEN regexp_like({}, '\\{{\"{}\":\"{}\",[^\\}}]*\"{}\":(\"[^\"]*\"|[0-9.]+|true|false)', 'i') THEN
3278 CASE
3279 WHEN regexp_like(
3280 regexp_match(
3281 {},
3282 '\\{{\"{}\":\"{}\",[^\\}}]*\"{}\":(\"[^\"]*\")',
3283 'i'
3284 )[1],
3285 '\"[^\"]*\"'
3286 ) THEN
3287 -- Handle string values by removing quotes
3288 regexp_replace(
3289 regexp_match(
3290 {},
3291 '\\{{\"{}\":\"{}\",[^\\}}]*\"{}\":\"([^\"]*)\"',
3292 'i'
3293 )[1],
3294 '\"',
3295 ''
3296 )
3297 ELSE
3298 -- Handle numeric and boolean values
3299 regexp_match(
3300 {},
3301 '\\{{\"{}\":\"{}\",[^\\}}]*\"{}\":([0-9.]+|true|false)',
3302 'i'
3303 )[1]
3304 END
3305 ELSE NULL
3306 END as \"{}\"",
3307 normalized_column, id_field, id_value, value_field,
3308 normalized_column, id_field, id_value, value_field,
3309 normalized_column, id_field, id_value, value_field,
3310 normalized_column, id_field, id_value, value_field,
3311 alias
3312 );
3313 } else {
3314 continue;
3315 }
3316
3317 json_expressions.push(sql_expr);
3318 }
3319
3320 self.selected_columns.extend(json_expressions);
3321
3322 self
3323 }
3324
3325 fn should_warn_complexity(&self) -> bool {
3326 self.joins.len() > 3 ||
3327 self.selected_columns.len() > 15 ||
3328 self.where_conditions.len() > 8
3329 }
3330
3331 fn normalize_group_by_columns(&self) -> Vec<String> {
3333 self.group_by_columns.iter().map(|col| {
3334 if col.contains(" as \"") {
3336 if let Some(as_pos) = col.find(" as \"") {
3337 col[..as_pos].trim().to_string()
3338 } else {
3339 col.clone()
3340 }
3341 } else {
3342 col.clone()
3343 }
3344 }).collect()
3345 }
3346
3347 pub async fn drop_duplicates(
3349 &self,
3350 alias: &str,
3351 ) -> ElusionResult<Self> {
3352 let ctx = SessionContext::new();
3353
3354 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
3355
3356 let columns_to_check: Vec<String> = self.df.schema()
3357 .fields()
3358 .iter()
3359 .map(|f| f.name().clone())
3360 .collect();
3361
3362 let sql = Self::build_drop_duplicates_sql_first(&self.table_alias, &columns_to_check);
3363
3364 let result_df = ctx.sql(&sql).await
3365 .map_err(|e| ElusionError::InvalidOperation {
3366 operation: "drop_duplicates execution".to_string(),
3367 reason: format!("Failed to execute drop_duplicates: {}", e),
3368 suggestion: "💡 Check if the DataFrame contains valid data".to_string(),
3369 })?;
3370
3371 let batches = result_df.clone().collect().await
3372 .map_err(|e| ElusionError::InvalidOperation {
3373 operation: "drop_duplicates collection".to_string(),
3374 reason: format!("Failed to collect results: {}", e),
3375 suggestion: "💡 Check memory availability".to_string(),
3376 })?;
3377
3378 let mem_table = MemTable::try_new(
3379 result_df.schema().clone().into(),
3380 vec![batches]
3381 ).map_err(|e| ElusionError::SchemaError {
3382 message: format!("Failed to create result table: {}", e),
3383 schema: Some(result_df.schema().to_string()),
3384 suggestion: "💡 Check schema compatibility".to_string(),
3385 })?;
3386
3387 ctx.register_table(alias, Arc::new(mem_table))
3388 .map_err(|e| ElusionError::InvalidOperation {
3389 operation: "Result Registration".to_string(),
3390 reason: format!("Failed to register result table: {}", e),
3391 suggestion: "💡 Try using a different alias name".to_string(),
3392 })?;
3393
3394 let final_df = ctx.table(alias).await
3395 .map_err(|e| ElusionError::InvalidOperation {
3396 operation: "Result Retrieval".to_string(),
3397 reason: format!("Failed to retrieve final result: {}", e),
3398 suggestion: "💡 This might be an internal issue - try a different alias".to_string(),
3399 })?;
3400
3401 Ok(CustomDataFrame {
3402 df: final_df,
3403 table_alias: alias.to_string(),
3404 from_table: alias.to_string(),
3405 selected_columns: Vec::new(),
3406 alias_map: Vec::new(),
3407 aggregations: Vec::new(),
3408 group_by_columns: Vec::new(),
3409 where_conditions: Vec::new(),
3410 having_conditions: Vec::new(),
3411 order_by_columns: Vec::new(),
3412 limit_count: None,
3413 joins: Vec::new(),
3414 window_functions: Vec::new(),
3415 ctes: Vec::new(),
3416 subquery_source: None,
3417 set_operations: Vec::new(),
3418 query: sql,
3419 aggregated_df: Some(result_df),
3420 union_tables: None,
3421 original_expressions: Vec::new(),
3422 needs_normalization: false,
3423 raw_selected_columns: Vec::new(),
3424 raw_group_by_columns: Vec::new(),
3425 raw_where_conditions: Vec::new(),
3426 raw_having_conditions: Vec::new(),
3427 raw_join_conditions: Vec::new(),
3428 raw_aggregations: Vec::new(),
3429 uses_group_by_all: false,
3430 })
3431 }
3432
3433 pub async fn drop_duplicates_by_column(
3435 &self,
3436 columns: &[&str],
3437 alias: &str,
3438 ) -> ElusionResult<Self> {
3439 if columns.is_empty() {
3441 return Err(ElusionError::InvalidOperation {
3442 operation: "drop_duplicates_by_column".to_string(),
3443 reason: "No columns specified for duplicate detection".to_string(),
3444 suggestion: "💡 Provide at least one column name, or use drop_duplicates() to check all columns".to_string(),
3445 });
3446 }
3447
3448 let ctx = SessionContext::new();
3449
3450 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
3451
3452 let schema_fields: Vec<String> = self.df.schema()
3454 .fields()
3455 .iter()
3456 .map(|f| f.name().clone())
3457 .collect();
3458
3459 let columns_to_check: Vec<String> = columns.iter().map(|s| s.to_string()).collect();
3460
3461 for col in &columns_to_check {
3462 if !schema_fields.contains(col) {
3463 return Err(ElusionError::InvalidOperation {
3464 operation: "drop_duplicates_by_column".to_string(),
3465 reason: format!("Column '{}' does not exist in DataFrame", col),
3466 suggestion: format!("💡 Available columns: {}", schema_fields.join(", ")),
3467 });
3468 }
3469 }
3470
3471 let sql = Self::build_drop_duplicates_sql_first(&self.table_alias, &columns_to_check);
3472
3473 let result_df = ctx.sql(&sql).await
3474 .map_err(|e| ElusionError::InvalidOperation {
3475 operation: "drop_duplicates_by_column execution".to_string(),
3476 reason: format!("Failed to execute drop_duplicates_by_column: {}", e),
3477 suggestion: "💡 Check if the DataFrame contains valid data".to_string(),
3478 })?;
3479
3480 let batches = result_df.clone().collect().await
3481 .map_err(|e| ElusionError::InvalidOperation {
3482 operation: "drop_duplicates_by_column collection".to_string(),
3483 reason: format!("Failed to collect results: {}", e),
3484 suggestion: "💡 Check memory availability".to_string(),
3485 })?;
3486
3487 let mem_table = MemTable::try_new(
3488 result_df.schema().clone().into(),
3489 vec![batches]
3490 ).map_err(|e| ElusionError::SchemaError {
3491 message: format!("Failed to create result table: {}", e),
3492 schema: Some(result_df.schema().to_string()),
3493 suggestion: "💡 Check schema compatibility".to_string(),
3494 })?;
3495
3496 ctx.register_table(alias, Arc::new(mem_table))
3497 .map_err(|e| ElusionError::InvalidOperation {
3498 operation: "Result Registration".to_string(),
3499 reason: format!("Failed to register result table: {}", e),
3500 suggestion: "💡 Try using a different alias name".to_string(),
3501 })?;
3502
3503 let final_df = ctx.table(alias).await
3504 .map_err(|e| ElusionError::InvalidOperation {
3505 operation: "Result Retrieval".to_string(),
3506 reason: format!("Failed to retrieve final result: {}", e),
3507 suggestion: "💡 This might be an internal issue - try a different alias".to_string(),
3508 })?;
3509
3510 Ok(CustomDataFrame {
3511 df: final_df,
3512 table_alias: alias.to_string(),
3513 from_table: alias.to_string(),
3514 selected_columns: Vec::new(),
3515 alias_map: Vec::new(),
3516 aggregations: Vec::new(),
3517 group_by_columns: Vec::new(),
3518 where_conditions: Vec::new(),
3519 having_conditions: Vec::new(),
3520 order_by_columns: Vec::new(),
3521 limit_count: None,
3522 joins: Vec::new(),
3523 window_functions: Vec::new(),
3524 ctes: Vec::new(),
3525 subquery_source: None,
3526 set_operations: Vec::new(),
3527 query: sql,
3528 aggregated_df: Some(result_df),
3529 union_tables: None,
3530 original_expressions: Vec::new(),
3531 needs_normalization: false,
3532 raw_selected_columns: Vec::new(),
3533 raw_group_by_columns: Vec::new(),
3534 raw_where_conditions: Vec::new(),
3535 raw_having_conditions: Vec::new(),
3536 raw_join_conditions: Vec::new(),
3537 raw_aggregations: Vec::new(),
3538 uses_group_by_all: false,
3539 })
3540 }
3541
3542
3543 fn build_drop_duplicates_sql_first(table_alias: &str, columns: &[String]) -> String {
3545 let partition_cols = columns
3546 .iter()
3547 .map(|c| format!("\"{}\"", c))
3548 .collect::<Vec<_>>()
3549 .join(", ");
3550
3551 format!(
3552 r#"WITH ranked_rows AS (
3553 SELECT *,
3554 ROW_NUMBER() OVER (PARTITION BY {} ORDER BY 1) as rn
3555 FROM {}
3556 )
3557 SELECT * EXCEPT (rn)
3558 FROM ranked_rows
3559 WHERE rn = 1"#,
3560 partition_cols,
3561 normalize_alias(table_alias)
3562 )
3563 }
3564
3565
3566 fn construct_sql(&self) -> String {
3569
3570 let estimated_capacity = self.estimate_sql_size();
3574 let mut builder = SqlBuilder::with_capacity(estimated_capacity);
3575
3576 builder.with_ctes(&self.ctes);
3578
3579 let is_subquery = self.from_table.starts_with('(') && self.from_table.ends_with(')');
3581 let no_selected_columns = self.selected_columns.is_empty()
3582 && self.aggregations.is_empty()
3583 && self.window_functions.is_empty();
3584
3585 if is_subquery && no_selected_columns {
3586 return format!("{}{}",
3588 if self.ctes.is_empty() { "" } else { &builder.buffer },
3589 self.from_table
3590 );
3591 }
3592
3593 let select_parts = self.build_select_parts();
3595
3596 let normalized_group_by = if !self.group_by_columns.is_empty() {
3597 self.normalize_group_by_columns()
3598 } else {
3599 Vec::new()
3600 };
3601
3602 builder.select(&select_parts)
3604 .from_table(&self.from_table, Some(&self.table_alias))
3605 .joins(&self.joins)
3606 .where_clause(&self.where_conditions)
3607 .group_by(&normalized_group_by)
3608 .having(&self.having_conditions)
3609 .order_by(&self.order_by_columns)
3610 .limit(self.limit_count);
3611
3612 let mut final_query = builder.build();
3614 for operation in &self.set_operations {
3615 final_query = self.handle_set_operation(operation, final_query);
3616 }
3617
3618 final_query
3619 }
3620
3621 fn estimate_sql_size(&self) -> usize {
3623 let base_size = 200;
3624 let select_size = self.selected_columns.iter().map(|s| s.len()).sum::<usize>()
3625 + self.aggregations.iter().map(|s| s.len()).sum::<usize>()
3626 + self.window_functions.iter().map(|s| s.len()).sum::<usize>();
3627 let joins_size = self.joins.iter().map(|j| j.condition.len() + 50).sum::<usize>();
3628 let where_size = self.where_conditions.iter().map(|s| s.len()).sum::<usize>();
3629
3630 base_size + select_size + joins_size + where_size + self.from_table.len()
3631 }
3632
3633 fn build_select_parts(&self) -> Vec<String> {
3635 let mut select_parts = Vec::new();
3636
3637 select_parts.extend_from_slice(&self.aggregations);
3639
3640 if !self.group_by_columns.is_empty() {
3641 select_parts.extend_from_slice(&self.selected_columns);
3644 } else {
3645 select_parts.extend_from_slice(&self.selected_columns);
3647 }
3648
3649 select_parts.extend_from_slice(&self.window_functions);
3651
3652 let mut seen = HashSet::new();
3654 select_parts.retain(|x| seen.insert(x.clone()));
3655
3656 select_parts
3657 }
3658
3659 pub async fn elusion(&self, alias: &str) -> ElusionResult<Self> {
3661 if alias.trim().is_empty() {
3663 return Err(ElusionError::InvalidOperation {
3664 operation: "Elusion".to_string(),
3665 reason: "Alias cannot be empty".to_string(),
3666 suggestion: "💡 Provide a valid table alias".to_string()
3667 });
3668 }
3669
3670 let ctx = Arc::new(SessionContext::new());
3671
3672 if self.has_group_by_all() {
3674 if let Err(validation_error) = self.validate_group_by_all_compatibility() {
3675 return Err(validation_error);
3676 }
3677 }
3678
3679 let sql = if self.is_complex_query() {
3681 let self_clone = self.clone(); tokio::task::spawn_blocking(move || self_clone.construct_sql())
3684 .await
3685 .map_err(|e| ElusionError::Custom(format!("SQL construction task failed: {}", e)))?
3686 } else {
3687 self.construct_sql()
3688 };
3689
3690 self.register_all_tables(&ctx).await?;
3692
3693 let final_sql = if self.from_table.starts_with('(') && self.from_table.ends_with(')') {
3695 format!("SELECT * FROM {} AS {}", self.from_table, alias)
3696 } else {
3697 sql
3698 };
3699 let df = ctx.sql(&final_sql).await
3703 .map_err(|e| {
3704
3705 let error_msg = e.to_string();
3706
3707 if error_msg.contains("not found") || error_msg.contains("No field named") ||
3710 (error_msg.contains("could not be resolved") && !error_msg.contains("OVER")) {
3711
3712 let missing_col = Self::extract_missing_column_comprehensive(&error_msg)
3713 .unwrap_or_else(|| {
3714 "unknown".to_string()
3716 });
3717
3718 let available_columns = self.get_available_columns();
3721 let error_context = self.determine_missing_column_context(&missing_col, &error_msg);
3722
3723 return ElusionError::MissingColumnWithContext {
3724 column: missing_col,
3725 available_columns,
3726 context: error_context.context,
3727 location: error_context.location,
3728 suggestion: error_context.suggestion,
3729 };
3730 }
3731
3732 else if error_msg.contains("could not be resolved") &&
3734 (error_msg.to_uppercase().contains("OVER") ||
3735 error_msg.to_uppercase().contains("PARTITION BY") ||
3736 error_msg.to_uppercase().contains("ROW_NUMBER")) {
3737
3738 let missing_cols = extract_window_function_columns(&error_msg);
3739 let missing_col = missing_cols.first().unwrap_or(&"unknown".to_string()).clone();
3740
3741 if self.has_group_by_all() {
3743 let function_context = Some(format!("Window function references column '{}' not in SELECT", missing_col));
3744 return self.create_group_by_all_error(&missing_col, function_context, &error_msg);
3745 }
3746
3747 ElusionError::WindowFunctionError {
3749 message: format!("Window function references columns not in SELECT"),
3750 function: extract_window_function_name(&error_msg).unwrap_or("WINDOW_FUNCTION".to_string()),
3751 details: format!("Missing columns: {}", missing_cols.join(", ")),
3752 suggestion: format!(
3753 "💡 Window function error - missing columns from SELECT. 🔧 Solution: Add missing columns to .select(): {} ✅ Example fix: .select ([\"your_existing_cols\", \"{}\"])",
3754 missing_cols.iter().map(|col| format!("\"{}\"", col)).collect::<Vec<_>>().join(", "),
3755 missing_cols.join("\", \"")
3756 ),
3757 }
3758 }
3759 else if error_msg.contains("could not be resolved from available columns") {
3761 let missing_col = extract_missing_column(&error_msg).unwrap_or("unknown".to_string());
3762 let function_context = detect_function_usage_in_error(&error_msg, &missing_col);
3763
3764 if self.has_group_by_all() {
3766 return self.create_group_by_all_error(&missing_col, function_context, &error_msg);
3767 }
3768
3769 ElusionError::GroupByError {
3771 message: "Column in SELECT clause missing from GROUP BY".to_string(),
3772 invalid_columns: vec![missing_col.clone()],
3773 function_context: function_context.clone(),
3774 suggestion: generate_enhanced_groupby_suggestion(&missing_col, function_context.as_deref()),
3775 }
3776 }
3777 else if error_msg.contains("duplicate qualified field name") ||
3779 error_msg.contains("Schema contains duplicate qualified field name") {
3780 ElusionError::DuplicateColumn {
3781 column: extract_column_from_duplicate_error(&error_msg)
3782 .unwrap_or("unknown".to_string()),
3783 locations: vec!["result schema".to_string()],
3784 }
3785 }
3786 else if error_msg.contains("projections require unique expression names") ||
3788 error_msg.contains("have the same name") {
3789 ElusionError::DuplicateColumn {
3790 column: extract_column_from_projection_error(&error_msg)
3791 .unwrap_or("unknown".to_string()),
3792 locations: vec!["SELECT clause".to_string()],
3793 }
3794 }
3795 else if error_msg.contains("join") || error_msg.contains("JOIN") {
3797 ElusionError::JoinError {
3798 message: error_msg.clone(),
3799 left_table: self.table_alias.clone(),
3800 right_table: extract_table_from_join_error(&error_msg)
3801 .unwrap_or("unknown".to_string()),
3802 suggestion: "💡 Check JOIN conditions and ensure table aliases match those used in .join_many([...])".to_string(),
3803 }
3804 }
3805 else if error_msg.contains("aggregate") ||
3807 error_msg.contains("SUM") || error_msg.contains("AVG") ||
3808 error_msg.contains("COUNT") {
3809 ElusionError::AggregationError {
3810 message: error_msg.clone(),
3811 function: extract_function_from_error(&error_msg)
3812 .unwrap_or("unknown".to_string()),
3813 column: extract_column_from_agg_error(&error_msg)
3814 .unwrap_or("unknown".to_string()),
3815 suggestion: "💡 Check aggregation syntax in .agg([...]) and ensure columns exist in your tables".to_string(),
3816 }
3817 }
3818 else if error_msg.contains("having") || error_msg.contains("HAVING") {
3820 ElusionError::InvalidOperation {
3821 operation: "HAVING clause evaluation".to_string(),
3822 reason: error_msg.clone(),
3823 suggestion: "💡 HAVING conditions must reference aggregated columns or their aliases from .agg([...])".to_string(),
3824 }
3825 }
3826 else if error_msg.contains("not found") || error_msg.contains("No field named") {
3828 ElusionError::MissingColumn {
3829 column: extract_missing_column(&error_msg)
3830 .unwrap_or("unknown".to_string()),
3831 available_columns: self.get_available_columns(),
3832 }
3833 }
3834 else {
3836 ElusionError::InvalidOperation {
3837 operation: "SQL Execution".to_string(),
3838 reason: format!("Failed to execute SQL: {}", e),
3839 suggestion: self.get_contextual_suggestion(&error_msg),
3840 }
3841 }
3842 })?;
3843
3844 let (batches, schema) = if self.is_large_result_expected() {
3845 let df_clone = df.clone();
3846 tokio::task::spawn_blocking(move || {
3847 futures::executor::block_on(async {
3848 let batches = df_clone.clone().collect().await?;
3849 let schema = df_clone.schema().clone();
3850 Ok::<_, datafusion::error::DataFusionError>((batches, schema))
3851 })
3852 })
3853 .await
3854 .map_err(|e| ElusionError::Custom(format!("Data collection task failed: {}", e)))?
3855 .map_err(|e| ElusionError::InvalidOperation {
3856 operation: "Data Collection".to_string(),
3857 reason: format!("Failed to collect results: {}", e),
3858 suggestion: "💡 Query executed successfully but failed to collect results. Try reducing result size with .limit() or check for memory issues".to_string()
3859 })?
3860 } else {
3861 let batches = df.clone().collect().await
3862 .map_err(|e| ElusionError::InvalidOperation {
3863 operation: "Data Collection".to_string(),
3864 reason: format!("Failed to collect results: {}", e),
3865 suggestion: "💡 Query executed but data collection failed. Check for data type issues or memory constraints".to_string()
3866 })?;
3867 (batches, df.schema().clone())
3868 };
3869
3870 let result_mem_table = MemTable::try_new(schema.clone().into(), vec![batches])
3871 .map_err(|e| {
3872 let error_msg = e.to_string();
3873 if error_msg.contains("duplicate") || error_msg.contains("Schema") {
3874 ElusionError::SchemaError {
3875 message: format!("Schema registration failed: {}", e),
3876 schema: Some(schema.to_string()),
3877 suggestion: "💡 Result schema has conflicting column names. Use unique aliases in .select([...]) or avoid .elusion() for this query".to_string()
3878 }
3879 } else {
3880 ElusionError::SchemaError {
3881 message: format!("Failed to create result table: {}", e),
3882 schema: Some(schema.to_string()),
3883 suggestion: "💡 Verify result schema compatibility and data types".to_string()
3884 }
3885 }
3886 })?;
3887
3888 ctx.register_table(alias, Arc::new(result_mem_table))
3889 .map_err(|e| {
3890 let error_msg = e.to_string();
3891 if error_msg.contains("already exists") || error_msg.contains("duplicate") {
3892 ElusionError::InvalidOperation {
3893 operation: "Result Registration".to_string(),
3894 reason: format!("Table alias '{}' already exists", alias),
3895 suggestion: format!("💡 Choose a different alias name or use a unique identifier like '{}_v2'", alias)
3896 }
3897 } else {
3898 ElusionError::InvalidOperation {
3899 operation: "Result Registration".to_string(),
3900 reason: format!("Failed to register result table: {}", e),
3901 suggestion: "💡 Try using a different alias name or check for naming conflicts".to_string()
3902 }
3903 }
3904 })?;
3905
3906 let result_df = ctx.table(alias).await
3907 .map_err(|e| ElusionError::InvalidOperation {
3908 operation: "Result Retrieval".to_string(),
3909 reason: format!("Failed to retrieve final result: {}", e),
3910 suggestion: "💡 Table was registered but retrieval failed. This might be an internal issue - try a different alias".to_string()
3911 })?;
3912
3913 Ok(CustomDataFrame {
3914 df: result_df,
3915 table_alias: alias.to_string(),
3916 from_table: alias.to_string(),
3917 selected_columns: Vec::new(),
3918 alias_map: Vec::new(),
3919 aggregations: Vec::new(),
3920 group_by_columns: Vec::new(),
3921 where_conditions: Vec::new(),
3922 having_conditions: Vec::new(),
3923 order_by_columns: Vec::new(),
3924 limit_count: None,
3925 joins: Vec::new(),
3926 window_functions: Vec::new(),
3927 ctes: Vec::new(),
3928 subquery_source: None,
3929 set_operations: Vec::new(),
3930 query: final_sql,
3931 aggregated_df: Some(df),
3932 union_tables: None,
3933 original_expressions: self.original_expressions.clone(),
3934 needs_normalization: false,
3935 raw_selected_columns: Vec::new(),
3936 raw_group_by_columns: Vec::new(),
3937 raw_where_conditions: Vec::new(),
3938 raw_having_conditions: Vec::new(),
3939 raw_join_conditions: Vec::new(),
3940 raw_aggregations: Vec::new(),
3941 uses_group_by_all: false
3942 })
3943 }
3944
3945 fn has_group_by_all(&self) -> bool {
3948 self.uses_group_by_all
3949 }
3950
3951 fn validate_group_by_all_compatibility(&self) -> ElusionResult<()> {
3952 let mut missing_columns = Vec::new();
3953 let mut window_function_deps = Vec::new();
3954
3955 for window_func in &self.window_functions {
3957 let dependencies = self.extract_column_dependencies(window_func);
3958 for dep in dependencies {
3959 if !self.is_column_in_select(&dep) {
3960 missing_columns.push(dep.clone());
3961 window_function_deps.push((window_func.clone(), dep));
3962 }
3963 }
3964 }
3965
3966 for agg in &self.aggregations {
3968 let dependencies = self.extract_column_dependencies(agg);
3969 for dep in dependencies {
3970 if !self.is_column_in_select(&dep) && !missing_columns.contains(&dep) {
3971 missing_columns.push(dep);
3972 }
3973 }
3974 }
3975
3976 if !missing_columns.is_empty() {
3977 return Err(ElusionError::GroupByAllCompatibilityError {
3978 missing_columns: missing_columns.clone(),
3979 window_function_dependencies: window_function_deps,
3980 suggestion: self.generate_group_by_all_fix_suggestion(&missing_columns),
3981 });
3982 }
3983
3984 Ok(())
3985 }
3986
3987 fn create_group_by_all_error(
3988 &self,
3989 missing_col: &str,
3990 function_context: Option<String>,
3991 original_error: &str
3992 ) -> ElusionError {
3993 let error_upper = original_error.to_uppercase();
3994 let is_window_function = error_upper.contains("OVER") ||
3995 error_upper.contains("PARTITION BY") ||
3996 error_upper.contains("ORDER BY") ||
3997 error_upper.contains("ROW_NUMBER") ||
3998 error_upper.contains("RANK");
3999
4000 let current_select_columns = self.get_current_select_columns_formatted();
4001 let current_group_by_columns = self.get_current_group_by_columns_formatted();
4002
4003 if is_window_function {
4004 ElusionError::GroupByAllWindowError {
4005 missing_column: missing_col.to_string(),
4006 window_function_context: function_context.unwrap_or_else(||
4007 format!("Window function needs column '{}'", missing_col)
4008 ),
4009 suggestion: format!(
4010 "🪟 group_by_all() + Window Function Issue. Your window function needs column '{}' but it's not in .select([...]). 🔧 Quick Fix - Add '{}' to your .select(): .select([{}, \"{}\" ]) 🔧 Alternative - Use manual .group_by(): .group_by([{}])",
4011 missing_col,
4012 missing_col,
4013 current_select_columns,
4014 missing_col,
4015 current_group_by_columns
4016 ),
4017 }
4018 } else {
4019 ElusionError::GroupByAllDependencyError {
4020 missing_column: missing_col.to_string(),
4021 dependency_context: function_context.unwrap_or_else(||
4022 format!("Column '{}' is referenced but not in SELECT clause", missing_col)
4023 ),
4024 suggestion: format!(
4025 "🔧 group_by_all() Issue: Missing column '{}' referenced in query. The problem: Your query references '{}' but it's not in your .select([...]) clause. Since group_by_all() groups by ALL selected columns, it needs '{}' to be selected first. 💡 Solutions: [1] Add '{}' to your .select([...]) clause: .select([{},\"{}\" ]). [2] Use manual .group_by([...]) instead of .group_by_all(): .group_by([{}]). Only group by the columns you actually want to group by. [3] Remove the dependency on '{}' from your query",
4026 missing_col,
4027 missing_col,
4028 missing_col,
4029 missing_col,
4030 current_select_columns,
4031 missing_col,
4032 current_group_by_columns,
4033 missing_col
4034 ),
4035 }
4036 }
4037 }
4038
4039 fn extract_column_dependencies(&self, expression: &str) -> Vec<String> {
4040 let mut dependencies = Vec::new();
4041
4042 if let Some(caps) = regex::Regex::new(r"PARTITION BY\s+([a-zA-Z_][a-zA-Z0-9_]*)")
4044 .unwrap().captures(expression) {
4045 if let Some(col) = caps.get(1) {
4046 dependencies.push(col.as_str().to_string());
4047 }
4048 }
4049
4050 if let Some(caps) = regex::Regex::new(r"ORDER BY\s+([a-zA-Z_][a-zA-Z0-9_]*)")
4051 .unwrap().captures(expression) {
4052 if let Some(col) = caps.get(1) {
4053 dependencies.push(col.as_str().to_string());
4054 }
4055 }
4056
4057 if let Some(caps) = regex::Regex::new(r"\(([^)]+)\)").unwrap().captures(expression) {
4059 if let Some(args) = caps.get(1) {
4060 for arg in args.as_str().split(',') {
4061 let clean_arg = arg.trim();
4062 if regex::Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap().is_match(clean_arg) {
4063 dependencies.push(clean_arg.to_string());
4064 }
4065 }
4066 }
4067 }
4068
4069 dependencies
4070 }
4071
4072 pub fn extract_missing_column(error: &str) -> Option<String> {
4073 println!("🔍 DEBUG - Extracting column from: {}", error);
4074
4075 if let Some(cap) = regex::Regex::new(r"No field named ([a-zA-Z_][a-zA-Z0-9_]*)\.")
4076 .unwrap().captures(error) {
4077 let col = cap.get(1)?.as_str().to_string();
4078 println!("🔍 DEBUG - Found via 'No field named': {}", col);
4079 return Some(col);
4080 }
4081
4082 if let Some(cap) = regex::Regex::new(r"Expression ([a-zA-Z_][a-zA-Z0-9_]*) could not be resolved")
4083 .unwrap().captures(error) {
4084 let col = cap.get(1)?.as_str().to_string();
4085 println!("🔍 DEBUG - Found via 'Expression could not be resolved': {}", col);
4086 return Some(col);
4087 }
4088
4089 if let Some(cap) = regex::Regex::new(r"Expression [a-zA-Z_][a-zA-Z0-9_]*\.([a-zA-Z_][a-zA-Z0-9_]*) could not be resolved")
4090 .unwrap().captures(error) {
4091 let col = cap.get(1)?.as_str().to_string();
4092 println!("🔍 DEBUG - Found via 'Expression table.column': {}", col);
4093 return Some(col);
4094 }
4095
4096 if let Some(cap) = regex::Regex::new(r"(?:PARTITION BY|ORDER BY)\s+([a-zA-Z_][a-zA-Z0-9_]*)")
4097 .unwrap().captures(error) {
4098 let col = cap.get(1)?.as_str().to_string();
4099 println!("🔍 DEBUG - Found via window function: {}", col);
4100 return Some(col);
4101 }
4102
4103 println!("🔍 DEBUG - No column name extracted");
4104 None
4105 }
4106
4107 fn extract_missing_column_comprehensive(error: &str) -> Option<String> {
4108 if error.contains("Schema error:") && error.contains("No field named") {
4112 if let Some(cap) = regex::Regex::new(r"No field named ([a-zA-Z_][a-zA-Z0-9_]*)\.")
4114 .unwrap().captures(error) {
4115 let col = cap.get(1)?.as_str().to_string();
4116 return Some(col);
4118 }
4119 }
4120
4121 if error.contains("Expression") && error.contains("could not be resolved") {
4123 if let Some(start) = error.find("Expression ") {
4124 let remaining = &error[start + 11..];
4125 if let Some(end) = remaining.find(" could not be resolved") {
4126 let expr = remaining[..end].trim();
4127
4128 if let Some(dot_pos) = expr.rfind('.') {
4130 let col = &expr[dot_pos + 1..];
4131 return Some(col.to_string());
4133 } else {
4134 return Some(expr.to_string());
4136 }
4137 }
4138 }
4139 }
4140
4141 extract_missing_column(error)
4143 }
4144
4145 fn is_column_in_select(&self, column: &str) -> bool {
4146 self.raw_selected_columns.iter().any(|sel| {
4147 sel.to_lowercase().contains(&column.to_lowercase()) ||
4149 sel.split(" as ").next().unwrap_or(sel)
4150 .split('.').last().unwrap_or(sel)
4151 .to_lowercase() == column.to_lowercase()
4152 })
4153 }
4154
4155 fn is_column_used_in_window_functions(&self, missing_col: &str) -> bool {
4156 self.window_functions.iter().any(|window_func| {
4157 window_func.to_lowercase().contains(&missing_col.to_lowercase())
4158 })
4159 }
4160
4161 fn is_column_used_in_aggregations(&self, missing_col: &str) -> bool {
4162 self.aggregations.iter().any(|agg| {
4163 agg.to_lowercase().contains(&missing_col.to_lowercase())
4164 })
4165 }
4166
4167 fn is_column_used_in_where(&self, missing_col: &str) -> bool {
4168 self.where_conditions.iter().any(|condition| {
4169 condition.to_lowercase().contains(&missing_col.to_lowercase())
4170 })
4171 }
4172
4173 fn is_column_likely_in_select(&self, missing_col: &str) -> bool {
4174 if self.raw_selected_columns.iter().any(|col| {
4176 col.to_lowercase().contains(&missing_col.to_lowercase())
4177 }) {
4178 return true;
4179 }
4180
4181 if missing_col.contains("_") {
4183 let parts: Vec<&str> = missing_col.split('_').collect();
4184 if parts.len() >= 2 {
4185 let first_part = parts[0];
4186 let last_part = parts[parts.len() - 1];
4187
4188 for selected_col in &self.raw_selected_columns {
4190 if selected_col.to_lowercase().contains(&first_part.to_lowercase()) ||
4191 selected_col.to_lowercase().contains(&last_part.to_lowercase()) {
4192 return true;
4193 }
4194 }
4195 }
4196 }
4197
4198 false
4199 }
4200
4201 fn determine_missing_column_context(&self, missing_col: &str, _error_msg: &str) -> ColumnErrorContext {
4202
4203 if self.is_column_used_in_window_functions(missing_col) {
4204 return ColumnErrorContext {
4205 context: format!("Column '{}' is referenced in a window function", missing_col),
4206 location: "window() function".to_string(),
4207 suggestion: format!(
4208 "💡 Check your .window() function. Either add '{}' to .select([...]) or fix the column name in your window function",
4209 missing_col
4210 ),
4211 };
4212 }
4213
4214 if self.is_column_used_in_aggregations(missing_col) {
4215 return ColumnErrorContext {
4216 context: format!("Column '{}' is referenced in an aggregation", missing_col),
4217 location: "agg() function".to_string(),
4218 suggestion: format!(
4219 "💡 Check your .agg([...]) function. Either add '{}' to .select([...]) or fix the column name in your aggregation",
4220 missing_col
4221 ),
4222 };
4223 }
4224
4225 if self.is_column_used_in_where(missing_col) {
4226 return ColumnErrorContext {
4227 context: format!("Column '{}' is referenced in WHERE conditions", missing_col),
4228 location: "filter() or filter_many() function".to_string(),
4229 suggestion: format!(
4230 "💡 Check your .filter() or .filter_many() conditions. Column '{}' doesn't exist in the table",
4231 missing_col
4232 ),
4233 };
4234 }
4235
4236 if self.is_column_likely_in_select(missing_col) {
4237 return ColumnErrorContext {
4238 context: format!("Column '{}' is wrongly referenced, or is NOT referenced at all in SELECT([...]) clause", missing_col),
4239 location: "select([...]) function".to_string(),
4240 suggestion: format!(
4241 "💡 [1] Use .df_schema() to see available columns. [2] Check your .select([...]), .string_functions([...]) and .datetime_functions([...]). [3] Column '{}' doesn't exist in the table OR is missing in SELECT function if you are using GROUP_BY_ALL(). [4] Maybe try to use GROUP_BY([...]) function and specify all select([...]) columns, and aliased columns from string_functions([...]) and .datetime_functions([...]).",
4242 missing_col
4243 ),
4244 };
4245 }
4246
4247 ColumnErrorContext {
4248 context: format!("Column '{}' is referenced somewhere in your query", missing_col),
4249 location: "unknown location".to_string(),
4250 suggestion: format!(
4251 "Column '{}' doesn't exist. 💡 Check all your functions: .select(), .filter(), .window(), .agg(), .order_by()",
4252 missing_col
4253 ),
4254 }
4255 }
4256
4257 fn generate_group_by_all_fix_suggestion(&self, missing_columns: &[String]) -> String {
4258 let columns_list = missing_columns.iter()
4259 .map(|col| format!("\"{}\"", col))
4260 .collect::<Vec<_>>()
4261 .join(", ");
4262
4263 format!(
4264 "🔧 group_by_all() requires all referenced columns in SELECT. Missing columns: {}. 💡 Quick Fixes: [1] Add missing columns to .select(): .select([\"your_existing_columns\", {}]) [2] Use manual .group_by() instead: .group_by([{}]) // only the columns you actually want to group by. [3] Restructure your query to avoid hidden dependencies",
4265 missing_columns.join(", "),
4266 columns_list,
4267 self.get_likely_group_by_columns().join(", ")
4268 )
4269 }
4270
4271 fn get_likely_group_by_columns(&self) -> Vec<String> {
4272 self.raw_selected_columns
4273 .iter()
4274 .filter(|col| {
4275 let col_upper = col.to_uppercase();
4277 !col_upper.contains("SUM(") &&
4278 !col_upper.contains("COUNT(") &&
4279 !col_upper.contains("AVG(") &&
4280 !col_upper.contains("MIN(") &&
4281 !col_upper.contains("MAX(") &&
4282 !col_upper.contains("ROW_NUMBER(") &&
4283 !col_upper.contains("RANK(") &&
4284 !col_upper.contains("DENSE_RANK(")
4285 })
4286 .map(|col| {
4287 if let Some(alias_pos) = col.to_lowercase().find(" as ") {
4289 format!("\"{}\"", col[alias_pos + 4..].trim())
4290 } else if let Some(dot_pos) = col.rfind('.') {
4291 format!("\"{}\"", &col[dot_pos + 1..])
4292 } else {
4293 format!("\"{}\"", col.trim())
4294 }
4295 })
4296 .collect()
4297 }
4298
4299 fn get_current_select_columns_formatted(&self) -> String {
4300 if self.raw_selected_columns.is_empty() {
4301 " // No columns selected yet".to_string()
4302 } else {
4303 self.raw_selected_columns
4304 .iter()
4305 .map(|col| format!("\"{}\"", col))
4306 .collect::<Vec<_>>()
4307 .join(",\n")
4308 }
4309 }
4310
4311 fn get_current_group_by_columns_formatted(&self) -> String {
4312 if self.raw_selected_columns.is_empty() {
4313 "// No columns available".to_string()
4314 } else {
4315 let non_agg_columns: Vec<String> = self.raw_selected_columns
4317 .iter()
4318 .filter(|col| {
4319 let col_upper = col.to_uppercase();
4320 !col_upper.contains("SUM(") &&
4321 !col_upper.contains("COUNT(") &&
4322 !col_upper.contains("AVG(") &&
4323 !col_upper.contains("MIN(") &&
4324 !col_upper.contains("MAX(") &&
4325 !col_upper.contains("ROW_NUMBER(") &&
4326 !col_upper.contains("RANK(") &&
4327 !col_upper.contains("DENSE_RANK(")
4328 })
4329 .map(|col| {
4330 if let Some(alias_pos) = col.to_lowercase().find(" as ") {
4332 format!("\"{}\"", col[alias_pos + 4..].trim())
4333 } else {
4334 if let Some(dot_pos) = col.rfind('.') {
4336 format!("\"{}\"", &col[dot_pos + 1..])
4337 } else {
4338 format!("\"{}\"", col.trim())
4339 }
4340 }
4341 })
4342 .collect();
4343
4344 if non_agg_columns.is_empty() {
4345 "// All columns are aggregated - manual GROUP BY may not be needed".to_string()
4346 } else {
4347 non_agg_columns.join(", ")
4348 }
4349 }
4350 }
4351
4352 fn get_table_aliases_for_suggestion(&self) -> Vec<String> {
4353 let mut aliases = vec![self.table_alias.clone()];
4354
4355 for join in &self.joins {
4357 aliases.push(join.dataframe.table_alias.clone());
4358 }
4359
4360 aliases
4361 }
4362 fn get_contextual_suggestion(&self, error_msg: &str) -> String {
4363 let has_joins = !self.joins.is_empty();
4364 let has_aggs = !self.aggregations.is_empty();
4365 let has_group_by = !self.group_by_columns.is_empty();
4366 let has_star = self.raw_selected_columns.iter().any(|col| col.contains("*"));
4367 let has_window_functions = !self.window_functions.is_empty();
4368
4369 if has_joins && error_msg.contains("duplicate") {
4370 let table_aliases = self.get_table_aliases_for_suggestion();
4371 let example_column = self.get_example_column_for_alias_suggestion();
4372
4373 if table_aliases.len() >= 2 && !example_column.is_empty() {
4374 format!(
4375 "💡 JOIN detected with duplicate columns. Use aliases: .select([\"{}.{} AS {}_{}\", \"{}.{} AS {}_{}\"])) or star selection: .select([\"{}.*, {}.*/)",
4376 table_aliases[0], example_column, table_aliases[0], example_column,
4377 table_aliases[1], example_column, table_aliases[1], example_column,
4378 table_aliases[0], table_aliases[1]
4379 )
4380 } else {
4381 "💡 JOIN detected with duplicate columns. Use table aliases in your .select([...]) or use star selection for auto-deduplication".to_string()
4382 }
4383 } else if has_window_functions && error_msg.contains("resolved") {
4384 "💡 Window function error. Ensure all referenced columns are in .select([...]) or check your PARTITION BY/ORDER BY clauses".to_string()
4385 } else if has_aggs && !has_group_by {
4386 let non_agg_columns = self.get_non_aggregate_columns();
4387 if !non_agg_columns.is_empty() {
4388 format!(
4389 "💡 Aggregations detected without GROUP BY. Use .group_by_all() or specify .group_by([{}])",
4390 non_agg_columns.join(", ")
4391 )
4392 } else {
4393 "💡 Aggregations detected without GROUP BY. Use .group_by_all() or specify .group_by([...]) columns".to_string()
4394 }
4395 } else if has_aggs && has_group_by && error_msg.contains("resolved") {
4396 "💡 GROUP BY/SELECT mismatch. Ensure all non-aggregate SELECT columns are in GROUP BY, or use .group_by_all()".to_string()
4397 } else if has_star && error_msg.contains("duplicate") {
4398 "💡 Star selection with duplicate columns. This should auto-deduplicate - check for mixed star/explicit selection".to_string()
4399 } else {
4400 "💡 Check SQL syntax, column names, and table aliases. Use .df_schema() to see available columns".to_string()
4401 }
4402 }
4403
4404 fn get_example_column_for_alias_suggestion(&self) -> String {
4405 let common_names = ["id", "key", "name", "code", "date", "time", "value"];
4407
4408 for col in &self.raw_selected_columns {
4410 let col_lower = col.to_lowercase();
4411 for &common in &common_names {
4412 if col_lower.contains(common) {
4413 return common.to_string();
4414 }
4415 }
4416 }
4417
4418 if let Some(first_col) = self.raw_selected_columns.first() {
4420 if let Some(dot_pos) = first_col.rfind('.') {
4421 return first_col[dot_pos + 1..].to_string();
4422 } else if let Some(space_pos) = first_col.find(' ') {
4423 return first_col[..space_pos].to_string();
4424 } else {
4425 return first_col.clone();
4426 }
4427 }
4428
4429 "column".to_string()
4430 }
4431
4432 fn get_non_aggregate_columns(&self) -> Vec<String> {
4434 self.raw_selected_columns
4435 .iter()
4436 .filter(|col| {
4437 let col_upper = col.to_uppercase();
4438 !col_upper.contains("SUM(") &&
4439 !col_upper.contains("COUNT(") &&
4440 !col_upper.contains("AVG(") &&
4441 !col_upper.contains("MIN(") &&
4442 !col_upper.contains("MAX(") &&
4443 !col_upper.contains("ROW_NUMBER(") &&
4444 !col_upper.contains("RANK(")
4445 })
4446 .map(|col| {
4447 if let Some(alias_pos) = col.to_lowercase().find(" as ") {
4449 format!("\"{}\"", col[alias_pos + 4..].trim())
4450 } else if let Some(dot_pos) = col.rfind('.') {
4451 format!("\"{}\"", &col[dot_pos + 1..])
4452 } else {
4453 format!("\"{}\"", col.trim())
4454 }
4455 })
4456 .collect()
4457 }
4458
4459 fn get_available_columns(&self) -> Vec<String> {
4461 let mut columns = Vec::new();
4462
4463 let schema = self.df.schema();
4465 for field in schema.fields() {
4466 columns.push(format!("{}.{}", self.table_alias, field.name()));
4467 }
4468
4469 for join in &self.joins {
4471 let join_schema = join.dataframe.df.schema();
4472 for field in join_schema.fields() {
4473 columns.push(format!("{}.{}", join.dataframe.table_alias, field.name()));
4474 }
4475 }
4476
4477 columns
4478 }
4479
4480 async fn register_all_tables(&self, ctx: &SessionContext) -> ElusionResult<()> {
4484 let mut tables_to_register = Vec::new();
4485
4486 tables_to_register.push((&self.table_alias, &self.df));
4487
4488 if self.union_tables.is_none() {
4489 for join in &self.joins {
4490 tables_to_register.push((&join.dataframe.table_alias, &join.dataframe.df));
4491 }
4492 }
4493
4494 if let Some(tables) = &self.union_tables {
4495 for (table_alias, df, _) in tables {
4496 if ctx.table(table_alias).await.is_err() {
4497 tables_to_register.push((table_alias, df));
4498 }
4499 }
4500 }
4501 for (alias, df) in tables_to_register {
4503 register_df_as_table(ctx, alias, df).await
4504 .map_err(|e| ElusionError::SchemaError {
4505 message: format!("Failed to register table '{}': {}", alias, e),
4506 schema: Some(df.schema().to_string()),
4507 suggestion: "💡 Check table schema compatibility".to_string()
4508 })?;
4509 }
4510
4511 Ok(())
4512 }
4513
4514 fn is_complex_query(&self) -> bool {
4516 self.joins.len() > 3 ||
4517 self.selected_columns.len() + self.aggregations.len() > 20 ||
4518 self.where_conditions.len() > 10 ||
4519 !self.ctes.is_empty() ||
4520 !self.set_operations.is_empty()
4521 }
4522
4523 fn is_large_result_expected(&self) -> bool {
4525 self.joins.len() > 2 ||
4527 self.aggregations.len() > 5 ||
4528 self.limit_count.map_or(true, |limit| limit > 10000)
4529 }
4530
4531 pub async fn display(&self) -> ElusionResult<()> {
4533 self.df.clone().show().await.map_err(|e|
4534 ElusionError::Custom(format!("Failed to display DataFrame: {}", e))
4535 )
4536 }
4537 pub fn df_schema(&self) {
4539 let schema = self.df.schema();
4540
4541 println!("\n📋 Schema - table alias: '{}'", self.table_alias);
4542 println!("{}", "-".repeat(60));
4543
4544 for (index, field) in schema.fields().iter().enumerate() {
4545 let data_type = match field.data_type() {
4546 arrow::datatypes::DataType::Utf8 => "String",
4547 arrow::datatypes::DataType::Int32 => "Int32",
4548 arrow::datatypes::DataType::Int64 => "Int64",
4549 arrow::datatypes::DataType::Float32 => "Float32",
4550 arrow::datatypes::DataType::Float64 => "Float64",
4551 arrow::datatypes::DataType::Boolean => "Boolean",
4552 arrow::datatypes::DataType::Date32 => "Date",
4553 arrow::datatypes::DataType::Timestamp(_, _) => "Timestamp",
4554 _ => "Other"
4555 };
4556
4557 println!("{:2}. {} ({})",
4558 index + 1,
4559 field.name(),
4560 data_type
4561 );
4562 }
4563 println!();
4564 }
4565
4566 pub fn display_query(&self) {
4568 let final_query = self.construct_sql();
4569
4570 let formatted = final_query
4572 .replace("WITH ", "\nWITH ")
4573 .replace(") SELECT ", ")\n\nSELECT ")
4574 .replace(" SELECT ", "\nSELECT ")
4575 .replace(" FROM ", "\nFROM ")
4576 .replace(" WHERE ", "\nWHERE ")
4577 .replace(" GROUP BY ", "\nGROUP BY ")
4578 .replace(" HAVING ", "\nHAVING ")
4579 .replace(" ORDER BY ", "\nORDER BY ")
4580 .replace(" LIMIT ", "\nLIMIT ")
4581 .replace(" INNER JOIN ", "\n INNER JOIN ")
4582 .replace(" LEFT JOIN ", "\n LEFT JOIN ")
4583 .replace(" RIGHT JOIN ", "\n RIGHT JOIN ")
4584 .replace(" AS (", " AS (\n ")
4585 .replace("UNION ALL ", "\nUNION ALL\n")
4586 .replace("UNION ", "\nUNION\n")
4587 .replace("EXCEPT ", "\nEXCEPT\n")
4588 .replace("INTERSECT ", "\nINTERSECT\n");
4589
4590 println!("📋 Generated SQL Query:");
4591 println!("{}", "=".repeat(60));
4592 println!("{}", formatted);
4593 println!("{}", "=".repeat(60));
4594 }
4595
4596 pub fn display_query_with_info(&self) {
4598 let final_query = self.construct_sql();
4599
4600 println!("📋 Query Analysis:");
4601 println!("{}", "=".repeat(50));
4602 println!("🔍 SQL Query:");
4603
4604 let formatted = final_query
4606 .replace("WITH ", "\nWITH ")
4607 .replace(") SELECT ", ")\n\nSELECT ")
4608 .replace(" SELECT ", "\nSELECT ")
4609 .replace(" FROM ", "\nFROM ")
4610 .replace(" WHERE ", "\nWHERE ")
4611 .replace(" GROUP BY ", "\nGROUP BY ")
4612 .replace(" HAVING ", "\nHAVING ")
4613 .replace(" ORDER BY ", "\nORDER BY ")
4614 .replace(" LIMIT ", "\nLIMIT ")
4615 .replace(" INNER JOIN ", "\n INNER JOIN ")
4616 .replace(" LEFT JOIN ", "\n LEFT JOIN ")
4617 .replace(" RIGHT JOIN ", "\n RIGHT JOIN ");
4618
4619 println!("{}", formatted);
4620 println!();
4621
4622 let query_upper = final_query.to_uppercase();
4624 println!("📊 Query Info:");
4625 println!(" • Has CTEs: {}", query_upper.contains("WITH"));
4626 println!(" • Has JOINs: {}", query_upper.contains("JOIN"));
4627 println!(" • Has WHERE: {}", query_upper.contains("WHERE"));
4628 println!(" • Has GROUP BY: {}", query_upper.contains("GROUP BY"));
4629 println!(" • Has HAVING: {}", query_upper.contains("HAVING"));
4630 println!(" • Has ORDER BY: {}", query_upper.contains("ORDER BY"));
4631 println!(" • Has LIMIT: {}", query_upper.contains("LIMIT"));
4632 println!(" • Has UNION: {}", query_upper.contains("UNION"));
4633
4634 let cte_count = query_upper.matches("WITH").count();
4636 let join_count = query_upper.matches("JOIN").count();
4637 let function_count = final_query.matches('(').count();
4638 let union_count = query_upper.matches("UNION").count();
4639
4640 println!(" • CTE count: {}", cte_count);
4641 println!(" • Join count: {}", join_count);
4642 println!(" • Union count: {}", union_count);
4643 println!(" • Function calls: ~{}", function_count);
4644
4645 let complexity = match (cte_count, join_count, union_count, function_count) {
4647 (0, 0, 0, 0..=2) => "Simple",
4648 (0, 0..=2, 0, 3..=10) => "Moderate",
4649 (1, 0..=3, 0..=1, _) => "Moderate",
4650 (0..=1, 4..=5, 0..=2, _) => "Complex",
4651 _ => "Very Complex"
4652 };
4653 println!(" • Complexity: {}", complexity);
4654
4655 if cte_count > 0 {
4657 println!(" 💡 Query uses CTEs - good for readability and performance");
4658 }
4659 if join_count > 3 {
4660 println!(" ⚠️ Many JOINs detected - consider performance implications");
4661 }
4662 if function_count > 15 {
4663 println!(" ⚠️ Many functions detected - verify index usage");
4664 }
4665
4666 println!("{}", "=".repeat(50));
4667 }
4668
4669
4670 fn find_actual_column_name(&self, column: &str) -> Option<String> {
4674 self.df
4675 .schema()
4676 .fields()
4677 .iter()
4678 .find(|f| f.name().to_lowercase() == column.to_lowercase())
4679 .map(|f| f.name().to_string())
4680 }
4681 async fn compute_column_stats(&self, columns: &[&str]) -> ElusionResult<ColumnStats> {
4683 let mut stats = ColumnStats::default();
4684 let ctx = Arc::new(SessionContext::new());
4685
4686 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
4688
4689 for &column in columns {
4690 let actual_column = self.find_actual_column_name(column)
4692 .ok_or_else(|| ElusionError::Custom(
4693 format!("Column '{}' not found in schema", column)
4694 ))?;
4695
4696 let normalized_col = if actual_column.contains('.') {
4698 normalize_column_name(&actual_column)
4699 } else {
4700 normalize_column_name(&format!("{}.{}", self.table_alias, actual_column))
4701 };
4702
4703 let sql = format!(
4704 "SELECT
4705 COUNT(*) as total_count,
4706 COUNT({col}) as non_null_count,
4707 AVG({col}::float) as mean,
4708 MIN({col}) as min_value,
4709 MAX({col}) as max_value,
4710 STDDEV({col}::float) as std_dev
4711 FROM {}",
4712 normalize_alias(&self.table_alias),
4713 col = normalized_col
4714 );
4715
4716 let result_df = ctx.sql(&sql).await.map_err(|e| {
4717 ElusionError::Custom(format!(
4718 "Failed to compute statistics for column '{}': {}",
4719 column, e
4720 ))
4721 })?;
4722
4723 let batches = result_df.collect().await.map_err(ElusionError::DataFusion)?;
4724
4725 if let Some(batch) = batches.first() {
4726 let total_count = batch.column(0).as_any().downcast_ref::<Int64Array>()
4728 .ok_or_else(|| ElusionError::Custom("Failed to downcast total_count".to_string()))?
4729 .value(0);
4730
4731 let non_null_count = batch.column(1).as_any().downcast_ref::<Int64Array>()
4732 .ok_or_else(|| ElusionError::Custom("Failed to downcast non_null_count".to_string()))?
4733 .value(0);
4734
4735 let mean = batch.column(2).as_any().downcast_ref::<Float64Array>()
4736 .ok_or_else(|| ElusionError::Custom("Failed to downcast mean".to_string()))?
4737 .value(0);
4738
4739 let min_value = ScalarValue::try_from_array(batch.column(3), 0)?;
4740 let max_value = ScalarValue::try_from_array(batch.column(4), 0)?;
4741
4742 let std_dev = batch.column(5).as_any().downcast_ref::<Float64Array>()
4743 .ok_or_else(|| ElusionError::Custom("Failed to downcast std_dev".to_string()))?
4744 .value(0);
4745
4746 stats.columns.push(ColumnStatistics {
4747 name: column.to_string(),
4748 total_count,
4749 non_null_count,
4750 mean: Some(mean),
4751 min_value,
4752 max_value,
4753 std_dev: Some(std_dev),
4754 });
4755 }
4756 }
4757
4758 Ok(stats)
4759 }
4760
4761 async fn analyze_null_values(&self, columns: Option<&[&str]>) -> ElusionResult<NullAnalysis> {
4763 let ctx = Arc::new(SessionContext::new());
4764 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
4765
4766 let columns = match columns {
4767 Some(cols) => cols.to_vec(),
4768 None => {
4769 self.df
4770 .schema()
4771 .fields()
4772 .iter()
4773 .map(|f| f.name().as_str())
4774 .collect()
4775 }
4776 };
4777
4778 let mut null_counts = Vec::new();
4779 for column in columns {
4780 let actual_column = self.find_actual_column_name(column)
4782 .ok_or_else(|| ElusionError::Custom(
4783 format!("Column '{}' not found in schema", column)
4784 ))?;
4785
4786 let normalized_col = if actual_column.contains('.') {
4788 normalize_column_name(&actual_column)
4789 } else {
4790 normalize_column_name(&format!("{}.{}", self.table_alias, actual_column))
4791 };
4792
4793 let sql = format!(
4794 "SELECT
4795 '{}' as column_name,
4796 COUNT(*) as total_rows,
4797 COUNT(*) - COUNT({}) as null_count,
4798 (COUNT(*) - COUNT({})) * 100.0 / COUNT(*) as null_percentage
4799 FROM {}",
4800 column, normalized_col, normalized_col, normalize_alias(&self.table_alias)
4801 );
4802
4803 let result_df = ctx.sql(&sql).await.map_err(|e| {
4804 ElusionError::Custom(format!(
4805 "Failed to analyze null values for column '{}': {}",
4806 column, e
4807 ))
4808 })?;
4809
4810 let batches = result_df.collect().await.map_err(ElusionError::DataFusion)?;
4811
4812 if let Some(batch) = batches.first() {
4813 let column_name = batch.column(0).as_any().downcast_ref::<StringArray>()
4814 .ok_or_else(|| ElusionError::Custom("Failed to downcast column_name".to_string()))?
4815 .value(0);
4816
4817 let total_rows = batch.column(1).as_any().downcast_ref::<Int64Array>()
4818 .ok_or_else(|| ElusionError::Custom("Failed to downcast total_rows".to_string()))?
4819 .value(0);
4820
4821 let null_count = batch.column(2).as_any().downcast_ref::<Int64Array>()
4822 .ok_or_else(|| ElusionError::Custom("Failed to downcast null_count".to_string()))?
4823 .value(0);
4824
4825 let null_percentage = batch.column(3).as_any().downcast_ref::<Float64Array>()
4826 .ok_or_else(|| ElusionError::Custom("Failed to downcast null_percentage".to_string()))?
4827 .value(0);
4828
4829 null_counts.push(NullCount {
4830 column_name: column_name.to_string(),
4831 total_rows,
4832 null_count,
4833 null_percentage,
4834 });
4835 }
4836 }
4837
4838 Ok(NullAnalysis { counts: null_counts })
4839 }
4840
4841 async fn compute_correlation(&self, col1: &str, col2: &str) -> ElusionResult<f64> {
4843 let ctx = Arc::new(SessionContext::new());
4844 register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
4845
4846 let actual_col1 = self.find_actual_column_name(col1)
4847 .ok_or_else(|| ElusionError::Custom(
4848 format!("Column '{}' not found in schema", col1)
4849 ))?;
4850
4851 let actual_col2 = self.find_actual_column_name(col2)
4852 .ok_or_else(|| ElusionError::Custom(
4853 format!("Column '{}' not found in schema", col2)
4854 ))?;
4855
4856 let normalized_col1 = if actual_col1.contains('.') {
4858 normalize_column_name(&actual_col1)
4859 } else {
4860 normalize_column_name(&format!("{}.{}", self.table_alias, actual_col1))
4861 };
4862
4863 let normalized_col2 = if actual_col2.contains('.') {
4864 normalize_column_name(&actual_col2)
4865 } else {
4866 normalize_column_name(&format!("{}.{}", self.table_alias, actual_col2))
4867 };
4868
4869 let sql = format!(
4870 "SELECT corr({}::float, {}::float) as correlation
4871 FROM {}",
4872 normalized_col1, normalized_col2, normalize_alias(&self.table_alias)
4873 );
4874
4875 let result_df = ctx.sql(&sql).await.map_err(|e| {
4876 ElusionError::Custom(format!(
4877 "Failed to compute correlation between '{}' and '{}': {}",
4878 col1, col2, e
4879 ))
4880 })?;
4881
4882 let batches = result_df.collect().await.map_err(ElusionError::DataFusion)?;
4883
4884 if let Some(batch) = batches.first() {
4885 if let Some(array) = batch.column(0).as_any().downcast_ref::<Float64Array>() {
4886 if !array.is_null(0) {
4887 return Ok(array.value(0));
4888 }
4889 }
4890 }
4891
4892 Ok(0.0) }
4894
4895
4896 pub async fn display_stats(&self, columns: &[&str]) -> ElusionResult<()> {
4898 let stats = self.compute_column_stats(columns).await?;
4899
4900 println!("\n=== Column Statistics ===");
4901 println!("{:-<80}", "");
4902
4903 for col_stat in stats.columns {
4904 println!("Column: {}", col_stat.name);
4905 println!("{:-<80}", "");
4906 println!("| {:<20} | {:>15} | {:>15} | {:>15} |",
4907 "Metric", "Value", "Min", "Max");
4908 println!("{:-<80}", "");
4909
4910 println!("| {:<20} | {:>15} | {:<15} | {:<15} |",
4911 "Records",
4912 col_stat.total_count,
4913 "-",
4914 "-");
4915
4916 println!("| {:<20} | {:>15} | {:<15} | {:<15} |",
4917 "Non-null Records",
4918 col_stat.non_null_count,
4919 "-",
4920 "-");
4921
4922 if let Some(mean) = col_stat.mean {
4923 println!("| {:<20} | {:>15.2} | {:<15} | {:<15} |",
4924 "Mean",
4925 mean,
4926 "-",
4927 "-");
4928 }
4929
4930 if let Some(std_dev) = col_stat.std_dev {
4931 println!("| {:<20} | {:>15.2} | {:<15} | {:<15} |",
4932 "Standard Dev",
4933 std_dev,
4934 "-",
4935 "-");
4936 }
4937
4938 println!("| {:<20} | {:>15} | {:<15} | {:<15} |",
4939 "Value Range",
4940 "-",
4941 format!("{}", col_stat.min_value),
4942 format!("{}", col_stat.max_value));
4943
4944 println!("{:-<80}\n", "");
4945 }
4946 Ok(())
4947 }
4948
4949 pub async fn display_null_analysis(&self, columns: Option<&[&str]>) -> ElusionResult<()> {
4951 let analysis = self.analyze_null_values(columns).await?;
4952
4953 println!("\n=== Null Value Analysis ===");
4954 println!("{:-<90}", "");
4955 println!("| {:<30} | {:>15} | {:>15} | {:>15} |",
4956 "Column", "Total Rows", "Null Count", "Null Percentage");
4957 println!("{:-<90}", "");
4958
4959 for count in analysis.counts {
4960 println!("| {:<30} | {:>15} | {:>15} | {:>14.2}% |",
4961 count.column_name,
4962 count.total_rows,
4963 count.null_count,
4964 count.null_percentage);
4965 }
4966 println!("{:-<90}\n", "");
4967 Ok(())
4968 }
4969
4970 pub async fn display_correlation_matrix(&self, columns: &[&str]) -> ElusionResult<()> {
4972 println!("\n=== Correlation Matrix ===");
4973 let col_width = 20;
4974 let total_width = (columns.len() + 1) * (col_width + 3) + 1;
4975 println!("{:-<width$}", "", width = total_width);
4976
4977 print!("| {:<width$} |", "", width = col_width);
4979 for col in columns {
4980 let display_name = if col.len() > col_width {
4981 format!("{}...", &col[..12])
4983 } else {
4984 col.to_string()
4985 };
4986 print!(" {:<width$} |", display_name, width = col_width);
4987 }
4988 println!();
4989 println!("{:-<width$}", "", width = total_width);
4990
4991 for &col1 in columns {
4993 let display_name = if col1.len() > col_width {
4994 format!("{}...", &col1[..12])
4995 } else {
4996 col1.to_string()
4997 };
4998 print!("| {:<width$} |", display_name, width = col_width);
4999
5000 for &col2 in columns {
5001 let correlation = self.compute_correlation(col1, col2).await?;
5002 print!(" {:>width$.4} |", correlation, width = col_width); }
5004 println!();
5005 }
5006 println!("{:-<width$}\n", "", width = total_width);
5007 Ok(())
5008 }
5009
5010pub async fn write_to_json(
5014 &self,
5015 path: &str,
5016 pretty: bool,
5017 ) -> ElusionResult<()> {
5018
5019 if !path.ends_with(".json") {
5020 return Err(ElusionError::Custom(
5021 "❌ Invalid file extension. Json files must end with '.json'".to_string()
5022 ));
5023 }
5024
5025 if let Some(parent) = LocalPath::new(path).parent() {
5026 if !parent.exists() {
5027 std::fs::create_dir_all(parent).map_err(|e| ElusionError::WriteError {
5028 path: parent.display().to_string(),
5029 operation: "create_directory".to_string(),
5030 reason: e.to_string(),
5031 suggestion: "💡 Check if you have permissions to create directories".to_string(),
5032 })?;
5033 }
5034 }
5035
5036 if fs::metadata(path).is_ok() {
5037 fs::remove_file(path).or_else(|_| fs::remove_dir_all(path)).map_err(|e|
5038 ElusionError::WriteError {
5039 path: path.to_string(),
5040 operation: "overwrite".to_string(),
5041 reason: format!("❌ Failed to delete existing file: {}", e),
5042 suggestion: "💡 Check file permissions and ensure no other process is using the file".to_string(),
5043 }
5044 )?;
5045 }
5046
5047 let batches = self.df.clone().collect().await.map_err(|e|
5048 ElusionError::InvalidOperation {
5049 operation: "Data Collection".to_string(),
5050 reason: format!("Failed to collect DataFrame: {}", e),
5051 suggestion: "💡 Verify DataFrame is not empty and contains valid data".to_string(),
5052 }
5053 )?;
5054
5055 if batches.is_empty() {
5056 return Err(ElusionError::InvalidOperation {
5057 operation: "JSON Writing".to_string(),
5058 reason: "No data to write".to_string(),
5059 suggestion: "💡 Ensure DataFrame contains data before writing".to_string(),
5060 });
5061 }
5062
5063 let file = OpenOptions::new()
5064 .write(true)
5065 .create(true)
5066 .truncate(true)
5067 .open(path)
5068 .map_err(|e| ElusionError::WriteError {
5069 path: path.to_string(),
5070 operation: "file_create".to_string(),
5071 reason: e.to_string(),
5072 suggestion: "💡 Check file permissions and path validity".to_string(),
5073 })?;
5074
5075 let mut writer = BufWriter::new(file);
5076
5077 writeln!(writer, "[").map_err(|e| ElusionError::WriteError {
5079 path: path.to_string(),
5080 operation: "begin_json".to_string(),
5081 reason: e.to_string(),
5082 suggestion: "💡 Check disk space and write permissions".to_string(),
5083 })?;
5084
5085 let mut first_row = true;
5087 let mut rows_written = 0;
5088
5089 for batch in batches.iter() {
5090 let row_count = batch.num_rows();
5091 let column_count = batch.num_columns();
5092
5093 if row_count == 0 || column_count == 0 {
5095 continue;
5096 }
5097
5098 let schema = batch.schema();
5100 let column_names: Vec<&str> = schema.fields().iter()
5101 .map(|f| f.name().as_str())
5102 .collect();
5103
5104 for row_idx in 0..row_count {
5106 if !first_row {
5107 writeln!(writer, ",").map_err(|e| ElusionError::WriteError {
5108 path: path.to_string(),
5109 operation: "write_separator".to_string(),
5110 reason: e.to_string(),
5111 suggestion: "💡 Check disk space and write permissions".to_string(),
5112 })?;
5113 }
5114 first_row = false;
5115 rows_written += 1;
5116
5117 let mut row_obj = serde_json::Map::new();
5119
5120 for col_idx in 0..column_count {
5122 let col_name = column_names[col_idx];
5123 let array = batch.column(col_idx);
5124
5125 let json_value = array_value_to_json(array, row_idx)?;
5127 row_obj.insert(col_name.to_string(), json_value);
5128 }
5129
5130 let json_value = serde_json::Value::Object(row_obj);
5132
5133 if pretty {
5134 serde_json::to_writer_pretty(&mut writer, &json_value)
5135 .map_err(|e| ElusionError::WriteError {
5136 path: path.to_string(),
5137 operation: format!("write_row_{}", rows_written),
5138 reason: format!("JSON serialization error: {}", e),
5139 suggestion: "💡 Check if row contains valid JSON data".to_string(),
5140 })?;
5141 } else {
5142 serde_json::to_writer(&mut writer, &json_value)
5143 .map_err(|e| ElusionError::WriteError {
5144 path: path.to_string(),
5145 operation: format!("write_row_{}", rows_written),
5146 reason: format!("JSON serialization error: {}", e),
5147 suggestion: "💡 Check if row contains valid JSON data".to_string(),
5148 })?;
5149 }
5150 }
5151 }
5152
5153 writeln!(writer, "\n]").map_err(|e| ElusionError::WriteError {
5155 path: path.to_string(),
5156 operation: "end_json".to_string(),
5157 reason: e.to_string(),
5158 suggestion: "💡 Check disk space and write permissions".to_string(),
5159 })?;
5160
5161 writer.flush().map_err(|e| ElusionError::WriteError {
5163 path: path.to_string(),
5164 operation: "flush".to_string(),
5165 reason: e.to_string(),
5166 suggestion: "💡 Failed to flush data to file".to_string(),
5167 })?;
5168
5169 println!("✅ Data successfully written to '{}'", path);
5170
5171 if rows_written == 0 {
5172 println!("*** Warning ***: No rows were written to the file. Check if this is expected.");
5173 } else {
5174 println!("✅ Wrote {} rows to JSON file", rows_written);
5175 }
5176
5177 Ok(())
5178 }
5179
5180
5181 pub async fn write_to_parquet(
5183 &self,
5184 mode: &str,
5185 path: &str,
5186 options: Option<DataFrameWriteOptions>,
5187 ) -> ElusionResult<()> {
5188
5189 if !path.ends_with(".parquet") {
5190 return Err(ElusionError::Custom(
5191 "❌ Invalid file extension. Parquet files must end with '.parquet'".to_string()
5192 ));
5193 }
5194
5195 let write_options = options.unwrap_or_else(DataFrameWriteOptions::new);
5196
5197 if let Some(parent) = LocalPath::new(path).parent() {
5198 if !parent.exists() {
5199 std::fs::create_dir_all(parent).map_err(|e| ElusionError::WriteError {
5200 path: parent.display().to_string(),
5201 operation: "create_directory".to_string(),
5202 reason: e.to_string(),
5203 suggestion: "💡 Check if you have permissions to create directories".to_string(),
5204 })?;
5205 }
5206 }
5207 match mode {
5208 "overwrite" => {
5209 if fs::metadata(path).is_ok() {
5210 fs::remove_file(path).or_else(|_| fs::remove_dir_all(path)).map_err(|e| {
5211 ElusionError::WriteError {
5212 path: path.to_string(),
5213 operation: "overwrite".to_string(),
5214 reason: format!("❌ Failed to delete existing file/directory: {}", e),
5215 suggestion: "💡 Check file permissions and ensure no other process is using the file".to_string()
5216 }
5217 })?;
5218 }
5219
5220 self.df.clone().write_parquet(path, write_options, None).await
5221 .map_err(|e| ElusionError::WriteError {
5222 path: path.to_string(),
5223 operation: "overwrite".to_string(),
5224 reason: e.to_string(),
5225 suggestion: "💡 Check file permissions and path validity".to_string()
5226 })?;
5227 }
5228 "append" => {
5229 let ctx = SessionContext::new();
5230
5231 if !fs::metadata(path).is_ok() {
5232 self.df.clone().write_parquet(path, write_options, None).await
5233 .map_err(|e| ElusionError::WriteError {
5234 path: path.to_string(),
5235 operation: "append".to_string(),
5236 reason: format!("❌ Failed to create initial file: {}", e),
5237 suggestion: "💡 Check directory permissions and path validity".to_string()
5238 })?;
5239 return Ok(());
5240 }
5241
5242 let existing_df = ctx.read_parquet(path, ParquetReadOptions::default()).await
5244 .map_err(|e| ElusionError::WriteError {
5245 path: path.to_string(),
5246 operation: "read_existing".to_string(),
5247 reason: e.to_string(),
5248 suggestion: "💡 Verify the file is a valid Parquet file".to_string()
5249 })?;
5250
5251 ctx.register_table("existing_data", Arc::new(
5265 MemTable::try_new(
5266 existing_df.schema().clone().into(),
5267 vec![existing_df.clone().collect().await.map_err(|e| ElusionError::WriteError {
5268 path: path.to_string(),
5269 operation: "collect_existing".to_string(),
5270 reason: e.to_string(),
5271 suggestion: "💡 Failed to collect existing data".to_string()
5272 })?]
5273 ).map_err(|e| ElusionError::WriteError {
5274 path: path.to_string(),
5275 operation: "create_mem_table".to_string(),
5276 reason: e.to_string(),
5277 suggestion: "💡 Failed to create memory table".to_string()
5278 })?
5279 )).map_err(|e| ElusionError::WriteError {
5280 path: path.to_string(),
5281 operation: "register_existing".to_string(),
5282 reason: e.to_string(),
5283 suggestion: "💡 Failed to register existing data".to_string()
5284 })?;
5285
5286 ctx.register_table("new_data", Arc::new(
5288 MemTable::try_new(
5289 self.df.schema().clone().into(),
5290 vec![self.df.clone().collect().await.map_err(|e| ElusionError::WriteError {
5291 path: path.to_string(),
5292 operation: "collect_new".to_string(),
5293 reason: e.to_string(),
5294 suggestion: "💡 Failed to collect new data".to_string()
5295 })?]
5296 ).map_err(|e| ElusionError::WriteError {
5297 path: path.to_string(),
5298 operation: "create_mem_table".to_string(),
5299 reason: e.to_string(),
5300 suggestion: "💡 Failed to create memory table".to_string()
5301 })?
5302 )).map_err(|e| ElusionError::WriteError {
5303 path: path.to_string(),
5304 operation: "register_new".to_string(),
5305 reason: e.to_string(),
5306 suggestion: "💡 Failed to register new data".to_string()
5307 })?;
5308
5309 let column_list = existing_df.schema()
5311 .fields()
5312 .iter()
5313 .map(|f| format!("\"{}\"", f.name()))
5314 .collect::<Vec<_>>()
5315 .join(", ");
5316
5317 let sql = format!(
5319 "SELECT {} FROM existing_data UNION ALL SELECT {} FROM new_data",
5320 column_list, column_list
5321 );
5322 let combined_df = ctx.sql(&sql).await
5325 .map_err(|e| ElusionError::WriteError {
5326 path: path.to_string(),
5327 operation: "combine_data".to_string(),
5328 reason: e.to_string(),
5329 suggestion: "💡 Failed to combine existing and new data".to_string()
5330 })?;
5331
5332 let temp_path = format!("{}.temp", path);
5334
5335 combined_df.write_parquet(&temp_path, write_options, None).await
5337 .map_err(|e| ElusionError::WriteError {
5338 path: temp_path.clone(),
5339 operation: "write_combined".to_string(),
5340 reason: e.to_string(),
5341 suggestion: "💡 Failed to write combined data".to_string()
5342 })?;
5343
5344 fs::remove_file(path).map_err(|e| ElusionError::WriteError {
5346 path: path.to_string(),
5347 operation: "remove_original".to_string(),
5348 reason: format!("❌ Failed to remove original file: {}", e),
5349 suggestion: "💡 Check file permissions".to_string()
5350 })?;
5351
5352 fs::rename(&temp_path, path).map_err(|e| ElusionError::WriteError {
5354 path: path.to_string(),
5355 operation: "rename_temp".to_string(),
5356 reason: format!("❌ Failed to rename temporary file: {}", e),
5357 suggestion: "💡 Check file system permissions".to_string()
5358 })?;
5359 }
5360 _ => return Err(ElusionError::InvalidOperation {
5361 operation: mode.to_string(),
5362 reason: "Invalid write mode".to_string(),
5363 suggestion: "💡 Use 'overwrite' or 'append'".to_string()
5364 })
5365 }
5366
5367 match mode {
5368 "overwrite" => println!("✅ Data successfully overwritten to '{}'", path),
5369 "append" => println!("✅ Data successfully appended to '{}'", path),
5370 _ => unreachable!(),
5371 }
5372
5373 Ok(())
5374 }
5375
5376 pub async fn write_to_csv(
5378 &self,
5379 mode: &str,
5380 path: &str,
5381 csv_options: CsvWriteOptions,
5382 ) -> ElusionResult<()> {
5383
5384 if !path.ends_with(".csv") {
5385 return Err(ElusionError::Custom(
5386 "❌ Invalid file extension. CSV files must end with '.csv'".to_string()
5387 ));
5388 }
5389
5390 csv_options.validate()?;
5391
5392 if let Some(parent) = LocalPath::new(path).parent() {
5393 if !parent.exists() {
5394 std::fs::create_dir_all(parent).map_err(|e| ElusionError::WriteError {
5395 path: parent.display().to_string(),
5396 operation: "create_directory".to_string(),
5397 reason: e.to_string(),
5398 suggestion: "💡 Check if you have permissions to create directories".to_string(),
5399 })?;
5400 }
5401 }
5402
5403 match mode {
5404 "overwrite" => {
5405 if fs::metadata(path).is_ok() {
5407 fs::remove_file(path).or_else(|_| fs::remove_dir_all(path)).map_err(|e|
5408 ElusionError::WriteError {
5409 path: path.to_string(),
5410 operation: "overwrite".to_string(),
5411 reason: format!("Failed to delete existing file: {}", e),
5412 suggestion: "💡 Check file permissions and ensure no other process is using the file".to_string(),
5413 }
5414 )?;
5415 }
5416
5417 let batches = self.df.clone().collect().await.map_err(|e|
5418 ElusionError::InvalidOperation {
5419 operation: "Data Collection".to_string(),
5420 reason: format!("Failed to collect DataFrame: {}", e),
5421 suggestion: "💡 Verify DataFrame is not empty and contains valid data".to_string(),
5422 }
5423 )?;
5424
5425 if batches.is_empty() {
5426 return Err(ElusionError::InvalidOperation {
5427 operation: "CSV Writing".to_string(),
5428 reason: "No data to write".to_string(),
5429 suggestion: "💡 Ensure DataFrame contains data before writing".to_string(),
5430 });
5431 }
5432
5433 let file = OpenOptions::new()
5434 .write(true)
5435 .create(true)
5436 .truncate(true)
5437 .open(path)
5438 .map_err(|e| ElusionError::WriteError {
5439 path: path.to_string(),
5440 operation: "file_create".to_string(),
5441 reason: e.to_string(),
5442 suggestion: "💡 Check file permissions and path validity".to_string(),
5443 })?;
5444
5445 let writer = BufWriter::new(file);
5446 let mut csv_writer = WriterBuilder::new()
5447 .with_header(true)
5448 .with_delimiter(csv_options.delimiter)
5449 .with_escape(csv_options.escape)
5450 .with_quote(csv_options.quote)
5451 .with_double_quote(csv_options.double_quote)
5452 .with_null(csv_options.null_value.clone())
5453 .build(writer);
5454
5455 for batch in batches.iter() {
5456 csv_writer.write(batch).map_err(|e| ElusionError::WriteError {
5457 path: path.to_string(),
5458 operation: "write_data".to_string(),
5459 reason: e.to_string(),
5460 suggestion: "💡 Failed to write data batch".to_string(),
5461 })?;
5462 }
5463
5464 csv_writer.into_inner().flush().map_err(|e| ElusionError::WriteError {
5465 path: path.to_string(),
5466 operation: "flush".to_string(),
5467 reason: e.to_string(),
5468 suggestion: "💡 Failed to flush data to file".to_string(),
5469 })?;
5470 },
5471 "append" => {
5472 if !fs::metadata(path).is_ok() {
5473 let batches = self.df.clone().collect().await.map_err(|e|
5475 ElusionError::InvalidOperation {
5476 operation: "Data Collection".to_string(),
5477 reason: format!("Failed to collect DataFrame: {}", e),
5478 suggestion: "💡 Verify DataFrame is not empty and contains valid data".to_string(),
5479 }
5480 )?;
5481
5482 if batches.is_empty() {
5483 return Err(ElusionError::InvalidOperation {
5484 operation: "CSV Writing".to_string(),
5485 reason: "No data to write".to_string(),
5486 suggestion: "💡 Ensure DataFrame contains data before writing".to_string(),
5487 });
5488 }
5489
5490 let file = OpenOptions::new()
5491 .write(true)
5492 .create(true)
5493 .open(path)
5494 .map_err(|e| ElusionError::WriteError {
5495 path: path.to_string(),
5496 operation: "file_create".to_string(),
5497 reason: e.to_string(),
5498 suggestion: "💡 Check file permissions and path validity".to_string(),
5499 })?;
5500
5501 let writer = BufWriter::new(file);
5502 let mut csv_writer = WriterBuilder::new()
5503 .with_header(true)
5504 .with_delimiter(csv_options.delimiter)
5505 .with_escape(csv_options.escape)
5506 .with_quote(csv_options.quote)
5507 .with_double_quote(csv_options.double_quote)
5508 .with_null(csv_options.null_value.clone())
5509 .build(writer);
5510
5511 for batch in batches.iter() {
5512 csv_writer.write(batch).map_err(|e| ElusionError::WriteError {
5513 path: path.to_string(),
5514 operation: "write_data".to_string(),
5515 reason: e.to_string(),
5516 suggestion: "💡 Failed to write data batch".to_string(),
5517 })?;
5518 }
5519 csv_writer.into_inner().flush().map_err(|e| ElusionError::WriteError {
5520 path: path.to_string(),
5521 operation: "flush".to_string(),
5522 reason: e.to_string(),
5523 suggestion: "💡 Failed to flush data to file".to_string(),
5524 })?;
5525 } else {
5526 let ctx = SessionContext::new();
5527 let existing_df = ctx.read_csv(
5528 path,
5529 CsvReadOptions::new()
5530 .has_header(true)
5531 .schema_infer_max_records(1000),
5532 ).await?;
5533
5534 let existing_cols: HashSet<_> = existing_df.schema()
5536 .fields()
5537 .iter()
5538 .map(|f| f.name().to_string())
5539 .collect();
5540
5541 let new_cols: HashSet<_> = self.df.schema()
5542 .fields()
5543 .iter()
5544 .map(|f| f.name().to_string())
5545 .collect();
5546
5547 if existing_cols != new_cols {
5548 return Err(ElusionError::WriteError {
5549 path: path.to_string(),
5550 operation: "column_check".to_string(),
5551 reason: "Column mismatch between existing file and new data".to_string(),
5552 suggestion: "💡 Ensure both datasets have the same columns".to_string()
5553 });
5554 }
5555
5556 ctx.register_table("existing_data", Arc::new(
5557 MemTable::try_new(
5558 existing_df.schema().clone().into(),
5559 vec![existing_df.clone().collect().await.map_err(|e| ElusionError::WriteError {
5560 path: path.to_string(),
5561 operation: "collect_existing".to_string(),
5562 reason: e.to_string(),
5563 suggestion: "💡 Failed to collect existing data".to_string()
5564 })?]
5565 ).map_err(|e| ElusionError::WriteError {
5566 path: path.to_string(),
5567 operation: "create_mem_table".to_string(),
5568 reason: e.to_string(),
5569 suggestion: "💡 Failed to create memory table".to_string()
5570 })?
5571 )).map_err(|e| ElusionError::WriteError {
5572 path: path.to_string(),
5573 operation: "register_existing".to_string(),
5574 reason: e.to_string(),
5575 suggestion: "💡 Failed to register existing data".to_string()
5576 })?;
5577
5578 ctx.register_table("new_data", Arc::new(
5579 MemTable::try_new(
5580 self.df.schema().clone().into(),
5581 vec![self.df.clone().collect().await.map_err(|e| ElusionError::WriteError {
5582 path: path.to_string(),
5583 operation: "collect_new".to_string(),
5584 reason: e.to_string(),
5585 suggestion: "💡 Failed to collect new data".to_string()
5586 })?]
5587 ).map_err(|e| ElusionError::WriteError {
5588 path: path.to_string(),
5589 operation: "create_mem_table".to_string(),
5590 reason: e.to_string(),
5591 suggestion: "💡 Failed to create memory table".to_string()
5592 })?
5593 )).map_err(|e| ElusionError::WriteError {
5594 path: path.to_string(),
5595 operation: "register_new".to_string(),
5596 reason: e.to_string(),
5597 suggestion: "💡 Failed to register new data".to_string()
5598 })?;
5599
5600 let column_list = existing_df.schema()
5601 .fields()
5602 .iter()
5603 .map(|f| format!("\"{}\"", f.name()))
5604 .collect::<Vec<_>>()
5605 .join(", ");
5606
5607 let sql = format!(
5608 "SELECT {} FROM existing_data UNION ALL SELECT {} FROM new_data",
5609 column_list, column_list
5610 );
5611
5612 let combined_df = ctx.sql(&sql).await
5613 .map_err(|e| ElusionError::WriteError {
5614 path: path.to_string(),
5615 operation: "combine_data".to_string(),
5616 reason: e.to_string(),
5617 suggestion: "💡 Failed to combine existing and new data".to_string()
5618 })?;
5619
5620 let temp_path = format!("{}.temp", path);
5621
5622 if fs::metadata(&temp_path).is_ok() {
5624 fs::remove_file(&temp_path).map_err(|e| ElusionError::WriteError {
5625 path: temp_path.clone(),
5626 operation: "cleanup_temp".to_string(),
5627 reason: format!("Failed to delete temporary file: {}", e),
5628 suggestion: "💡 Check file permissions and ensure no other process is using the file".to_string(),
5629 })?;
5630 }
5631
5632 let batches = combined_df.collect().await.map_err(|e|
5633 ElusionError::InvalidOperation {
5634 operation: "Data Collection".to_string(),
5635 reason: format!("Failed to collect DataFrame: {}", e),
5636 suggestion: "💡 Verify DataFrame is not empty and contains valid data".to_string(),
5637 }
5638 )?;
5639
5640 if batches.is_empty() {
5641 return Err(ElusionError::InvalidOperation {
5642 operation: "CSV Writing".to_string(),
5643 reason: "No data to write".to_string(),
5644 suggestion: "💡 Ensure DataFrame contains data before writing".to_string(),
5645 });
5646 }
5647
5648 {
5650 let file = OpenOptions::new()
5651 .write(true)
5652 .create(true)
5653 .truncate(true)
5654 .open(&temp_path)
5655 .map_err(|e| ElusionError::WriteError {
5656 path: temp_path.clone(),
5657 operation: "file_open".to_string(),
5658 reason: e.to_string(),
5659 suggestion: "💡 Check file permissions and path validity".to_string(),
5660 })?;
5661
5662 let writer = BufWriter::new(file);
5663 let mut csv_writer = WriterBuilder::new()
5664 .with_header(true)
5665 .with_delimiter(csv_options.delimiter)
5666 .with_escape(csv_options.escape)
5667 .with_quote(csv_options.quote)
5668 .with_double_quote(csv_options.double_quote)
5669 .with_null(csv_options.null_value.clone())
5670 .build(writer);
5671
5672 for batch in batches.iter() {
5673 csv_writer.write(batch).map_err(|e| ElusionError::WriteError {
5674 path: temp_path.clone(),
5675 operation: "write_data".to_string(),
5676 reason: e.to_string(),
5677 suggestion: "💡 Failed to write data batch".to_string(),
5678 })?;
5679 }
5680
5681 csv_writer.into_inner().flush().map_err(|e| ElusionError::WriteError {
5682 path: temp_path.clone(),
5683 operation: "flush".to_string(),
5684 reason: e.to_string(),
5685 suggestion: "💡 Check disk space and write permissions".to_string(),
5686 })?;
5687 } if fs::metadata(path).is_ok() {
5691 fs::remove_file(path).map_err(|e| ElusionError::WriteError {
5692 path: path.to_string(),
5693 operation: "remove_original".to_string(),
5694 reason: format!("Failed to remove original file: {}", e),
5695 suggestion: "💡 Check file permissions".to_string()
5696 })?;
5697 }
5698
5699 fs::rename(&temp_path, path).map_err(|e| ElusionError::WriteError {
5701 path: path.to_string(),
5702 operation: "rename_temp".to_string(),
5703 reason: format!("Failed to rename temporary file: {}", e),
5704 suggestion: "💡 Check file system permissions".to_string()
5705 })?;
5706 }
5707 },
5708 _ => return Err(ElusionError::InvalidOperation {
5709 operation: mode.to_string(),
5710 reason: "Invalid write mode".to_string(),
5711 suggestion: "💡 Use 'overwrite' or 'append'".to_string()
5712 })
5713 }
5714
5715 match mode {
5716 "overwrite" => println!("✅ Data successfully overwritten to '{}'", path),
5717 "append" => println!("✅ Data successfully appended to '{}'", path),
5718 _ => unreachable!(),
5719 }
5720
5721 Ok(())
5722 }
5723
5724 pub async fn write_to_delta_table(
5726 &self,
5727 mode: &str,
5728 path: &str,
5729 partition_columns: Option<Vec<String>>,
5730 ) -> Result<(), DeltaTableError> {
5731 let (overwrite, write_mode) = match mode {
5733 "overwrite" => {
5734 (true, WriteMode::Default)
5735 }
5736 "append" => {
5737 (false, WriteMode::Default)
5738 }
5739 "merge" => {
5740 (false, WriteMode::MergeSchema)
5742 }
5743 "default" => {
5744 (false, WriteMode::Default)
5746 }
5747 other => {
5748 return Err(DeltaTableError::Generic(format!(
5749 "Unsupported write mode: {other}"
5750 )));
5751 }
5752 };
5753
5754 write_to_delta_impl(
5755 &self.df, path,
5757 partition_columns,
5758 overwrite,
5759 write_mode,
5760 )
5761 .await
5762 }
5763
5764 #[cfg(feature = "excel")]
5767 pub async fn write_to_excel(
5768 &self,
5769 path: &str,
5770 sheet_name: Option<&str>
5771 ) -> ElusionResult<()> {
5772 crate::features::excel::write_to_excel_impl(self, path, sheet_name).await
5773 }
5774
5775 #[cfg(not(feature = "excel"))]
5776 pub async fn write_to_excel(
5777 &self,
5778 _path: &str,
5779 _sheet_name: Option<&str>
5780 ) -> ElusionResult<()> {
5781 Err(ElusionError::Custom("*** Warning ***: Excel feature not enabled. Add feature excel under [dependencies]".to_string()))
5782 }
5783 #[cfg(feature = "azure")]
5785 pub async fn write_parquet_to_azure_with_sas(
5786 &self,
5787 mode: &str,
5788 url: &str,
5789 sas_token: &str,
5790 ) -> ElusionResult<()> {
5791 crate::features::azure::write_parquet_to_azure_with_sas_impl(self, mode, url, sas_token).await
5792 }
5793
5794 #[cfg(not(feature = "azure"))]
5795 pub async fn write_parquet_to_azure_with_sas(
5796 &self,
5797 _mode: &str,
5798 _url: &str,
5799 _sas_token: &str,
5800 ) -> ElusionResult<()> {
5801 Err(ElusionError::Custom("*** Warning ***: Azure feature not enabled. Add feature under [dependencies]".to_string()))
5802 }
5803
5804 #[cfg(feature = "azure")]
5805 pub async fn write_json_to_azure_with_sas(
5806 &self,
5807 url: &str,
5808 sas_token: &str,
5809 pretty: bool,
5810 ) -> ElusionResult<()> {
5811 crate::features::azure::write_json_to_azure_with_sas_impl(self, url, sas_token, pretty).await
5812 }
5813
5814 #[cfg(not(feature = "azure"))]
5815 pub async fn write_json_to_azure_with_sas(
5816 &self,
5817 _url: &str,
5818 _sas_token: &str,
5819 _pretty: bool,
5820 ) -> ElusionResult<()> {
5821 Err(ElusionError::Custom("*** Warning ***: Azure feature not enabled. Add feature under [dependencies]".to_string()))
5822 }
5823
5824 #[cfg(feature = "azure")]
5826 pub async fn from_azure_with_sas_token(
5827 url: &str,
5828 sas_token: &str,
5829 filter_keyword: Option<&str>,
5830 alias: &str,
5831 ) -> ElusionResult<Self> {
5832 crate::features::azure::from_azure_with_sas_token_impl(url, sas_token, filter_keyword, alias).await
5833 }
5834
5835 #[cfg(not(feature = "azure"))]
5836 pub async fn from_azure_with_sas_token(
5837 _url: &str,
5838 _sas_token: &str,
5839 _filter_keyword: Option<&str>,
5840 _alias: &str,
5841 ) -> ElusionResult<Self> {
5842 Err(ElusionError::Custom("*** Warning ***: Azure feature not enabled. Add feature under [dependencies]".to_string()))
5843 }
5844
5845 #[cfg(feature = "fabric")]
5848 pub async fn from_fabric(
5849 abfss_path: &str,
5850 file_path: &str,
5851 alias: &str,
5852 ) -> ElusionResult<CustomDataFrame> {
5853 crate::features::fabric::load_from_fabric_abfss_impl(
5854 abfss_path,
5855 file_path,
5856 alias,
5857 ).await
5858 }
5859
5860 #[cfg(not(feature = "fabric"))]
5861 pub async fn from_fabric(
5862 _abfss_path: &str,
5863 _file_path: &str,
5864 _alias: &str,
5865 ) -> ElusionResult<CustomDataFrame> {
5866 Err(ElusionError::Custom("*** Warning ***: fabric feature not enabled. Add 'fabric' feature under [dependencies]".to_string()))
5867 }
5868
5869 #[cfg(feature = "fabric")]
5870 pub async fn from_fabric_with_service_principal(
5871 tenant_id: &str,
5872 client_id: &str,
5873 client_secret: &str,
5874 abfss_path: &str,
5875 file_path: &str,
5876 alias: &str,
5877 ) -> ElusionResult<CustomDataFrame> {
5878 crate::features::fabric::load_from_fabric_abfss_with_service_principal_impl(
5879 tenant_id,
5880 client_id,
5881 client_secret,
5882 abfss_path,
5883 file_path,
5884 alias,
5885 ).await
5886 }
5887
5888 #[cfg(not(feature = "fabric"))]
5889 pub async fn from_fabric_with_service_principal(
5890 _tenant_id: &str,
5891 _client_id: &str,
5892 _client_secret: &str,
5893 _abfss_path: &str,
5894 _file_path: &str,
5895 _alias: &str,
5896 ) -> ElusionResult<CustomDataFrame> {
5897 Err(ElusionError::Custom("*** Warning ***: fabric feature not enabled. Add 'fabric' feature under [dependencies]".to_string()))
5898 }
5899
5900 #[cfg(feature = "fabric")]
5902 pub async fn write_parquet_to_fabric(
5903 &self,
5904 abfss_path: &str,
5905 file_path: &str,
5906 ) -> ElusionResult<()> {
5907 crate::features::fabric::write_parquet_to_fabric_abfss_impl(
5908 self,
5909 abfss_path,
5910 file_path,
5911 ).await
5912 }
5913
5914 #[cfg(not(feature = "fabric"))]
5915 pub async fn write_parquet_to_fabric(
5916 &self,
5917 _abfss_path: &str,
5918 _file_path: &str,
5919 ) -> ElusionResult<()> {
5920 Err(ElusionError::Custom("*** Warning ***: fabric feature not enabled. Add 'fabric' feature under [dependencies]".to_string()))
5921 }
5922
5923 #[cfg(feature = "fabric")]
5924 pub async fn write_parquet_to_fabric_with_service_principal(
5925 &self,
5926 tenant_id: &str,
5927 client_id: &str,
5928 client_secret: &str,
5929 abfss_path: &str,
5930 file_path: &str,
5931 ) -> ElusionResult<()> {
5932 crate::features::fabric::write_parquet_to_fabric_abfss_with_service_principal_impl(
5933 self,
5934 tenant_id,
5935 client_id,
5936 client_secret,
5937 abfss_path,
5938 file_path,
5939 ).await
5940 }
5941
5942 #[cfg(not(feature = "fabric"))]
5943 pub async fn write_parquet_to_fabric_with_service_principal(
5944 &self,
5945 _tenant_id: &str,
5946 _client_id: &str,
5947 _client_secret: &str,
5948 _abfss_path: &str,
5949 _file_path: &str,
5950 ) -> ElusionResult<()> {
5951 Err(ElusionError::Custom("*** Warning ***: fabric feature not enabled. Add 'fabric' feature under [dependencies]".to_string()))
5952 }
5953
5954 #[cfg(feature = "ftp")]
5956 pub async fn from_ftp(
5957 server: &str,
5958 username: &str,
5959 password: &str,
5960 remote_path: &str,
5961 alias: &str
5962 ) -> ElusionResult<Self> {
5963 crate::features::ftp::from_ftp_impl(
5964 server,
5965 username,
5966 password,
5967 remote_path,
5968 alias
5969 ).await
5970 }
5971
5972 #[cfg(not(feature = "ftp"))]
5973 pub async fn from_ftp(
5974 _server: &str,
5975 _username: &str,
5976 _password: &str,
5977 _remote_path: &str,
5978 _alias: &str
5979 ) -> ElusionResult<Self> {
5980 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
5981 }
5982
5983 #[cfg(feature = "ftp")]
5984 pub async fn from_ftps(
5985 server: &str,
5986 username: &str,
5987 password: &str,
5988 remote_path: &str,
5989 alias: &str
5990 ) -> ElusionResult<Self> {
5991 crate::features::ftp::from_ftps_impl(
5992 server,
5993 username,
5994 password,
5995 remote_path,
5996 alias
5997 ).await
5998 }
5999
6000 #[cfg(not(feature = "ftp"))]
6001 pub async fn from_ftps(
6002 _server: &str,
6003 _username: &str,
6004 _password: &str,
6005 _remote_path: &str,
6006 _alias: &str
6007 ) -> ElusionResult<Self> {
6008 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6009 }
6010
6011 #[cfg(feature = "ftp")]
6012 pub async fn from_ftp_with_port(
6013 server: &str,
6014 port: u16,
6015 username: &str,
6016 password: &str,
6017 remote_path: &str,
6018 alias: &str
6019 ) -> ElusionResult<Self> {
6020 crate::features::ftp::from_ftp_with_port_impl(
6021 server,
6022 port,
6023 username,
6024 password,
6025 remote_path,
6026 alias
6027 ).await
6028 }
6029
6030 #[cfg(not(feature = "ftp"))]
6031 pub async fn from_ftp_with_port(
6032 _server: &str,
6033 _port: u16,
6034 _username: &str,
6035 _password: &str,
6036 _remote_path: &str,
6037 _alias: &str
6038 ) -> ElusionResult<Self> {
6039 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6040 }
6041
6042 #[cfg(feature = "ftp")]
6043 pub async fn from_ftp_with_directory(
6044 server: &str,
6045 username: &str,
6046 password: &str,
6047 directory: &str,
6048 remote_path: &str,
6049 alias: &str
6050 ) -> ElusionResult<Self> {
6051 crate::features::ftp::from_ftp_with_directory_impl(
6052 server,
6053 username,
6054 password,
6055 directory,
6056 remote_path,
6057 alias
6058 ).await
6059 }
6060
6061 #[cfg(not(feature = "ftp"))]
6062 pub async fn from_ftp_with_directory(
6063 _server: &str,
6064 _username: &str,
6065 _password: &str,
6066 _directory: &str,
6067 _remote_path: &str,
6068 _alias: &str
6069 ) -> ElusionResult<Self> {
6070 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6071 }
6072
6073 #[cfg(feature = "ftp")]
6076 pub async fn from_ftp_folder(
6077 server: &str,
6078 username: &str,
6079 password: &str,
6080 port: Option<u16>,
6081 folder_path: &str,
6082 file_extensions: Option<Vec<&str>>,
6083 result_alias: &str,
6084 ) -> ElusionResult<Self> {
6085 crate::features::ftp::from_ftp_folder_impl(
6086 server,
6087 username,
6088 password,
6089 port,
6090 false,
6091 folder_path,
6092 file_extensions,
6093 result_alias
6094 ).await
6095 }
6096
6097 #[cfg(not(feature = "ftp"))]
6098 pub async fn from_ftp_folder(
6099 _server: &str,
6100 _username: &str,
6101 _password: &str,
6102 _port: Option<u16>,
6103 _folder_path: &str,
6104 _file_extensions: Option<Vec<&str>>,
6105 _result_alias: &str,
6106 ) -> ElusionResult<Self> {
6107 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6108 }
6109
6110 #[cfg(feature = "ftp")]
6113 pub async fn from_ftps_folder(
6114 server: &str,
6115 username: &str,
6116 password: &str,
6117 port: Option<u16>,
6118 folder_path: &str,
6119 file_extensions: Option<Vec<&str>>,
6120 result_alias: &str,
6121 ) -> ElusionResult<Self> {
6122 crate::features::ftp::from_ftp_folder_impl(
6123 server,
6124 username,
6125 password,
6126 port,
6127 true,
6128 folder_path,
6129 file_extensions,
6130 result_alias
6131 ).await
6132 }
6133
6134 #[cfg(not(feature = "ftp"))]
6135 pub async fn from_ftps_folder(
6136 _server: &str,
6137 _username: &str,
6138 _password: &str,
6139 _port: Option<u16>,
6140 _folder_path: &str,
6141 _file_extensions: Option<Vec<&str>>,
6142 _result_alias: &str,
6143 ) -> ElusionResult<Self> {
6144 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6145 }
6146
6147 #[cfg(feature = "ftp")]
6150 pub async fn from_ftp_folder_with_filename_column(
6151 server: &str,
6152 username: &str,
6153 password: &str,
6154 port: Option<u16>,
6155 folder_path: &str,
6156 file_extensions: Option<Vec<&str>>,
6157 result_alias: &str,
6158 ) -> ElusionResult<Self> {
6159 crate::features::ftp::from_ftp_folder_with_filename_column_impl(
6160 server,
6161 username,
6162 password,
6163 port,
6164 false,
6165 folder_path,
6166 file_extensions,
6167 result_alias
6168 ).await
6169 }
6170
6171 #[cfg(not(feature = "ftp"))]
6172 pub async fn from_ftp_folder_with_filename_column(
6173 _server: &str,
6174 _username: &str,
6175 _password: &str,
6176 _port: Option<u16>,
6177 _folder_path: &str,
6178 _file_extensions: Option<Vec<&str>>,
6179 _result_alias: &str,
6180 ) -> ElusionResult<Self> {
6181 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6182 }
6183
6184 #[cfg(feature = "ftp")]
6187 pub async fn from_ftps_folder_with_filename_column(
6188 server: &str,
6189 username: &str,
6190 password: &str,
6191 port: Option<u16>,
6192 folder_path: &str,
6193 file_extensions: Option<Vec<&str>>,
6194 result_alias: &str,
6195 ) -> ElusionResult<Self> {
6196 crate::features::ftp::from_ftp_folder_with_filename_column_impl(
6197 server,
6198 username,
6199 password,
6200 port,
6201 true,
6202 folder_path,
6203 file_extensions,
6204 result_alias
6205 ).await
6206 }
6207
6208 #[cfg(not(feature = "ftp"))]
6209 pub async fn from_ftps_folder_with_filename_column(
6210 _server: &str,
6211 _username: &str,
6212 _password: &str,
6213 _port: Option<u16>,
6214 _folder_path: &str,
6215 _file_extensions: Option<Vec<&str>>,
6216 _result_alias: &str,
6217 ) -> ElusionResult<Self> {
6218 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6219 }
6220
6221 #[cfg(feature = "ftp")]
6223 pub async fn write_csv_to_ftp(
6224 &self,
6225 server: &str,
6226 username: &str,
6227 password: &str,
6228 remote_path: &str
6229 ) -> ElusionResult<()> {
6230 crate::features::ftp::write_csv_to_ftp_impl(
6231 self,
6232 server,
6233 username,
6234 password,
6235 remote_path
6236 ).await
6237 }
6238
6239 #[cfg(not(feature = "ftp"))]
6240 pub async fn write_csv_to_ftp(
6241 &self,
6242 _server: &str,
6243 _username: &str,
6244 _password: &str,
6245 _remote_path: &str
6246 ) -> ElusionResult<()> {
6247 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6248 }
6249
6250 #[cfg(feature = "ftp")]
6252 pub async fn write_excel_to_ftp(
6253 &self,
6254 server: &str,
6255 username: &str,
6256 password: &str,
6257 remote_path: &str,
6258 sheet_name: Option<&str>
6259 ) -> ElusionResult<()> {
6260 crate::features::ftp::write_excel_to_ftp_impl(
6261 self,
6262 server,
6263 username,
6264 password,
6265 remote_path,
6266 sheet_name
6267 ).await
6268 }
6269
6270 #[cfg(not(feature = "ftp"))]
6271 pub async fn write_excel_to_ftp(
6272 &self,
6273 _server: &str,
6274 _username: &str,
6275 _password: &str,
6276 _remote_path: &str,
6277 _sheet_name: Option<&str>
6278 ) -> ElusionResult<()> {
6279 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6280 }
6281
6282 #[cfg(feature = "ftp")]
6284 pub async fn write_parquet_to_ftp(
6285 &self,
6286 server: &str,
6287 username: &str,
6288 password: &str,
6289 remote_path: &str
6290 ) -> ElusionResult<()> {
6291 crate::features::ftp::write_parquet_to_ftp_impl(
6292 self,
6293 server,
6294 username,
6295 password,
6296 remote_path
6297 ).await
6298 }
6299
6300 #[cfg(not(feature = "ftp"))]
6301 pub async fn write_parquet_to_ftp(
6302 &self,
6303 _server: &str,
6304 _username: &str,
6305 _password: &str,
6306 _remote_path: &str
6307 ) -> ElusionResult<()> {
6308 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6309 }
6310
6311 #[cfg(feature = "ftp")]
6313 pub async fn write_json_to_ftp(
6314 &self,
6315 server: &str,
6316 username: &str,
6317 password: &str,
6318 remote_path: &str,
6319 pretty: bool
6320 ) -> ElusionResult<()> {
6321 crate::features::ftp::write_json_to_ftp_impl(
6322 self,
6323 server,
6324 username,
6325 password,
6326 remote_path,
6327 pretty
6328 ).await
6329 }
6330
6331 #[cfg(not(feature = "ftp"))]
6332 pub async fn write_json_to_ftp(
6333 &self,
6334 _server: &str,
6335 _username: &str,
6336 _password: &str,
6337 _remote_path: &str,
6338 _pretty: bool
6339 ) -> ElusionResult<()> {
6340 Err(ElusionError::Custom("*** Warning ***: ftp feature not enabled. Add 'ftp' feature under [dependencies]".to_string()))
6341 }
6342
6343 pub fn load_xml<'a>(file_path: &'a str, alias: &'a str) -> BoxFuture<'a, ElusionResult<AliasedDataFrame>> {
6347 Box::pin(async move {
6348 load_xml_with_mode(file_path, alias, XmlProcessingMode::Auto).await
6349 })
6350 }
6351 pub async fn load_csv(file_path: &str, alias: &str) -> ElusionResult<AliasedDataFrame> {
6353 load_csv_with_type_handling(file_path, alias).await
6354 }
6355
6356
6357 pub fn load_parquet<'a>(
6359 file_path: &'a str,
6360 alias: &'a str,
6361 ) -> BoxFuture<'a, ElusionResult<AliasedDataFrame>> {
6362 Box::pin(async move {
6363 let ctx = SessionContext::new();
6364
6365 if !LocalPath::new(file_path).exists() {
6366 return Err(ElusionError::WriteError {
6367 path: file_path.to_string(),
6368 operation: "read".to_string(),
6369 reason: "File not found".to_string(),
6370 suggestion: "💡 Check if the file path is correct".to_string(),
6371 });
6372 }
6373
6374 println!("🔄 Starting Parquet loading process...");
6375
6376 if let Ok(metadata) = std::fs::metadata(file_path) {
6377 let file_size = metadata.len();
6378 println!("📏 Parquet file size: {} bytes ({:.2} MB)",
6379 file_size, file_size as f64 / 1024.0 / 1024.0);
6380 }
6381
6382 let read_start = std::time::Instant::now();
6383 let df = match ctx.read_parquet(file_path, ParquetReadOptions::default()).await {
6384 Ok(df) => {
6385 let read_elapsed = read_start.elapsed();
6386 let schema = df.schema();
6387 let column_count = schema.fields().len();
6388
6389 println!("✅ Parquet file read successfully in {:?}", read_elapsed);
6390 println!("📊 Schema detected: {} columns with native types", column_count);
6391
6392 df
6407 }
6408 Err(err) => {
6409 let read_elapsed = read_start.elapsed();
6410 println!("❌ Failed to read Parquet file after {:?}: {}", read_elapsed, err);
6411 return Err(ElusionError::DataFusion(err));
6412 }
6413 };
6414
6415 let batches = df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6416 let schema = df.schema().clone();
6417 let mem_table = MemTable::try_new(schema.clone().into(), vec![batches])
6418 .map_err(|e| ElusionError::SchemaError {
6419 message: e.to_string(),
6420 schema: Some(schema.to_string()),
6421 suggestion: "💡 Check if the parquet file schema is valid".to_string(),
6422 })?;
6423
6424 let normalized_alias = normalize_alias_write(alias).into_owned();
6425
6426 ctx.register_table(&normalized_alias, Arc::new(mem_table))
6427 .map_err(|e| ElusionError::InvalidOperation {
6428 operation: "Table Registration".to_string(),
6429 reason: e.to_string(),
6430 suggestion: "💡 Try using a different alias name".to_string(),
6431 })?;
6432
6433 let aliased_df = ctx.table(alias).await
6434 .map_err(|_| ElusionError::InvalidOperation {
6435 operation: "Table Creation".to_string(),
6436 reason: format!("Failed to create table with alias '{}'", alias),
6437 suggestion: "💡 Check if the alias is valid and unique".to_string(),
6438 })?;
6439
6440 let total_elapsed = read_start.elapsed();
6441 println!("🎉 Parquet DataFrame loading completed successfully in {:?} for table alias: '{}'",
6442 total_elapsed, alias);
6443
6444 Ok(AliasedDataFrame {
6445 dataframe: aliased_df,
6446 alias: alias.to_string(),
6447 })
6448 })
6449 }
6450
6451 pub fn load_json<'a>(
6452 file_path: &'a str,
6453 alias: &'a str,
6454 ) -> BoxFuture<'a, ElusionResult<AliasedDataFrame>> {
6455 Box::pin(async move {
6456 println!("🔄 Processing JSON records...");
6457
6458 let file = File::open(file_path).map_err(|e| ElusionError::WriteError {
6459 path: file_path.to_string(),
6460 operation: "read".to_string(),
6461 reason: e.to_string(),
6462 suggestion: "💡 Check if the file exists and you have proper permissions".to_string(),
6463 })?;
6464
6465 let file_size = file.metadata().map_err(|e| ElusionError::WriteError {
6466 path: file_path.to_string(),
6467 operation: "metadata reading".to_string(),
6468 reason: e.to_string(),
6469 suggestion: "💡 Check file permissions and disk status".to_string(),
6470 })?.len();
6471
6472 println!("📏 File size: {} bytes ({:.2} MB)", file_size, file_size as f64 / 1024.0 / 1024.0);
6473 let reader = BufReader::with_capacity(128 * 1024, file);
6475 let stream = Deserializer::from_reader(reader).into_iter::<Value>();
6476
6477 let mut all_data: Vec<HashMap<String, Value>> = Vec::new();
6478 let mut processed_count = 0;
6479 let start_time = std::time::Instant::now();
6480
6481
6482 for (index, value) in stream.enumerate() {
6483
6484 if index % 500 == 0 && index > 0 {
6485 let elapsed = start_time.elapsed();
6486 let rate = index as f64 / elapsed.as_secs_f64();
6487 println!("Processed {} records in {:?} ({:.1} records/sec)", index, elapsed, rate);
6488 }
6489
6490 match value {
6491 Ok(json_value) => {
6492 match json_value {
6493 Value::Object(map) => {
6494 let hash_map: HashMap<String, Value> = map.into_iter().collect();
6496 all_data.push(hash_map);
6497 processed_count += 1;
6498 },
6499 Value::Array(array) => {
6500 let array_size = array.len();
6502 all_data.reserve(array_size);
6506
6507 let batch_size = 1000;
6509 for (batch_start, batch) in array.chunks(batch_size).enumerate() {
6510 if batch_start % 10 == 0 && batch_start > 0 {
6511 let items_processed = batch_start * batch_size;
6512 let progress = (items_processed as f64 / array_size as f64) * 100.0;
6513 println!("📦 Array progress: {}/{} items ({:.1}%)",
6514 items_processed, array_size, progress);
6515 }
6516
6517 for item in batch {
6518 if let Value::Object(map) = item {
6519 let hash_map: HashMap<String, Value> = map.clone().into_iter().collect();
6521 all_data.push(hash_map);
6522 processed_count += 1;
6523 }
6524 }
6525 }
6526
6527 println!("✅ Completed processing array with {} items", array_size);
6528 },
6529 _ => {
6530 println!("⚠️ Skipping non-object/non-array JSON value at index {}", index);
6531 continue;
6532 }
6533 }
6534 },
6535 Err(e) => {
6536 if e.is_eof() {
6537 println!("📄 Reached end of JSON file at record {}", index);
6538 break;
6539 } else {
6540 println!("❌ JSON parsing error at record {}: {}", index, e);
6541 continue;
6542 }
6543 }
6544 }
6545 }
6546
6547 let total_elapsed = start_time.elapsed();
6548 println!("✅ JSON parsing completed: {} records in {:?}", processed_count, total_elapsed);
6549
6550 if all_data.is_empty() {
6551 return Err(ElusionError::InvalidOperation {
6552 operation: "JSON processing".to_string(),
6553 reason: "No valid JSON data found".to_string(),
6554 suggestion: "💡 Check if the JSON file contains valid object data".to_string(),
6555 });
6556 }
6557
6558 let schema = infer_schema_from_json(&all_data);
6561 println!("🔧 Building record batch...");
6565 let batch_start = std::time::Instant::now();
6566 let record_batch = build_record_batch(&all_data, schema.clone())
6567 .map_err(|e| ElusionError::SchemaError {
6568 message: format!("Failed to build RecordBatch: {}", e),
6569 schema: Some(schema.to_string()),
6570 suggestion: "💡 Check if the JSON data structure is consistent".to_string(),
6571 })?;
6572 let batch_elapsed = batch_start.elapsed();
6573 println!("📊 Record batch created with {} rows in {:?}", record_batch.num_rows(), batch_elapsed);
6574
6575 let ctx = SessionContext::new();
6576 let mem_table = MemTable::try_new(schema.clone(), vec![vec![record_batch]])
6577 .map_err(|e| ElusionError::SchemaError {
6578 message: format!("Failed to create MemTable: {}", e),
6579 schema: Some(schema.to_string()),
6580 suggestion: "💡 Verify data types and schema compatibility".to_string(),
6581 })?;
6582
6583 ctx.register_table(alias, Arc::new(mem_table))
6584 .map_err(|e| ElusionError::InvalidOperation {
6585 operation: "Table registration".to_string(),
6586 reason: format!("Failed to register table: {}", e),
6587 suggestion: "💡 Try using a different alias or check table compatibility".to_string(),
6588 })?;
6589
6590 let df = ctx.table(alias).await.map_err(|e| ElusionError::InvalidOperation {
6591 operation: "Table creation".to_string(),
6592 reason: format!("Failed to create table: {}", e),
6593 suggestion: "💡 Verify table creation parameters and permissions".to_string(),
6594 })?;
6595
6596 let total_time = start_time.elapsed();
6597 println!("🎉 JSON DataFrame loading completed successfully in {:?} for table alias: {}", total_time, alias);
6598
6599 Ok(AliasedDataFrame {
6600 dataframe: df,
6601 alias: alias.to_string(),
6602 })
6603 })
6604 }
6605
6606 pub fn load_delta<'a>(
6608 file_path: &'a str,
6609 alias: &'a str,
6610 ) -> BoxFuture<'a, ElusionResult<AliasedDataFrame>> {
6611 Box::pin(async move {
6612 let ctx = SessionContext::new();
6613
6614 let path_manager = DeltaPathManager::new(file_path);
6616
6617 println!("🔄 Opening Delta table and reading metadata...");
6618 let table_start = std::time::Instant::now();
6619 let table = open_table(&path_manager.table_path())
6621 .await
6622 .map_err(|e| ElusionError::InvalidOperation {
6623 operation: "Delta Table Opening".to_string(),
6624 reason: e.to_string(),
6625 suggestion: "💡 Ensure the path points to a valid Delta table".to_string(),
6626 })?;
6627
6628 let table_elapsed = table_start.elapsed();
6629 println!("✅ Delta table opened successfully in {:?}", table_elapsed);
6630
6631 let version = table.version();
6632 println!("📊 Delta table version: {}", version);
6633
6634 println!("🔍 Discovering Delta table files...");
6635 let files_start = std::time::Instant::now();
6636
6637 let file_paths: Vec<String> = {
6638 let raw_uris = table.get_file_uris()
6639 .map_err(|e| ElusionError::InvalidOperation {
6640 operation: "Delta File Listing".to_string(),
6641 reason: e.to_string(),
6642 suggestion: "💡 Check Delta table permissions and integrity".to_string(),
6643 })?;
6644
6645 raw_uris.map(|uri| path_manager.normalize_uri(&uri))
6646 .collect()
6647 };
6648
6649 let files_elapsed = files_start.elapsed();
6650 let file_count = file_paths.len();
6651 println!("📁 Found {} Delta files in {:?}", file_count, files_elapsed);
6652
6653 println!("🔄 Reading Delta data files as Parquet...");
6654 let read_start = std::time::Instant::now();
6655 let parquet_options = ParquetReadOptions::new()
6657 .parquet_pruning(false)
6660 .skip_metadata(false);
6661
6662
6663
6664 let df = ctx.read_parquet(file_paths, parquet_options).await?;
6665
6666 let read_elapsed = read_start.elapsed();
6667 println!("✅ Delta data read successfully in {:?}", read_elapsed);
6668
6669 println!("🔄 Loading data into memory and building table...");
6670 let collect_start = std::time::Instant::now();
6671
6672 let batches = df.clone().collect().await?;
6673 let schema = df.schema().clone().into();
6678 let mem_table = MemTable::try_new(schema, vec![batches])?;
6680
6681 let collect_elapsed = collect_start.elapsed();
6682 println!("✅ Memory table created in {:?}", collect_elapsed);
6683
6684 let normalized_alias = normalize_alias_write(alias).into_owned();
6685
6686 ctx.register_table(&normalized_alias, Arc::new(mem_table))
6687 .map_err(|e| ElusionError::InvalidOperation {
6688 operation: "Table Registration".to_string(),
6689 reason: e.to_string(),
6690 suggestion: "💡 Try using a different alias name".to_string(),
6691 })?;
6692
6693 let aliased_df = ctx.table(&normalized_alias).await
6694 .map_err(|_| ElusionError::InvalidOperation {
6695 operation: "Table Creation".to_string(),
6696 reason: format!("Failed to create table with alias '{}'", alias),
6697 suggestion: "💡 Check if the alias is valid and unique".to_string(),
6698 })?;
6699
6700 let total_elapsed = table_start.elapsed();
6701
6702 println!("🎉 Delta table loading completed successfully in {:?} for table alias: '{}'",
6703 total_elapsed, alias);
6704
6705 Ok(AliasedDataFrame {
6706 dataframe: aliased_df,
6707 alias: alias.to_string(),
6708 })
6709 })
6710 }
6711
6712
6713 pub async fn load(
6717 file_path: &str,
6718 alias: &str,
6719 ) -> ElusionResult<AliasedDataFrame> {
6720 let path_manager = DeltaPathManager::new(file_path);
6721 if path_manager.is_delta_table() {
6722 let aliased_df = Self::load_delta(file_path, alias).await?;
6723 let df_lower = lowercase_column_names(aliased_df.dataframe).await?;
6725 return Ok(AliasedDataFrame {
6726 dataframe: df_lower,
6727 alias: alias.to_string(),
6728 });
6729 }
6730
6731 let ext = file_path
6732 .split('.')
6733 .last()
6734 .unwrap_or_default()
6735 .to_lowercase();
6736
6737 let aliased_df = match ext.as_str() {
6738 "csv" => Self::load_csv(file_path, alias).await?,
6739 "json" => Self::load_json(file_path, alias).await?,
6740 "parquet" => Self::load_parquet(file_path, alias).await?,
6741 "xml" => Self::load_xml(file_path, alias).await?,
6742 "xlsx" | "xls" => crate::features::excel::load_excel(file_path, alias).await?,
6743 "" => return Err(ElusionError::InvalidOperation {
6744 operation: "File Loading".to_string(),
6745 reason: format!("Directory is not a Delta table and has no recognized extension: {file_path}"),
6746 suggestion: "💡 Provide a file with a supported extension (.csv, .json, .parquet, .xlsx, .xls, .xml) or a valid Delta table directory".to_string(),
6747 }),
6748 other => return Err(ElusionError::InvalidOperation {
6749 operation: "File Loading".to_string(),
6750 reason: format!("Unsupported file extension: {other}"),
6751 suggestion: "💡 Use one of the supported file types: .csv, .json, .parquet, .xlsx, .xls, .xml or Delta table".to_string(),
6752 }),
6753 };
6754
6755 let df_lower = lowercase_column_names(aliased_df.dataframe).await?;
6756 Ok(AliasedDataFrame {
6757 dataframe: df_lower,
6758 alias: alias.to_string(),
6759 })
6760 }
6761
6762 pub async fn load_folder(
6766 folder_path: &str,
6767 file_extensions: Option<Vec<&str>>,
6768 result_alias: &str,
6769 ) -> ElusionResult<Self> {
6770 use std::fs;
6771 use std::path::Path;
6772
6773 let folder_path_obj = Path::new(folder_path);
6774 if !folder_path_obj.exists() {
6775 return Err(ElusionError::WriteError {
6776 path: folder_path.to_string(),
6777 operation: "read".to_string(),
6778 reason: "Folder not found".to_string(),
6779 suggestion: "💡 Check if the folder path is correct".to_string(),
6780 });
6781 }
6782
6783 if !folder_path_obj.is_dir() {
6784 return Err(ElusionError::InvalidOperation {
6785 operation: "Local Folder Loading".to_string(),
6786 reason: "Path is not a directory".to_string(),
6787 suggestion: "💡 Provide a valid directory path".to_string(),
6788 });
6789 }
6790
6791 let entries = fs::read_dir(folder_path)
6792 .map_err(|e| ElusionError::WriteError {
6793 path: folder_path.to_string(),
6794 operation: "read".to_string(),
6795 reason: format!("Failed to read directory: {}", e),
6796 suggestion: "💡 Check directory permissions".to_string(),
6797 })?;
6798
6799 let mut dataframes = Vec::new();
6800
6801 for entry in entries {
6802 let entry = entry.map_err(|e| ElusionError::WriteError {
6803 path: folder_path.to_string(),
6804 operation: "read".to_string(),
6805 reason: format!("Failed to read directory entry: {}", e),
6806 suggestion: "💡 Check directory permissions".to_string(),
6807 })?;
6808
6809 let file_path = entry.path();
6810
6811 if !file_path.is_file() {
6812 continue;
6813 }
6814
6815 let file_name = file_path.file_name()
6816 .and_then(|name| name.to_str())
6817 .unwrap_or("")
6818 .to_string();
6819
6820 if file_name.starts_with('.') {
6821 continue;
6822 }
6823
6824 if let Some(ref extensions) = file_extensions {
6826 let file_ext = file_name
6827 .split('.')
6828 .last()
6829 .unwrap_or("")
6830 .to_lowercase();
6831
6832 if !extensions.iter().any(|ext| ext.to_lowercase() == file_ext) {
6833 continue;
6834 }
6835 }
6836
6837 let file_path_str = file_path.to_str().ok_or_else(|| ElusionError::InvalidOperation {
6838 operation: "Local Folder Loading".to_string(),
6839 reason: format!("Invalid file path: {:?}", file_path),
6840 suggestion: "💡 Ensure file paths contain valid UTF-8 characters".to_string(),
6841 })?;
6842
6843 match file_name.split('.').last().unwrap_or("").to_lowercase().as_str() {
6844 "csv" => {
6845 let file_size = std::fs::metadata(file_path_str)
6846 .map(|m| m.len())
6847 .unwrap_or(0);
6848
6849 if file_size > 500_000_000 {
6850 println!("📊 Large CSV detected ({} bytes), using streaming loader", file_size);
6851
6852 match load_csv_smart(file_path_str, "local_data").await {
6853 Ok(aliased_df) => {
6854 println!("✅ Loaded large CSV: {}", file_name);
6855
6856 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
6858
6859 let df = CustomDataFrame {
6860 df: normalized_df,
6861 table_alias: "local_csv".to_string(),
6862 from_table: "local_csv".to_string(),
6863 selected_columns: Vec::new(),
6864 alias_map: Vec::new(),
6865 aggregations: Vec::new(),
6866 group_by_columns: Vec::new(),
6867 where_conditions: Vec::new(),
6868 having_conditions: Vec::new(),
6869 order_by_columns: Vec::new(),
6870 limit_count: None,
6871 joins: Vec::new(),
6872 window_functions: Vec::new(),
6873 ctes: Vec::new(),
6874 subquery_source: None,
6875 set_operations: Vec::new(),
6876 query: String::new(),
6877 aggregated_df: None,
6878 union_tables: None,
6879 original_expressions: Vec::new(),
6880 needs_normalization: false,
6881 raw_selected_columns: Vec::new(),
6882 raw_group_by_columns: Vec::new(),
6883 raw_where_conditions: Vec::new(),
6884 raw_having_conditions: Vec::new(),
6885 raw_join_conditions: Vec::new(),
6886 raw_aggregations: Vec::new(),
6887 uses_group_by_all: false
6888 };
6889 dataframes.push(df);
6890 },
6891 Err(e) => {
6892 eprintln!("⚠️ Failed to load large CSV file {}: {}", file_name, e);
6893 continue;
6894 }
6895 }
6896 } else {
6897 match Self::load_csv(file_path_str, "local_data").await {
6899 Ok(aliased_df) => {
6900 println!("✅ Loaded CSV: {}", file_name);
6901
6902 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
6904
6905 let df = CustomDataFrame {
6906 df: normalized_df,
6907 table_alias: "local_csv".to_string(),
6908 from_table: "local_csv".to_string(),
6909 selected_columns: Vec::new(),
6910 alias_map: Vec::new(),
6911 aggregations: Vec::new(),
6912 group_by_columns: Vec::new(),
6913 where_conditions: Vec::new(),
6914 having_conditions: Vec::new(),
6915 order_by_columns: Vec::new(),
6916 limit_count: None,
6917 joins: Vec::new(),
6918 window_functions: Vec::new(),
6919 ctes: Vec::new(),
6920 subquery_source: None,
6921 set_operations: Vec::new(),
6922 query: String::new(),
6923 aggregated_df: None,
6924 union_tables: None,
6925 original_expressions: Vec::new(),
6926 needs_normalization: false,
6927 raw_selected_columns: Vec::new(),
6928 raw_group_by_columns: Vec::new(),
6929 raw_where_conditions: Vec::new(),
6930 raw_having_conditions: Vec::new(),
6931 raw_join_conditions: Vec::new(),
6932 raw_aggregations: Vec::new(),
6933 uses_group_by_all: false
6934 };
6935 dataframes.push(df);
6936 },
6937 Err(e) => {
6938 eprintln!("⚠️ Failed to load CSV file {}: {}", file_name, e);
6939 continue;
6940 }
6941 }
6942 }
6943 },
6944 "xlsx" | "xls" => {
6945 match crate::features::excel::load_excel(file_path_str, "local_data").await {
6946 Ok(aliased_df) => {
6947 println!("✅ Loaded Excel: {}", file_name);
6948
6949 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
6951
6952 let df = CustomDataFrame {
6953 df: normalized_df,
6954 table_alias: "local_excel".to_string(),
6955 from_table: "local_excel".to_string(),
6956 selected_columns: Vec::new(),
6957 alias_map: Vec::new(),
6958 aggregations: Vec::new(),
6959 group_by_columns: Vec::new(),
6960 where_conditions: Vec::new(),
6961 having_conditions: Vec::new(),
6962 order_by_columns: Vec::new(),
6963 limit_count: None,
6964 joins: Vec::new(),
6965 window_functions: Vec::new(),
6966 ctes: Vec::new(),
6967 subquery_source: None,
6968 set_operations: Vec::new(),
6969 query: String::new(),
6970 aggregated_df: None,
6971 union_tables: None,
6972 original_expressions: Vec::new(),
6973 needs_normalization: false,
6974 raw_selected_columns: Vec::new(),
6975 raw_group_by_columns: Vec::new(),
6976 raw_where_conditions: Vec::new(),
6977 raw_having_conditions: Vec::new(),
6978 raw_join_conditions: Vec::new(),
6979 raw_aggregations: Vec::new(),
6980 uses_group_by_all: false
6981 };
6982 dataframes.push(df);
6983 },
6984 Err(e) => {
6985 eprintln!("⚠️ Failed to load Excel file {}: {}", file_name, e);
6986 continue;
6987 }
6988 }
6989 },
6990 "json" => {
6991 match Self::load_json(file_path_str, "local_data").await {
6992 Ok(aliased_df) => {
6993 println!("✅ Loaded JSON: {}", file_name);
6994
6995 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
6997
6998 let df = CustomDataFrame {
6999 df: normalized_df,
7000 table_alias: "local_json".to_string(),
7001 from_table: "local_json".to_string(),
7002 selected_columns: Vec::new(),
7003 alias_map: Vec::new(),
7004 aggregations: Vec::new(),
7005 group_by_columns: Vec::new(),
7006 where_conditions: Vec::new(),
7007 having_conditions: Vec::new(),
7008 order_by_columns: Vec::new(),
7009 limit_count: None,
7010 joins: Vec::new(),
7011 window_functions: Vec::new(),
7012 ctes: Vec::new(),
7013 subquery_source: None,
7014 set_operations: Vec::new(),
7015 query: String::new(),
7016 aggregated_df: None,
7017 union_tables: None,
7018 original_expressions: Vec::new(),
7019 needs_normalization: false,
7020 raw_selected_columns: Vec::new(),
7021 raw_group_by_columns: Vec::new(),
7022 raw_where_conditions: Vec::new(),
7023 raw_having_conditions: Vec::new(),
7024 raw_join_conditions: Vec::new(),
7025 raw_aggregations: Vec::new(),
7026 uses_group_by_all: false
7027 };
7028 dataframes.push(df);
7029 },
7030 Err(e) => {
7031 eprintln!("⚠️ Failed to load JSON file {}: {}", file_name, e);
7032 continue;
7033 }
7034 }
7035 },
7036 "parquet" => {
7037 match Self::load_parquet(file_path_str, "local_data").await {
7038 Ok(aliased_df) => {
7039 println!("✅ Loaded Parquet: {}", file_name);
7040
7041 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7043
7044 let df = CustomDataFrame {
7045 df: normalized_df,
7046 table_alias: "local_parquet".to_string(),
7047 from_table: "local_parquet".to_string(),
7048 selected_columns: Vec::new(),
7049 alias_map: Vec::new(),
7050 aggregations: Vec::new(),
7051 group_by_columns: Vec::new(),
7052 where_conditions: Vec::new(),
7053 having_conditions: Vec::new(),
7054 order_by_columns: Vec::new(),
7055 limit_count: None,
7056 joins: Vec::new(),
7057 window_functions: Vec::new(),
7058 ctes: Vec::new(),
7059 subquery_source: None,
7060 set_operations: Vec::new(),
7061 query: String::new(),
7062 aggregated_df: None,
7063 union_tables: None,
7064 original_expressions: Vec::new(),
7065 needs_normalization: false,
7066 raw_selected_columns: Vec::new(),
7067 raw_group_by_columns: Vec::new(),
7068 raw_where_conditions: Vec::new(),
7069 raw_having_conditions: Vec::new(),
7070 raw_join_conditions: Vec::new(),
7071 raw_aggregations: Vec::new(),
7072 uses_group_by_all: false
7073 };
7074 dataframes.push(df);
7075 },
7076 Err(e) => {
7077 eprintln!("⚠️ Failed to load Parquet file {}: {}", file_name, e);
7078 continue;
7079 }
7080 }
7081 },
7082 "xml" => {
7083 match Self::load_xml(file_path_str, "local_data").await {
7084 Ok(aliased_df) => {
7085 println!("✅ Loaded XML: {}", file_name);
7086
7087 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7088
7089 let df = CustomDataFrame {
7090 df: normalized_df,
7091 table_alias: "local_xml".to_string(),
7092 from_table: "local_xml".to_string(),
7093 selected_columns: Vec::new(),
7094 alias_map: Vec::new(),
7095 aggregations: Vec::new(),
7096 group_by_columns: Vec::new(),
7097 where_conditions: Vec::new(),
7098 having_conditions: Vec::new(),
7099 order_by_columns: Vec::new(),
7100 limit_count: None,
7101 joins: Vec::new(),
7102 window_functions: Vec::new(),
7103 ctes: Vec::new(),
7104 subquery_source: None,
7105 set_operations: Vec::new(),
7106 query: String::new(),
7107 aggregated_df: None,
7108 union_tables: None,
7109 original_expressions: Vec::new(),
7110 needs_normalization: false,
7111 raw_selected_columns: Vec::new(),
7112 raw_group_by_columns: Vec::new(),
7113 raw_where_conditions: Vec::new(),
7114 raw_having_conditions: Vec::new(),
7115 raw_join_conditions: Vec::new(),
7116 raw_aggregations: Vec::new(),
7117 uses_group_by_all: false
7118 };
7119 dataframes.push(df);
7120 },
7121 Err(e) => {
7122 eprintln!("⚠️ Failed to load XML file {}: {}", file_name, e);
7123 continue;
7124 }
7125 }
7126 },
7127 _ => {
7128 println!("⏭️ Skipping unsupported file type: {}", file_name);
7129 }
7130 }
7131 }
7132
7133 if dataframes.is_empty() {
7134 return Err(ElusionError::InvalidOperation {
7135 operation: "Local Folder Loading".to_string(),
7136 reason: "No supported files found or all files failed to load".to_string(),
7137 suggestion: "💡 Check folder path and ensure it contains CSV, Excel, JSON, or Parquet files".to_string(),
7138 });
7139 }
7140
7141 if dataframes.len() == 1 {
7143 println!("📄 Single file loaded, returning as-is");
7144 return dataframes.into_iter().next().unwrap().elusion(result_alias).await;
7145 }
7146
7147 println!("🔍 Checking schema compatibility for {} files (names + types)...", dataframes.len());
7149
7150 let first_schema = dataframes[0].df.schema();
7151 let mut compatible_schemas = true;
7152 let mut schema_issues = Vec::new();
7153
7154 println!("📋 File 1 schema:");
7156 for (i, field) in first_schema.fields().iter().enumerate() {
7157 println!(" Column {}: '{}' ({})", i + 1, field.name(), field.data_type());
7158 }
7159
7160 for (file_idx, df) in dataframes.iter().enumerate().skip(1) {
7161 let current_schema = df.df.schema();
7162
7163 println!("📋 File {} schema:", file_idx + 1);
7164 for (i, field) in current_schema.fields().iter().enumerate() {
7165 println!(" Column {}: '{}' ({})", i + 1, field.name(), field.data_type());
7166 }
7167
7168 if first_schema.fields().len() != current_schema.fields().len() {
7170 compatible_schemas = false;
7171 schema_issues.push(format!("File {} has {} columns, but first file has {}",
7172 file_idx + 1, current_schema.fields().len(), first_schema.fields().len()));
7173 continue;
7174 }
7175
7176 for (col_idx, first_field) in first_schema.fields().iter().enumerate() {
7178 if let Some(current_field) = current_schema.fields().get(col_idx) {
7179 if first_field.name().to_lowercase() != current_field.name().to_lowercase() {
7181 compatible_schemas = false;
7182 schema_issues.push(format!("File {} column {} name is '{}', but first file has '{}'",
7183 file_idx + 1, col_idx + 1, current_field.name(), first_field.name()));
7184 }
7185
7186 if first_field.data_type() != current_field.data_type() {
7188 compatible_schemas = false;
7189 schema_issues.push(format!("File {} column {} ('{}') type is {:?}, but first file has {:?}",
7190 file_idx + 1, col_idx + 1, current_field.name(),
7191 current_field.data_type(), first_field.data_type()));
7192 }
7193 }
7194 }
7195 }
7196
7197 if !compatible_schemas {
7198 println!("⚠️ Schema compatibility issues found:");
7199 for issue in &schema_issues {
7200 println!(" {}", issue);
7201 }
7202
7203 println!("🔧 Reordering columns by name to match first file...");
7204
7205 let first_file_columns: Vec<String> = first_schema.fields()
7207 .iter()
7208 .map(|field| field.name().clone())
7209 .collect();
7210
7211 println!("📋 Target column order: {:?}", first_file_columns);
7212
7213 let mut reordered_dataframes = Vec::new();
7214
7215 for (i, df) in dataframes.clone().into_iter().enumerate() {
7216 let column_refs: Vec<&str> = first_file_columns.iter().map(|s| s.as_str()).collect();
7218 let reordered_df = df.select_vec(column_refs);
7219
7220 let temp_alias = format!("reordered_file_{}", i + 1);
7222 match reordered_df.elusion(&temp_alias).await {
7223 Ok(standardized_df) => {
7224 println!("✅ Reordered file {} columns", i + 1);
7225 reordered_dataframes.push(standardized_df);
7226 },
7227 Err(e) => {
7228 eprintln!("⚠️ Failed to reorder file {} columns: {}", i + 1, e);
7229 continue;
7230 }
7231 }
7232 }
7233
7234 if reordered_dataframes.is_empty() {
7235 println!("📄 Column reordering failed, returning first file only");
7236 return dataframes.into_iter().next().unwrap().elusion(result_alias).await;
7237 }
7238
7239 dataframes = reordered_dataframes;
7240 println!("✅ All files reordered to match first file column order");
7241 } else {
7242 println!("✅ All schemas are compatible!");
7243 }
7244
7245 println!("🔗 Unioning {} files with compatible schemas...", dataframes.len());
7246
7247 let total_files = dataframes.len();
7248 let mut result = dataframes.clone().into_iter().next().unwrap();
7249
7250 for (i, df) in dataframes.into_iter().enumerate().skip(1) {
7252 result = result.union_all(df).await
7253 .map_err(|e| ElusionError::InvalidOperation {
7254 operation: "Local Folder Union All".to_string(),
7255 reason: format!("Failed to union file {}: {}", i + 1, e),
7256 suggestion: "💡 Check that all files have compatible schemas".to_string(),
7257 })?;
7258
7259 println!("✅ Unioned file {}/{}", i + 1, total_files - 1);
7260 }
7261
7262 println!("🎉 Successfully combined {} files using UNION ALL", total_files);
7263
7264 result.elusion(result_alias).await
7265 }
7266
7267 pub async fn load_folder_with_filename_column(
7270 folder_path: &str,
7271 file_extensions: Option<Vec<&str>>,
7272 result_alias: &str,
7273 ) -> ElusionResult<Self> {
7274 use std::fs;
7275 use std::path::Path;
7276
7277 let folder_path_obj = Path::new(folder_path);
7279 if !folder_path_obj.exists() {
7280 return Err(ElusionError::WriteError {
7281 path: folder_path.to_string(),
7282 operation: "read".to_string(),
7283 reason: "Folder not found".to_string(),
7284 suggestion: "💡 Check if the folder path is correct".to_string(),
7285 });
7286 }
7287
7288 if !folder_path_obj.is_dir() {
7289 return Err(ElusionError::InvalidOperation {
7290 operation: "Local Folder Loading with Filename".to_string(),
7291 reason: "Path is not a directory".to_string(),
7292 suggestion: "💡 Provide a valid directory path".to_string(),
7293 });
7294 }
7295
7296 let entries = fs::read_dir(folder_path)
7298 .map_err(|e| ElusionError::WriteError {
7299 path: folder_path.to_string(),
7300 operation: "read".to_string(),
7301 reason: format!("Failed to read directory: {}", e),
7302 suggestion: "💡 Check directory permissions".to_string(),
7303 })?;
7304
7305 let mut dataframes = Vec::new();
7306
7307 for entry in entries {
7308 let entry = entry.map_err(|e| ElusionError::WriteError {
7309 path: folder_path.to_string(),
7310 operation: "read".to_string(),
7311 reason: format!("Failed to read directory entry: {}", e),
7312 suggestion: "💡 Check directory permissions".to_string(),
7313 })?;
7314
7315 let file_path = entry.path();
7316
7317 if !file_path.is_file() {
7319 continue;
7320 }
7321
7322 let file_name = file_path.file_name()
7323 .and_then(|name| name.to_str())
7324 .unwrap_or("")
7325 .to_string();
7326
7327 if file_name.starts_with('.') {
7329 continue;
7330 }
7331
7332 if let Some(ref extensions) = file_extensions {
7334 let file_ext = file_name
7335 .split('.')
7336 .last()
7337 .unwrap_or("")
7338 .to_lowercase();
7339
7340 if !extensions.iter().any(|ext| ext.to_lowercase() == file_ext) {
7341 continue;
7342 }
7343 }
7344
7345 let file_path_str = file_path.to_str().ok_or_else(|| ElusionError::InvalidOperation {
7346 operation: "Local Folder Loading with Filename".to_string(),
7347 reason: format!("Invalid file path: {:?}", file_path),
7348 suggestion: "💡 Ensure file paths contain valid UTF-8 characters".to_string(),
7349 })?;
7350
7351 let mut loaded_df = None;
7353
7354 match file_name.split('.').last().unwrap_or("").to_lowercase().as_str() {
7355 "csv" => {
7356
7357 let file_size = std::fs::metadata(file_path_str)
7358 .map(|m| m.len())
7359 .unwrap_or(0);
7360
7361 if file_size > 500_000_000 {
7362 println!("📊 Large CSV detected ({}), using streaming loader", file_name);
7363
7364 match load_csv_smart(file_path_str, "local_data").await {
7365 Ok(aliased_df) => {
7366 println!("✅ Loaded large CSV: {}", file_name);
7367 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7368
7369 let df = CustomDataFrame {
7370 df: normalized_df,
7371 table_alias: "local_csv".to_string(),
7372 from_table: "local_csv".to_string(),
7373 selected_columns: Vec::new(),
7374 alias_map: Vec::new(),
7375 aggregations: Vec::new(),
7376 group_by_columns: Vec::new(),
7377 where_conditions: Vec::new(),
7378 having_conditions: Vec::new(),
7379 order_by_columns: Vec::new(),
7380 limit_count: None,
7381 joins: Vec::new(),
7382 window_functions: Vec::new(),
7383 ctes: Vec::new(),
7384 subquery_source: None,
7385 set_operations: Vec::new(),
7386 query: String::new(),
7387 aggregated_df: None,
7388 union_tables: None,
7389 original_expressions: Vec::new(),
7390 needs_normalization: false,
7391 raw_selected_columns: Vec::new(),
7392 raw_group_by_columns: Vec::new(),
7393 raw_where_conditions: Vec::new(),
7394 raw_having_conditions: Vec::new(),
7395 raw_join_conditions: Vec::new(),
7396 raw_aggregations: Vec::new(),
7397 uses_group_by_all: false
7398 };
7399 loaded_df = Some(df);
7400 },
7401 Err(e) => {
7402 eprintln!("⚠️ Failed to load large CSV file {}: {}", file_name, e);
7403 continue;
7404 }
7405 }
7406 } else {
7407 match Self::load_csv(file_path_str, "local_data").await {
7408 Ok(aliased_df) => {
7409 println!("✅ Loaded CSV: {}", file_name);
7410
7411 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7412
7413 let df = CustomDataFrame {
7414 df: normalized_df,
7415 table_alias: "local_csv".to_string(),
7416 from_table: "local_csv".to_string(),
7417 selected_columns: Vec::new(),
7418 alias_map: Vec::new(),
7419 aggregations: Vec::new(),
7420 group_by_columns: Vec::new(),
7421 where_conditions: Vec::new(),
7422 having_conditions: Vec::new(),
7423 order_by_columns: Vec::new(),
7424 limit_count: None,
7425 joins: Vec::new(),
7426 window_functions: Vec::new(),
7427 ctes: Vec::new(),
7428 subquery_source: None,
7429 set_operations: Vec::new(),
7430 query: String::new(),
7431 aggregated_df: None,
7432 union_tables: None,
7433 original_expressions: Vec::new(),
7434 needs_normalization: false,
7435 raw_selected_columns: Vec::new(),
7436 raw_group_by_columns: Vec::new(),
7437 raw_where_conditions: Vec::new(),
7438 raw_having_conditions: Vec::new(),
7439 raw_join_conditions: Vec::new(),
7440 raw_aggregations: Vec::new(),
7441 uses_group_by_all: false
7442 };
7443 loaded_df = Some(df);
7444 },
7445 Err(e) => {
7446 eprintln!("⚠️ Failed to load CSV file {}: {}", file_name, e);
7447 continue;
7448 }
7449 }
7450 }
7451 },
7452 "xlsx" | "xls" => {
7453 match crate::features::excel::load_excel(file_path_str, "local_data").await {
7454 Ok(aliased_df) => {
7455 println!("✅ Loaded Excel: {}", file_name);
7456
7457 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7459
7460 let df = CustomDataFrame {
7461 df: normalized_df,
7462 table_alias: "local_excel".to_string(),
7463 from_table: "local_excel".to_string(),
7464 selected_columns: Vec::new(),
7465 alias_map: Vec::new(),
7466 aggregations: Vec::new(),
7467 group_by_columns: Vec::new(),
7468 where_conditions: Vec::new(),
7469 having_conditions: Vec::new(),
7470 order_by_columns: Vec::new(),
7471 limit_count: None,
7472 joins: Vec::new(),
7473 window_functions: Vec::new(),
7474 ctes: Vec::new(),
7475 subquery_source: None,
7476 set_operations: Vec::new(),
7477 query: String::new(),
7478 aggregated_df: None,
7479 union_tables: None,
7480 original_expressions: Vec::new(),
7481 needs_normalization: false,
7482 raw_selected_columns: Vec::new(),
7483 raw_group_by_columns: Vec::new(),
7484 raw_where_conditions: Vec::new(),
7485 raw_having_conditions: Vec::new(),
7486 raw_join_conditions: Vec::new(),
7487 raw_aggregations: Vec::new(),
7488 uses_group_by_all: false
7489 };
7490 loaded_df = Some(df);
7491 },
7492 Err(e) => {
7493 eprintln!("⚠️ Failed to load Excel file {}: {}", file_name, e);
7494 continue;
7495 }
7496 }
7497 },
7498 "json" => {
7499 match Self::load_json(file_path_str, "local_data").await {
7500 Ok(aliased_df) => {
7501 println!("✅ Loaded JSON: {}", file_name);
7502
7503 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7505
7506 let df = CustomDataFrame {
7507 df: normalized_df,
7508 table_alias: "local_json".to_string(),
7509 from_table: "local_json".to_string(),
7510 selected_columns: Vec::new(),
7511 alias_map: Vec::new(),
7512 aggregations: Vec::new(),
7513 group_by_columns: Vec::new(),
7514 where_conditions: Vec::new(),
7515 having_conditions: Vec::new(),
7516 order_by_columns: Vec::new(),
7517 limit_count: None,
7518 joins: Vec::new(),
7519 window_functions: Vec::new(),
7520 ctes: Vec::new(),
7521 subquery_source: None,
7522 set_operations: Vec::new(),
7523 query: String::new(),
7524 aggregated_df: None,
7525 union_tables: None,
7526 original_expressions: Vec::new(),
7527 needs_normalization: false,
7528 raw_selected_columns: Vec::new(),
7529 raw_group_by_columns: Vec::new(),
7530 raw_where_conditions: Vec::new(),
7531 raw_having_conditions: Vec::new(),
7532 raw_join_conditions: Vec::new(),
7533 raw_aggregations: Vec::new(),
7534 uses_group_by_all: false
7535 };
7536 loaded_df = Some(df);
7537 },
7538 Err(e) => {
7539 eprintln!("⚠️ Failed to load JSON file {}: {}", file_name, e);
7540 continue;
7541 }
7542 }
7543 },
7544 "parquet" => {
7545 match Self::load_parquet(file_path_str, "local_data").await {
7546 Ok(aliased_df) => {
7547 println!("✅ Loaded Parquet: {}", file_name);
7548
7549 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7551
7552 let df = CustomDataFrame {
7553 df: normalized_df,
7554 table_alias: "local_parquet".to_string(),
7555 from_table: "local_parquet".to_string(),
7556 selected_columns: Vec::new(),
7557 alias_map: Vec::new(),
7558 aggregations: Vec::new(),
7559 group_by_columns: Vec::new(),
7560 where_conditions: Vec::new(),
7561 having_conditions: Vec::new(),
7562 order_by_columns: Vec::new(),
7563 limit_count: None,
7564 joins: Vec::new(),
7565 window_functions: Vec::new(),
7566 ctes: Vec::new(),
7567 subquery_source: None,
7568 set_operations: Vec::new(),
7569 query: String::new(),
7570 aggregated_df: None,
7571 union_tables: None,
7572 original_expressions: Vec::new(),
7573 needs_normalization: false,
7574 raw_selected_columns: Vec::new(),
7575 raw_group_by_columns: Vec::new(),
7576 raw_where_conditions: Vec::new(),
7577 raw_having_conditions: Vec::new(),
7578 raw_join_conditions: Vec::new(),
7579 raw_aggregations: Vec::new(),
7580 uses_group_by_all: false
7581 };
7582 loaded_df = Some(df);
7583 },
7584 Err(e) => {
7585 eprintln!("⚠️ Failed to load Parquet file {}: {}", file_name, e);
7586 continue;
7587 }
7588 }
7589 },
7590 "xml" => {
7591 match Self::load_xml(file_path_str, "local_data").await {
7592 Ok(aliased_df) => {
7593 println!("✅ Loaded XML: {}", file_name);
7594
7595 let normalized_df = lowercase_column_names(aliased_df.dataframe).await?;
7597
7598 let df = CustomDataFrame {
7599 df: normalized_df,
7600 table_alias: "local_xml".to_string(),
7601 from_table: "local_xml".to_string(),
7602 selected_columns: Vec::new(),
7603 alias_map: Vec::new(),
7604 aggregations: Vec::new(),
7605 group_by_columns: Vec::new(),
7606 where_conditions: Vec::new(),
7607 having_conditions: Vec::new(),
7608 order_by_columns: Vec::new(),
7609 limit_count: None,
7610 joins: Vec::new(),
7611 window_functions: Vec::new(),
7612 ctes: Vec::new(),
7613 subquery_source: None,
7614 set_operations: Vec::new(),
7615 query: String::new(),
7616 aggregated_df: None,
7617 union_tables: None,
7618 original_expressions: Vec::new(),
7619 needs_normalization: false,
7620 raw_selected_columns: Vec::new(),
7621 raw_group_by_columns: Vec::new(),
7622 raw_where_conditions: Vec::new(),
7623 raw_having_conditions: Vec::new(),
7624 raw_join_conditions: Vec::new(),
7625 raw_aggregations: Vec::new(),
7626 uses_group_by_all: false
7627 };
7628 dataframes.push(df);
7629 },
7630 Err(e) => {
7631 eprintln!("⚠️ Failed to load XML file {}: {}", file_name, e);
7632 continue;
7633 }
7634 }
7635 },
7636 _ => {
7637 println!("⏭️ Skipping unsupported file type: {}", file_name);
7638 }
7639 }
7640
7641 if let Some(mut df) = loaded_df {
7643 df = df.select_vec(vec![
7645 &format!("'{}' AS filename_added", file_name),
7646 "*"
7647 ]);
7648
7649 let temp_alias = format!("file_with_filename_{}", dataframes.len());
7651 match df.elusion(&temp_alias).await {
7652 Ok(filename_df) => {
7653 println!("✅ Added filename column to {}", file_name);
7654 dataframes.push(filename_df);
7655 },
7656 Err(e) => {
7657 eprintln!("⚠️ Failed to add filename to {}: {}", file_name, e);
7658 continue;
7659 }
7660 }
7661 }
7662 }
7663
7664 if dataframes.is_empty() {
7665 return Err(ElusionError::InvalidOperation {
7666 operation: "Local Folder Loading with Filename".to_string(),
7667 reason: "No supported files found or all files failed to load".to_string(),
7668 suggestion: "💡 Check folder path and ensure it contains supported files".to_string(),
7669 });
7670 }
7671
7672 if dataframes.len() == 1 {
7674 println!("📄 Single file loaded with filename column");
7675 return dataframes.into_iter().next().unwrap().elusion(result_alias).await;
7676 }
7677
7678 println!("🔍 Checking schema compatibility for {} files with filename columns...", dataframes.len());
7680
7681 let first_schema = dataframes[0].df.schema();
7682 let mut compatible_schemas = true;
7683 let mut schema_issues = Vec::new();
7684
7685 for (file_idx, df) in dataframes.iter().enumerate().skip(1) {
7686 let current_schema = df.df.schema();
7687
7688 if first_schema.fields().len() != current_schema.fields().len() {
7690 compatible_schemas = false;
7691 schema_issues.push(format!("File {} has {} columns, but first file has {}",
7692 file_idx + 1, current_schema.fields().len(), first_schema.fields().len()));
7693 continue;
7694 }
7695
7696 for (col_idx, first_field) in first_schema.fields().iter().enumerate() {
7698 if let Some(current_field) = current_schema.fields().get(col_idx) {
7699 if first_field.name().to_lowercase() != current_field.name().to_lowercase() {
7700 compatible_schemas = false;
7701 schema_issues.push(format!("File {} column {} name is '{}', but first file has '{}'",
7702 file_idx + 1, col_idx + 1, current_field.name(), first_field.name()));
7703 }
7704 }
7705 }
7706 }
7707
7708 if !compatible_schemas {
7709 println!("⚠️ Schema compatibility issues found:");
7710 for issue in &schema_issues {
7711 println!(" {}", issue);
7712 }
7713
7714 println!("🔧 Reordering columns by name to match first file...");
7716
7717 let first_file_columns: Vec<String> = first_schema.fields()
7718 .iter()
7719 .map(|field| field.name().clone())
7720 .collect();
7721
7722 println!("📋 Target column order: {:?}", first_file_columns);
7723
7724 let mut reordered_dataframes = Vec::new();
7725
7726 for (i, df) in dataframes.clone().into_iter().enumerate() {
7727 let column_refs: Vec<&str> = first_file_columns.iter().map(|s| s.as_str()).collect();
7728 let reordered_df = df.select_vec(column_refs);
7729
7730 let temp_alias = format!("reordered_file_{}", i + 1);
7731 match reordered_df.elusion(&temp_alias).await {
7732 Ok(standardized_df) => {
7733 println!("✅ Reordered file {} columns", i + 1);
7734 reordered_dataframes.push(standardized_df);
7735 },
7736 Err(e) => {
7737 eprintln!("⚠️ Failed to reorder file {} columns: {}", i + 1, e);
7738 continue;
7739 }
7740 }
7741 }
7742
7743 if reordered_dataframes.is_empty() {
7744 println!("📄 Column reordering failed, returning first file only");
7745 return dataframes.into_iter().next().unwrap().elusion(result_alias).await;
7746 }
7747
7748 dataframes = reordered_dataframes;
7749 println!("✅ All files reordered to match first file column order");
7750 } else {
7751 println!("✅ All schemas are compatible!");
7752 }
7753
7754 println!("🔗 Unioning {} files with filename tracking...", dataframes.len());
7756
7757 let total_files = dataframes.len();
7758 let mut result = dataframes.clone().into_iter().next().unwrap();
7759
7760 for (i, df) in dataframes.into_iter().enumerate().skip(1) {
7762 result = result.union_all(df).await
7763 .map_err(|e| ElusionError::InvalidOperation {
7764 operation: "Local Folder Union with Filename".to_string(),
7765 reason: format!("Failed to union file {}: {}", i + 1, e),
7766 suggestion: "💡 Check that all files have compatible schemas".to_string(),
7767 })?;
7768
7769 println!("✅ Unioned file {}/{}", i + 1, total_files - 1);
7770 }
7771
7772 println!("🎉 Successfully combined {} files with filename tracking", total_files);
7773
7774 result.elusion(result_alias).await
7775 }
7776
7777 #[cfg(feature = "dashboard")]
7780 pub async fn plot_line(
7781 &self,
7782 date_col: &str,
7783 value_col: &str,
7784 show_markers: bool,
7785 title: Option<&str>,
7786 ) -> ElusionResult<PlotlyPlot> {
7787 crate::features::dashboard::plot_line_impl(self, date_col, value_col, show_markers, title).await
7788 }
7789
7790 #[cfg(not(feature = "dashboard"))]
7791 pub async fn plot_line(
7792 &self,
7793 _date_col: &str,
7794 _value_col: &str,
7795 _show_markers: bool,
7796 _title: Option<&str>,
7797 ) -> ElusionResult<PlotlyPlot> {
7798 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7799 }
7800
7801 #[cfg(feature = "dashboard")]
7802 pub async fn plot_time_series(
7803 &self,
7804 date_col: &str,
7805 value_col: &str,
7806 show_markers: bool,
7807 title: Option<&str>,
7808 ) -> ElusionResult<PlotlyPlot> {
7809 crate::features::dashboard::plot_time_series_impl(self, date_col, value_col, show_markers, title).await
7810 }
7811
7812 #[cfg(not(feature = "dashboard"))]
7813 pub async fn plot_time_series(
7814 &self,
7815 _date_col: &str,
7816 _value_col: &str,
7817 _show_markers: bool,
7818 _title: Option<&str>,
7819 ) -> ElusionResult<PlotlyPlot> {
7820 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7821 }
7822
7823 #[cfg(feature = "dashboard")]
7824 pub async fn plot_bar(
7825 &self,
7826 x_col: &str,
7827 y_col: &str,
7828 title: Option<&str>,
7829 ) -> ElusionResult<PlotlyPlot> {
7830 crate::features::dashboard::plot_bar_impl(self, x_col, y_col, title).await
7831 }
7832
7833 #[cfg(not(feature = "dashboard"))]
7834 pub async fn plot_bar(
7835 &self,
7836 _x_col: &str,
7837 _y_col: &str,
7838 _title: Option<&str>,
7839 ) -> ElusionResult<PlotlyPlot> {
7840 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7841 }
7842
7843 #[cfg(feature = "dashboard")]
7844 pub async fn plot_scatter(
7845 &self,
7846 x_col: &str,
7847 y_col: &str,
7848 marker_size: Option<usize>,
7849 ) -> ElusionResult<PlotlyPlot> {
7850 crate::features::dashboard::plot_scatter_impl(self, x_col, y_col, marker_size).await
7851 }
7852
7853 #[cfg(not(feature = "dashboard"))]
7854 pub async fn plot_scatter(
7855 &self,
7856 _x_col: &str,
7857 _y_col: &str,
7858 _marker_size: Option<usize>,
7859 ) -> ElusionResult<PlotlyPlot> {
7860 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7861 }
7862
7863 #[cfg(feature = "dashboard")]
7864 pub async fn plot_histogram(
7865 &self,
7866 col: &str,
7867 title: Option<&str>,
7868 ) -> ElusionResult<PlotlyPlot> {
7869 crate::features::dashboard::plot_histogram_impl(self, col, title).await
7870 }
7871
7872 #[cfg(not(feature = "dashboard"))]
7873 pub async fn plot_histogram(
7874 &self,
7875 _col: &str,
7876 _title: Option<&str>,
7877 ) -> ElusionResult<PlotlyPlot> {
7878 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7879 }
7880
7881 #[cfg(feature = "dashboard")]
7882 pub async fn plot_box(
7883 &self,
7884 value_col: &str,
7885 group_by_col: Option<&str>,
7886 title: Option<&str>,
7887 ) -> ElusionResult<PlotlyPlot> {
7888 crate::features::dashboard::plot_box_impl(self, value_col, group_by_col, title).await
7889 }
7890
7891 #[cfg(not(feature = "dashboard"))]
7892 pub async fn plot_box(
7893 &self,
7894 _value_col: &str,
7895 _group_by_col: Option<&str>,
7896 _title: Option<&str>,
7897 ) -> ElusionResult<PlotlyPlot> {
7898 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7899 }
7900
7901 #[cfg(feature = "dashboard")]
7902 pub async fn plot_pie(
7903 &self,
7904 label_col: &str,
7905 value_col: &str,
7906 title: Option<&str>,
7907 ) -> ElusionResult<PlotlyPlot> {
7908 crate::features::dashboard::plot_pie_impl(self, label_col, value_col, title).await
7909 }
7910
7911 #[cfg(not(feature = "dashboard"))]
7912 pub async fn plot_pie(
7913 &self,
7914 _label_col: &str,
7915 _value_col: &str,
7916 _title: Option<&str>,
7917 ) -> ElusionResult<PlotlyPlot> {
7918 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7919 }
7920
7921 #[cfg(feature = "dashboard")]
7922 pub async fn plot_donut(
7923 &self,
7924 label_col: &str,
7925 value_col: &str,
7926 title: Option<&str>,
7927 ) -> ElusionResult<PlotlyPlot> {
7928 crate::features::dashboard::plot_donut_impl(self, label_col, value_col, title).await
7929 }
7930
7931 #[cfg(not(feature = "dashboard"))]
7932 pub async fn plot_donut(
7933 &self,
7934 _label_col: &str,
7935 _value_col: &str,
7936 _title: Option<&str>,
7937 ) -> ElusionResult<PlotlyPlot> {
7938 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7939 }
7940
7941 #[cfg(feature = "dashboard")]
7943 pub async fn plot_waterfall(
7944 &self,
7945 x_col: &str,
7946 y_col: &str,
7947 title: Option<&str>,
7948 ) -> ElusionResult<PlotlyPlot> {
7949 crate::features::dashboard::plot_waterfall_impl(self, x_col, y_col, title).await
7950 }
7951
7952 #[cfg(not(feature = "dashboard"))]
7953 pub async fn plot_waterfall(
7954 &self,
7955 _x_col: &str,
7956 _y_col: &str,
7957 _title: Option<&str>,
7958 ) -> ElusionResult<PlotlyPlot> {
7959 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
7960 }
7961
7962 #[cfg(feature = "dashboard")]
7963 pub async fn create_report(
7964 plots: Option<&[(&PlotlyPlot, &str)]>,
7965 tables: Option<&[(&CustomDataFrame, &str)]>,
7966 report_title: &str,
7967 filename: &str,
7968 layout_config: Option<ReportLayout>,
7969 table_options: Option<TableOptions>,
7970 ) -> ElusionResult<()> {
7971 crate::features::dashboard::create_report_impl(
7972 plots, tables, report_title, filename, layout_config, table_options
7973 ).await
7974 }
7975
7976 #[cfg(not(feature = "dashboard"))]
7977 pub async fn create_report(
7978 _plots: Option<&[(&PlotlyPlot, &str)]>,
7979 _tables: Option<&[(&CustomDataFrame, &str)]>,
7980 _report_title: &str,
7981 _filename: &str,
7982 _layout_config: Option<ReportLayout>,
7983 _table_options: Option<TableOptions>,
7984 ) -> ElusionResult<()> {
7985 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled.".to_string()))
7986 }
7987
7988 #[cfg(feature = "dashboard")]
7990 pub async fn export_plot_to_png(
7991 plot: &PlotlyPlot,
7992 filename: &str,
7993 width: u32,
7994 height: u32,
7995 ) -> ElusionResult<()> {
7996 crate::features::dashboard::export_plot_to_png_impl(plot, filename, width, height).await
7997 }
7998
7999 #[cfg(not(feature = "dashboard"))]
8000 pub async fn export_plot_to_png(
8001 _plot: &PlotlyPlot,
8002 _filename: &str,
8003 _width: u32,
8004 _height: u32,
8005 ) -> ElusionResult<()> {
8006 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
8007 }
8008
8009 #[cfg(feature = "dashboard")]
8011 pub async fn export_report_to_pdf(
8012 plots: Option<&[(&PlotlyPlot, &str)]>,
8013 tables: Option<&[(&CustomDataFrame, &str)]>,
8014 report_title: &str,
8015 pdf_filename: &str,
8016 layout_config: Option<ReportLayout>,
8017 table_options: Option<TableOptions>,
8018 ) -> ElusionResult<()> {
8019 crate::features::dashboard::export_report_to_pdf_impl(
8020 plots, tables, report_title, pdf_filename, layout_config, table_options
8021 ).await
8022 }
8023
8024 #[cfg(not(feature = "dashboard"))]
8025 pub async fn export_report_to_pdf(
8026 _plots: Option<&[(&PlotlyPlot, &str)]>,
8027 _tables: Option<&[(&CustomDataFrame, &str)]>,
8028 _report_title: &str,
8029 _pdf_filename: &str,
8030 _layout_config: Option<ReportLayout>,
8031 _table_options: Option<TableOptions>,
8032 ) -> ElusionResult<()> {
8033 Err(ElusionError::Custom("*** Warning ***: Dashboard feature not enabled. Add feature dashboard under [dependencies]".to_string()))
8034 }
8035
8036
8037 pub async fn elusion_streaming(&self, alias: &str) -> ElusionResult<()> {
8042
8043 if alias.trim().is_empty() {
8044 return Err(ElusionError::InvalidOperation {
8045 operation: "Elusion Streaming".to_string(),
8046 reason: "Alias cannot be empty".to_string(),
8047 suggestion: "💡 Provide a valid table alias".to_string()
8048 });
8049 }
8050
8051 println!("🚀 Executing streaming query for '{}'...", alias);
8052
8053 let sql = self.construct_sql();
8054 let mut stream = match self.stream().await {
8057 Ok(stream) => stream,
8058 Err(e) => {
8059 println!("❌ Failed to create stream: {}", e);
8060 return Err(e);
8061 }
8062 };
8063
8064 let mut chunk_count = 0;
8065 let mut total_rows = 0;
8066 let start_time = std::time::Instant::now();
8067 let mut has_shown_sample = false;
8068
8069 println!("🔃 Starting to process stream...");
8070
8071 while let Some(batch_result) = stream.next().await {
8072 let batch = match batch_result {
8073 Ok(batch) => batch,
8074 Err(e) => {
8075 println!("❌ Error processing batch {}: {}", chunk_count + 1, e);
8076 return Err(ElusionError::InvalidOperation {
8077 operation: "Stream Processing".to_string(),
8078 reason: format!("Failed to process batch: {}", e),
8079 suggestion: "💡 Check query syntax and data integrity".to_string()
8080 });
8081 }
8082 };
8083
8084 chunk_count += 1;
8085 let batch_rows = batch.num_rows();
8086 total_rows += batch_rows;
8087
8088 if batch_rows > 0 {
8089 println!("📦 Chunk {}: {} rows", chunk_count, batch_rows);
8090 } else {
8091 println!("📦 Chunk {}: {} rows (empty)", chunk_count, batch_rows);
8092 }
8093
8094 if chunk_count <= 3 || chunk_count % 100 == 0 {
8096 let elapsed = start_time.elapsed();
8097 let rows_per_sec = if elapsed.as_secs() > 0 {
8098 total_rows as f64 / elapsed.as_secs_f64()
8099 } else {
8100 0.0
8101 };
8102 println!("🔁 Progress: {} chunks processed | {} total rows | {:.0} rows/sec",
8103 chunk_count, total_rows, rows_per_sec);
8104 }
8105
8106 if !has_shown_sample && batch_rows > 0 {
8107 println!("📋 Sample results ({} rows shown):", batch_rows.min(15));
8108 match self.display_sample(&batch) {
8109 Ok(_) => has_shown_sample = true,
8110 Err(e) => println!("⚠️ Could not display sample: {}", e)
8111 }
8112 }
8113
8114 if chunk_count % 1000 == 0 {
8116 println!("💤 Brief pause after {} chunks to prevent system overload", chunk_count);
8117 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
8118 }
8119 }
8120
8121 let total_time = start_time.elapsed();
8122
8123 if total_rows == 0 {
8124 println!("⚠️ Query completed but returned 0 rows");
8125 println!("💡 This could indicate:");
8126 println!(" - Filters eliminated all data");
8127 println!(" - Empty source dataset");
8128 println!(" - Query logic issue");
8129 println!("🔍 Generated SQL was: {}", sql);
8130 } else {
8131 println!("✅ Streaming complete: {} chunks, {} rows in {:?}",
8132 chunk_count, total_rows, total_time);
8133 }
8134
8135 Ok(())
8136 }
8137
8138 fn display_sample(&self, batch: &RecordBatch) -> ElusionResult<()> {
8140 if batch.num_rows() == 0 {
8141 println!("📋 Empty batch (0 rows)");
8142 return Ok(());
8143 }
8144
8145 let sample_size = if batch.num_rows() <= 20 {
8147 batch.num_rows()
8148 } else {
8149 15
8150 };
8151
8152 let sample_batch = batch.slice(0, sample_size);
8153
8154 let formatted = pretty_format_batches(&[sample_batch])
8155 .map_err(|e| ElusionError::Custom(format!("Display error: {}", e)))?;
8156
8157 if batch.num_rows() <= 20 {
8158 println!("📑 Complete result ({} rows):", batch.num_rows());
8159 } else {
8160 println!("📋 Sample result ({} of {} rows shown):", sample_size, batch.num_rows());
8161 }
8162
8163 println!("{}", formatted);
8164
8165 if batch.num_rows() > sample_size {
8166 println!("... ({} more rows in this chunk)", batch.num_rows() - sample_size);
8167 }
8168
8169 Ok(())
8170 }
8171
8172 pub async fn stream(&self) -> ElusionResult<SendableRecordBatchStream> {
8174 let ctx = SessionContext::new();
8175
8176 if let Err(e) = self.register_all_tables(&ctx).await {
8178 println!("❌ Failed to register tables: {}", e);
8179 return Err(e);
8180 }
8181
8182 let sql = self.construct_sql();
8183 let df = ctx.sql(&sql).await.map_err(|e| {
8187 println!("❌ SQL parsing failed: {}", e);
8188 ElusionError::InvalidOperation {
8189 operation: "SQL Parsing".to_string(),
8190 reason: format!("Failed to parse SQL: {}", e),
8191 suggestion: "💡 Check SQL syntax, table aliases, and column names".to_string()
8192 }
8193 })?;
8194
8195 println!("✅ Query parsed successfully, creating execution stream...");
8196
8197 df.execute_stream().await.map_err(|e| {
8199 println!("❌ Stream execution failed: {}", e);
8200 ElusionError::InvalidOperation {
8201 operation: "Stream Execution".to_string(),
8202 reason: format!("Failed to execute stream: {}", e),
8203 suggestion: "💡 Check data integrity and memory availability".to_string()
8204 }
8205 })
8206 }
8207
8208 pub async fn stream_process<F, Fut>(
8210 &self,
8211 mut processor: F
8212 ) -> ElusionResult<()>
8213 where
8214 F: FnMut(RecordBatch) -> Fut,
8215 Fut: std::future::Future<Output = ElusionResult<()>>,
8216 {
8217 let mut stream = self.stream().await?;
8218
8219 while let Some(batch_result) = stream.next().await {
8220 let batch = batch_result.map_err(|e| ElusionError::InvalidOperation {
8221 operation: "Stream Processing".to_string(),
8222 reason: format!("Failed to get batch from stream: {}", e),
8223 suggestion: "💡 Check data integrity and memory availability".to_string()
8224 })?;
8225
8226 processor(batch).await?;
8227 }
8228
8229 Ok(())
8230 }
8231
8232 pub async fn elusion_streaming_write(
8236 &self,
8237 alias: &str,
8238 output_path: &str,
8239 write_mode: &str ) -> ElusionResult<()> {
8241 if alias.trim().is_empty() {
8242 return Err(ElusionError::InvalidOperation {
8243 operation: "Elusion Streaming Write".to_string(),
8244 reason: "Alias cannot be empty".to_string(),
8245 suggestion: "💡 Provide a valid table alias".to_string()
8246 });
8247 }
8248
8249 println!("📄 Streaming query results to: {}", output_path);
8250
8251 let ext = output_path.split('.').last().unwrap_or_default().to_lowercase();
8253
8254 match ext.as_str() {
8255 "csv" => {
8256 self.stream_to_csv(alias, output_path, write_mode).await
8257 },
8258 "json" => {
8259 self.stream_to_json(alias, output_path, write_mode).await
8260 },
8261 "parquet" => {
8262 self.stream_to_parquet_batched(alias, output_path, write_mode).await
8264 },
8265 _ => {
8266 Err(ElusionError::InvalidOperation {
8267 operation: "Streaming Write".to_string(),
8268 reason: format!("Unsupported file extension: {}", ext),
8269 suggestion: "💡 Use .csv, .json, or .parquet extensions".to_string()
8270 })
8271 }
8272 }
8273 }
8274
8275 async fn stream_to_csv(&self, _alias: &str, output_path: &str, mode: &str) -> ElusionResult<()> {
8277 if mode == "overwrite" && std::fs::metadata(output_path).is_ok() {
8279 std::fs::remove_file(output_path).map_err(|e| ElusionError::WriteError {
8280 path: output_path.to_string(),
8281 operation: "overwrite".to_string(),
8282 reason: e.to_string(),
8283 suggestion: "💡 Check file permissions".to_string()
8284 })?;
8285 }
8286
8287 let file = std::fs::OpenOptions::new()
8288 .create(true)
8289 .write(true)
8290 .append(mode == "append")
8291 .truncate(mode == "overwrite")
8292 .open(output_path)
8293 .map_err(|e| ElusionError::WriteError {
8294 path: output_path.to_string(),
8295 operation: "file_create".to_string(),
8296 reason: e.to_string(),
8297 suggestion: "💡 Check path and permissions".to_string()
8298 })?;
8299
8300 let mut writer = std::io::BufWriter::new(file);
8301
8302 let mut stream = self.stream().await?;
8303
8304 let mut is_first_batch = true;
8305 let mut total_rows = 0;
8306 let mut chunk_count = 0;
8307
8308 while let Some(batch_result) = stream.next().await {
8309 let batch = batch_result.map_err(|e| ElusionError::DataFusion(e))?;
8310 chunk_count += 1;
8311 total_rows += batch.num_rows();
8312
8313 let write_header = is_first_batch && (mode == "overwrite" || !std::fs::metadata(output_path).is_ok());
8315
8316 let mut csv_writer = WriterBuilder::new()
8317 .with_header(write_header)
8318 .build(&mut writer);
8319
8320 csv_writer.write(&batch).map_err(|e| ElusionError::WriteError {
8321 path: output_path.to_string(),
8322 operation: "write_batch".to_string(),
8323 reason: e.to_string(),
8324 suggestion: "💡 Check disk space and permissions".to_string()
8325 })?;
8326
8327 if chunk_count % 100 == 0 {
8328 println!("📦 Written {} chunks ({} rows) to CSV", chunk_count, total_rows);
8329 }
8330
8331 is_first_batch = false;
8332 }
8333
8334 writer.flush().map_err(|e| ElusionError::WriteError {
8335 path: output_path.to_string(),
8336 operation: "flush".to_string(),
8337 reason: e.to_string(),
8338 suggestion: "💡 Failed to flush data".to_string()
8339 })?;
8340
8341 println!("✅ Streaming CSV write complete: {} rows in {} chunks", total_rows, chunk_count);
8342 Ok(())
8343 }
8344
8345 async fn stream_to_json(&self, _alias: &str, output_path: &str, mode: &str) -> ElusionResult<()> {
8347
8348 if mode == "overwrite" && std::fs::metadata(output_path).is_ok() {
8349 std::fs::remove_file(output_path).map_err(|e| ElusionError::WriteError {
8350 path: output_path.to_string(),
8351 operation: "overwrite".to_string(),
8352 reason: e.to_string(),
8353 suggestion: "💡 Check file permissions".to_string()
8354 })?;
8355 }
8356
8357 let file = std::fs::OpenOptions::new()
8358 .create(true)
8359 .write(true)
8360 .truncate(mode == "overwrite")
8361 .open(output_path)
8362 .map_err(|e| ElusionError::WriteError {
8363 path: output_path.to_string(),
8364 operation: "file_create".to_string(),
8365 reason: e.to_string(),
8366 suggestion: "💡 Check path and permissions".to_string()
8367 })?;
8368
8369 let mut writer = std::io::BufWriter::new(file);
8370 let mut stream = self.stream().await?;
8371 let mut is_first_row = true;
8372 let mut total_rows = 0;
8373
8374 writeln!(writer, "[").map_err(|e| ElusionError::WriteError {
8376 path: output_path.to_string(),
8377 operation: "write_start".to_string(),
8378 reason: e.to_string(),
8379 suggestion: "💡 Check disk space".to_string()
8380 })?;
8381
8382 while let Some(batch_result) = stream.next().await {
8383 let batch = batch_result.map_err(|e| ElusionError::DataFusion(e))?;
8384 total_rows += batch.num_rows();
8385
8386 for row_idx in 0..batch.num_rows() {
8388 if !is_first_row {
8389 writeln!(writer, ",").map_err(|e| ElusionError::WriteError {
8390 path: output_path.to_string(),
8391 operation: "write_separator".to_string(),
8392 reason: e.to_string(),
8393 suggestion: "💡 Check disk space".to_string()
8394 })?;
8395 }
8396 is_first_row = false;
8397
8398 let mut row_obj = serde_json::Map::new();
8400 for col_idx in 0..batch.num_columns() {
8401 let schema = batch.schema();
8402 let field = schema.field(col_idx);
8403 let array = batch.column(col_idx);
8404 let json_value = array_value_to_json(array, row_idx)?;
8405 row_obj.insert(field.name().clone(), json_value);
8406 }
8407
8408 let json_value = serde_json::Value::Object(row_obj);
8409 serde_json::to_writer(&mut writer, &json_value).map_err(|e| ElusionError::WriteError {
8410 path: output_path.to_string(),
8411 operation: "write_json".to_string(),
8412 reason: e.to_string(),
8413 suggestion: "💡 Check JSON serialization".to_string()
8414 })?;
8415 }
8416 }
8417
8418 writeln!(writer, "\n]").map_err(|e| ElusionError::WriteError {
8420 path: output_path.to_string(),
8421 operation: "write_end".to_string(),
8422 reason: e.to_string(),
8423 suggestion: "💡 Check disk space".to_string()
8424 })?;
8425
8426 writer.flush().map_err(|e| ElusionError::WriteError {
8427 path: output_path.to_string(),
8428 operation: "flush".to_string(),
8429 reason: e.to_string(),
8430 suggestion: "💡 Failed to flush data".to_string()
8431 })?;
8432
8433 println!("✅ Streaming JSON write complete: {} rows", total_rows);
8434 Ok(())
8435 }
8436
8437 async fn stream_to_parquet_batched(&self, _alias: &str, output_path: &str, mode: &str) -> ElusionResult<()> {
8439 println!("⚠️ Parquet streaming: collecting batches of 50k rows for efficiency");
8440
8441 let mut stream = self.stream().await?;
8442 let mut accumulated_batches = Vec::new();
8443 let mut total_rows = 0;
8444 let batch_limit = 50000; while let Some(batch_result) = stream.next().await {
8447 let batch = batch_result.map_err(|e| ElusionError::DataFusion(e))?;
8448 total_rows += batch.num_rows();
8449 accumulated_batches.push(batch);
8450
8451 if total_rows >= batch_limit {
8453 self.write_parquet_batches(&accumulated_batches, output_path, mode).await?;
8454 accumulated_batches.clear();
8455 total_rows = 0;
8456 println!("📦 Written batch to Parquet ({}k+ rows)", batch_limit / 1000);
8457 }
8458 }
8459
8460 if !accumulated_batches.is_empty() {
8462 self.write_parquet_batches(&accumulated_batches, output_path, mode).await?;
8463 }
8464
8465 println!("✅ Streaming Parquet write complete");
8466 Ok(())
8467 }
8468
8469 async fn write_parquet_batches(&self, batches: &[RecordBatch], output_path: &str, mode: &str) -> ElusionResult<()> {
8471
8472 let ctx = SessionContext::new();
8473 let schema = batches[0].schema();
8474 let mem_table = MemTable::try_new(schema.into(), vec![batches.to_vec()])
8475 .map_err(|e| ElusionError::Custom(format!("Failed to create temp table: {}", e)))?;
8476
8477 ctx.register_table("temp_batches", Arc::new(mem_table))
8478 .map_err(|e| ElusionError::DataFusion(e))?;
8479
8480 let temp_df = ctx.table("temp_batches").await.map_err(|e| ElusionError::DataFusion(e))?;
8481
8482 let temp_custom_df = CustomDataFrame {
8483 df: temp_df,
8484 table_alias: "temp".to_string(),
8485 from_table: "temp".to_string(),
8486 selected_columns: Vec::new(),
8487 alias_map: Vec::new(),
8488 aggregations: Vec::new(),
8489 group_by_columns: Vec::new(),
8490 where_conditions: Vec::new(),
8491 having_conditions: Vec::new(),
8492 order_by_columns: Vec::new(),
8493 limit_count: None,
8494 joins: Vec::new(),
8495 window_functions: Vec::new(),
8496 ctes: Vec::new(),
8497 subquery_source: None,
8498 set_operations: Vec::new(),
8499 query: String::new(),
8500 aggregated_df: None,
8501 union_tables: None,
8502 original_expressions: Vec::new(),
8503 needs_normalization: false,
8504 raw_selected_columns: Vec::new(),
8505 raw_group_by_columns: Vec::new(),
8506 raw_where_conditions: Vec::new(),
8507 raw_having_conditions: Vec::new(),
8508 raw_join_conditions: Vec::new(),
8509 raw_aggregations: Vec::new(),
8510 uses_group_by_all: false
8511 };
8512
8513 temp_custom_df.write_to_parquet(mode, output_path, None).await
8514 }
8515
8516}