1pub mod prelude;
2
3use regex::Regex;
5use datafusion::prelude::*;
6use datafusion::error::DataFusionError;
7use futures::future::BoxFuture;
8use datafusion::datasource::MemTable;
9use std::sync::Arc;
10use arrow::datatypes::{Field, DataType as ArrowDataType, Schema, SchemaRef};
11use chrono::NaiveDate;
12use arrow::array::{StringBuilder, ArrayRef, ArrayBuilder, Float64Builder, Int64Builder, UInt64Builder};
13
14
15use arrow::record_batch::RecordBatch;
16use ArrowDataType::*;
17use arrow::csv::writer::WriterBuilder;
18
19use std::fs::{self, File, OpenOptions};
21use std::io::{Write, BufWriter};
22
23use datafusion::prelude::SessionContext;
25use datafusion::dataframe::{DataFrame,DataFrameWriteOptions};
26
27use serde_json::{json, Value};
29use serde::{Deserialize, Serialize};
30use std::collections::{HashMap, HashSet};
31use arrow::error::Result as ArrowResult;
32
33use datafusion::arrow::datatypes::TimeUnit;
34
35use std::result::Result;
37use std::path::{Path as LocalPath, PathBuf};
38use deltalake::operations::DeltaOps;
39use deltalake::writer::{RecordBatchWriter, WriteMode, DeltaWriter};
40use deltalake::{open_table, DeltaTableBuilder, DeltaTableError, ObjectStore, Path as DeltaPath};
41use deltalake::protocol::SaveMode;
42use deltalake::kernel::{DataType as DeltaType, Metadata, Protocol, StructType};
43use deltalake::kernel::StructField;
44use futures::StreamExt;
45use deltalake::storage::object_store::local::LocalFileSystem;
46use std::fmt::{self, Debug};
50use std::error::Error;
51
52use arrow::compute;
54use arrow::array::StringArray;
55
56use plotly::{Plot, Scatter, Bar, Histogram, BoxPlot, Pie};
58use plotly::common::{Mode, Line, Marker, Orientation};
59use plotly::layout::{Axis, Layout};
60use plotly::color::Rgb;
61use plotly::layout::update_menu::{Button,UpdateMenu,UpdateMenuDirection};
62use plotly::layout::{DragMode, RangeSlider};
63use arrow::array::{Array, Float64Array,Int64Array,Int32Array,TimestampNanosecondArray, Date64Array,Date32Array};
64use std::cmp::Ordering;
65
66use datafusion::common::ScalarValue;
68
69#[cfg(feature = "odbc")]
71use arrow_odbc::odbc_api::{Environment, ConnectionOptions};
72#[cfg(feature = "odbc")]
73use arrow_odbc::OdbcReaderBuilder;
74#[cfg(feature = "odbc")]
75use lazy_static::lazy_static;
76
77use azure_storage_blobs::prelude::*;
79use azure_storage::StorageCredentials;
80use azure_storage::CloudLocation;
81use futures::stream;
82use std::io::BufReader;
83use futures::pin_mut;
84use csv::ReaderBuilder;
85use csv::Trim::All;
86use serde_json::Deserializer;
87use azure_storage_blobs::blob::{BlockList, BlobBlockType};
89use bytes::Bytes;
90use datafusion::parquet::basic::Compression;
91use datafusion::parquet::file::properties::{WriterProperties, WriterVersion};
92use datafusion::parquet::arrow::ArrowWriter;
93use base64::engine::general_purpose::STANDARD;
94use base64::Engine;
95use futures::TryStreamExt;
96use tempfile::Builder;
97use std::future::Future;
99use tokio_cron_scheduler::{JobScheduler, Job};
100
101use reqwest::Client;
103
104use std::hash::{Hash, Hasher};
106use std::collections::hash_map::DefaultHasher;
107use chrono::{DateTime, Utc};
108use std::sync::Mutex;
109use lazy_static::lazy_static;
110
111pub struct MaterializedView {
113 pub(crate) name: String,
115 definition: String,
117 data: Vec<RecordBatch>,
119 refresh_time: DateTime<Utc>,
121 ttl: Option<u64>,
123}
124
125impl MaterializedView {
126 fn is_valid(&self) -> bool {
127 if let Some(ttl) = self.ttl {
128 let now = Utc::now();
129 let age = now.signed_duration_since(self.refresh_time).num_seconds();
130 return age < ttl as i64;
131 }
132 true
133 }
134
135 fn display_info(&self) -> String {
136 format!(
137 "View '{}' - Created: {}, TTL: {} seconds",
138 self.name,
139 self.refresh_time.format("%Y-%m-%d %H:%M:%S"),
140 self.ttl.map_or("None".to_string(), |ttl| ttl.to_string())
141 )
142 }
143}
144
145pub struct QueryCache {
147 cached_queries: HashMap<u64, (Vec<RecordBatch>, DateTime<Utc>)>,
148 max_cache_size: usize,
149 ttl_seconds: Option<u64>,
150}
151
152impl QueryCache {
153 pub fn new(max_cache_size: usize, ttl_seconds: Option<u64>) -> Self {
154 Self {
155 cached_queries: HashMap::new(),
156 max_cache_size,
157 ttl_seconds,
158 }
159 }
160
161 pub fn cache_query(&mut self, query: &str, result: Vec<RecordBatch>) {
162 if self.cached_queries.len() >= self.max_cache_size {
163 if let Some(oldest) = self.cached_queries
165 .iter()
166 .min_by_key(|(_, (_, time))| time) {
167 let key = *oldest.0;
168 self.cached_queries.remove(&key);
169 }
170 }
171
172 let mut hasher = DefaultHasher::new();
173 query.hash(&mut hasher);
174 let query_hash = hasher.finish();
175 self.cached_queries.insert(query_hash, (result, Utc::now()));
176 }
177
178 pub fn get_cached_result(&mut self, query: &str) -> Option<Vec<RecordBatch>> {
179 let mut hasher = DefaultHasher::new();
180 query.hash(&mut hasher);
181 let query_hash = hasher.finish();
182
183 if let Some((result, timestamp)) = self.cached_queries.get(&query_hash) {
184 if let Some(ttl) = self.ttl_seconds {
186 let now = Utc::now();
187 let age = now.signed_duration_since(*timestamp).num_seconds();
188 if age > ttl as i64 {
189 self.cached_queries.remove(&query_hash);
191 return None;
192 }
193 }
194 return Some(result.clone());
195 }
196 None
197 }
198
199 pub fn clear(&mut self) {
200 self.cached_queries.clear();
201 }
202
203 pub fn invalidate(&mut self, table_names: &[String]) {
204 if !table_names.is_empty() {
206 println!("Invalidating cache due to changes in tables: {:?}", table_names);
207 self.clear();
208 }
209 }
210}
211
212pub struct MaterializedViewManager {
214 views: HashMap<String, MaterializedView>,
215 max_views: usize,
216}
217
218impl MaterializedViewManager {
219 pub fn new(max_views: usize) -> Self {
220 Self {
221 views: HashMap::new(),
222 max_views,
223 }
224 }
225
226 pub async fn create_view(
227 &mut self,
228 ctx: &SessionContext,
229 name: &str,
230 query: &str,
231 ttl: Option<u64>,
232 ) -> ElusionResult<()> {
233 if self.views.len() >= self.max_views && !self.views.contains_key(name) {
235 return Err(ElusionError::Custom(
236 format!("Maximum number of materialized views ({}) reached", self.max_views)
237 ));
238 }
239
240 let df = ctx.sql(query).await.map_err(|e| ElusionError::Custom(
242 format!("Failed to execute query for materialized view: {}", e)
243 ))?;
244
245 let batches = df.collect().await.map_err(|e| ElusionError::Custom(
246 format!("Failed to collect results for materialized view: {}", e)
247 ))?;
248
249 let view = MaterializedView {
251 name: name.to_string(),
252 definition: query.to_string(),
253 data: batches,
254 refresh_time: Utc::now(),
255 ttl,
256 };
257
258 self.views.insert(name.to_string(), view);
259 Ok(())
260 }
261
262 pub async fn refresh_view(
263 &mut self,
264 ctx: &SessionContext,
265 name: &str,
266 ) -> ElusionResult<()> {
267 if let Some(view) = self.views.get(name) {
268 let query = view.definition.clone();
269 let ttl = view.ttl;
270 return self.create_view(ctx, name, &query, ttl).await;
271 }
272 Err(ElusionError::Custom(format!("View '{}' not found", name)))
273 }
274
275 pub async fn get_view_as_dataframe(
276 &self,
277 ctx: &SessionContext,
278 name: &str,
279 ) -> ElusionResult<DataFrame> {
280 if let Some(view) = self.views.get(name) {
281 if !view.is_valid() {
282 return Err(ElusionError::Custom(
283 format!("View '{}' has expired", name)
284 ));
285 }
286
287 let schema = match view.data.first() {
288 Some(batch) => batch.schema(),
289 None => return Err(ElusionError::Custom(
290 format!("View '{}' contains no data", name)
291 )),
292 };
293
294 let mem_table = MemTable::try_new(schema.clone(), vec![view.data.clone()])
295 .map_err(|e| ElusionError::Custom(
296 format!("Failed to create memory table from view: {}", e)
297 ))?;
298
299 let table_name = format!("view_{}", name);
300 ctx.register_table(&table_name, Arc::new(mem_table))
301 .map_err(|e| ElusionError::Custom(
302 format!("Failed to register table from view: {}", e)
303 ))?;
304
305 let df = ctx.table(&table_name).await
306 .map_err(|e| ElusionError::Custom(
307 format!("Failed to create DataFrame from view: {}", e)
308 ))?;
309
310 Ok(df)
311 } else {
312 Err(ElusionError::Custom(format!("View '{}' not found", name)))
313 }
314 }
315
316 pub fn drop_view(&mut self, name: &str) -> ElusionResult<()> {
317 if self.views.remove(name).is_some() {
318 println!("View '{}' droped.", name);
319 Ok(())
320 } else {
321 Err(ElusionError::Custom(format!("View '{}' not found", name)))
322 }
323 }
324
325 pub fn list_views(&self) -> Vec<(String, DateTime<Utc>, Option<u64>)> {
326 let mut result = Vec::new();
327
328 if self.views.is_empty() {
329 return result;
330 }
331
332 for (view_name, view) in &self.views {
333 println!("{}", view.display_info());
334 result.push((view_name.clone(), view.refresh_time, view.ttl));
335 }
336 result
337 }
338
339
340 pub fn get_view_metadata(&self, name: &str) -> Option<(String, DateTime<Utc>, Option<u64>)> {
341 self.views.get(name).map(|view| (
342 view.definition.clone(),
343 view.refresh_time,
344 view.ttl
345 ))
346 }
347}
348
349lazy_static! {
351 static ref QUERY_CACHE: Mutex<QueryCache> = Mutex::new(QueryCache::new(100, Some(3600))); static ref MATERIALIZED_VIEW_MANAGER: Mutex<MaterializedViewManager> = Mutex::new(MaterializedViewManager::new(50));
353}
354
355fn validate_azure_url(url: &str) -> ElusionResult<()> {
359 if !url.starts_with("https://") {
360 return Err(ElusionError::Custom("Bad url format. Expected format: https://{account}.{endpoint}.core.windows.net/{container}/{blob}".to_string()));
361 }
362
363 if !url.contains(".blob.core.windows.net/") && !url.contains(".dfs.core.windows.net/") {
364 return Err(ElusionError::Custom(
365 "URL must contain either '.blob.core.windows.net/' or '.dfs.core.windows.net/'".to_string()
366 ));
367 }
368
369 Ok(())
370}
371
372#[derive(Debug, Clone, Copy, PartialEq)]
374pub enum AzureWriteMode {
375 Overwrite,
376 Append,
377 ErrorIfExists,
378}
379
380fn process_json_content(content: &[u8]) -> ElusionResult<Vec<HashMap<String, Value>>> {
382 let reader = BufReader::new(content);
383 let stream = Deserializer::from_reader(reader).into_iter::<Value>();
384
385 let mut results = Vec::new();
386 let mut stream = stream.peekable();
387
388 match stream.peek() {
389 Some(Ok(Value::Array(_))) => {
390 for value in stream {
391 match value {
392 Ok(Value::Array(array)) => {
393 for item in array {
394 if let Value::Object(map) = item {
395 let mut base_map = map.clone();
396
397 if let Some(Value::Array(fields)) = base_map.remove("fields") {
398 for field in fields {
399 let mut row = base_map.clone();
400 if let Value::Object(field_obj) = field {
401 for (key, val) in field_obj {
402 row.insert(format!("field_{}", key), val);
403 }
404 }
405 results.push(row.into_iter().collect());
406 }
407 } else {
408 results.push(base_map.into_iter().collect());
409 }
410 }
411 }
412 }
413 Ok(_) => continue,
414 Err(e) => return Err(ElusionError::Custom(format!("JSON parsing error: {}", e))),
415 }
416 }
417 }
418 Some(Ok(Value::Object(_))) => {
419 for value in stream {
420 if let Ok(Value::Object(map)) = value {
421 let mut base_map = map.clone();
422 if let Some(Value::Array(fields)) = base_map.remove("fields") {
423 for field in fields {
424 let mut row = base_map.clone();
425 if let Value::Object(field_obj) = field {
426 for (key, val) in field_obj {
427 row.insert(format!("field_{}", key), val);
428 }
429 }
430 results.push(row.into_iter().collect());
431 }
432 } else {
433 results.push(base_map.into_iter().collect());
434 }
435 }
436 }
437 }
438 Some(Ok(Value::Null)) |
439 Some(Ok(Value::Bool(_))) |
440 Some(Ok(Value::Number(_))) |
441 Some(Ok(Value::String(_))) => {
442 return Err(ElusionError::Custom("JSON content must be an array or object".to_string()));
443 }
444 Some(Err(e)) => return Err(ElusionError::Custom(format!("JSON parsing error: {}", e))),
445 None => return Err(ElusionError::Custom("Empty JSON content".to_string())),
446 }
447
448 if results.is_empty() {
449 return Err(ElusionError::Custom("No valid JSON data found".to_string()));
450 }
451
452 Ok(results)
453}
454
455async fn process_csv_content(_name: &str, content: Vec<u8>) -> ElusionResult<Vec<HashMap<String, Value>>> {
456 let mut reader = ReaderBuilder::new()
458 .has_headers(true)
459 .flexible(true)
460 .trim(All)
461 .from_reader(content.as_slice());
462
463 let headers: Vec<String> = reader
465 .headers()
466 .map_err(|e| ElusionError::Custom(format!("Failed to read CSV headers: {}", e)))?
467 .iter()
468 .map(|h| h.trim().to_string())
469 .collect();
470
471 let estimated_rows = content.len() / (headers.len() * 20);
472 let mut results = Vec::with_capacity(estimated_rows);
473
474 for record in reader.records() {
475 match record {
476 Ok(record) => {
477 let mut map = HashMap::with_capacity(headers.len());
478 for (header, field) in headers.iter().zip(record.iter()) {
479 let value = if field.is_empty() {
480 Value::Null
481 } else if let Ok(num) = field.parse::<i64>() {
482 Value::Number(num.into())
483 } else if let Ok(num) = field.parse::<f64>() {
484 match serde_json::Number::from_f64(num) {
485 Some(n) => Value::Number(n),
486 None => Value::String(field.to_string())
487 }
488 } else if field.eq_ignore_ascii_case("true") {
489 Value::Bool(true)
490 } else if field.eq_ignore_ascii_case("false") {
491 Value::Bool(false)
492 } else {
493 Value::String(field.to_string())
494 };
495
496 map.insert(header.clone(), value);
497 }
498 results.push(map);
499 }
500 Err(e) => {
501 println!("Warning: Error reading CSV record: {}", e);
502 continue;
503 }
504 }
505 }
506
507 Ok(results)
508}
509
510#[cfg(feature = "odbc")]
512lazy_static!{
513 static ref DB_ENV: Environment = {
514 Environment::new().expect("Failed to create odbc environment")
515 };
516}
517
518#[derive(Debug, PartialEq, Clone)]
519pub enum DatabaseType {
520 MySQL,
521 PostgreSQL,
522 MongoDB,
523 SQLServer,
524 Unknown
525}
526#[cfg(feature = "odbc")]
527fn detect_database(connection_string: &str) -> DatabaseType {
529 if connection_string.contains("MySQL") {
530 DatabaseType::MySQL
531 } else if connection_string.contains("PostgreSQL") {
532 DatabaseType::PostgreSQL
533 } else if connection_string.contains("MongoDB") {
534 DatabaseType::MongoDB
535 } else if connection_string.contains("SQL Server") {
536 DatabaseType::SQLServer
537 } else {
538 DatabaseType::Unknown
539 }
540 }
541
542 #[cfg(feature = "odbc")]
543 fn extract_alias_from_sql(query: &str, db_type: DatabaseType) -> Option<String> {
544 let lower_query = query.to_lowercase();
545 if let Some(from_idx) = lower_query.find(" from ") {
546 let after_from = &query[from_idx + 6..];
547 let parts: Vec<&str> = after_from.split_whitespace().collect();
548
549 if parts.len() >= 3 && (parts[1].eq_ignore_ascii_case("as") || parts[1].eq_ignore_ascii_case("")) {
550 let alias = parts[2];
551 match db_type {
552 DatabaseType::SQLServer => {
553 Some(alias.to_string())
554 },
555 DatabaseType::MySQL => Some(alias.trim_matches('`').to_string()),
556 DatabaseType::PostgreSQL => Some(alias.trim_matches('"').to_string()),
557 DatabaseType::MongoDB => Some(alias.to_string()),
558 DatabaseType::Unknown => Some(alias.trim_matches(|c| c == '`' || c == '"').to_string()),
559 }
560 } else {
561 None
562 }
563 } else {
564 None
565 }
566}
567
568#[cfg(not(feature = "odbc"))]
570pub fn detect_database(_connection_string: &str) -> DatabaseType {
571 DatabaseType::Unknown
572}
573
574#[cfg(not(feature = "odbc"))]
575pub fn extract_alias_from_sql(_query: &str, _db_type: DatabaseType) -> Option<String> {
576 None
577}
578
579fn convert_to_f64_vec(array: &dyn Array) -> ElusionResult<Vec<f64>> {
582 match array.data_type() {
583 ArrowDataType::Float64 => {
584 let float_array = array.as_any()
585 .downcast_ref::<Float64Array>()
586 .ok_or_else(|| ElusionError::Custom("Failed to downcast to Float64Array".to_string()))?;
587 Ok(float_array.values().to_vec())
588 },
589 ArrowDataType::Int64 => {
590 let int_array = array.as_any()
591 .downcast_ref::<Int64Array>()
592 .ok_or_else(|| ElusionError::Custom("Failed to downcast to Int64Array".to_string()))?;
593 Ok(int_array.values().iter().map(|&x| x as f64).collect())
594 },
595 ArrowDataType::Date32 => {
596 let date_array = array.as_any()
597 .downcast_ref::<Date32Array>()
598 .ok_or_else(|| ElusionError::Custom("Failed to downcast to Date32Array".to_string()))?;
599 Ok(convert_date32_to_timestamps(date_array))
600 },
601 ArrowDataType::Utf8 => {
602 let string_array = array.as_any()
603 .downcast_ref::<StringArray>()
604 .ok_or_else(|| ElusionError::Custom("Failed to downcast to StringArray".to_string()))?;
605 let mut values = Vec::with_capacity(array.len());
606 for i in 0..array.len() {
607 let value = string_array.value(i).parse::<f64>().unwrap_or(0.0);
608 values.push(value);
609 }
610 Ok(values)
611 },
612 other_type => {
613 Err(ElusionError::Custom(format!("Unsupported data type for plotting: {:?}", other_type)))
614 }
615 }
616}
617
618fn convert_to_string_vec(array: &dyn Array) -> ElusionResult<Vec<String>> {
619 match array.data_type() {
620 ArrowDataType::Utf8 => {
621 let string_array = array.as_any()
622 .downcast_ref::<StringArray>()
623 .ok_or_else(|| ElusionError::Custom("Failed to downcast to StringArray".to_string()))?;
624
625 let mut values = Vec::with_capacity(array.len());
626 for i in 0..array.len() {
627 values.push(string_array.value(i).to_string());
628 }
629 Ok(values)
630 },
631 other_type => {
632 Err(ElusionError::Custom(format!("Expected string type but got: {:?}", other_type)))
633 }
634 }
635}
636
637fn convert_date32_to_timestamps(array: &Date32Array) -> Vec<f64> {
638 array.values()
639 .iter()
640 .map(|&days| {
641 let date = NaiveDate::from_num_days_from_ce_opt(days + 719163)
643 .unwrap_or(NaiveDate::from_ymd_opt(1970, 1, 1).unwrap());
644 let datetime = date.and_hms_opt(0, 0, 0).unwrap();
645 datetime.and_utc().timestamp() as f64 * 1000.0 })
647 .collect()
648}
649
650fn sort_by_date(x_values: &[f64], y_values: &[f64]) -> (Vec<f64>, Vec<f64>) {
652 let mut pairs: Vec<(f64, f64)> = x_values.iter()
653 .cloned()
654 .zip(y_values.iter().cloned())
655 .collect();
656
657 pairs.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
659
660 pairs.into_iter().unzip()
662}
663
664fn parse_date_string(date_str: &str) -> Option<chrono::NaiveDateTime> {
666 let formats = [
668 "%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y", "%Y/%m/%d", "%d %b %Y", "%d %B %Y", "%b %d %Y", "%B %d %Y", "%Y-%m-%d %H:%M:%S", "%d.%m.%Y %H:%M:%S", "%m/%d/%Y", "%Y.%m.%d", ];
688
689 for format in formats {
690 if let Ok(date) = chrono::NaiveDateTime::parse_from_str(date_str, format) {
691 return Some(date);
692 } else if let Ok(date) = chrono::NaiveDate::parse_from_str(date_str, format) {
693 return Some(date.and_hms_opt(0, 0, 0).unwrap_or_default());
694 }
695 }
696
697 None
698}
699
700#[derive(Debug)]
702pub enum ElusionError {
703 MissingColumn {
704 column: String,
705 available_columns: Vec<String>,
706 },
707 InvalidDataType {
708 column: String,
709 expected: String,
710 found: String,
711 },
712 DuplicateColumn {
713 column: String,
714 locations: Vec<String>,
715 },
716 InvalidOperation {
717 operation: String,
718 reason: String,
719 suggestion: String,
720 },
721 SchemaError {
722 message: String,
723 schema: Option<String>,
724 suggestion: String,
725 },
726 JoinError {
727 message: String,
728 left_table: String,
729 right_table: String,
730 suggestion: String,
731 },
732 GroupByError {
733 message: String,
734 invalid_columns: Vec<String>,
735 suggestion: String,
736 },
737 WriteError {
738 path: String,
739 operation: String,
740 reason: String,
741 suggestion: String,
742 },
743 PartitionError {
744 message: String,
745 partition_columns: Vec<String>,
746 suggestion: String,
747 },
748 AggregationError {
749 message: String,
750 function: String,
751 column: String,
752 suggestion: String,
753 },
754 OrderByError {
755 message: String,
756 columns: Vec<String>,
757 suggestion: String,
758 },
759 WindowFunctionError {
760 message: String,
761 function: String,
762 details: String,
763 suggestion: String,
764 },
765 LimitError {
766 message: String,
767 value: u64,
768 suggestion: String,
769 },
770 SetOperationError {
771 operation: String,
772 reason: String,
773 suggestion: String,
774 },
775 DataFusion(DataFusionError),
776 Io(std::io::Error),
777 Custom(String),
778}
779
780impl fmt::Display for ElusionError {
781 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
782 match self {
783 ElusionError::MissingColumn { column, available_columns } => {
784 let suggestion = suggest_similar_column(column, available_columns);
785 write!(
786 f,
787 "🔍 Column Not Found: '{}'\n\
788 📋 Available columns are: {}\n\
789 💡 Did you mean '{}'?\n\
790 🔧 Check for typos or use .display_schema() to see all available columns.",
791 column,
792 available_columns.join(", "),
793 suggestion
794 )
795 },
796 ElusionError::InvalidDataType { column, expected, found } => write!(
797 f,
798 "📊 Type Mismatch in column '{}'\n\
799 ❌ Found: {}\n\
800 ✅ Expected: {}\n\
801 💡 Try: .with_column(\"{}\", cast(\"{}\", {}));",
802 column, found, expected, column, column, expected
803 ),
804 ElusionError::DuplicateColumn { column, locations } => write!(
805 f,
806 "🔄 Duplicate Column: '{}'\n\
807 📍 Found in: {}\n\
808 💡 Try using table aliases or renaming columns:\n\
809 .select([\"table1.{} as table1_{}\", \"table2.{} as table2_{}\"])",
810 column,
811 locations.join(", "),
812 column, column, column, column
813 ),
814 ElusionError::InvalidOperation { operation, reason, suggestion } => write!(
815 f,
816 "⚠️ Invalid Operation: {}\n\
817 ❌ Problem: {}\n\
818 💡 Suggestion: {}",
819 operation, reason, suggestion
820 ),
821 ElusionError::SchemaError { message, schema, suggestion } => {
822 let schema_info = schema.as_ref().map_or(
823 String::new(),
824 |s| format!("\n📋 Current Schema:\n{}", s)
825 );
826 write!(
827 f,
828 "🏗️ Schema Error: {}{}\n\
829 💡 Suggestion: {}",
830 message, schema_info, suggestion
831 )
832 },
833 ElusionError::JoinError { message, left_table, right_table, suggestion } => write!(
834 f,
835 "🤝 Join Error:\n\
836 ❌ {}\n\
837 📌 Left Table: {}\n\
838 📌 Right Table: {}\n\
839 💡 Suggestion: {}",
840 message, left_table, right_table, suggestion
841 ),
842 ElusionError::GroupByError { message, invalid_columns, suggestion } => write!(
843 f,
844 "📊 Group By Error: {}\n\
845 ❌ Invalid columns: {}\n\
846 💡 Suggestion: {}",
847 message,
848 invalid_columns.join(", "),
849 suggestion
850 ),
851 ElusionError::WriteError { path, operation, reason, suggestion } => write!(
852 f,
853 "💾 Write Error during {} operation\n\
854 📍 Path: {}\n\
855 ❌ Problem: {}\n\
856 💡 Suggestion: {}",
857 operation, path, reason, suggestion
858 ),
859 ElusionError::DataFusion(err) => write!(
860 f,
861 "⚡ DataFusion Error: {}\n\
862 💡 Don't worry! Here's what you can try:\n\
863 1. Check your column names and types\n\
864 2. Verify your SQL syntax\n\
865 3. Use .display_schema() to see available columns\n\
866 4. Try breaking down complex operations into smaller steps",
867 err
868 ),
869 ElusionError::Io(err) => write!(
870 f,
871 "📁 I/O Error: {}\n\
872 💡 Quick fixes to try:\n\
873 1. Check if the file/directory exists\n\
874 2. Verify your permissions\n\
875 3. Ensure the path is correct\n\
876 4. Close any programs using the file",
877 err
878 ),
879 ElusionError::PartitionError { message, partition_columns, suggestion } => write!(
880 f,
881 "📦 Partition Error: {}\n\
882 ❌ Affected partition columns: {}\n\
883 💡 Suggestion: {}",
884 message,
885 partition_columns.join(", "),
886 suggestion
887 ),
888 ElusionError::AggregationError { message, function, column, suggestion } => write!(
889 f,
890 "📊 Aggregation Error in function '{}'\n\
891 ❌ Problem with column '{}': {}\n\
892 💡 Suggestion: {}",
893 function, column, message, suggestion
894 ),
895 ElusionError::OrderByError { message, columns, suggestion } => write!(
896 f,
897 "🔄 Order By Error: {}\n\
898 ❌ Problem with columns: {}\n\
899 💡 Suggestion: {}",
900 message,
901 columns.join(", "),
902 suggestion
903 ),
904 ElusionError::WindowFunctionError { message, function, details, suggestion } => write!(
905 f,
906 "🪟 Window Function Error in '{}'\n\
907 ❌ Problem: {}\n\
908 📝 Details: {}\n\
909 💡 Suggestion: {}",
910 function, message, details, suggestion
911 ),
912 ElusionError::LimitError { message, value, suggestion } => write!(
913 f,
914 "🔢 Limit Error: {}\n\
915 ❌ Invalid limit value: {}\n\
916 💡 Suggestion: {}",
917 message, value, suggestion
918 ),
919 ElusionError::SetOperationError { operation, reason, suggestion } => write!(
920 f,
921 "🔄 Set Operation Error in '{}'\n\
922 ❌ Problem: {}\n\
923 💡 Suggestion: {}",
924 operation, reason, suggestion
925 ),
926 ElusionError::Custom(err) => write!(f, "💫 {}", err),
927 }
928 }
929}
930
931impl From<DataFusionError> for ElusionError {
932 fn from(err: DataFusionError) -> Self {
933 match &err {
934 DataFusionError::SchemaError(schema_err, _context) => {
935 let error_msg = schema_err.to_string();
936
937 if error_msg.contains("Column") && error_msg.contains("not found") {
938 if let Some(col_name) = extract_column_name_from_error(&error_msg) {
939 return ElusionError::MissingColumn {
940 column: col_name,
941 available_columns: extract_available_columns_from_error(&error_msg),
942 };
943 }
944 }
945
946 if error_msg.contains("Cannot cast") {
947 if let Some((col, expected, found)) = extract_type_info_from_error(&error_msg) {
948 return ElusionError::InvalidDataType {
949 column: col,
950 expected,
951 found,
952 };
953 }
954 }
955
956 if error_msg.contains("Schema") {
957 return ElusionError::SchemaError {
958 message: error_msg,
959 schema: None,
960 suggestion: "💡 Check column names and data types in your schema".to_string(),
961 };
962 }
963
964 ElusionError::DataFusion(err)
965 },
966 DataFusionError::Plan(plan_err) => {
967 let error_msg = plan_err.to_string();
968
969 if error_msg.contains("Duplicate column") {
970 if let Some((col, locs)) = extract_duplicate_column_info(&error_msg) {
971 return ElusionError::DuplicateColumn {
972 column: col,
973 locations: locs,
974 };
975 }
976 }
977
978 if error_msg.contains("JOIN") {
979 return ElusionError::JoinError {
980 message: error_msg.clone(),
981 left_table: "unknown".to_string(),
982 right_table: "unknown".to_string(),
983 suggestion: "💡 Check join conditions and table names".to_string(),
984 };
985 }
986
987 ElusionError::DataFusion(err)
988 },
989 DataFusionError::Execution(exec_err) => {
990 let error_msg = exec_err.to_string();
991
992 if error_msg.contains("aggregate") || error_msg.contains("SUM") ||
993 error_msg.contains("AVG") || error_msg.contains("COUNT") {
994 if let Some((func, col)) = extract_aggregation_error(&error_msg) {
995 return ElusionError::AggregationError {
996 message: error_msg.clone(),
997 function: func,
998 column: col,
999 suggestion: "💡 Verify aggregation function syntax and column data types".to_string(),
1000 };
1001 }
1002 }
1003 if error_msg.contains("GROUP BY") {
1004 return ElusionError::GroupByError {
1005 message: error_msg.clone(),
1006 invalid_columns: Vec::new(),
1007 suggestion: "💡 Ensure all non-aggregated columns are included in GROUP BY".to_string(),
1008 };
1009 }
1010
1011 if error_msg.contains("PARTITION BY") {
1012 return ElusionError::PartitionError {
1013 message: error_msg.clone(),
1014 partition_columns: Vec::new(),
1015 suggestion: "💡 Check partition column names and data types".to_string(),
1016 };
1017 }
1018
1019 if error_msg.contains("ORDER BY") {
1020 return ElusionError::OrderByError {
1021 message: error_msg.clone(),
1022 columns: Vec::new(),
1023 suggestion: "💡 Verify column names and sort directions".to_string(),
1024 };
1025 }
1026
1027 if error_msg.contains("OVER") || error_msg.contains("window") {
1028 if let Some((func, details)) = extract_window_function_error(&error_msg) {
1029 return ElusionError::WindowFunctionError {
1030 message: error_msg.clone(),
1031 function: func,
1032 details,
1033 suggestion: "💡 Check window function syntax and parameters".to_string(),
1034 };
1035 }
1036 }
1037
1038 if error_msg.contains("LIMIT") {
1039 return ElusionError::LimitError {
1040 message: error_msg.clone(),
1041 value: 0,
1042 suggestion: "💡 Ensure limit value is a positive integer".to_string(),
1043 };
1044 }
1045
1046 if error_msg.contains("UNION") || error_msg.contains("INTERSECT") || error_msg.contains("EXCEPT") {
1047 return ElusionError::SetOperationError {
1048 operation: "Set Operation".to_string(),
1049 reason: error_msg.clone(),
1050 suggestion: "💡 Ensure both sides of the operation have compatible schemas".to_string(),
1051 };
1052 }
1053
1054 ElusionError::DataFusion(err)
1055 },
1056 DataFusionError::NotImplemented(msg) => {
1057 ElusionError::InvalidOperation {
1058 operation: "Operation not supported".to_string(),
1059 reason: msg.clone(),
1060 suggestion: "💡 Try using an alternative approach or check documentation for supported features".to_string(),
1061 }
1062 },
1063 DataFusionError::Internal(msg) => {
1064 ElusionError::Custom(format!("Internal error: {}. Please report this issue.", msg))
1065 },
1066 _ => ElusionError::DataFusion(err)
1067 }
1068 }
1069}
1070
1071fn extract_window_function_error(err: &str) -> Option<(String, String)> {
1072 let re = Regex::new(r"Window function '([^']+)' error: (.+)").ok()?;
1073 let caps = re.captures(err)?;
1074 Some((
1075 caps.get(1)?.as_str().to_string(),
1076 caps.get(2)?.as_str().to_string(),
1077 ))
1078}
1079
1080fn extract_aggregation_error(err: &str) -> Option<(String, String)> {
1081 let re = Regex::new(r"Aggregate function '([^']+)' error on column '([^']+)'").ok()?;
1082 let caps = re.captures(err)?;
1083 Some((
1084 caps.get(1)?.as_str().to_string(),
1085 caps.get(2)?.as_str().to_string(),
1086 ))
1087}
1088fn extract_column_name_from_error(err: &str) -> Option<String> {
1090 let re = Regex::new(r"Column '([^']+)'").ok()?;
1091 re.captures(err)?.get(1).map(|m| m.as_str().to_string())
1092}
1093
1094fn extract_available_columns_from_error(err: &str) -> Vec<String> {
1095 if let Some(re) = Regex::new(r"Available fields are: \[(.*?)\]").ok() {
1096 if let Some(caps) = re.captures(err) {
1097 if let Some(fields) = caps.get(1) {
1098 return fields.as_str()
1099 .split(',')
1100 .map(|s| s.trim().trim_matches('\'').to_string())
1101 .collect();
1102 }
1103 }
1104 }
1105 Vec::new()
1106}
1107
1108fn extract_type_info_from_error(err: &str) -> Option<(String, String, String)> {
1109 let re = Regex::new(r"Cannot cast column '([^']+)' from ([^ ]+) to ([^ ]+)").ok()?;
1110 let caps = re.captures(err)?;
1111 Some((
1112 caps.get(1)?.as_str().to_string(),
1113 caps.get(3)?.as_str().to_string(),
1114 caps.get(2)?.as_str().to_string(),
1115 ))
1116}
1117
1118fn extract_duplicate_column_info(err: &str) -> Option<(String, Vec<String>)> {
1119 let re = Regex::new(r"Duplicate column '([^']+)' in schema: \[(.*?)\]").ok()?;
1120 let caps = re.captures(err)?;
1121 Some((
1122 caps.get(1)?.as_str().to_string(),
1123 caps.get(2)?
1124 .as_str()
1125 .split(',')
1126 .map(|s| s.trim().to_string())
1127 .collect()
1128 ))
1129}
1130
1131fn suggest_similar_column(target: &str, available: &[String]) -> String {
1133 available
1134 .iter()
1135 .map(|col| (col, string_similarity(target, col)))
1136 .max_by(|a, b| a.1.partial_cmp(&b.1).unwrap())
1137 .map(|(col, _)| col.clone())
1138 .unwrap_or_else(|| "".to_string())
1139}
1140
1141fn string_similarity(s1: &str, s2: &str) -> f64 {
1143 let s1_lower = s1.to_lowercase();
1144 let s2_lower = s2.to_lowercase();
1145
1146 if s1_lower.starts_with(&s2_lower) || s2_lower.starts_with(&s1_lower) {
1148 return 0.9;
1149 }
1150
1151 let common_len = s1_lower.chars()
1153 .zip(s2_lower.chars())
1154 .take_while(|(c1, c2)| c1 == c2)
1155 .count() as f64;
1156
1157 if common_len > 0.0 {
1158 return common_len / s1_lower.len().max(s2_lower.len()) as f64;
1159 }
1160
1161 let max_len = s1_lower.len().max(s2_lower.len()) as f64;
1163 let common_chars = s1_lower.chars()
1164 .filter(|c| s2_lower.contains(*c))
1165 .count() as f64;
1166
1167 common_chars / max_len
1168}
1169
1170impl Error for ElusionError {}
1171
1172impl From<std::io::Error> for ElusionError {
1173 fn from(err: std::io::Error) -> Self {
1174 ElusionError::Io(err)
1175 }
1176}
1177
1178pub type ElusionResult<T> = Result<T, ElusionError>;
1179
1180#[derive(Clone, Debug)]
1181pub struct Join {
1182 dataframe: CustomDataFrame,
1183 condition: String,
1184 join_type: String,
1185}
1186
1187#[derive(Clone, Debug)]
1188pub struct CustomDataFrame {
1189 df: DataFrame,
1190 table_alias: String,
1191 from_table: String,
1192 selected_columns: Vec<String>,
1193 pub alias_map: Vec<(String, String)>,
1194 aggregations: Vec<String>,
1195 group_by_columns: Vec<String>,
1196 where_conditions: Vec<String>,
1197 having_conditions: Vec<String>,
1198 order_by_columns: Vec<(String, bool)>,
1199 limit_count: Option<u64>,
1200 joins: Vec<Join>,
1201 window_functions: Vec<String>,
1202 ctes: Vec<String>,
1203 pub subquery_source: Option<String>,
1204 set_operations: Vec<String>,
1205 pub query: String,
1206 pub aggregated_df: Option<DataFrame>,
1207 union_tables: Option<Vec<(String, DataFrame, String)>>,
1208 original_expressions: Vec<String>,
1209}
1210
1211#[derive(Deserialize, Serialize, Debug)]
1214struct GenericJson {
1215 #[serde(flatten)]
1216 fields: HashMap<String, Value>,
1217}
1218fn infer_schema_from_json(rows: &[HashMap<String, Value>]) -> SchemaRef {
1220 let mut fields_map: HashMap<String, ArrowDataType> = HashMap::new();
1221 let mut keys_set: HashSet<String> = HashSet::new();
1222
1223 for row in rows {
1224 for (k, v) in row {
1225 keys_set.insert(k.clone());
1226 let inferred_type = infer_arrow_type(v);
1227 fields_map
1229 .entry(k.clone())
1230 .and_modify(|existing_type| {
1231 *existing_type = promote_types(existing_type.clone(), inferred_type.clone());
1232 })
1233 .or_insert(inferred_type);
1234 }
1235 }
1236
1237 let fields: Vec<Field> = keys_set.into_iter().map(|k| {
1238 let data_type = fields_map.get(&k).unwrap_or(&ArrowDataType::Utf8).clone();
1239 Field::new(&k, data_type, true)
1240 }).collect();
1241
1242 Arc::new(Schema::new(fields))
1243}
1244
1245fn infer_arrow_type(value: &Value) -> ArrowDataType {
1246 match value {
1247 Value::Null => ArrowDataType::Utf8, Value::Bool(_) => ArrowDataType::Utf8, Value::Number(n) => {
1250 if n.is_i64() {
1251 ArrowDataType::Int64
1252 } else if n.is_u64() {
1253 ArrowDataType::UInt64
1254 } else if let Some(f) = n.as_f64() {
1255 if f.is_finite() {
1256 ArrowDataType::Float64
1257 } else {
1258 ArrowDataType::Utf8 }
1260 } else {
1261 ArrowDataType::Utf8 }
1263 },
1264 Value::String(_) => ArrowDataType::Utf8,
1265 Value::Array(_) => ArrowDataType::Utf8, Value::Object(_) => ArrowDataType::Utf8, }
1268}
1269fn promote_types(a: ArrowDataType, b: ArrowDataType) -> ArrowDataType {
1271 match (a, b) {
1272 (Utf8, _) | (_, Utf8) => Utf8,
1274
1275 (Int64, Int64) => Int64,
1277 (UInt64, UInt64) => UInt64,
1278 (Float64, Float64) => Float64,
1279
1280 _ => Utf8,
1282 }
1283}
1284fn build_record_batch(
1286 rows: &[HashMap<String, Value>],
1287 schema: Arc<Schema>
1288) -> ArrowResult<RecordBatch> {
1289 let mut builders: Vec<Box<dyn ArrayBuilder>> = Vec::new();
1290
1291 for field in schema.fields() {
1293 let builder: Box<dyn ArrayBuilder> = match field.data_type() {
1294 ArrowDataType::Int64 => Box::new(Int64Builder::new()),
1295 ArrowDataType::UInt64 => Box::new(UInt64Builder::new()),
1296 ArrowDataType::Float64 => Box::new(Float64Builder::new()),
1297 _ => Box::new(StringBuilder::new()), };
1299 builders.push(builder);
1300 }
1301
1302 for row in rows {
1303 for (i, field) in schema.fields().iter().enumerate() {
1304 let key = field.name();
1305 let value = row.get(key);
1306
1307 match field.data_type() {
1308 ArrowDataType::Int64 => {
1309 let builder = builders[i]
1310 .as_any_mut()
1311 .downcast_mut::<Int64Builder>()
1312 .expect("Expected Int64Builder");
1313
1314 match value {
1315 Some(Value::Number(n)) => {
1316 if let Some(i) = n.as_i64() {
1318 builder.append_value(i);
1319 } else {
1320 builder.append_null();
1321 }
1322 },
1323 _ => builder.append_null(),
1325 }
1326 },
1327 ArrowDataType::UInt64 => {
1328 let builder = builders[i]
1329 .as_any_mut()
1330 .downcast_mut::<UInt64Builder>()
1331 .expect("Expected UInt64Builder");
1332
1333 match value {
1334 Some(Value::Number(n)) => {
1335 if let Some(u) = n.as_u64() {
1337 builder.append_value(u);
1338 } else {
1339 builder.append_null();
1340 }
1341 },
1342 _ => builder.append_null(),
1343 }
1344 },
1345 ArrowDataType::Float64 => {
1346 let builder = builders[i]
1347 .as_any_mut()
1348 .downcast_mut::<Float64Builder>()
1349 .expect("Expected Float64Builder");
1350
1351 match value {
1352 Some(Value::Number(n)) => {
1353 if let Some(f) = n.as_f64() {
1355 builder.append_value(f);
1356 } else {
1357 builder.append_null();
1358 }
1359 },
1360 _ => builder.append_null(),
1361 }
1362 },
1363 _ => {
1364 let builder = builders[i]
1366 .as_any_mut()
1367 .downcast_mut::<StringBuilder>()
1368 .expect("Expected StringBuilder");
1369
1370 match value {
1371 Some(v) => {
1372 let string_val = match v {
1374 Value::Null => "null".to_string(),
1375 Value::Bool(b) => b.to_string(),
1376 Value::Number(n) => {
1377 if n.is_f64() {
1378 if let Some(f) = n.as_f64() {
1380 if f.is_nan() {
1381 "NaN".to_string()
1382 } else if f.is_infinite() {
1383 if f.is_sign_positive() {
1384 "Infinity".to_string()
1385 } else {
1386 "-Infinity".to_string()
1387 }
1388 } else {
1389 f.to_string()
1390 }
1391 } else {
1392 n.to_string()
1393 }
1394 } else {
1395 n.to_string()
1396 }
1397 },
1398 Value::String(s) => {
1399 s.chars()
1401 .map(|c| if c.is_control() {
1402 format!("\\u{:04x}", c as u32)
1403 } else {
1404 c.to_string()
1405 })
1406 .collect()
1407 },
1408 Value::Array(arr) => {
1409 serde_json::to_string(arr)
1411 .unwrap_or_else(|_| "[]".to_string())
1412 },
1413 Value::Object(obj) => {
1414 serde_json::to_string(obj)
1416 .unwrap_or_else(|_| "{}".to_string())
1417 },
1418 };
1419 builder.append_value(&string_val);
1421 },
1422 None => builder.append_null(),
1423 }
1424 },
1425 }
1426 }
1427 }
1428
1429 let arrays: Vec<ArrayRef> = builders.into_iter().map(|mut b| b.finish()).collect();
1430 RecordBatch::try_new(schema.clone(), arrays)
1431}
1432
1433#[derive(Debug, Default)]
1435pub struct ColumnStats {
1436 pub columns: Vec<ColumnStatistics>,
1437}
1438
1439#[derive(Debug)]
1440pub struct ColumnStatistics {
1441 pub name: String,
1442 pub total_count: i64,
1443 pub non_null_count: i64,
1444 pub mean: Option<f64>,
1445 pub min_value: ScalarValue,
1446 pub max_value: ScalarValue,
1447 pub std_dev: Option<f64>,
1448}
1449
1450
1451#[derive(Debug)]
1452pub struct NullAnalysis {
1453 pub counts: Vec<NullCount>,
1454}
1455
1456#[derive(Debug)]
1457pub struct NullCount {
1458 pub column_name: String,
1459 pub total_rows: i64,
1460 pub null_count: i64,
1461 pub null_percentage: f64,
1462}
1463#[derive(Debug, Clone)]
1466pub struct CsvWriteOptions {
1467 pub delimiter: u8,
1468 pub escape: u8,
1469 pub quote: u8,
1470 pub double_quote: bool,
1471 pub null_value: String,
1476}
1477
1478impl Default for CsvWriteOptions {
1479 fn default() -> Self {
1480 Self {
1481 delimiter: b',',
1482 escape: b'\\',
1483 quote: b'"',
1484 double_quote: true,
1485 null_value: "NULL".to_string(),
1490 }
1491 }
1492}
1493
1494impl CsvWriteOptions {
1495 pub fn validate(&self) -> Result<(), ElusionError> {
1496
1497 if !self.delimiter.is_ascii() {
1499 return Err(ElusionError::InvalidOperation {
1500 operation: "CSV Write".to_string(),
1501 reason: format!("Delimiter '{}' is not a valid ASCII character",
1502 self.delimiter as char),
1503 suggestion: "💡 Use an ASCII character for delimiter".to_string()
1504 });
1505 }
1506
1507 if !self.escape.is_ascii() {
1509 return Err(ElusionError::Custom(format!(
1510 "Escape character '{}' is not a valid ASCII character.",
1511 self.escape as char
1512 )));
1513 }
1514
1515 if !self.quote.is_ascii() {
1517 return Err(ElusionError::Custom(format!(
1518 "Quote character '{}' is not a valid ASCII character.",
1519 self.quote as char
1520 )));
1521 }
1522
1523 if self.null_value.trim().is_empty() {
1525 return Err(ElusionError::Custom("Null value representation cannot be empty.".to_string()));
1526 }
1527
1528 let delimiter_char = self.delimiter as char;
1530 let quote_char = self.quote as char;
1531
1532 if self.null_value.contains(delimiter_char) {
1533 return Err(ElusionError::Custom(format!(
1534 "Null value '{}' cannot contain the delimiter '{}'.",
1535 self.null_value, delimiter_char
1536 )));
1537 }
1538
1539 if self.null_value.contains(quote_char) {
1540 return Err(ElusionError::Custom(format!(
1541 "Null value '{}' cannot contain the quote character '{}'.",
1542 self.null_value, quote_char
1543 )));
1544 }
1545
1546 Ok(())
1547 }
1548}
1549async fn lowercase_column_names(df: DataFrame) -> ElusionResult<DataFrame> {
1552 let schema = df.schema();
1553
1554 let columns: Vec<String> = schema.fields()
1556 .iter()
1557 .map(|f| format!("\"{}\" as \"{}\"", f.name(), f.name().trim().replace(" ", "_").to_lowercase()))
1558 .collect();
1559
1560 let ctx = SessionContext::new();
1561
1562 let batches = df.clone().collect().await?;
1564 let mem_table = MemTable::try_new(schema.clone().into(), vec![batches])?;
1565 ctx.register_table("temp_table", Arc::new(mem_table))?;
1566
1567 let sql = format!("SELECT {} FROM temp_table", columns.join(", "));
1569 ctx.sql(&sql).await.map_err(|e| ElusionError::Custom(format!("Failed to lowercase column names: {}", e)))
1570}
1571
1572fn normalize_alias_write(alias: &str) -> String {
1574 alias.trim().to_lowercase()
1575}
1576
1577fn normalize_column_name(name: &str) -> String {
1579 let name_upper = name.to_uppercase();
1581 if name_upper.contains(" AS ") {
1582
1583 let pattern = match regex::Regex::new(r"(?i)\s+AS\s+") {
1587 Ok(re) => re,
1588 Err(e) => {
1589 eprintln!("Column parsing error in SELECT() function: {}", e);
1591 return name.to_string();
1592 }
1593 };
1594
1595 let parts: Vec<&str> = pattern.split(name).collect();
1596
1597 if parts.len() >= 2 {
1598 let column = parts[0].trim();
1599 let alias = parts[1].trim();
1600
1601 if let Some(pos) = column.find('.') {
1602 let table = &column[..pos];
1603 let col = &column[pos + 1..];
1604 format!("\"{}\".\"{}\" AS \"{}\"",
1605 table.trim().to_lowercase(),
1606 col.trim().to_lowercase(),
1607 alias.to_lowercase())
1608 } else {
1609 format!("\"{}\" AS \"{}\"",
1610 column.trim().to_lowercase(),
1611 alias.to_lowercase())
1612 }
1613 } else {
1614
1615 if let Some(pos) = name.find('.') {
1616 let table = &name[..pos];
1617 let column = &name[pos + 1..];
1618 format!("\"{}\".\"{}\"",
1619 table.trim().to_lowercase(),
1620 column.trim().replace(" ", "_").to_lowercase())
1621 } else {
1622 format!("\"{}\"",
1623 name.trim().replace(" ", "_").to_lowercase())
1624 }
1625 }
1626 } else {
1627
1628 if let Some(pos) = name.find('.') {
1629 let table = &name[..pos];
1630 let column = &name[pos + 1..];
1631 format!("\"{}\".\"{}\"",
1632 table.trim().to_lowercase(),
1633 column.trim().replace(" ", "_").to_lowercase())
1634 } else {
1635 format!("\"{}\"",
1636 name.trim().replace(" ", "_").to_lowercase())
1637 }
1638 }
1639}
1640fn normalize_alias(alias: &str) -> String {
1642 format!("\"{}\"", alias.trim().to_lowercase())
1644}
1645
1646fn normalize_condition(condition: &str) -> String {
1648 let re = Regex::new(r"\b([A-Za-z_][A-Za-z0-9_]*)\.([A-Za-z_][A-Za-z0-9_]*)\b").unwrap();
1650
1651 re.replace_all(condition.trim(), "\"$1\".\"$2\"").to_string().to_lowercase()
1652}
1653
1654fn normalize_condition_filter(condition: &str) -> String {
1655 let re = Regex::new(r"\b([A-Za-z_][A-Za-z0-9_]*)\.([A-Za-z_][A-Za-z0-9_]*)\b").unwrap();
1657
1658 re.replace_all(condition.trim(), "\"$1\".\"$2\"").to_string()
1659}
1660
1661fn normalize_expression(expr: &str, table_alias: &str) -> String {
1666 let parts: Vec<&str> = expr.splitn(2, " AS ").collect();
1667
1668 if parts.len() == 2 {
1669 let expr_part = parts[0].trim();
1670 let alias_part = parts[1].trim();
1671
1672 let normalized_expr = if is_aggregate_expression(expr_part) {
1673 normalize_aggregate_expression(expr_part, table_alias)
1674 } else if is_datetime_expression(expr_part) {
1675 normalize_datetime_expression(expr_part)
1676 } else {
1677 normalize_simple_expression(expr_part, table_alias)
1678 };
1679
1680 format!("{} AS \"{}\"",
1681 normalized_expr.to_lowercase(),
1682 alias_part.replace(" ", "_").to_lowercase())
1683 } else {
1684 if is_aggregate_expression(expr) {
1685 normalize_aggregate_expression(expr, table_alias).to_lowercase()
1686 } else if is_datetime_expression(expr) {
1687 normalize_datetime_expression(expr).to_lowercase()
1688 } else {
1689 normalize_simple_expression(expr, table_alias).to_lowercase()
1690 }
1691 }
1692}
1693
1694fn normalize_aggregate_expression(expr: &str, table_alias: &str) -> String {
1695 let re = Regex::new(r"^([A-Za-z_][A-Za-z0-9_]*)\s*\((.*)\)$").unwrap();
1696 if let Some(caps) = re.captures(expr.trim()) {
1697 let func_name = &caps[1];
1698 let args = &caps[2];
1699 let normalized_args = args.split(',')
1700 .map(|arg| normalize_simple_expression(arg.trim(), table_alias))
1701 .collect::<Vec<_>>()
1702 .join(", ");
1703 format!("{}({})", func_name.to_lowercase(), normalized_args.to_lowercase())
1704 } else {
1705 expr.to_lowercase()
1706 }
1707}
1708
1709fn normalize_simple_expression(expr: &str, table_alias: &str) -> String {
1710 let col_re = Regex::new(r"(?P<alias>[A-Za-z_][A-Za-z0-9_]*)\.(?P<column>[A-Za-z_][A-Za-z0-9_]*)").unwrap();
1711 let func_re = Regex::new(r"^([A-Za-z_][A-Za-z0-9_]*)\s*\((.*)\)$").unwrap();
1712 let operator_re = Regex::new(r"([\+\-\*\/])").unwrap();
1713
1714 if let Some(caps) = func_re.captures(expr) {
1715 let func_name = &caps[1];
1716 let args = &caps[2];
1717
1718 let normalized_args = args.split(',')
1719 .map(|arg| normalize_simple_expression(arg.trim(), table_alias))
1720 .collect::<Vec<_>>()
1721 .join(", ");
1722
1723 format!("{}({})", func_name.to_lowercase(), normalized_args.to_lowercase())
1724 } else if operator_re.is_match(expr) {
1725 let mut result = String::new();
1726 let mut parts = operator_re.split(expr).peekable();
1727
1728 while let Some(part) = parts.next() {
1729 let trimmed = part.trim();
1730 if col_re.is_match(trimmed) {
1731 result.push_str(&trimmed.to_lowercase());
1732 } else if is_simple_column(trimmed) {
1733 result.push_str(&format!("\"{}\".\"{}\"",
1734 table_alias.to_lowercase(),
1735 trimmed.to_lowercase()));
1736 } else {
1737 result.push_str(&trimmed.to_lowercase());
1738 }
1739
1740 if parts.peek().is_some() {
1741 if let Some(op) = expr.chars().skip_while(|c| !"+-*/%".contains(*c)).next() {
1742 result.push_str(&format!(" {} ", op));
1743 }
1744 }
1745 }
1746 result
1747 } else if col_re.is_match(expr) {
1748 col_re.replace_all(expr, "\"$1\".\"$2\"")
1749 .to_string()
1750 .to_lowercase()
1751 } else if is_simple_column(expr) {
1752 format!("\"{}\".\"{}\"",
1753 table_alias.to_lowercase(),
1754 expr.trim().replace(" ", "_").to_lowercase())
1755 } else {
1756 expr.to_lowercase()
1757 }
1758}
1759
1760fn is_expression(s: &str) -> bool {
1762 let operators = ['+', '-', '*', '/', '%' ,'(', ')', ',', '.'];
1764 let has_operator = s.chars().any(|c| operators.contains(&c));
1765 let has_function = Regex::new(r"\b[A-Za-z_][A-Za-z0-9_]*\s*\(").unwrap().is_match(s);
1766 has_operator || has_function
1767}
1768
1769fn is_simple_column(s: &str) -> bool {
1771 let re = Regex::new(r"^[A-Za-z_][A-Za-z0-9_]*$").unwrap();
1772 re.is_match(s)
1773}
1774
1775fn is_aggregate_expression(expr: &str) -> bool {
1777 let aggregate_functions = [
1778 "SUM", "AVG", "MAX", "MIN", "MEAN", "MEDIAN","COUNT", "LAST_VALUE", "FIRST_VALUE",
1780 "GROUPING", "STRING_AGG", "ARRAY_AGG","VAR", "VAR_POP", "VAR_POPULATION", "VAR_SAMP", "VAR_SAMPLE",
1781 "BIT_AND", "BIT_OR", "BIT_XOR", "BOOL_AND", "BOOL_OR",
1782 "ABS", "FLOOR", "CEIL", "SQRT", "ISNAN", "ISZERO", "PI", "POW", "POWER", "RADIANS", "RANDOM", "ROUND",
1784 "FACTORIAL", "ACOS", "ACOSH", "ASIN", "ASINH", "COS", "COSH", "COT", "DEGREES", "EXP","SIN", "SINH", "TAN", "TANH", "TRUNC", "CBRT", "ATAN", "ATAN2", "ATANH", "GCD", "LCM", "LN", "LOG", "LOG10", "LOG2", "NANVL", "SIGNUM"
1785 ];
1786
1787 aggregate_functions.iter().any(|&func| expr.to_uppercase().starts_with(func))
1788
1789}
1790
1791fn is_datetime_expression(expr: &str) -> bool {
1792 let datetime_functions = [
1794 "CURRENT_DATE", "CURRENT_TIME", "CURRENT_TIMESTAMP", "DATE_BIN", "DATE_FORMAT",
1795 "DATE_PART", "DATE_TRUNC", "DATEPART", "DATETRUNC", "FROM_UNIXTIME", "MAKE_DATE",
1796 "NOW", "TO_CHAR", "TO_DATE", "TO_LOCAL_TIME", "TO_TIMESTAMP", "TO_TIMESTAMP_MICROS",
1797 "TO_TIMESTAMP_MILLIS", "TO_TIMESTAMP_NANOS", "TO_TIMESTAMP_SECONDS", "TO_UNIXTIME", "TODAY"
1798 ];
1799
1800 datetime_functions.iter().any(|&func| expr.to_uppercase().starts_with(func))
1801}
1802
1803fn normalize_datetime_expression(expr: &str) -> String {
1805 let re = Regex::new(r"\b([A-Za-z_][A-Za-z0-9_]*)\.(?P<column>[A-Za-z_][A-Za-z0-9_]*)\b").unwrap();
1806
1807 let expr_with_columns = re.replace_all(expr, |caps: ®ex::Captures| {
1808 format!("\"{}\"", caps["column"].to_lowercase())
1809 }).to_string();
1810
1811 expr_with_columns.to_lowercase()
1812}
1813
1814fn normalize_window_function(expression: &str) -> String {
1816 let parts: Vec<&str> = expression.splitn(2, " OVER ").collect();
1817 if parts.len() != 2 {
1818 return expression.to_lowercase();
1819 }
1820
1821 let function_part = parts[0].trim();
1822 let over_part = parts[1].trim();
1823
1824 let func_regex = Regex::new(r"^(\w+)\((.*)\)$").unwrap();
1825
1826 let (normalized_function, maybe_args) = if let Some(caps) = func_regex.captures(function_part) {
1827 let func_name = &caps[1];
1828 let arg_list_str = &caps[2];
1829
1830 let raw_args: Vec<&str> = arg_list_str.split(',').map(|s| s.trim()).collect();
1831
1832 let normalized_args: Vec<String> = raw_args
1833 .iter()
1834 .map(|arg| normalize_function_arg(arg))
1835 .collect();
1836
1837 (func_name.to_lowercase(), Some(normalized_args))
1838 } else {
1839 (function_part.to_lowercase(), None)
1840 };
1841
1842 let rebuilt_function = if let Some(args) = maybe_args {
1843 format!("{}({})", normalized_function, args.join(", ").to_lowercase())
1844 } else {
1845 normalized_function
1846 };
1847
1848 let re_cols = Regex::new(r"\b([A-Za-z_][A-Za-z0-9_]*)\.([A-Za-z_][A-Za-z0-9_]*)\b").unwrap();
1849 let normalized_over = re_cols.replace_all(over_part, "\"$1\".\"$2\"")
1850 .to_string()
1851 .to_lowercase();
1852
1853 format!("{} OVER {}", rebuilt_function, normalized_over)
1854}
1855
1856fn normalize_function_arg(arg: &str) -> String {
1858 let re_table_col = Regex::new(r"^([A-Za-z_][A-Za-z0-9_]*)\.([A-Za-z_][A-Za-z0-9_]*)$").unwrap();
1859
1860 if let Some(caps) = re_table_col.captures(arg) {
1861 let table = &caps[1];
1862 let col = &caps[2];
1863 format!("\"{}\".\"{}\"",
1864 table.to_lowercase(),
1865 col.to_lowercase())
1866 } else {
1867 arg.to_lowercase()
1868 }
1869}
1870async fn glean_arrow_schema(df: &DataFrame) -> ElusionResult<SchemaRef> {
1875
1876 let limited_df = df.clone().limit(0, Some(1))
1877 .map_err(|e| ElusionError::InvalidOperation {
1878 operation: "Schema Inference".to_string(),
1879 reason: format!("Failed to limit DataFrame: {}", e),
1880 suggestion: "💡 Check if the DataFrame is valid".to_string()
1881 })?;
1882
1883 let batches = limited_df.collect().await
1884 .map_err(|e| ElusionError::SchemaError {
1885 message: format!("Failed to collect sample batch: {}", e),
1886 schema: None,
1887 suggestion: "💡 Verify DataFrame contains valid data".to_string()
1888 })?;
1889
1890 if let Some(first_batch) = batches.get(0) {
1891 Ok(first_batch.schema())
1892 } else {
1893 let empty_fields: Vec<Field> = vec![];
1894 let empty_schema = Schema::new(empty_fields);
1895 Ok(Arc::new(empty_schema))
1896 }
1897}
1898
1899fn arrow_to_delta_type(arrow_type: &ArrowDataType) -> DeltaType {
1901
1902 match arrow_type {
1903 ArrowDataType::Boolean => DeltaType::BOOLEAN,
1904 ArrowDataType::Int8 => DeltaType::BYTE,
1905 ArrowDataType::Int16 => DeltaType::SHORT,
1906 ArrowDataType::Int32 => DeltaType::INTEGER,
1907 ArrowDataType::Int64 => DeltaType::LONG,
1908 ArrowDataType::Float32 => DeltaType::FLOAT,
1909 ArrowDataType::Float64 => DeltaType::DOUBLE,
1910 ArrowDataType::Utf8 => DeltaType::STRING,
1911 ArrowDataType::Date32 => DeltaType::DATE,
1912 ArrowDataType::Date64 => DeltaType::DATE,
1913 ArrowDataType::Timestamp(TimeUnit::Second, _) => DeltaType::TIMESTAMP,
1914 ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => DeltaType::TIMESTAMP,
1915 ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => DeltaType::TIMESTAMP,
1916 ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => DeltaType::TIMESTAMP,
1917 ArrowDataType::Binary => DeltaType::BINARY,
1918 _ => DeltaType::STRING, }
1920}
1921
1922#[derive(Clone)]
1924struct DeltaPathManager {
1925 base_path: PathBuf,
1926}
1927
1928impl DeltaPathManager {
1929 pub fn new<P: AsRef<LocalPath>>(path: P) -> Self {
1931 let normalized = path
1932 .as_ref()
1933 .to_string_lossy()
1934 .replace('\\', "/")
1935 .trim_end_matches('/')
1936 .to_string();
1937
1938 Self {
1939 base_path: PathBuf::from(normalized),
1940 }
1941 }
1942
1943 pub fn base_path_str(&self) -> String {
1945 self.base_path.to_string_lossy().replace('\\', "/")
1946 }
1947
1948 pub fn delta_log_path(&self) -> DeltaPath {
1950 let base = self.base_path_str();
1951 DeltaPath::from(format!("{base}/_delta_log"))
1952 }
1953
1954 pub fn table_path(&self) -> String {
1961 self.base_path_str()
1962 }
1963 pub fn drive_prefix(&self) -> String {
1965 let base_path = self.base_path_str();
1966 if let Some(colon_pos) = base_path.find(':') {
1967 base_path[..colon_pos + 2].to_string() } else {
1969 "/".to_string() }
1971 }
1972
1973 pub fn normalize_uri(&self, uri: &str) -> String {
1975 let drive_prefix = self.drive_prefix();
1976
1977 let path = uri.trim_start_matches(|c| c != '/' && c != '\\')
1979 .trim_start_matches(['/', '\\']);
1980
1981 format!("{}{}", drive_prefix, path).replace('\\', "/")
1983 }
1984
1985 pub fn is_delta_table(&self) -> bool {
1986 let delta_log = self.base_path.join("_delta_log");
1987 let delta_log_exists = delta_log.is_dir();
1988
1989 if delta_log_exists {
1990 if let Ok(entries) = fs::read_dir(&delta_log) {
1992 for entry in entries {
1993 if let Ok(entry) = entry {
1994 if let Some(ext) = entry.path().extension() {
1995 if ext == "json" {
1996 return true;
1997 }
1998 }
1999 }
2000 }
2001 }
2002 }
2003 false
2004 }
2005}
2006
2007async fn append_protocol_action(
2009 store: &Arc<dyn ObjectStore>,
2010 delta_log_path: &DeltaPath,
2011 protocol_action: Value,
2012) -> Result<(), DeltaTableError> {
2013
2014 let latest_version = get_latest_version(store, delta_log_path).await?;
2015 let next_version = latest_version + 1;
2016 let protocol_file = format!("{:020}.json", next_version);
2017
2018 let child_path = delta_log_path.child(&*protocol_file);
2019
2020 let protocol_file_path = DeltaPath::from(child_path);
2021
2022 let action_str = serde_json::to_string(&protocol_action)
2023 .map_err(|e| DeltaTableError::Generic(format!("Failed to serialize Protocol action: {e}")))?;
2024
2025 store
2026 .put(&protocol_file_path, action_str.into_bytes().into())
2027 .await
2028 .map_err(|e| DeltaTableError::Generic(format!("Failed to write Protocol action to Delta log: {e}")))?;
2029
2030 Ok(())
2031}
2032
2033async fn get_latest_version(
2035 store: &Arc<dyn ObjectStore>,
2036 delta_log_path: &DeltaPath,
2037) -> Result<i64, DeltaTableError> {
2038 let mut versions = Vec::new();
2039
2040 let mut stream = store.list(Some(delta_log_path));
2041
2042 while let Some(res) = stream.next().await {
2043 let metadata = res.map_err(|e| DeltaTableError::Generic(format!("Failed to list Delta log files: {e}")))?;
2044 let path_str = metadata.location.as_ref();
2046
2047 if let Some(file_name) = path_str.split('/').last() {
2048 println!("Detected log file: {}", file_name);
2049 if let Some(version_str) = file_name.strip_suffix(".json") {
2050 if let Ok(version) = version_str.parse::<i64>() {
2051 println!("Parsed version: {}", version);
2052 versions.push(version);
2053 }
2054 }
2055 }
2056 }
2057
2058 let latest = versions.into_iter().max().unwrap_or(-1);
2059 println!("Latest version detected: {}", latest);
2060 Ok(latest)
2061}
2062
2063async fn write_to_delta_impl(
2065 df: &DataFrame,
2066 path: &str,
2067 partition_columns: Option<Vec<String>>,
2068 overwrite: bool,
2069 write_mode: WriteMode,
2070) -> Result<(), DeltaTableError> {
2071 let path_manager = DeltaPathManager::new(path);
2072
2073 let arrow_schema_ref = glean_arrow_schema(df)
2075 .await
2076 .map_err(|e| DeltaTableError::Generic(format!("Could not glean Arrow schema: {e}")))?;
2077
2078 let delta_fields: Vec<StructField> = arrow_schema_ref
2080 .fields()
2081 .iter()
2082 .map(|field| {
2083 let nullable = field.is_nullable();
2084 let name = field.name().clone();
2085 let data_type = arrow_to_delta_type(field.data_type());
2086 StructField::new(name, data_type, nullable)
2087 })
2088 .collect();
2089
2090 let mut config: HashMap<String, Option<String>> = HashMap::new();
2092 config.insert("delta.minWriterVersion".to_string(), Some("7".to_string()));
2093 config.insert("delta.minReaderVersion".to_string(), Some("3".to_string()));
2094
2095 if overwrite {
2096 if let Err(e) = fs::remove_dir_all(&path_manager.base_path) {
2098 if e.kind() != std::io::ErrorKind::NotFound {
2099 return Err(DeltaTableError::Generic(format!(
2100 "Failed to remove existing directory at '{}': {e}",
2101 path
2102 )));
2103 }
2104 }
2105
2106 fs::create_dir_all(&path_manager.base_path)
2108 .map_err(|e| DeltaTableError::Generic(format!("Failed to create directory structure: {e}")))?;
2109
2110 let metadata = Metadata::try_new(
2112 StructType::new(delta_fields.clone()),
2113 partition_columns.clone().unwrap_or_default(),
2114 HashMap::new()
2115 )?;
2116
2117 let metadata_action = json!({
2119 "metaData": {
2120 "id": metadata.id,
2121 "name": metadata.name,
2122 "description": metadata.description,
2123 "format": {
2124 "provider": "parquet",
2125 "options": {}
2126 },
2127 "schemaString": metadata.schema_string,
2128 "partitionColumns": metadata.partition_columns,
2129 "configuration": {
2130 "delta.minReaderVersion": "3",
2131 "delta.minWriterVersion": "7"
2132 },
2133 "created_time": metadata.created_time
2134 }
2135 });
2136
2137 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
2139 let delta_log_path = path_manager.delta_log_path();
2140 let protocol = Protocol::new(3, 7);
2141
2142 let protocol_action = json!({
2143 "protocol": {
2144 "minReaderVersion": protocol.min_reader_version,
2145 "minWriterVersion": protocol.min_writer_version,
2146 "readerFeatures": [],
2147 "writerFeatures": []
2148 }
2149 });
2150 append_protocol_action(&store, &delta_log_path, protocol_action).await?;
2151 append_protocol_action(&store, &delta_log_path, metadata_action).await?;
2152
2153 let _ = DeltaOps::try_from_uri(&path_manager.table_path())
2155 .await
2156 .map_err(|e| DeltaTableError::Generic(format!("Failed to init DeltaOps: {e}")))?
2157 .create()
2158 .with_columns(delta_fields.clone())
2159 .with_partition_columns(partition_columns.clone().unwrap_or_default())
2160 .with_save_mode(SaveMode::Overwrite)
2161 .with_configuration(config.clone())
2162 .await?;
2163 } else {
2164 if !DeltaTableBuilder::from_uri(&path_manager.table_path()).build().is_ok() {
2166 fs::create_dir_all(&path_manager.base_path)
2168 .map_err(|e| DeltaTableError::Generic(format!("Failed to create directory structure: {e}")))?;
2169
2170 let metadata = Metadata::try_new(
2172 StructType::new(delta_fields.clone()),
2173 partition_columns.clone().unwrap_or_default(),
2174 HashMap::new()
2175 )?;
2176
2177 let metadata_action = json!({
2179 "metaData": {
2180 "id": metadata.id,
2181 "name": metadata.name,
2182 "description": metadata.description,
2183 "format": {
2184 "provider": "parquet",
2185 "options": {}
2186 },
2187 "schemaString": metadata.schema_string,
2188 "partitionColumns": metadata.partition_columns,
2189 "configuration": {
2190 "delta.minReaderVersion": "3",
2191 "delta.minWriterVersion": "7"
2192 },
2193 "created_time": metadata.created_time
2194 }
2195 });
2196
2197 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
2199 let delta_log_path = path_manager.delta_log_path();
2200 let protocol = Protocol::new(3, 7);
2201
2202 let protocol_action = json!({
2203 "protocol": {
2204 "minReaderVersion": protocol.min_reader_version,
2205 "minWriterVersion": protocol.min_writer_version,
2206 "readerFeatures": [],
2207 "writerFeatures": []
2208 }
2209 });
2210
2211 append_protocol_action(&store, &delta_log_path, protocol_action).await?;
2212 append_protocol_action(&store, &delta_log_path, metadata_action).await?;
2213
2214 let _ = DeltaOps::try_from_uri(&path_manager.table_path())
2216 .await
2217 .map_err(|e| DeltaTableError::Generic(format!("Failed to init DeltaOps: {e}")))?
2218 .create()
2219 .with_columns(delta_fields.clone())
2220 .with_partition_columns(partition_columns.clone().unwrap_or_default())
2221 .with_save_mode(SaveMode::Append)
2222 .with_configuration(config.clone())
2223 .await?;
2224 }
2225 }
2226
2227 let mut table = DeltaTableBuilder::from_uri(&path_manager.table_path())
2229 .build()
2230 .map_err(|e| DeltaTableError::Generic(format!("Failed to build Delta table: {e}")))?;
2231
2232 table.load()
2234 .await
2235 .map_err(|e| DeltaTableError::Generic(format!("Failed to load table: {e}")))?;
2236
2237 let batches = df
2239 .clone()
2240 .collect()
2241 .await
2242 .map_err(|e| DeltaTableError::Generic(format!("DataFusion collect error: {e}")))?;
2243
2244 let mut writer_config = HashMap::new();
2245 writer_config.insert("delta.protocol.minWriterVersion".to_string(), "7".to_string());
2246 writer_config.insert("delta.protocol.minReaderVersion".to_string(), "3".to_string());
2247
2248 let mut writer = RecordBatchWriter::try_new(
2249 &path_manager.table_path(),
2250 arrow_schema_ref,
2251 partition_columns,
2252 Some(writer_config)
2253 )?;
2254
2255 for batch in batches {
2256 writer.write_with_mode(batch, write_mode).await?;
2257 }
2258
2259 let version = writer
2260 .flush_and_commit(&mut table)
2261 .await
2262 .map_err(|e| DeltaTableError::Generic(format!("Failed to flush and commit: {e}")))?;
2263
2264 println!("Wrote data to Delta table at version: {version}");
2265 Ok(())
2266}
2267
2268
2269pub struct AliasedDataFrame {
2271 dataframe: DataFrame,
2272 alias: String,
2273}
2274
2275impl CustomDataFrame {
2276
2277 pub async fn create_view(
2279 &self,
2280 view_name: &str,
2281 ttl_seconds: Option<u64>,
2282 ) -> ElusionResult<()> {
2283 let sql = self.construct_sql();
2285
2286 let ctx = SessionContext::new();
2288
2289 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
2291
2292 for join in &self.joins {
2293 Self::register_df_as_table(&ctx, &join.dataframe.table_alias, &join.dataframe.df).await?;
2294 }
2295
2296 let mut manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
2298 manager.create_view(&ctx, view_name, &sql, ttl_seconds).await
2299 }
2300
2301 pub async fn from_view(view_name: &str) -> ElusionResult<Self> {
2303 let ctx = SessionContext::new();
2304 let manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
2305
2306 let df = manager.get_view_as_dataframe(&ctx, view_name).await?;
2307
2308 Ok(CustomDataFrame {
2309 df,
2310 table_alias: view_name.to_string(),
2311 from_table: view_name.to_string(),
2312 selected_columns: Vec::new(),
2313 alias_map: Vec::new(),
2314 aggregations: Vec::new(),
2315 group_by_columns: Vec::new(),
2316 where_conditions: Vec::new(),
2317 having_conditions: Vec::new(),
2318 order_by_columns: Vec::new(),
2319 limit_count: None,
2320 joins: Vec::new(),
2321 window_functions: Vec::new(),
2322 ctes: Vec::new(),
2323 subquery_source: None,
2324 set_operations: Vec::new(),
2325 query: String::new(),
2326 aggregated_df: None,
2327 union_tables: None,
2328 original_expressions: Vec::new(),
2329 })
2330 }
2331
2332 pub async fn refresh_view(view_name: &str) -> ElusionResult<()> {
2334 let ctx = SessionContext::new();
2335 let mut manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
2336 manager.refresh_view(&ctx, view_name).await
2337 }
2338
2339 pub async fn drop_view(view_name: &str) -> ElusionResult<()> {
2341 let mut manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
2342 manager.drop_view(view_name)
2343 }
2344
2345 pub async fn list_views() -> Vec<(String, DateTime<Utc>, Option<u64>)> {
2347 let manager = MATERIALIZED_VIEW_MANAGER.lock().unwrap();
2348 let views = manager.list_views();
2349
2350 if views.is_empty() {
2351 println!("There are no materialized views created.");
2352 }
2353
2354 views
2355 }
2356
2357 pub async fn elusion_with_cache(&self, alias: &str) -> ElusionResult<Self> {
2359 let sql = self.construct_sql();
2360
2361 let mut cache = QUERY_CACHE.lock().unwrap();
2363 if let Some(cached_result) = cache.get_cached_result(&sql) {
2364 println!("✅ Using cached result for query");
2365
2366 let ctx = SessionContext::new();
2368 let schema = cached_result[0].schema();
2369
2370 let mem_table = MemTable::try_new(schema.clone(), vec![cached_result])
2371 .map_err(|e| ElusionError::Custom(format!("Failed to create memory table from cache: {}", e)))?;
2372
2373 ctx.register_table(alias, Arc::new(mem_table))
2374 .map_err(|e| ElusionError::Custom(format!("Failed to register table from cache: {}", e)))?;
2375
2376 let df = ctx.table(alias).await
2377 .map_err(|e| ElusionError::Custom(format!("Failed to create DataFrame from cache: {}", e)))?;
2378
2379 return Ok(CustomDataFrame {
2380 df,
2381 table_alias: alias.to_string(),
2382 from_table: alias.to_string(),
2383 selected_columns: Vec::new(),
2384 alias_map: Vec::new(),
2385 aggregations: Vec::new(),
2386 group_by_columns: Vec::new(),
2387 where_conditions: Vec::new(),
2388 having_conditions: Vec::new(),
2389 order_by_columns: Vec::new(),
2390 limit_count: None,
2391 joins: Vec::new(),
2392 window_functions: Vec::new(),
2393 ctes: Vec::new(),
2394 subquery_source: None,
2395 set_operations: Vec::new(),
2396 query: sql,
2397 aggregated_df: None,
2398 union_tables: None,
2399 original_expressions: self.original_expressions.clone(),
2400 });
2401 }
2402
2403 let result = self.elusion(alias).await?;
2405
2406 let batches = result.df.clone().collect().await
2408 .map_err(|e| ElusionError::Custom(format!("Failed to collect batches: {}", e)))?;
2409
2410 cache.cache_query(&sql, batches);
2411
2412 Ok(result)
2413 }
2414
2415 pub fn invalidate_cache(table_names: &[String]) {
2417 let mut cache = QUERY_CACHE.lock().unwrap();
2418 cache.invalidate(table_names);
2419 }
2420
2421 pub fn clear_cache() {
2423 let mut cache = QUERY_CACHE.lock().unwrap();
2424 let size_before = cache.cached_queries.len();
2425 cache.clear();
2426 println!("Cache cleared: {} queries removed from cache.", size_before);
2427 }
2428
2429 pub fn configure_cache(max_size: usize, ttl_seconds: Option<u64>) {
2431 *QUERY_CACHE.lock().unwrap() = QueryCache::new(max_size, ttl_seconds);
2432 }
2433
2434 async fn register_df_as_table(
2436 ctx: &SessionContext,
2437 table_name: &str,
2438 df: &DataFrame,
2439 ) -> ElusionResult<()> {
2440 let batches = df.clone().collect().await
2441 .map_err(|e| ElusionError::InvalidOperation {
2442 operation: "Data Collection".to_string(),
2443 reason: format!("Failed to collect DataFrame: {}", e),
2444 suggestion: "💡 Check if DataFrame contains valid data".to_string()
2445 })?;
2446
2447 let schema = df.schema();
2448
2449 let mem_table = MemTable::try_new(schema.clone().into(), vec![batches])
2450 .map_err(|e| ElusionError::SchemaError {
2451 message: format!("Failed to create in-memory table: {}", e),
2452 schema: Some(schema.to_string()),
2453 suggestion: "💡 Verify schema compatibility and data types".to_string()
2454 })?;
2455
2456 ctx.register_table(table_name, Arc::new(mem_table))
2457 .map_err(|e| ElusionError::InvalidOperation {
2458 operation: "Table Registration".to_string(),
2459 reason: format!("Failed to register table '{}': {}", table_name, e),
2460 suggestion: "💡 Check if table name is unique and valid".to_string()
2461 })?;
2462
2463 Ok(())
2464 }
2465
2466 pub async fn new<'a>(
2468 file_path: &'a str,
2469 alias: &'a str,
2470 ) -> ElusionResult<Self> {
2471 let aliased_df = Self::load(file_path, alias).await?;
2472
2473 Ok(CustomDataFrame {
2474 df: aliased_df.dataframe,
2475 table_alias: aliased_df.alias,
2476 from_table: alias.to_string(),
2477 selected_columns: Vec::new(),
2478 alias_map: Vec::new(),
2479 aggregations: Vec::new(),
2480 group_by_columns: Vec::new(),
2481 where_conditions: Vec::new(),
2482 having_conditions: Vec::new(),
2483 order_by_columns: Vec::new(),
2484 limit_count: None,
2485 joins: Vec::new(),
2486 window_functions: Vec::new(),
2487 ctes: Vec::new(),
2488 subquery_source: None,
2489 set_operations: Vec::new(),
2490 query: String::new(),
2491 aggregated_df: None,
2492 union_tables: None,
2493 original_expressions: Vec::new(),
2494 })
2495 }
2496
2497 pub fn join<const N: usize>(
2501 mut self,
2502 other: CustomDataFrame,
2503 conditions: [&str; N], join_type: &str
2505 ) -> Self {
2506 let condition = conditions.iter()
2507 .map(|&cond| normalize_condition(cond)) .collect::<Vec<_>>()
2509 .join(" AND ");
2510
2511 self.joins.push(Join {
2512 dataframe: other,
2513 condition,
2514 join_type: join_type.to_string(),
2515 });
2516 self
2517 }
2518 pub fn join_many<const N: usize, const M: usize>(
2521 self,
2522 joins: [(CustomDataFrame, [&str; M], &str); N]
2523 ) -> Self {
2524 let join_inputs = joins.into_iter()
2525 .map(|(df, conds, jt)| {
2526 let condition = conds.iter()
2527 .map(|&cond| normalize_condition(cond))
2528 .collect::<Vec<_>>()
2529 .join(" AND ");
2530
2531 Join {
2532 dataframe: df,
2533 condition,
2534 join_type: jt.to_string(),
2535 }
2536 })
2537 .collect::<Vec<_>>();
2538 self.join_many_vec(join_inputs)
2539 }
2540
2541 pub fn join_many_vec(mut self, joins: Vec<Join>) -> Self {
2542 self.joins.extend(joins);
2543 self
2544 }
2545
2546 pub fn group_by<const N: usize>(self, group_columns: [&str; N]) -> Self {
2548 self.group_by_vec(group_columns.to_vec())
2549 }
2550 pub fn group_by_vec(mut self, columns: Vec<&str>) -> Self {
2551 self.group_by_columns = columns
2552 .into_iter()
2553 .map(|s| {
2554 if is_simple_column(s) {
2555 normalize_column_name(s)
2556 } else if s.contains(" AS ") {
2557 let expr_part = s.split(" AS ")
2559 .next()
2560 .unwrap_or(s);
2561 normalize_expression(expr_part, &self.table_alias)
2562 } else {
2563 normalize_expression(s, &self.table_alias)
2565 }
2566 })
2567 .collect();
2568 self
2569 }
2570
2571 pub fn group_by_all(mut self) -> Self {
2573 let mut all_group_by = Vec::new();
2574
2575 for col in &self.selected_columns {
2577 if is_simple_column(&col) {
2578 all_group_by.push(col.clone());
2580 } else if is_expression(&col) {
2581 if col.contains(" AS ") {
2583 if let Some(expr_part) = col.split(" AS ").next() {
2584 let expr = expr_part.trim().to_string();
2585 if !all_group_by.contains(&expr) {
2586 all_group_by.push(expr);
2587 }
2588 }
2589 } else {
2590 if !all_group_by.contains(col) {
2592 all_group_by.push(col.clone());
2593 }
2594 }
2595 } else {
2596 all_group_by.push(col.clone());
2598 }
2599 }
2600
2601 self.group_by_columns = all_group_by;
2602 self
2603 }
2604
2605 pub fn filter_many<const N: usize>(self, conditions: [&str; N]) -> Self {
2608 self.filter_vec(conditions.to_vec())
2609 }
2610
2611 pub fn filter_vec(mut self, conditions: Vec<&str>) -> Self {
2613 self.where_conditions.extend(conditions.into_iter().map(|c| normalize_condition_filter(c)));
2614 self
2615 }
2616
2617 pub fn filter(mut self, condition: &str) -> Self {
2619 self.where_conditions.push(normalize_condition_filter(condition));
2620 self
2621 }
2622
2623 pub fn having_many<const N: usize>(self, conditions: [&str; N]) -> Self {
2626 self.having_conditions_vec(conditions.to_vec())
2627 }
2628
2629 pub fn having_conditions_vec(mut self, conditions: Vec<&str>) -> Self {
2631 self.having_conditions.extend(conditions.into_iter().map(|c| normalize_condition(c)));
2632 self
2633 }
2634
2635 pub fn having(mut self, condition: &str) -> Self {
2637 self.having_conditions.push(normalize_condition(condition));
2638 self
2639 }
2640
2641 pub fn order_by<const N: usize>(self, columns: [&str; N], ascending: [bool; N]) -> Self {
2644 let normalized_columns: Vec<String> = columns.iter()
2645 .map(|c| normalize_column_name(c))
2646 .collect();
2647 self.order_by_vec(normalized_columns, ascending.to_vec())
2648 }
2649
2650 pub fn order_by_vec(mut self, columns: Vec<String>, ascending: Vec<bool>) -> Self {
2652 assert!(
2654 columns.len() == ascending.len(),
2655 "Columns and ascending flags must have the same length"
2656 );
2657
2658 self.order_by_columns = columns.into_iter()
2660 .zip(ascending.into_iter())
2661 .collect();
2662 self
2663 }
2664
2665 pub fn order_by_many<const N: usize>(self, orders: [( &str, bool ); N]) -> Self {
2668 let orderings = orders.into_iter()
2669 .map(|(col, asc)| (normalize_column_name(col), asc))
2670 .collect::<Vec<_>>();
2671 self.order_by_many_vec(orderings)
2672 }
2673
2674 pub fn order_by_many_vec(mut self, orders: Vec<(String, bool)>) -> Self {
2676 self.order_by_columns = orders;
2677 self
2678 }
2679
2680 pub fn limit(mut self, count: u64) -> Self {
2682 self.limit_count = Some(count);
2683 self
2684 }
2685
2686 pub fn window(mut self, window_expr: &str) -> Self {
2688 let normalized = normalize_window_function(window_expr);
2689 self.window_functions.push(normalized);
2690 self
2691 }
2692
2693 pub fn with_ctes<const N: usize>(self, ctes: [&str; N]) -> Self {
2696 self.with_ctes_vec(ctes.to_vec())
2697 }
2698
2699 pub fn with_ctes_vec(mut self, ctes: Vec<&str>) -> Self {
2701 self.ctes.extend(ctes.into_iter().map(|c| c.to_string()));
2702 self
2703 }
2704
2705 pub fn with_cte_single(mut self, cte: &str) -> Self {
2707 self.ctes.push(cte.to_string());
2708 self
2709 }
2710
2711 pub fn set_operation(mut self, set_op: &str) -> Self {
2713 self.set_operations.push(set_op.to_string());
2714 self
2715 }
2716
2717 pub fn string_functions<const N: usize>(mut self, expressions: [&str; N]) -> Self {
2719 for expr in expressions.iter() {
2720 self.selected_columns.push(normalize_expression(expr, &self.table_alias));
2722
2723 if !self.group_by_columns.is_empty() {
2726 let expr_part = expr.split(" AS ")
2727 .next()
2728 .unwrap_or(expr);
2729 self.group_by_columns.push(normalize_expression(expr_part, &self.table_alias));
2730 }
2731 }
2732 self
2733 }
2734
2735 pub fn datetime_functions<const N: usize>(mut self, expressions: [&str; N]) -> Self {
2740 for expr in expressions.iter() {
2741 self.selected_columns.push(normalize_expression(expr, &self.table_alias));
2743
2744 if !self.group_by_columns.is_empty() {
2746 let expr_part = expr.split(" AS ")
2747 .next()
2748 .unwrap_or(expr);
2749
2750 self.group_by_columns.push(normalize_expression(expr_part, &self.table_alias));
2751 }
2752 }
2753 self
2754 }
2755
2756 pub fn agg<const N: usize>(self, aggregations: [&str; N]) -> Self {
2759 self.clone().agg_vec(
2760 aggregations.iter()
2761 .filter(|&expr| is_aggregate_expression(expr))
2762 .map(|s| normalize_expression(s, &self.table_alias))
2763 .collect()
2764 )
2765 }
2766
2767 pub fn agg_vec(mut self, aggregations: Vec<String>) -> Self {
2770 let valid_aggs = aggregations.into_iter()
2771 .filter(|expr| is_aggregate_expression(expr))
2772 .collect::<Vec<_>>();
2773
2774 self.aggregations.extend(valid_aggs);
2775 self
2776 }
2777
2778 pub async fn append(self, other: CustomDataFrame) -> ElusionResult<Self> {
2780
2781 let ctx = Arc::new(SessionContext::new());
2790
2791 let mut batches_self = self.df.clone().collect().await
2792 .map_err(|e| ElusionError::InvalidOperation {
2793 operation: "Collecting batches from first dataframe".to_string(),
2794 reason: e.to_string(),
2795 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
2796 })?;
2797
2798 let batches_other = other.df.clone().collect().await
2799 .map_err(|e| ElusionError::InvalidOperation {
2800 operation: "Collecting batches from second dataframe".to_string(),
2801 reason: e.to_string(),
2802 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
2803 })?;
2804
2805 batches_self.extend(batches_other);
2806
2807 let mem_table = MemTable::try_new(self.df.schema().clone().into(), vec![batches_self])
2808 .map_err(|e| ElusionError::InvalidOperation {
2809 operation: "Creating memory table".to_string(),
2810 reason: e.to_string(),
2811 suggestion: "💡 Verify data consistency, number of columns or memory availability".to_string(),
2812 })?;
2813
2814 let alias = "append_result";
2815
2816 ctx.register_table(alias, Arc::new(mem_table))
2817 .map_err(|e| ElusionError::InvalidOperation {
2818 operation: "Registering table".to_string(),
2819 reason: e.to_string(),
2820 suggestion: "💡 Check if table name is unique in context".to_string(),
2821 })?;
2822
2823 let df = ctx.table(alias).await
2824 .map_err(|e| ElusionError::Custom(format!("Failed to create union DataFrame: {}", e)))?;
2825
2826 Ok(CustomDataFrame {
2827 df,
2828 table_alias: alias.to_string(),
2829 from_table: alias.to_string(),
2830 selected_columns: self.selected_columns.clone(),
2831 alias_map: self.alias_map.clone(),
2832 aggregations: Vec::new(),
2833 group_by_columns: Vec::new(),
2834 where_conditions: Vec::new(),
2835 having_conditions: Vec::new(),
2836 order_by_columns: Vec::new(),
2837 limit_count: None,
2838 joins: Vec::new(),
2839 window_functions: Vec::new(),
2840 ctes: Vec::new(),
2841 subquery_source: None,
2842 set_operations: Vec::new(),
2843 query: String::new(),
2844 aggregated_df: None,
2845 union_tables: None,
2846 original_expressions: self.original_expressions.clone(),
2847 })
2848 }
2849 pub async fn append_many<const N: usize>(self, others: [CustomDataFrame; N]) -> ElusionResult<Self> {
2851
2852 if N == 0 {
2853 return Err(ElusionError::SetOperationError {
2854 operation: "APPEND MANY".to_string(),
2855 reason: "No dataframes provided for append operation".to_string(),
2856 suggestion: "💡 Provide at least one dataframe to append".to_string(),
2857 });
2858 }
2859
2860 let ctx = Arc::new(SessionContext::new());
2871
2872 let mut all_batches = self.df.clone().collect().await
2873 .map_err(|e| ElusionError::InvalidOperation {
2874 operation: "Collecting base dataframe".to_string(),
2875 reason: e.to_string(),
2876 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
2877 })?;
2878
2879 for (i, other) in others.iter().enumerate() {
2880 let other_batches = other.df.clone().collect().await
2881 .map_err(|e| ElusionError::InvalidOperation {
2882 operation: format!("Collecting dataframe at index {}", i),
2883 reason: e.to_string(),
2884 suggestion: "💡 Check if the dataframe is valid and not empty".to_string(),
2885 })?;
2886 all_batches.extend(other_batches);
2887 }
2888
2889 let mem_table = MemTable::try_new(self.df.schema().clone().into(), vec![all_batches])
2890 .map_err(|e| ElusionError::InvalidOperation {
2891 operation: "Creating memory table".to_string(),
2892 reason: e.to_string(),
2893 suggestion: "💡 Verify data consistency and memory availability".to_string(),
2894 })?;
2895
2896 let alias = "union_many_result";
2897
2898 ctx.register_table(alias, Arc::new(mem_table))
2899 .map_err(|e| ElusionError::InvalidOperation {
2900 operation: "Registering result table".to_string(),
2901 reason: e.to_string(),
2902 suggestion: "💡 Check if table name is unique in context".to_string(),
2903 })?;
2904
2905 let df = ctx.table(alias).await
2906 .map_err(|e| ElusionError::SetOperationError {
2907 operation: "APPEND MANY".to_string(),
2908 reason: e.to_string(),
2909 suggestion: "💡 Verify final table creation".to_string(),
2910 })?;
2911
2912 Ok(CustomDataFrame {
2913 df,
2914 table_alias: alias.to_string(),
2915 from_table: alias.to_string(),
2916 selected_columns: self.selected_columns.clone(),
2917 alias_map: self.alias_map.clone(),
2918 aggregations: Vec::new(),
2919 group_by_columns: Vec::new(),
2920 where_conditions: Vec::new(),
2921 having_conditions: Vec::new(),
2922 order_by_columns: Vec::new(),
2923 limit_count: None,
2924 joins: Vec::new(),
2925 window_functions: Vec::new(),
2926 ctes: Vec::new(),
2927 subquery_source: None,
2928 set_operations: Vec::new(),
2929 query: String::new(),
2930 aggregated_df: None,
2931 union_tables: None,
2932 original_expressions: self.original_expressions.clone(),
2933 })
2934 }
2935 pub async fn union(self, other: CustomDataFrame) -> ElusionResult<Self> {
2937
2938 let ctx = Arc::new(SessionContext::new());
2947
2948 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await
2949 .map_err(|e| ElusionError::InvalidOperation {
2950 operation: "Registering first table".to_string(),
2951 reason: e.to_string(),
2952 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
2953 })?;
2954
2955 Self::register_df_as_table(&ctx, &other.table_alias, &other.df).await
2956 .map_err(|e| ElusionError::InvalidOperation {
2957 operation: "Registering second table".to_string(),
2958 reason: e.to_string(),
2959 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
2960 })?;
2961
2962
2963 let sql = format!(
2964 "SELECT DISTINCT * FROM {} UNION SELECT DISTINCT * FROM {}",
2965 normalize_alias(&self.table_alias),
2966 normalize_alias(&other.table_alias)
2967 );
2968
2969 let df = ctx.sql(&sql).await
2970 .map_err(|e| ElusionError::SetOperationError {
2971 operation: "UNION".to_string(),
2972 reason: e.to_string(),
2973 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
2974 })?;
2975
2976 Ok(CustomDataFrame {
2977 df,
2978 table_alias: "union_result".to_string(),
2979 from_table: "union_result".to_string(),
2980 selected_columns: self.selected_columns.clone(),
2981 alias_map: self.alias_map.clone(),
2982 aggregations: Vec::new(),
2983 group_by_columns: Vec::new(),
2984 where_conditions: Vec::new(),
2985 having_conditions: Vec::new(),
2986 order_by_columns: Vec::new(),
2987 limit_count: None,
2988 joins: Vec::new(),
2989 window_functions: Vec::new(),
2990 ctes: Vec::new(),
2991 subquery_source: None,
2992 set_operations: Vec::new(),
2993 query: String::new(),
2994 aggregated_df: None,
2995 union_tables: None,
2996 original_expressions: self.original_expressions.clone(),
2997 })
2998 }
2999 pub async fn union_many<const N: usize>(self, others: [CustomDataFrame; N]) -> ElusionResult<Self> {
3001
3002 if N == 0 {
3003 return Err(ElusionError::SetOperationError {
3004 operation: "UNION MANY".to_string(),
3005 reason: "No dataframes provided for union operation".to_string(),
3006 suggestion: "💡 Provide at least one dataframe to union with".to_string(),
3007 });
3008 }
3009
3010 let ctx = Arc::new(SessionContext::new());
3021
3022 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await
3023 .map_err(|e| ElusionError::InvalidOperation {
3024 operation: "Registering base table".to_string(),
3025 reason: e.to_string(),
3026 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3027 })?;
3028
3029 for (i, other) in others.iter().enumerate() {
3030 let alias = format!("union_source_{}", i);
3031 Self::register_df_as_table(&ctx, &alias, &other.df).await
3032 .map_err(|e| ElusionError::InvalidOperation {
3033 operation: format!("Registering table {}", i),
3034 reason: e.to_string(),
3035 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3036 })?;
3037 }
3038
3039 let mut sql = format!("SELECT DISTINCT * FROM {}", normalize_alias(&self.table_alias));
3040 for i in 0..N {
3041 sql.push_str(&format!(" UNION SELECT DISTINCT * FROM {}",
3042 normalize_alias(&format!("union_source_{}", i))));
3043 }
3044
3045 let df = ctx.sql(&sql).await
3046 .map_err(|e| ElusionError::SetOperationError {
3047 operation: "UNION MANY".to_string(),
3048 reason: e.to_string(),
3049 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
3050 })?;
3051
3052 Ok(CustomDataFrame {
3053 df,
3054 table_alias: "union_many_result".to_string(),
3055 from_table: "union_many_result".to_string(),
3056 selected_columns: self.selected_columns.clone(),
3057 alias_map: self.alias_map.clone(),
3058 aggregations: Vec::new(),
3059 group_by_columns: Vec::new(),
3060 where_conditions: Vec::new(),
3061 having_conditions: Vec::new(),
3062 order_by_columns: Vec::new(),
3063 limit_count: None,
3064 joins: Vec::new(),
3065 window_functions: Vec::new(),
3066 ctes: Vec::new(),
3067 subquery_source: None,
3068 set_operations: Vec::new(),
3069 query: String::new(),
3070 aggregated_df: None,
3071 union_tables: None,
3072 original_expressions: self.original_expressions.clone(),
3073 })
3074 }
3075
3076 pub async fn union_all(self, other: CustomDataFrame) -> ElusionResult<Self> {
3078
3079 let ctx = Arc::new(SessionContext::new());
3088
3089 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await
3090 .map_err(|e| ElusionError::InvalidOperation {
3091 operation: "Registering first table".to_string(),
3092 reason: e.to_string(),
3093 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3094 })?;
3095
3096 Self::register_df_as_table(&ctx, &other.table_alias, &other.df).await
3097 .map_err(|e| ElusionError::InvalidOperation {
3098 operation: "Registering second table".to_string(),
3099 reason: e.to_string(),
3100 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3101 })?;
3102
3103 let sql = format!(
3104 "SELECT * FROM {} UNION ALL SELECT * FROM {}",
3105 normalize_alias(&self.table_alias),
3106 normalize_alias(&other.table_alias)
3107 );
3108
3109 let df = ctx.sql(&sql).await
3110 .map_err(|e| ElusionError::SetOperationError {
3111 operation: "UNION ALL".to_string(),
3112 reason: e.to_string(),
3113 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
3114 })?;
3115
3116 Ok(CustomDataFrame {
3117 df,
3118 table_alias: "union_all_result".to_string(),
3119 from_table: "union_all_result".to_string(),
3120 selected_columns: self.selected_columns.clone(),
3121 alias_map: self.alias_map.clone(),
3122 aggregations: Vec::new(),
3123 group_by_columns: Vec::new(),
3124 where_conditions: Vec::new(),
3125 having_conditions: Vec::new(),
3126 order_by_columns: Vec::new(),
3127 limit_count: None,
3128 joins: Vec::new(),
3129 window_functions: Vec::new(),
3130 ctes: Vec::new(),
3131 subquery_source: None,
3132 set_operations: Vec::new(),
3133 query: String::new(),
3134 aggregated_df: None,
3135 union_tables: None,
3136 original_expressions: self.original_expressions.clone(),
3137 })
3138 }
3139 pub async fn union_all_many<const N: usize>(self, others: [CustomDataFrame; N]) -> ElusionResult<Self> {
3141
3142 if N == 0 {
3143 return Err(ElusionError::SetOperationError {
3144 operation: "UNION ALL MANY".to_string(),
3145 reason: "No dataframes provided for union operation".to_string(),
3146 suggestion: "💡 Provide at least one dataframe to union with".to_string(),
3147 });
3148 }
3149
3150 let ctx = Arc::new(SessionContext::new());
3161
3162 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await
3163 .map_err(|e| ElusionError::InvalidOperation {
3164 operation: "Registering base table".to_string(),
3165 reason: e.to_string(),
3166 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3167 })?;
3168
3169 for (i, other) in others.iter().enumerate() {
3170 let alias = format!("union_all_source_{}", i);
3171 Self::register_df_as_table(&ctx, &alias, &other.df).await
3172 .map_err(|e| ElusionError::InvalidOperation {
3173 operation: format!("Registering table {}", i),
3174 reason: e.to_string(),
3175 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3176 })?;
3177 }
3178
3179 let mut sql = format!("SELECT * FROM {}", normalize_alias(&self.table_alias));
3180 for i in 0..N {
3181 sql.push_str(&format!(" UNION ALL SELECT * FROM {}",
3182 normalize_alias(&format!("union_all_source_{}", i))));
3183 }
3184
3185 let df = ctx.sql(&sql).await
3186 .map_err(|e| ElusionError::SetOperationError {
3187 operation: "UNION ALL MANY".to_string(),
3188 reason: e.to_string(),
3189 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
3190 })?;
3191
3192 Ok(CustomDataFrame {
3193 df,
3194 table_alias: "union_all_many_result".to_string(),
3195 from_table: "union_all_many_result".to_string(),
3196 selected_columns: self.selected_columns.clone(),
3197 alias_map: self.alias_map.clone(),
3198 aggregations: Vec::new(),
3199 group_by_columns: Vec::new(),
3200 where_conditions: Vec::new(),
3201 having_conditions: Vec::new(),
3202 order_by_columns: Vec::new(),
3203 limit_count: None,
3204 joins: Vec::new(),
3205 window_functions: Vec::new(),
3206 ctes: Vec::new(),
3207 subquery_source: None,
3208 set_operations: Vec::new(),
3209 query: String::new(),
3210 aggregated_df: None,
3211 union_tables: None,
3212 original_expressions: self.original_expressions.clone(),
3213 })
3214 }
3215 pub async fn except(self, other: CustomDataFrame) -> ElusionResult<Self> {
3217
3218 let ctx = Arc::new(SessionContext::new());
3227
3228 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await
3229 .map_err(|e| ElusionError::InvalidOperation {
3230 operation: "Registering first table".to_string(),
3231 reason: e.to_string(),
3232 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3233 })?;
3234
3235 Self::register_df_as_table(&ctx, &other.table_alias, &other.df).await
3236 .map_err(|e| ElusionError::InvalidOperation {
3237 operation: "Registering second table".to_string(),
3238 reason: e.to_string(),
3239 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3240 })?;
3241
3242 let sql = format!(
3243 "SELECT * FROM {} EXCEPT SELECT * FROM {}",
3244 normalize_alias(&self.table_alias),
3245 normalize_alias(&other.table_alias)
3246 );
3247
3248 let df = ctx.sql(&sql).await
3249 .map_err(|e| ElusionError::SetOperationError {
3250 operation: "EXCEPT".to_string(),
3251 reason: e.to_string(),
3252 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
3253 })?;
3254
3255 Ok(CustomDataFrame {
3256 df,
3257 table_alias: "except_result".to_string(),
3258 from_table: "except_result".to_string(),
3259 selected_columns: self.selected_columns.clone(),
3260 alias_map: self.alias_map.clone(),
3261 aggregations: Vec::new(),
3262 group_by_columns: Vec::new(),
3263 where_conditions: Vec::new(),
3264 having_conditions: Vec::new(),
3265 order_by_columns: Vec::new(),
3266 limit_count: None,
3267 joins: Vec::new(),
3268 window_functions: Vec::new(),
3269 ctes: Vec::new(),
3270 subquery_source: None,
3271 set_operations: Vec::new(),
3272 query: String::new(),
3273 aggregated_df: None,
3274 union_tables: None,
3275 original_expressions: self.original_expressions.clone(),
3276 })
3277 }
3278
3279 pub async fn intersect(self, other: CustomDataFrame) -> ElusionResult<Self> {
3327
3328 let ctx = Arc::new(SessionContext::new());
3337
3338 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await
3339 .map_err(|e| ElusionError::InvalidOperation {
3340 operation: "Registering first table".to_string(),
3341 reason: e.to_string(),
3342 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3343 })?;
3344
3345 Self::register_df_as_table(&ctx, &other.table_alias, &other.df).await
3346 .map_err(|e| ElusionError::InvalidOperation {
3347 operation: "Registering second table".to_string(),
3348 reason: e.to_string(),
3349 suggestion: "💡 Check if table name is unique and data is valid".to_string(),
3350 })?;
3351
3352 let sql = format!(
3353 "SELECT * FROM {} INTERSECT SELECT * FROM {}",
3354 normalize_alias(&self.table_alias),
3355 normalize_alias(&other.table_alias)
3356 );
3357
3358 let df = ctx.sql(&sql).await
3359 .map_err(|e| ElusionError::SetOperationError {
3360 operation: "INTERSECT".to_string(),
3361 reason: e.to_string(),
3362 suggestion: "💡 Verify SQL syntax and schema compatibility".to_string(),
3363 })?;
3364
3365 Ok(CustomDataFrame {
3366 df,
3367 table_alias: "intersect_result".to_string(),
3368 from_table: "intersect_result".to_string(),
3369 selected_columns: self.selected_columns.clone(),
3370 alias_map: self.alias_map.clone(),
3371 aggregations: Vec::new(),
3372 group_by_columns: Vec::new(),
3373 where_conditions: Vec::new(),
3374 having_conditions: Vec::new(),
3375 order_by_columns: Vec::new(),
3376 limit_count: None,
3377 joins: Vec::new(),
3378 window_functions: Vec::new(),
3379 ctes: Vec::new(),
3380 subquery_source: None,
3381 set_operations: Vec::new(),
3382 query: String::new(),
3383 aggregated_df: None,
3384 union_tables: None,
3385 original_expressions: self.original_expressions.clone(),
3386 })
3387 }
3388
3389 pub async fn pivot<const N: usize>(
3439 mut self,
3440 row_keys: [&str; N],
3441 pivot_column: &str,
3442 value_column: &str,
3443 aggregate_func: &str,
3444 ) -> ElusionResult<Self> {
3445 let ctx = Arc::new(SessionContext::new());
3446
3447 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
3449
3450 let schema = self.df.schema();
3451 let exact_pivot_column = schema.fields().iter()
3456 .find(|f| f.name().to_uppercase() == pivot_column.to_uppercase())
3457 .ok_or_else(|| {
3458 let available = schema.fields().iter()
3459 .map(|f| f.name())
3460 .collect::<Vec<_>>();
3461 ElusionError::Custom(format!(
3462 "Column {} not found in current data. Available columns: {:?}",
3463 pivot_column, available
3464 ))
3465 })?
3466 .name();
3467
3468 let exact_value_column = schema.fields().iter()
3469 .find(|f| f.name().to_uppercase() == value_column.to_uppercase())
3470 .ok_or_else(|| {
3471 let available = schema.fields().iter()
3472 .map(|f| f.name())
3473 .collect::<Vec<_>>();
3474 ElusionError::Custom(format!(
3475 "Column {} not found in current data. Available columns: {:?}",
3476 value_column, available
3477 ))
3478 })?
3479 .name();
3480
3481 let distinct_query = format!(
3498 "SELECT DISTINCT \"{}\" \
3499 FROM \"{}\" AS {} \
3500 WHERE \"{}\" IS NOT NULL \
3501 AND \"{}\" IS NOT NULL \
3502 ORDER BY \"{}\"",
3503 exact_pivot_column,
3504 self.from_table,
3505 self.table_alias,
3506 exact_pivot_column,
3507 exact_value_column,
3508 exact_pivot_column
3509 );
3510
3511 let distinct_df = ctx.sql(&distinct_query).await
3512 .map_err(|e| ElusionError::Custom(format!("Failed to execute distinct query: {}", e)))?;
3513
3514 let distinct_batches = distinct_df.collect().await
3515 .map_err(|e| ElusionError::Custom(format!("Failed to collect distinct values: {}", e)))?;
3516
3517 let distinct_values: Vec<String> = distinct_batches
3519 .iter()
3520 .flat_map(|batch| {
3521 let array = batch.column(0);
3522 match array.data_type() {
3523 ArrowDataType::Utf8 => {
3524 let string_array = array.as_any().downcast_ref::<StringArray>().unwrap();
3525 (0..batch.num_rows())
3526 .map(|i| string_array.value(i).to_string())
3527 .collect::<Vec<_>>()
3528 },
3529 _ => {
3530 let string_array = compute::cast(array, &ArrowDataType::Utf8)
3532 .unwrap();
3533 let string_array = string_array.as_any().downcast_ref::<StringArray>().unwrap();
3534 (0..batch.num_rows())
3535 .map(|i| string_array.value(i).to_string())
3536 .collect::<Vec<_>>()
3537 }
3538 }
3539 })
3540 .collect();
3541
3542 let pivot_cols: Vec<String> = distinct_values
3544 .iter()
3545 .map(|val| {
3546 let value_expr = if schema.field_with_name(None, &exact_pivot_column)
3548 .map(|f| matches!(f.data_type(), ArrowDataType::Int32 | ArrowDataType::Int64 | ArrowDataType::Float32 | ArrowDataType::Float64))
3549 .unwrap_or(false) {
3550 format!(
3552 "COALESCE({}(CASE WHEN \"{}\" = '{}' THEN \"{}\" END), 0)",
3553 aggregate_func,
3554 exact_pivot_column,
3555 val,
3556 exact_value_column
3557 )
3558 } else {
3559 format!(
3561 "COALESCE({}(CASE WHEN \"{}\" = '{}' THEN \"{}\" END), 0)",
3562 aggregate_func,
3563 exact_pivot_column,
3564 val.replace("'", "''"), exact_value_column
3566 )
3567 };
3568
3569 format!(
3571 "{} AS \"{}_{}\"",
3572 value_expr,
3573 exact_pivot_column,
3574 val.replace("\"", "\"\"") )
3576 })
3577 .collect();
3578
3579 let row_keys_str = row_keys.iter()
3580 .map(|&key| {
3581 let exact_key = schema.fields().iter()
3582 .find(|f| f.name().to_uppercase() == key.to_uppercase())
3583 .map_or(key.to_string(), |f| f.name().to_string());
3584 format!("\"{}\"", exact_key)
3585 })
3586 .collect::<Vec<_>>()
3587 .join(", ");
3588
3589 let pivot_subquery = format!(
3591 "(SELECT {}, {} FROM \"{}\" AS {} GROUP BY {})",
3592 row_keys_str,
3593 pivot_cols.join(", "),
3594 self.from_table,
3595 self.table_alias,
3596 row_keys_str
3597 );
3598
3599 self.from_table = pivot_subquery;
3601 self.selected_columns.clear();
3602 self.group_by_columns.clear();
3603
3604 self.selected_columns.extend(row_keys.iter().map(|&s| s.to_string()));
3606
3607 for val in distinct_values {
3609 self.selected_columns.push(
3610 format!("{}_{}",
3611 normalize_column_name(pivot_column),
3612 normalize_column_name(&val)
3613 )
3614 );
3615 }
3616
3617
3618
3619 Ok(self)
3620 }
3621
3622 pub async fn unpivot<const N: usize, const M: usize>(
3624 mut self,
3625 id_columns: [&str; N],
3626 value_columns: [&str; M],
3627 name_column: &str,
3628 value_column: &str,
3629 ) -> ElusionResult<Self> {
3630 let ctx = Arc::new(SessionContext::new());
3631
3632 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
3634
3635 let schema = self.df.schema();
3636 let exact_id_columns: Vec<String> = id_columns.iter()
3641 .map(|&id| {
3642 schema.fields().iter()
3643 .find(|f| f.name().to_uppercase() == id.to_uppercase())
3644 .map(|f| f.name().to_string())
3645 .ok_or_else(|| {
3646 let available = schema.fields().iter()
3647 .map(|f| f.name())
3648 .collect::<Vec<_>>();
3649 ElusionError::Custom(format!(
3650 "ID column '{}' not found in current data. Available columns: {:?}",
3651 id, available
3652 ))
3653 })
3654 })
3655 .collect::<Result<Vec<_>, _>>()?;
3656
3657 let exact_value_columns: Vec<String> = value_columns.iter()
3659 .map(|&val| {
3660 schema.fields().iter()
3661 .find(|f| f.name().to_uppercase() == val.to_uppercase())
3662 .map(|f| f.name().to_string())
3663 .ok_or_else(|| {
3664 let available = schema.fields().iter()
3665 .map(|f| f.name())
3666 .collect::<Vec<_>>();
3667 ElusionError::Custom(format!(
3668 "Value column '{}' not found in current data. Available columns: {:?}",
3669 val, available
3670 ))
3671 })
3672 })
3673 .collect::<Result<Vec<_>, _>>()?;
3674
3675 let selects: Vec<String> = exact_value_columns.iter().map(|val_col| {
3677 let id_cols_str = exact_id_columns.iter()
3678 .map(|id| format!("\"{}\"", id))
3679 .collect::<Vec<_>>()
3680 .join(", ");
3681
3682 format!(
3690 "SELECT {}, '{}' AS \"{}\", \"{}\" AS \"{}\" FROM \"{}\" AS {}",
3691 id_cols_str,
3692 val_col,
3693 name_column,
3695 val_col,
3696 value_column,
3697 self.from_table,
3698 self.table_alias
3699 )
3700 }).collect();
3701
3702 let unpivot_subquery = format!(
3704 "({})",
3705 selects.join(" UNION ALL ")
3706 );
3707
3708 self.from_table = unpivot_subquery;
3710 self.selected_columns.clear();
3711
3712 self.selected_columns.extend(
3714 exact_id_columns.iter()
3715 .map(|id| format!("\"{}\"", id))
3716 );
3717 self.selected_columns.push(format!("\"{}\"", name_column));
3718 self.selected_columns.push(format!("\"{}\"", value_column));
3719
3720 Ok(self)
3721 }
3722
3723 pub fn select<const N: usize>(self, columns: [&str; N]) -> Self {
3725 self.select_vec(columns.to_vec())
3726 }
3727
3728 pub fn select_vec(mut self, columns: Vec<&str>) -> Self {
3730 self.original_expressions = columns
3732 .iter()
3733 .filter(|&col| col.contains(" AS "))
3734 .map(|&s| s.to_string())
3735 .collect();
3736
3737 let mut all_columns = self.selected_columns.clone();
3739
3740 if !self.group_by_columns.is_empty() {
3741 for col in columns {
3742 if is_expression(col) {
3743 if is_aggregate_expression(col) {
3744 all_columns.push(normalize_expression(col, &self.table_alias));
3745 } else {
3746 self.group_by_columns.push(col.to_string());
3747 all_columns.push(normalize_expression(col, &self.table_alias));
3748 }
3749 } else {
3750 let normalized_col = normalize_column_name(col);
3751 if self.group_by_columns.contains(&normalized_col) {
3752 all_columns.push(normalized_col);
3753 } else {
3754 self.group_by_columns.push(normalized_col.clone());
3755 all_columns.push(normalized_col);
3756 }
3757 }
3758 }
3759 } else {
3760 let aggregate_aliases: Vec<String> = self
3762 .aggregations
3763 .iter()
3764 .filter_map(|agg| {
3765 agg.split(" AS ")
3766 .nth(1)
3767 .map(|alias| normalize_alias(alias))
3768 })
3769 .collect();
3770
3771 all_columns.extend(
3772 columns
3773 .into_iter()
3774 .filter(|col| !aggregate_aliases.contains(&normalize_alias(col)))
3775 .map(|s| {
3776 if is_expression(s) {
3777 normalize_expression(s, &self.table_alias)
3778 } else {
3779 normalize_column_name(s)
3780 }
3781 })
3782 );
3783 }
3784
3785 let mut seen = HashSet::new();
3787 self.selected_columns = all_columns
3788 .into_iter()
3789 .filter(|x| seen.insert(x.clone()))
3790 .collect();
3791
3792 self
3793 }
3794
3795 fn construct_sql(&self) -> String {
3797 let mut query = String::new();
3798
3799 if !self.ctes.is_empty() {
3801 query.push_str("WITH ");
3802 query.push_str(&self.ctes.join(", "));
3803 query.push_str(" ");
3804 }
3805
3806 let is_subquery = self.from_table.starts_with('(') && self.from_table.ends_with(')');
3808 let no_selected_columns = self.selected_columns.is_empty() && self.aggregations.is_empty() && self.window_functions.is_empty();
3809
3810 if is_subquery && no_selected_columns {
3811 query.push_str(&format!("{}", self.from_table));
3813 } else {
3814 query.push_str("SELECT ");
3816 let mut select_parts = Vec::new();
3817
3818 if !self.group_by_columns.is_empty() {
3819 select_parts.extend(self.aggregations.clone());
3821
3822 for col in &self.selected_columns {
3824 if !select_parts.contains(col) {
3825 select_parts.push(col.clone());
3826 }
3827 }
3828 } else {
3829 select_parts.extend(self.aggregations.clone());
3831 select_parts.extend(self.selected_columns.clone());
3832 }
3833
3834 select_parts.extend(self.window_functions.clone());
3836
3837 if select_parts.is_empty() {
3838 query.push_str("*");
3839 } else {
3840 query.push_str(&select_parts.join(", "));
3841 }
3842
3843 query.push_str(" FROM ");
3845 if is_subquery {
3846 query.push_str(&format!("{}", self.from_table));
3848 } else {
3849 query.push_str(&format!(
3851 "\"{}\" AS {}",
3852 self.from_table.trim(),
3853 self.table_alias
3854 ));
3855 }
3856
3857 for join in &self.joins {
3859 query.push_str(&format!(
3860 " {} JOIN \"{}\" AS {} ON {}",
3861 join.join_type,
3862 join.dataframe.from_table,
3863 join.dataframe.table_alias,
3864 join.condition
3865 ));
3866 }
3867
3868 if !self.where_conditions.is_empty() {
3870 query.push_str(" WHERE ");
3871 query.push_str(&self.where_conditions.join(" AND "));
3872 }
3873
3874 if !self.group_by_columns.is_empty() {
3876 query.push_str(" GROUP BY ");
3877 query.push_str(&self.group_by_columns.join(", "));
3878 }
3879
3880 if !self.having_conditions.is_empty() {
3882 query.push_str(" HAVING ");
3883 query.push_str(&self.having_conditions.join(" AND "));
3884 }
3885
3886 if !self.order_by_columns.is_empty() {
3888 query.push_str(" ORDER BY ");
3889 let orderings: Vec<String> = self.order_by_columns.iter()
3890 .map(|(col, asc)| format!("{} {}", col, if *asc { "ASC" } else { "DESC" }))
3891 .collect();
3892 query.push_str(&orderings.join(", "));
3893 }
3894
3895 if let Some(limit) = self.limit_count {
3897 query.push_str(&format!(" LIMIT {}", limit));
3898 }
3899 }
3900
3901 query
3902 }
3903
3904
3905 pub async fn elusion(&self, alias: &str) -> ElusionResult<Self> {
3907 let ctx = Arc::new(SessionContext::new());
3908
3909 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await
3911 .map_err(|e| ElusionError::SchemaError {
3912 message: format!("Failed to register base table: {}", e),
3913 schema: Some(self.df.schema().to_string()),
3914 suggestion: "💡 Check table schema compatibility".to_string()
3915 })?;
3916
3917 if self.union_tables.is_none() {
3919 for join in &self.joins {
3920 Self::register_df_as_table(&ctx, &join.dataframe.table_alias, &join.dataframe.df).await
3921 .map_err(|e| ElusionError::JoinError {
3922 message: format!("Failed to register joined table: {}", e),
3923 left_table: self.table_alias.clone(),
3924 right_table: join.dataframe.table_alias.clone(),
3925 suggestion: "💡 Verify join table schemas are compatible".to_string()
3926 })?;
3927 }
3928 }
3929
3930 if let Some(tables) = &self.union_tables {
3932 for (table_alias, df, _) in tables {
3933 if ctx.table(table_alias).await.is_ok() {
3934 continue;
3935 }
3936 Self::register_df_as_table(&ctx, table_alias, df).await
3937 .map_err(|e| ElusionError::InvalidOperation {
3938 operation: "Union Table Registration".to_string(),
3939 reason: format!("Failed to register union table '{}': {}", table_alias, e),
3940 suggestion: "💡 Check union table schema compatibility".to_string()
3941 })?;
3942 }
3943 }
3944
3945 let sql = if self.from_table.starts_with('(') && self.from_table.ends_with(')') {
3946 format!("SELECT * FROM {} AS {}", self.from_table, alias)
3947 } else {
3948 self.construct_sql()
3949 };
3950
3951 let df = ctx.sql(&sql).await
3955 .map_err(|e| ElusionError::InvalidOperation {
3956 operation: "SQL Execution".to_string(),
3957 reason: format!("Failed to execute SQL: {}", e),
3958 suggestion: "💡 Verify SQL syntax and table/column references".to_string()
3959 })?;
3960
3961 let batches = df.clone().collect().await
3963 .map_err(|e| ElusionError::InvalidOperation {
3964 operation: "Data Collection".to_string(),
3965 reason: format!("Failed to collect results: {}", e),
3966 suggestion: "💡 Check if query returns valid data".to_string()
3967 })?;
3968
3969 let result_mem_table = MemTable::try_new(df.schema().clone().into(), vec![batches])
3971 .map_err(|e| ElusionError::SchemaError {
3972 message: format!("Failed to create result table: {}", e),
3973 schema: Some(df.schema().to_string()),
3974 suggestion: "💡 Verify result schema compatibility".to_string()
3975 })?;
3976
3977 ctx.register_table(alias, Arc::new(result_mem_table))
3979 .map_err(|e| ElusionError::InvalidOperation {
3980 operation: "Result Registration".to_string(),
3981 reason: format!("Failed to register result table: {}", e),
3982 suggestion: "💡 Try using a different alias name".to_string()
3983 })?;
3984
3985 let result_df = ctx.table(alias).await
3987 .map_err(|e| ElusionError::InvalidOperation {
3988 operation: "Result Retrieval".to_string(),
3989 reason: format!("Failed to retrieve final result: {}", e),
3990 suggestion: "💡 Check if result table was properly registered".to_string()
3991 })?;
3992 Ok(CustomDataFrame {
3994 df: result_df,
3995 table_alias: alias.to_string(),
3996 from_table: alias.to_string(),
3997 selected_columns: Vec::new(),
3998 alias_map: Vec::new(),
3999 aggregations: Vec::new(),
4000 group_by_columns: Vec::new(),
4001 where_conditions: Vec::new(),
4002 having_conditions: Vec::new(),
4003 order_by_columns: Vec::new(),
4004 limit_count: None,
4005 joins: Vec::new(),
4006 window_functions: Vec::new(),
4007 ctes: Vec::new(),
4008 subquery_source: None,
4009 set_operations: Vec::new(),
4010 query: sql,
4011 aggregated_df: Some(df.clone()),
4012 union_tables: None,
4013 original_expressions: self.original_expressions.clone(),
4014 })
4015 }
4016
4017 pub async fn display(&self) -> ElusionResult<()> {
4019 self.df.clone().show().await.map_err(|e|
4020 ElusionError::Custom(format!("Failed to display DataFrame: {}", e))
4021 )
4022 }
4023 fn find_actual_column_name(&self, column: &str) -> Option<String> {
4045 self.df
4046 .schema()
4047 .fields()
4048 .iter()
4049 .find(|f| f.name().to_lowercase() == column.to_lowercase())
4050 .map(|f| f.name().to_string())
4051 }
4052 async fn compute_column_stats(&self, columns: &[&str]) -> ElusionResult<ColumnStats> {
4054 let mut stats = ColumnStats::default();
4055 let ctx = Arc::new(SessionContext::new());
4056
4057 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
4059
4060 for &column in columns {
4061 let actual_column = self.find_actual_column_name(column)
4063 .ok_or_else(|| ElusionError::Custom(
4064 format!("Column '{}' not found in schema", column)
4065 ))?;
4066
4067 let normalized_col = if actual_column.contains('.') {
4069 normalize_column_name(&actual_column)
4070 } else {
4071 normalize_column_name(&format!("{}.{}", self.table_alias, actual_column))
4072 };
4073
4074 let sql = format!(
4075 "SELECT
4076 COUNT(*) as total_count,
4077 COUNT({col}) as non_null_count,
4078 AVG({col}::float) as mean,
4079 MIN({col}) as min_value,
4080 MAX({col}) as max_value,
4081 STDDEV({col}::float) as std_dev
4082 FROM {}",
4083 normalize_alias(&self.table_alias),
4084 col = normalized_col
4085 );
4086
4087 let result_df = ctx.sql(&sql).await.map_err(|e| {
4088 ElusionError::Custom(format!(
4089 "Failed to compute statistics for column '{}': {}",
4090 column, e
4091 ))
4092 })?;
4093
4094 let batches = result_df.collect().await.map_err(ElusionError::DataFusion)?;
4095
4096 if let Some(batch) = batches.first() {
4097 let total_count = batch.column(0).as_any().downcast_ref::<Int64Array>()
4099 .ok_or_else(|| ElusionError::Custom("Failed to downcast total_count".to_string()))?
4100 .value(0);
4101
4102 let non_null_count = batch.column(1).as_any().downcast_ref::<Int64Array>()
4103 .ok_or_else(|| ElusionError::Custom("Failed to downcast non_null_count".to_string()))?
4104 .value(0);
4105
4106 let mean = batch.column(2).as_any().downcast_ref::<Float64Array>()
4107 .ok_or_else(|| ElusionError::Custom("Failed to downcast mean".to_string()))?
4108 .value(0);
4109
4110 let min_value = ScalarValue::try_from_array(batch.column(3), 0)?;
4111 let max_value = ScalarValue::try_from_array(batch.column(4), 0)?;
4112
4113 let std_dev = batch.column(5).as_any().downcast_ref::<Float64Array>()
4114 .ok_or_else(|| ElusionError::Custom("Failed to downcast std_dev".to_string()))?
4115 .value(0);
4116
4117 stats.columns.push(ColumnStatistics {
4118 name: column.to_string(),
4119 total_count,
4120 non_null_count,
4121 mean: Some(mean),
4122 min_value,
4123 max_value,
4124 std_dev: Some(std_dev),
4125 });
4126 }
4127 }
4128
4129 Ok(stats)
4130 }
4131
4132 async fn analyze_null_values(&self, columns: Option<&[&str]>) -> ElusionResult<NullAnalysis> {
4134 let ctx = Arc::new(SessionContext::new());
4135 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
4136
4137 let columns = match columns {
4138 Some(cols) => cols.to_vec(),
4139 None => {
4140 self.df
4141 .schema()
4142 .fields()
4143 .iter()
4144 .map(|f| f.name().as_str())
4145 .collect()
4146 }
4147 };
4148
4149 let mut null_counts = Vec::new();
4150 for column in columns {
4151 let actual_column = self.find_actual_column_name(column)
4153 .ok_or_else(|| ElusionError::Custom(
4154 format!("Column '{}' not found in schema", column)
4155 ))?;
4156
4157 let normalized_col = if actual_column.contains('.') {
4159 normalize_column_name(&actual_column)
4160 } else {
4161 normalize_column_name(&format!("{}.{}", self.table_alias, actual_column))
4162 };
4163
4164 let sql = format!(
4165 "SELECT
4166 '{}' as column_name,
4167 COUNT(*) as total_rows,
4168 COUNT(*) - COUNT({}) as null_count,
4169 (COUNT(*) - COUNT({})) * 100.0 / COUNT(*) as null_percentage
4170 FROM {}",
4171 column, normalized_col, normalized_col, normalize_alias(&self.table_alias)
4172 );
4173
4174 let result_df = ctx.sql(&sql).await.map_err(|e| {
4175 ElusionError::Custom(format!(
4176 "Failed to analyze null values for column '{}': {}",
4177 column, e
4178 ))
4179 })?;
4180
4181 let batches = result_df.collect().await.map_err(ElusionError::DataFusion)?;
4182
4183 if let Some(batch) = batches.first() {
4184 let column_name = batch.column(0).as_any().downcast_ref::<StringArray>()
4185 .ok_or_else(|| ElusionError::Custom("Failed to downcast column_name".to_string()))?
4186 .value(0);
4187
4188 let total_rows = batch.column(1).as_any().downcast_ref::<Int64Array>()
4189 .ok_or_else(|| ElusionError::Custom("Failed to downcast total_rows".to_string()))?
4190 .value(0);
4191
4192 let null_count = batch.column(2).as_any().downcast_ref::<Int64Array>()
4193 .ok_or_else(|| ElusionError::Custom("Failed to downcast null_count".to_string()))?
4194 .value(0);
4195
4196 let null_percentage = batch.column(3).as_any().downcast_ref::<Float64Array>()
4197 .ok_or_else(|| ElusionError::Custom("Failed to downcast null_percentage".to_string()))?
4198 .value(0);
4199
4200 null_counts.push(NullCount {
4201 column_name: column_name.to_string(),
4202 total_rows,
4203 null_count,
4204 null_percentage,
4205 });
4206 }
4207 }
4208
4209 Ok(NullAnalysis { counts: null_counts })
4210 }
4211
4212 async fn compute_correlation(&self, col1: &str, col2: &str) -> ElusionResult<f64> {
4214 let ctx = Arc::new(SessionContext::new());
4215 Self::register_df_as_table(&ctx, &self.table_alias, &self.df).await?;
4216
4217 let actual_col1 = self.find_actual_column_name(col1)
4218 .ok_or_else(|| ElusionError::Custom(
4219 format!("Column '{}' not found in schema", col1)
4220 ))?;
4221
4222 let actual_col2 = self.find_actual_column_name(col2)
4223 .ok_or_else(|| ElusionError::Custom(
4224 format!("Column '{}' not found in schema", col2)
4225 ))?;
4226
4227 let normalized_col1 = if actual_col1.contains('.') {
4229 normalize_column_name(&actual_col1)
4230 } else {
4231 normalize_column_name(&format!("{}.{}", self.table_alias, actual_col1))
4232 };
4233
4234 let normalized_col2 = if actual_col2.contains('.') {
4235 normalize_column_name(&actual_col2)
4236 } else {
4237 normalize_column_name(&format!("{}.{}", self.table_alias, actual_col2))
4238 };
4239
4240 let sql = format!(
4241 "SELECT corr({}::float, {}::float) as correlation
4242 FROM {}",
4243 normalized_col1, normalized_col2, normalize_alias(&self.table_alias)
4244 );
4245
4246 let result_df = ctx.sql(&sql).await.map_err(|e| {
4247 ElusionError::Custom(format!(
4248 "Failed to compute correlation between '{}' and '{}': {}",
4249 col1, col2, e
4250 ))
4251 })?;
4252
4253 let batches = result_df.collect().await.map_err(ElusionError::DataFusion)?;
4254
4255 if let Some(batch) = batches.first() {
4256 if let Some(array) = batch.column(0).as_any().downcast_ref::<Float64Array>() {
4257 if !array.is_null(0) {
4258 return Ok(array.value(0));
4259 }
4260 }
4261 }
4262
4263 Ok(0.0) }
4265
4266
4267 pub async fn display_stats(&self, columns: &[&str]) -> ElusionResult<()> {
4269 let stats = self.compute_column_stats(columns).await?;
4270
4271 println!("\n=== Column Statistics ===");
4272 println!("{:-<80}", "");
4273
4274 for col_stat in stats.columns {
4275 println!("Column: {}", col_stat.name);
4276 println!("{:-<80}", "");
4277 println!("| {:<20} | {:>15} | {:>15} | {:>15} |",
4278 "Metric", "Value", "Min", "Max");
4279 println!("{:-<80}", "");
4280
4281 println!("| {:<20} | {:>15} | {:<15} | {:<15} |",
4282 "Records",
4283 col_stat.total_count,
4284 "-",
4285 "-");
4286
4287 println!("| {:<20} | {:>15} | {:<15} | {:<15} |",
4288 "Non-null Records",
4289 col_stat.non_null_count,
4290 "-",
4291 "-");
4292
4293 if let Some(mean) = col_stat.mean {
4294 println!("| {:<20} | {:>15.2} | {:<15} | {:<15} |",
4295 "Mean",
4296 mean,
4297 "-",
4298 "-");
4299 }
4300
4301 if let Some(std_dev) = col_stat.std_dev {
4302 println!("| {:<20} | {:>15.2} | {:<15} | {:<15} |",
4303 "Standard Dev",
4304 std_dev,
4305 "-",
4306 "-");
4307 }
4308
4309 println!("| {:<20} | {:>15} | {:<15} | {:<15} |",
4310 "Value Range",
4311 "-",
4312 format!("{}", col_stat.min_value),
4313 format!("{}", col_stat.max_value));
4314
4315 println!("{:-<80}\n", "");
4316 }
4317 Ok(())
4318 }
4319
4320 pub async fn display_null_analysis(&self, columns: Option<&[&str]>) -> ElusionResult<()> {
4322 let analysis = self.analyze_null_values(columns).await?;
4323
4324 println!("\n=== Null Value Analysis ===");
4325 println!("{:-<90}", "");
4326 println!("| {:<30} | {:>15} | {:>15} | {:>15} |",
4327 "Column", "Total Rows", "Null Count", "Null Percentage");
4328 println!("{:-<90}", "");
4329
4330 for count in analysis.counts {
4331 println!("| {:<30} | {:>15} | {:>15} | {:>14.2}% |",
4332 count.column_name,
4333 count.total_rows,
4334 count.null_count,
4335 count.null_percentage);
4336 }
4337 println!("{:-<90}\n", "");
4338 Ok(())
4339 }
4340
4341 pub async fn display_correlation_matrix(&self, columns: &[&str]) -> ElusionResult<()> {
4343 println!("\n=== Correlation Matrix ===");
4344 let col_width = 20;
4345 let total_width = (columns.len() + 1) * (col_width + 3) + 1;
4346 println!("{:-<width$}", "", width = total_width);
4347
4348 print!("| {:<width$} |", "", width = col_width);
4350 for col in columns {
4351 let display_name = if col.len() > col_width {
4352 format!("{}...", &col[..12])
4354 } else {
4355 col.to_string()
4356 };
4357 print!(" {:<width$} |", display_name, width = col_width);
4358 }
4359 println!();
4360 println!("{:-<width$}", "", width = total_width);
4361
4362 for &col1 in columns {
4364 let display_name = if col1.len() > col_width {
4365 format!("{}...", &col1[..12])
4366 } else {
4367 col1.to_string()
4368 };
4369 print!("| {:<width$} |", display_name, width = col_width);
4370
4371 for &col2 in columns {
4372 let correlation = self.compute_correlation(col1, col2).await?;
4373 print!(" {:>width$.4} |", correlation, width = col_width); }
4375 println!();
4376 }
4377 println!("{:-<width$}\n", "", width = total_width);
4378 Ok(())
4379 }
4380
4381pub async fn write_to_parquet(
4385 &self,
4386 mode: &str,
4387 path: &str,
4388 options: Option<DataFrameWriteOptions>,
4389) -> ElusionResult<()> {
4390 let write_options = options.unwrap_or_else(DataFrameWriteOptions::new);
4391
4392 if let Some(parent) = LocalPath::new(path).parent() {
4393 if !parent.exists() {
4394 std::fs::create_dir_all(parent).map_err(|e| ElusionError::WriteError {
4395 path: parent.display().to_string(),
4396 operation: "create_directory".to_string(),
4397 reason: e.to_string(),
4398 suggestion: "💡 Check if you have permissions to create directories".to_string(),
4399 })?;
4400 }
4401 }
4402 match mode {
4403 "overwrite" => {
4404 if fs::metadata(path).is_ok() {
4405 fs::remove_file(path).or_else(|_| fs::remove_dir_all(path)).map_err(|e| {
4406 ElusionError::WriteError {
4407 path: path.to_string(),
4408 operation: "overwrite".to_string(),
4409 reason: format!("❌ Failed to delete existing file/directory: {}", e),
4410 suggestion: "💡 Check file permissions and ensure no other process is using the file".to_string()
4411 }
4412 })?;
4413 }
4414
4415 self.df.clone().write_parquet(path, write_options, None).await
4416 .map_err(|e| ElusionError::WriteError {
4417 path: path.to_string(),
4418 operation: "overwrite".to_string(),
4419 reason: e.to_string(),
4420 suggestion: "💡 Check file permissions and path validity".to_string()
4421 })?;
4422 }
4423 "append" => {
4424 let ctx = SessionContext::new();
4425
4426 if !fs::metadata(path).is_ok() {
4427 self.df.clone().write_parquet(path, write_options, None).await
4428 .map_err(|e| ElusionError::WriteError {
4429 path: path.to_string(),
4430 operation: "append".to_string(),
4431 reason: format!("❌ Failed to create initial file: {}", e),
4432 suggestion: "💡 Check directory permissions and path validity".to_string()
4433 })?;
4434 return Ok(());
4435 }
4436
4437 let existing_df = ctx.read_parquet(path, ParquetReadOptions::default()).await
4439 .map_err(|e| ElusionError::WriteError {
4440 path: path.to_string(),
4441 operation: "read_existing".to_string(),
4442 reason: e.to_string(),
4443 suggestion: "💡 Verify the file is a valid Parquet file".to_string()
4444 })?;
4445
4446 ctx.register_table("existing_data", Arc::new(
4460 MemTable::try_new(
4461 existing_df.schema().clone().into(),
4462 vec![existing_df.clone().collect().await.map_err(|e| ElusionError::WriteError {
4463 path: path.to_string(),
4464 operation: "collect_existing".to_string(),
4465 reason: e.to_string(),
4466 suggestion: "💡 Failed to collect existing data".to_string()
4467 })?]
4468 ).map_err(|e| ElusionError::WriteError {
4469 path: path.to_string(),
4470 operation: "create_mem_table".to_string(),
4471 reason: e.to_string(),
4472 suggestion: "💡 Failed to create memory table".to_string()
4473 })?
4474 )).map_err(|e| ElusionError::WriteError {
4475 path: path.to_string(),
4476 operation: "register_existing".to_string(),
4477 reason: e.to_string(),
4478 suggestion: "💡 Failed to register existing data".to_string()
4479 })?;
4480
4481 ctx.register_table("new_data", Arc::new(
4483 MemTable::try_new(
4484 self.df.schema().clone().into(),
4485 vec![self.df.clone().collect().await.map_err(|e| ElusionError::WriteError {
4486 path: path.to_string(),
4487 operation: "collect_new".to_string(),
4488 reason: e.to_string(),
4489 suggestion: "💡 Failed to collect new data".to_string()
4490 })?]
4491 ).map_err(|e| ElusionError::WriteError {
4492 path: path.to_string(),
4493 operation: "create_mem_table".to_string(),
4494 reason: e.to_string(),
4495 suggestion: "💡 Failed to create memory table".to_string()
4496 })?
4497 )).map_err(|e| ElusionError::WriteError {
4498 path: path.to_string(),
4499 operation: "register_new".to_string(),
4500 reason: e.to_string(),
4501 suggestion: "💡 Failed to register new data".to_string()
4502 })?;
4503
4504 let column_list = existing_df.schema()
4506 .fields()
4507 .iter()
4508 .map(|f| format!("\"{}\"", f.name()))
4509 .collect::<Vec<_>>()
4510 .join(", ");
4511
4512 let sql = format!(
4514 "SELECT {} FROM existing_data UNION ALL SELECT {} FROM new_data",
4515 column_list, column_list
4516 );
4517 let combined_df = ctx.sql(&sql).await
4520 .map_err(|e| ElusionError::WriteError {
4521 path: path.to_string(),
4522 operation: "combine_data".to_string(),
4523 reason: e.to_string(),
4524 suggestion: "💡 Failed to combine existing and new data".to_string()
4525 })?;
4526
4527 let temp_path = format!("{}.temp", path);
4529
4530 combined_df.write_parquet(&temp_path, write_options, None).await
4532 .map_err(|e| ElusionError::WriteError {
4533 path: temp_path.clone(),
4534 operation: "write_combined".to_string(),
4535 reason: e.to_string(),
4536 suggestion: "💡 Failed to write combined data".to_string()
4537 })?;
4538
4539 fs::remove_file(path).map_err(|e| ElusionError::WriteError {
4541 path: path.to_string(),
4542 operation: "remove_original".to_string(),
4543 reason: format!("❌ Failed to remove original file: {}", e),
4544 suggestion: "💡 Check file permissions".to_string()
4545 })?;
4546
4547 fs::rename(&temp_path, path).map_err(|e| ElusionError::WriteError {
4549 path: path.to_string(),
4550 operation: "rename_temp".to_string(),
4551 reason: format!("❌ Failed to rename temporary file: {}", e),
4552 suggestion: "💡 Check file system permissions".to_string()
4553 })?;
4554 }
4555 _ => return Err(ElusionError::InvalidOperation {
4556 operation: mode.to_string(),
4557 reason: "Invalid write mode".to_string(),
4558 suggestion: "💡 Use 'overwrite' or 'append'".to_string()
4559 })
4560 }
4561
4562 match mode {
4563 "overwrite" => println!("✅ Data successfully overwritten to '{}'", path),
4564 "append" => println!("✅ Data successfully appended to '{}'", path),
4565 _ => unreachable!(),
4566 }
4567
4568 Ok(())
4569}
4570
4571pub async fn write_to_csv(
4573 &self,
4574 mode: &str,
4575 path: &str,
4576 csv_options: CsvWriteOptions,
4577) -> ElusionResult<()> {
4578 csv_options.validate()?;
4579
4580 if let Some(parent) = LocalPath::new(path).parent() {
4581 if !parent.exists() {
4582 std::fs::create_dir_all(parent).map_err(|e| ElusionError::WriteError {
4583 path: parent.display().to_string(),
4584 operation: "create_directory".to_string(),
4585 reason: e.to_string(),
4586 suggestion: "💡 Check if you have permissions to create directories".to_string(),
4587 })?;
4588 }
4589 }
4590
4591 match mode {
4592 "overwrite" => {
4593 if fs::metadata(path).is_ok() {
4595 fs::remove_file(path).or_else(|_| fs::remove_dir_all(path)).map_err(|e|
4596 ElusionError::WriteError {
4597 path: path.to_string(),
4598 operation: "overwrite".to_string(),
4599 reason: format!("Failed to delete existing file: {}", e),
4600 suggestion: "💡 Check file permissions and ensure no other process is using the file".to_string(),
4601 }
4602 )?;
4603 }
4604
4605 let batches = self.df.clone().collect().await.map_err(|e|
4606 ElusionError::InvalidOperation {
4607 operation: "Data Collection".to_string(),
4608 reason: format!("Failed to collect DataFrame: {}", e),
4609 suggestion: "💡 Verify DataFrame is not empty and contains valid data".to_string(),
4610 }
4611 )?;
4612
4613 if batches.is_empty() {
4614 return Err(ElusionError::InvalidOperation {
4615 operation: "CSV Writing".to_string(),
4616 reason: "No data to write".to_string(),
4617 suggestion: "💡 Ensure DataFrame contains data before writing".to_string(),
4618 });
4619 }
4620
4621 let file = OpenOptions::new()
4622 .write(true)
4623 .create(true)
4624 .truncate(true)
4625 .open(path)
4626 .map_err(|e| ElusionError::WriteError {
4627 path: path.to_string(),
4628 operation: "file_create".to_string(),
4629 reason: e.to_string(),
4630 suggestion: "💡 Check file permissions and path validity".to_string(),
4631 })?;
4632
4633 let writer = BufWriter::new(file);
4634 let mut csv_writer = WriterBuilder::new()
4635 .with_header(true)
4636 .with_delimiter(csv_options.delimiter)
4637 .with_escape(csv_options.escape)
4638 .with_quote(csv_options.quote)
4639 .with_double_quote(csv_options.double_quote)
4640 .with_null(csv_options.null_value.clone())
4641 .build(writer);
4642
4643 for batch in batches.iter() {
4644 csv_writer.write(batch).map_err(|e| ElusionError::WriteError {
4645 path: path.to_string(),
4646 operation: "write_data".to_string(),
4647 reason: e.to_string(),
4648 suggestion: "💡 Failed to write data batch".to_string(),
4649 })?;
4650 }
4651
4652 csv_writer.into_inner().flush().map_err(|e| ElusionError::WriteError {
4653 path: path.to_string(),
4654 operation: "flush".to_string(),
4655 reason: e.to_string(),
4656 suggestion: "💡 Failed to flush data to file".to_string(),
4657 })?;
4658 },
4659 "append" => {
4660 if !fs::metadata(path).is_ok() {
4661 let batches = self.df.clone().collect().await.map_err(|e|
4663 ElusionError::InvalidOperation {
4664 operation: "Data Collection".to_string(),
4665 reason: format!("Failed to collect DataFrame: {}", e),
4666 suggestion: "💡 Verify DataFrame is not empty and contains valid data".to_string(),
4667 }
4668 )?;
4669
4670 if batches.is_empty() {
4671 return Err(ElusionError::InvalidOperation {
4672 operation: "CSV Writing".to_string(),
4673 reason: "No data to write".to_string(),
4674 suggestion: "💡 Ensure DataFrame contains data before writing".to_string(),
4675 });
4676 }
4677
4678 let file = OpenOptions::new()
4679 .write(true)
4680 .create(true)
4681 .open(path)
4682 .map_err(|e| ElusionError::WriteError {
4683 path: path.to_string(),
4684 operation: "file_create".to_string(),
4685 reason: e.to_string(),
4686 suggestion: "💡 Check file permissions and path validity".to_string(),
4687 })?;
4688
4689 let writer = BufWriter::new(file);
4690 let mut csv_writer = WriterBuilder::new()
4691 .with_header(true)
4692 .with_delimiter(csv_options.delimiter)
4693 .with_escape(csv_options.escape)
4694 .with_quote(csv_options.quote)
4695 .with_double_quote(csv_options.double_quote)
4696 .with_null(csv_options.null_value.clone())
4697 .build(writer);
4698
4699 for batch in batches.iter() {
4700 csv_writer.write(batch).map_err(|e| ElusionError::WriteError {
4701 path: path.to_string(),
4702 operation: "write_data".to_string(),
4703 reason: e.to_string(),
4704 suggestion: "💡 Failed to write data batch".to_string(),
4705 })?;
4706 }
4707 csv_writer.into_inner().flush().map_err(|e| ElusionError::WriteError {
4708 path: path.to_string(),
4709 operation: "flush".to_string(),
4710 reason: e.to_string(),
4711 suggestion: "💡 Failed to flush data to file".to_string(),
4712 })?;
4713 } else {
4714 let ctx = SessionContext::new();
4715 let existing_df = ctx.read_csv(
4716 path,
4717 CsvReadOptions::new()
4718 .has_header(true)
4719 .schema_infer_max_records(1000),
4720 ).await?;
4721
4722 let existing_cols: HashSet<_> = existing_df.schema()
4724 .fields()
4725 .iter()
4726 .map(|f| f.name().to_string())
4727 .collect();
4728
4729 let new_cols: HashSet<_> = self.df.schema()
4730 .fields()
4731 .iter()
4732 .map(|f| f.name().to_string())
4733 .collect();
4734
4735 if existing_cols != new_cols {
4736 return Err(ElusionError::WriteError {
4737 path: path.to_string(),
4738 operation: "column_check".to_string(),
4739 reason: "Column mismatch between existing file and new data".to_string(),
4740 suggestion: "💡 Ensure both datasets have the same columns".to_string()
4741 });
4742 }
4743
4744 ctx.register_table("existing_data", Arc::new(
4745 MemTable::try_new(
4746 existing_df.schema().clone().into(),
4747 vec![existing_df.clone().collect().await.map_err(|e| ElusionError::WriteError {
4748 path: path.to_string(),
4749 operation: "collect_existing".to_string(),
4750 reason: e.to_string(),
4751 suggestion: "💡 Failed to collect existing data".to_string()
4752 })?]
4753 ).map_err(|e| ElusionError::WriteError {
4754 path: path.to_string(),
4755 operation: "create_mem_table".to_string(),
4756 reason: e.to_string(),
4757 suggestion: "💡 Failed to create memory table".to_string()
4758 })?
4759 )).map_err(|e| ElusionError::WriteError {
4760 path: path.to_string(),
4761 operation: "register_existing".to_string(),
4762 reason: e.to_string(),
4763 suggestion: "💡 Failed to register existing data".to_string()
4764 })?;
4765
4766 ctx.register_table("new_data", Arc::new(
4767 MemTable::try_new(
4768 self.df.schema().clone().into(),
4769 vec![self.df.clone().collect().await.map_err(|e| ElusionError::WriteError {
4770 path: path.to_string(),
4771 operation: "collect_new".to_string(),
4772 reason: e.to_string(),
4773 suggestion: "💡 Failed to collect new data".to_string()
4774 })?]
4775 ).map_err(|e| ElusionError::WriteError {
4776 path: path.to_string(),
4777 operation: "create_mem_table".to_string(),
4778 reason: e.to_string(),
4779 suggestion: "💡 Failed to create memory table".to_string()
4780 })?
4781 )).map_err(|e| ElusionError::WriteError {
4782 path: path.to_string(),
4783 operation: "register_new".to_string(),
4784 reason: e.to_string(),
4785 suggestion: "💡 Failed to register new data".to_string()
4786 })?;
4787
4788 let column_list = existing_df.schema()
4789 .fields()
4790 .iter()
4791 .map(|f| format!("\"{}\"", f.name()))
4792 .collect::<Vec<_>>()
4793 .join(", ");
4794
4795 let sql = format!(
4796 "SELECT {} FROM existing_data UNION ALL SELECT {} FROM new_data",
4797 column_list, column_list
4798 );
4799
4800 let combined_df = ctx.sql(&sql).await
4801 .map_err(|e| ElusionError::WriteError {
4802 path: path.to_string(),
4803 operation: "combine_data".to_string(),
4804 reason: e.to_string(),
4805 suggestion: "💡 Failed to combine existing and new data".to_string()
4806 })?;
4807
4808 let temp_path = format!("{}.temp", path);
4809
4810 if fs::metadata(&temp_path).is_ok() {
4812 fs::remove_file(&temp_path).map_err(|e| ElusionError::WriteError {
4813 path: temp_path.clone(),
4814 operation: "cleanup_temp".to_string(),
4815 reason: format!("Failed to delete temporary file: {}", e),
4816 suggestion: "💡 Check file permissions and ensure no other process is using the file".to_string(),
4817 })?;
4818 }
4819
4820 let batches = combined_df.collect().await.map_err(|e|
4821 ElusionError::InvalidOperation {
4822 operation: "Data Collection".to_string(),
4823 reason: format!("Failed to collect DataFrame: {}", e),
4824 suggestion: "💡 Verify DataFrame is not empty and contains valid data".to_string(),
4825 }
4826 )?;
4827
4828 if batches.is_empty() {
4829 return Err(ElusionError::InvalidOperation {
4830 operation: "CSV Writing".to_string(),
4831 reason: "No data to write".to_string(),
4832 suggestion: "💡 Ensure DataFrame contains data before writing".to_string(),
4833 });
4834 }
4835
4836 {
4838 let file = OpenOptions::new()
4839 .write(true)
4840 .create(true)
4841 .truncate(true)
4842 .open(&temp_path)
4843 .map_err(|e| ElusionError::WriteError {
4844 path: temp_path.clone(),
4845 operation: "file_open".to_string(),
4846 reason: e.to_string(),
4847 suggestion: "💡 Check file permissions and path validity".to_string(),
4848 })?;
4849
4850 let writer = BufWriter::new(file);
4851 let mut csv_writer = WriterBuilder::new()
4852 .with_header(true)
4853 .with_delimiter(csv_options.delimiter)
4854 .with_escape(csv_options.escape)
4855 .with_quote(csv_options.quote)
4856 .with_double_quote(csv_options.double_quote)
4857 .with_null(csv_options.null_value.clone())
4858 .build(writer);
4859
4860 for batch in batches.iter() {
4861 csv_writer.write(batch).map_err(|e| ElusionError::WriteError {
4862 path: temp_path.clone(),
4863 operation: "write_data".to_string(),
4864 reason: e.to_string(),
4865 suggestion: "💡 Failed to write data batch".to_string(),
4866 })?;
4867 }
4868
4869 csv_writer.into_inner().flush().map_err(|e| ElusionError::WriteError {
4870 path: temp_path.clone(),
4871 operation: "flush".to_string(),
4872 reason: e.to_string(),
4873 suggestion: "💡 Check disk space and write permissions".to_string(),
4874 })?;
4875 } if fs::metadata(path).is_ok() {
4879 fs::remove_file(path).map_err(|e| ElusionError::WriteError {
4880 path: path.to_string(),
4881 operation: "remove_original".to_string(),
4882 reason: format!("Failed to remove original file: {}", e),
4883 suggestion: "💡 Check file permissions".to_string()
4884 })?;
4885 }
4886
4887 fs::rename(&temp_path, path).map_err(|e| ElusionError::WriteError {
4889 path: path.to_string(),
4890 operation: "rename_temp".to_string(),
4891 reason: format!("Failed to rename temporary file: {}", e),
4892 suggestion: "💡 Check file system permissions".to_string()
4893 })?;
4894 }
4895 },
4896 _ => return Err(ElusionError::InvalidOperation {
4897 operation: mode.to_string(),
4898 reason: "Invalid write mode".to_string(),
4899 suggestion: "💡 Use 'overwrite' or 'append'".to_string()
4900 })
4901 }
4902
4903 match mode {
4904 "overwrite" => println!("✅ Data successfully overwritten to '{}'", path),
4905 "append" => println!("✅ Data successfully appended to '{}'", path),
4906 _ => unreachable!(),
4907 }
4908
4909 Ok(())
4910}
4911
4912pub async fn write_to_delta_table(
4914 &self,
4915 mode: &str,
4916 path: &str,
4917 partition_columns: Option<Vec<String>>,
4918) -> Result<(), DeltaTableError> {
4919 let (overwrite, write_mode) = match mode {
4921 "overwrite" => {
4922 (true, WriteMode::Default)
4923 }
4924 "append" => {
4925 (false, WriteMode::Default)
4926 }
4927 "merge" => {
4928 (false, WriteMode::MergeSchema)
4930 }
4931 "default" => {
4932 (false, WriteMode::Default)
4934 }
4935 other => {
4936 return Err(DeltaTableError::Generic(format!(
4937 "Unsupported write mode: {other}"
4938 )));
4939 }
4940 };
4941
4942 write_to_delta_impl(
4943 &self.df, path,
4945 partition_columns,
4946 overwrite,
4947 write_mode,
4948 )
4949 .await
4950}
4951
4952fn setup_azure_client(&self, url: &str, sas_token: &str) -> ElusionResult<(ContainerClient, String)> {
4954 let url_parts: Vec<&str> = url.split('/').collect();
4956 if url_parts.len() < 5 {
4957 return Err(ElusionError::Custom(
4958 "Invalid URL format. Expected format: https://{account}.{endpoint}.core.windows.net/{container}/{blob}".to_string()
4959 ));
4960 }
4961
4962 let (account, endpoint_type) = url_parts[2]
4963 .split('.')
4964 .next()
4965 .map(|acc| {
4966 if url.contains(".dfs.") {
4967 (acc, "dfs")
4968 } else {
4969 (acc, "blob")
4970 }
4971 })
4972 .ok_or_else(|| ElusionError::Custom("Invalid URL format: cannot extract account name".to_string()))?;
4973
4974 let container = url_parts[3].to_string();
4976 if container.is_empty() {
4977 return Err(ElusionError::Custom("Container name cannot be empty".to_string()));
4978 }
4979
4980 let blob_name = url_parts[4..].join("/");
4981 if blob_name.is_empty() {
4982 return Err(ElusionError::Custom("Blob name cannot be empty".to_string()));
4983 }
4984
4985 if let Some(expiry_param) = sas_token.split('&').find(|¶m| param.starts_with("se=")) {
4987 let expiry = expiry_param.trim_start_matches("se=");
4988 if let Ok(expiry_time) = chrono::DateTime::parse_from_rfc3339(expiry) {
4990 let now = chrono::Utc::now();
4991 if expiry_time < now {
4992 return Err(ElusionError::Custom("SAS token has expired".to_string()));
4993 }
4994 }
4995 }
4996
4997 let credentials = StorageCredentials::sas_token(sas_token.to_string())
4999 .map_err(|e| ElusionError::Custom(format!("Invalid SAS token: {}", e)))?;
5000
5001 let client = if endpoint_type == "dfs" {
5003 let cloud_location = CloudLocation::Public {
5004 account: account.to_string(),
5005 };
5006 ClientBuilder::with_location(cloud_location, credentials)
5007 .blob_service_client()
5008 .container_client(container)
5009 } else {
5010 ClientBuilder::new(account.to_string(), credentials)
5011 .blob_service_client()
5012 .container_client(container)
5013 };
5014
5015 Ok((client, blob_name))
5016}
5017
5018pub async fn write_parquet_to_azure_with_sas(
5020 &self,
5021 mode: &str,
5022 url: &str,
5023 sas_token: &str,
5024) -> ElusionResult<()> {
5025 validate_azure_url(url)?;
5026
5027 let (client, blob_name) = self.setup_azure_client(url, sas_token)?;
5028 let blob_client = client.blob_client(&blob_name);
5029
5030 match mode {
5031 "overwrite" => {
5032 let batches: Vec<RecordBatch> = self.clone().df.collect().await
5034 .map_err(|e| ElusionError::Custom(format!("Failed to collect DataFrame: {}", e)))?;
5035
5036 let props = WriterProperties::builder()
5037 .set_writer_version(WriterVersion::PARQUET_2_0)
5038 .set_compression(Compression::SNAPPY)
5039 .set_created_by("Elusion".to_string())
5040 .build();
5041
5042 let mut buffer = Vec::new();
5043 {
5044 let schema = self.df.schema();
5045 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone().into(), Some(props))
5046 .map_err(|e| ElusionError::Custom(format!("Failed to create Parquet writer: {}", e)))?;
5047
5048 for batch in batches {
5049 writer.write(&batch)
5050 .map_err(|e| ElusionError::Custom(format!("Failed to write batch to Parquet: {}", e)))?;
5051 }
5052 writer.close()
5053 .map_err(|e| ElusionError::Custom(format!("Failed to close Parquet writer: {}", e)))?;
5054 }
5055
5056 self.upload_to_azure(&blob_client, buffer).await?;
5057 println!("✅ Successfully overwrote parquet data to Azure blob: {}", url);
5058 }
5059 "append" => {
5060 let ctx = SessionContext::new();
5061
5062 match blob_client.get().into_stream().try_collect::<Vec<_>>().await {
5064 Ok(chunks) => {
5065 let mut existing_data = Vec::new();
5067 for chunk in chunks {
5068 let data = chunk.data.collect().await
5069 .map_err(|e| ElusionError::Custom(format!("Failed to collect chunk data: {}", e)))?;
5070 existing_data.extend(data);
5071 }
5072
5073 let temp_file = Builder::new()
5075 .prefix("azure_parquet_")
5076 .suffix(".parquet")
5077 .tempfile()
5078 .map_err(|e| ElusionError::Custom(format!("Failed to create temp file: {}", e)))?;
5079
5080 std::fs::write(&temp_file, existing_data)
5081 .map_err(|e| ElusionError::Custom(format!("Failed to write to temp file: {}", e)))?;
5082
5083 let existing_df = ctx.read_parquet(
5084 temp_file.path().to_str().unwrap(),
5085 ParquetReadOptions::default()
5086 ).await.map_err(|e| ElusionError::Custom(
5087 format!("Failed to read existing parquet: {}", e)
5088 ))?;
5089
5090 ctx.register_table(
5092 "existing_data",
5093 Arc::new(MemTable::try_new(
5094 existing_df.schema().clone().into(),
5095 vec![existing_df.clone().collect().await.map_err(|e|
5096 ElusionError::Custom(format!("Failed to collect existing data: {}", e)))?]
5097 ).map_err(|e| ElusionError::Custom(
5098 format!("Failed to create memory table: {}", e)
5099 ))?)
5100 ).map_err(|e| ElusionError::Custom(
5101 format!("Failed to register existing data: {}", e)
5102 ))?;
5103
5104 ctx.register_table(
5106 "new_data",
5107 Arc::new(MemTable::try_new(
5108 self.df.schema().clone().into(),
5109 vec![self.df.clone().collect().await.map_err(|e|
5110 ElusionError::Custom(format!("Failed to collect new data: {}", e)))?]
5111 ).map_err(|e| ElusionError::Custom(
5112 format!("Failed to create memory table: {}", e)
5113 ))?)
5114 ).map_err(|e| ElusionError::Custom(
5115 format!("Failed to register new data: {}", e)
5116 ))?;
5117
5118 let column_list = existing_df.schema()
5120 .fields()
5121 .iter()
5122 .map(|f| format!("\"{}\"", f.name()))
5123 .collect::<Vec<_>>()
5124 .join(", ");
5125
5126 let sql = format!(
5128 "SELECT {} FROM existing_data UNION ALL SELECT {} FROM new_data",
5129 column_list, column_list
5130 );
5131
5132 let combined_df = ctx.sql(&sql).await
5133 .map_err(|e| ElusionError::Custom(
5134 format!("Failed to combine data: {}", e)
5135 ))?;
5136
5137 let batches = combined_df.clone().collect().await
5139 .map_err(|e| ElusionError::Custom(format!("Failed to collect combined data: {}", e)))?;
5140
5141 let props = WriterProperties::builder()
5143 .set_writer_version(WriterVersion::PARQUET_2_0)
5144 .set_compression(Compression::SNAPPY)
5145 .set_created_by("Elusion".to_string())
5146 .build();
5147
5148 let mut buffer = Vec::new();
5149 {
5150 let schema = combined_df.schema();
5151 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone().into(), Some(props))
5152 .map_err(|e| ElusionError::Custom(format!("Failed to create Parquet writer: {}", e)))?;
5153
5154 for batch in batches {
5155 writer.write(&batch)
5156 .map_err(|e| ElusionError::Custom(format!("Failed to write batch to Parquet: {}", e)))?;
5157 }
5158 writer.close()
5159 .map_err(|e| ElusionError::Custom(format!("Failed to close Parquet writer: {}", e)))?;
5160 }
5161
5162 self.upload_to_azure(&blob_client, buffer).await?;
5164 println!("✅ Successfully appended parquet data to Azure blob: {}", url);
5165 }
5166 Err(_) => {
5167 let batches: Vec<RecordBatch> = self.clone().df.collect().await
5169 .map_err(|e| ElusionError::Custom(format!("Failed to collect DataFrame: {}", e)))?;
5170
5171 let props = WriterProperties::builder()
5172 .set_writer_version(WriterVersion::PARQUET_2_0)
5173 .set_compression(Compression::SNAPPY)
5174 .set_created_by("Elusion".to_string())
5175 .build();
5176
5177 let mut buffer = Vec::new();
5178 {
5179 let schema = self.df.schema();
5180 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone().into(), Some(props))
5181 .map_err(|e| ElusionError::Custom(format!("Failed to create Parquet writer: {}", e)))?;
5182
5183 for batch in batches {
5184 writer.write(&batch)
5185 .map_err(|e| ElusionError::Custom(format!("Failed to write batch to Parquet: {}", e)))?;
5186 }
5187 writer.close()
5188 .map_err(|e| ElusionError::Custom(format!("Failed to close Parquet writer: {}", e)))?;
5189 }
5190
5191 self.upload_to_azure(&blob_client, buffer).await?;
5192 println!("✅ Successfully created initial parquet data in Azure blob: {}", url);
5193 }
5194 }
5195 }
5196 _ => return Err(ElusionError::InvalidOperation {
5197 operation: mode.to_string(),
5198 reason: "Invalid write mode".to_string(),
5199 suggestion: "💡 Use 'overwrite' or 'append'".to_string()
5200 })
5201 }
5202
5203 Ok(())
5204}
5205
5206async fn upload_to_azure(&self, blob_client: &BlobClient, buffer: Vec<u8>) -> ElusionResult<()> {
5208 let content = Bytes::from(buffer);
5209 let content_length = content.len();
5210
5211 if content_length > 1_073_741_824 { let block_id = STANDARD.encode(format!("{:0>20}", 1));
5213
5214 blob_client
5215 .put_block(block_id.clone(), content)
5216 .await
5217 .map_err(|e| ElusionError::Custom(format!("Failed to upload block to Azure: {}", e)))?;
5218
5219 let block_list = BlockList {
5220 blocks: vec![BlobBlockType::Uncommitted(block_id.into_bytes().into())],
5221 };
5222
5223 blob_client
5224 .put_block_list(block_list)
5225 .content_type("application/parquet")
5226 .await
5227 .map_err(|e| ElusionError::Custom(format!("Failed to commit block list: {}", e)))?;
5228 } else {
5229 blob_client
5230 .put_block_blob(content)
5231 .content_type("application/parquet")
5232 .await
5233 .map_err(|e| ElusionError::Custom(format!("Failed to upload blob to Azure: {}", e)))?;
5234 }
5235
5236 Ok(())
5237}
5238
5239pub async fn load_csv(file_path: &str, alias: &str) -> ElusionResult<AliasedDataFrame> {
5242 let ctx = SessionContext::new();
5243
5244 if !LocalPath::new(file_path).exists() {
5245 return Err(ElusionError::WriteError {
5246 path: file_path.to_string(),
5247 operation: "read".to_string(),
5248 reason: "File not found".to_string(),
5249 suggestion: "💡 Check if the file path is correct".to_string()
5250 });
5251 }
5252
5253 let df = match ctx
5254 .read_csv(
5255 file_path,
5256 CsvReadOptions::new()
5257 .has_header(true)
5258 .schema_infer_max_records(1000),
5259 )
5260 .await
5261 {
5262 Ok(df) => df,
5263 Err(err) => {
5264 eprintln!(
5265 "Error reading CSV file '{}': {}. Ensure the file is UTF-8 encoded and free of corrupt data.",
5266 file_path, err
5267 );
5268 return Err(ElusionError::DataFusion(err));
5269 }
5270 };
5271
5272 Ok(AliasedDataFrame {
5273 dataframe: df,
5274 alias: alias.to_string(),
5275 })
5276}
5277
5278pub fn load_parquet<'a>(
5280 file_path: &'a str,
5281 alias: &'a str,
5282) -> BoxFuture<'a, ElusionResult<AliasedDataFrame>> {
5283 Box::pin(async move {
5284 let ctx = SessionContext::new();
5285
5286 if !LocalPath::new(file_path).exists() {
5287 return Err(ElusionError::WriteError {
5288 path: file_path.to_string(),
5289 operation: "read".to_string(),
5290 reason: "File not found".to_string(),
5291 suggestion: "💡 Check if the file path is correct".to_string(),
5292 });
5293 }
5294
5295 let df = match ctx.read_parquet(file_path, ParquetReadOptions::default()).await {
5296 Ok(df) => {
5297 df
5298 }
5299 Err(err) => {
5300 return Err(ElusionError::DataFusion(err));
5301 }
5302 };
5303
5304
5305 let batches = df.clone().collect().await.map_err(ElusionError::DataFusion)?;
5306 let schema = df.schema().clone();
5307 let mem_table = MemTable::try_new(schema.clone().into(), vec![batches])
5308 .map_err(|e| ElusionError::SchemaError {
5309 message: e.to_string(),
5310 schema: Some(schema.to_string()),
5311 suggestion: "💡 Check if the parquet file schema is valid".to_string(),
5312 })?;
5313
5314 let normalized_alias = normalize_alias_write(alias);
5315 ctx.register_table(&normalized_alias, Arc::new(mem_table))
5316 .map_err(|e| ElusionError::InvalidOperation {
5317 operation: "Table Registration".to_string(),
5318 reason: e.to_string(),
5319 suggestion: "💡 Try using a different alias name".to_string(),
5320 })?;
5321
5322 let aliased_df = ctx.table(alias).await
5323 .map_err(|_| ElusionError::InvalidOperation {
5324 operation: "Table Creation".to_string(),
5325 reason: format!("Failed to create table with alias '{}'", alias),
5326 suggestion: "💡 Check if the alias is valid and unique".to_string(),
5327 })?;
5328
5329 Ok(AliasedDataFrame {
5330 dataframe: aliased_df,
5331 alias: alias.to_string(),
5332 })
5333 })
5334}
5335
5336pub fn load_json<'a>(
5338 file_path: &'a str,
5339 alias: &'a str,
5340) -> BoxFuture<'a, ElusionResult<AliasedDataFrame>> {
5341 Box::pin(async move {
5342 let file = File::open(file_path).map_err(|e| ElusionError::WriteError {
5344 path: file_path.to_string(),
5345 operation: "read".to_string(),
5346 reason: e.to_string(),
5347 suggestion: "💡 heck if the file exists and you have proper permissions".to_string(),
5348 })?;
5349
5350 let file_size = file.metadata().map_err(|e| ElusionError::WriteError {
5351 path: file_path.to_string(),
5352 operation: "metadata reading".to_string(),
5353 reason: e.to_string(),
5354 suggestion: "💡 Check file permissions and disk status".to_string(),
5355 })?.len() as usize;
5356
5357 let reader = BufReader::with_capacity(32 * 1024, file); let stream = serde_json::Deserializer::from_reader(reader).into_iter::<Value>();
5359
5360 let mut all_data = Vec::with_capacity(file_size / 3); let mut stream = stream.peekable();
5364 match stream.peek() {
5365 Some(Ok(Value::Array(_))) => {
5366 for value in stream {
5367 match value {
5368 Ok(Value::Array(array)) => {
5369 for item in array {
5370 if let Value::Object(map) = item {
5371 let mut base_map = map.clone();
5372
5373 if let Some(Value::Array(fields)) = base_map.remove("fields") {
5374 for field in fields {
5375 let mut row = base_map.clone();
5376 if let Value::Object(field_obj) = field {
5377 for (key, val) in field_obj {
5378 row.insert(format!("field_{}", key), val);
5379 }
5380 }
5381 all_data.push(row.into_iter().collect());
5382 }
5383 } else {
5384 all_data.push(base_map.into_iter().collect());
5385 }
5386 }
5387 }
5388 }
5389 Ok(_) => continue,
5390 Err(e) => return Err(ElusionError::InvalidOperation {
5391 operation: "JSON parsing".to_string(),
5392 reason: format!("Failed to parse JSON array: {}", e),
5393 suggestion: "💡 Ensure the JSON file is properly formatted and contains valid data".to_string(),
5394 }),
5395 }
5396 }
5397 }
5398 Some(Ok(Value::Object(_))) => {
5399 for value in stream {
5400 if let Ok(Value::Object(map)) = value {
5401 let mut base_map = map.clone();
5402 if let Some(Value::Array(fields)) = base_map.remove("fields") {
5403 for field in fields {
5404 let mut row = base_map.clone();
5405 if let Value::Object(field_obj) = field {
5406 for (key, val) in field_obj {
5407 row.insert(format!("field_{}", key), val);
5408 }
5409 }
5410 all_data.push(row.into_iter().collect());
5411 }
5412 } else {
5413 all_data.push(base_map.into_iter().collect());
5414 }
5415 }
5416 }
5417 }
5418 Some(Err(e)) => return Err(ElusionError::InvalidOperation {
5419 operation: "JSON parsing".to_string(),
5420 reason: format!("Invalid JSON format: {}", e),
5421 suggestion: "💡 Check if the JSON file is well-formed and valid".to_string(),
5422 }),
5423 _ => return Err(ElusionError::InvalidOperation {
5424 operation: "JSON reading".to_string(),
5425 reason: "Empty or invalid JSON file".to_string(),
5426 suggestion: "💡 Ensure the JSON file contains valid data in either array or object format".to_string(),
5427 }),
5428 }
5429
5430 if all_data.is_empty() {
5431 return Err(ElusionError::InvalidOperation {
5432 operation: "JSON processing".to_string(),
5433 reason: "No valid JSON data found".to_string(),
5434 suggestion: "💡 Check if the JSON file contains the expected data structure".to_string(),
5435 });
5436 }
5437
5438 let schema = infer_schema_from_json(&all_data);
5439 let record_batch = build_record_batch(&all_data, schema.clone())
5440 .map_err(|e| ElusionError::SchemaError {
5441 message: format!("Failed to build RecordBatch: {}", e),
5442 schema: Some(schema.to_string()),
5443 suggestion: "💡 Check if the JSON data structure is consistent".to_string(),
5444 })?;
5445
5446 let ctx = SessionContext::new();
5447 let mem_table = MemTable::try_new(schema.clone(), vec![vec![record_batch]])
5448 .map_err(|e| ElusionError::SchemaError {
5449 message: format!("Failed to create MemTable: {}", e),
5450 schema: Some(schema.to_string()),
5451 suggestion: "💡 Verify data types and schema compatibility".to_string(),
5452 })?;
5453
5454 ctx.register_table(alias, Arc::new(mem_table))
5455 .map_err(|e| ElusionError::InvalidOperation {
5456 operation: "Table registration".to_string(),
5457 reason: format!("Failed to register table: {}", e),
5458 suggestion: "💡 Try using a different alias or check table compatibility".to_string(),
5459 })?;
5460
5461 let df = ctx.table(alias).await.map_err(|e| ElusionError::InvalidOperation {
5462 operation: "Table creation".to_string(),
5463 reason: format!("Failed to create table: {}", e),
5464 suggestion: "💡 Verify table creation parameters and permissions".to_string(),
5465 })?;
5466
5467 Ok(AliasedDataFrame {
5468 dataframe: df,
5469 alias: alias.to_string(),
5470 })
5471 })
5472}
5473
5474pub fn load_delta<'a>(
5476 file_path: &'a str,
5477 alias: &'a str,
5478) -> BoxFuture<'a, ElusionResult<AliasedDataFrame>> {
5479 Box::pin(async move {
5480 let ctx = SessionContext::new();
5481
5482 let path_manager = DeltaPathManager::new(file_path);
5484
5485 let table = open_table(&path_manager.table_path())
5487 .await
5488 .map_err(|e| ElusionError::InvalidOperation {
5489 operation: "Delta Table Opening".to_string(),
5490 reason: e.to_string(),
5491 suggestion: "💡 Ensure the path points to a valid Delta table".to_string(),
5492 })?;
5493
5494
5495 let file_paths: Vec<String> = {
5496 let raw_uris = table.get_file_uris()
5497 .map_err(|e| ElusionError::InvalidOperation {
5498 operation: "Delta File Listing".to_string(),
5499 reason: e.to_string(),
5500 suggestion: "💡 Check Delta table permissions and integrity".to_string(),
5501 })?;
5502
5503 raw_uris.map(|uri| path_manager.normalize_uri(&uri))
5504 .collect()
5505 };
5506
5507 let parquet_options = ParquetReadOptions::new()
5509 .parquet_pruning(false)
5512 .skip_metadata(false);
5513
5514 let df = ctx.read_parquet(file_paths, parquet_options).await?;
5515
5516
5517 let batches = df.clone().collect().await?;
5518 let schema = df.schema().clone().into();
5523 let mem_table = MemTable::try_new(schema, vec![batches])?;
5525 let normalized_alias = normalize_alias_write(alias);
5526 ctx.register_table(&normalized_alias, Arc::new(mem_table))?;
5527
5528 let aliased_df = ctx.table(&normalized_alias).await?;
5530
5531 Ok(AliasedDataFrame {
5536 dataframe: aliased_df,
5537 alias: alias.to_string(),
5538 })
5539 })
5540}
5541
5542#[cfg(not(feature = "odbc"))]
5544pub async fn load_db(
5545 _connection_string: &str,
5546 _query: &str,
5547 _alias: &str,
5548) -> ElusionResult<AliasedDataFrame> {
5549 Err(ElusionError::InvalidOperation {
5550 operation: "Database Connection".to_string(),
5551 reason: "ODBC support is not compiled. Recompile with --features odbc".to_string(),
5552 suggestion: "Compile with ODBC feature enabled to use database loading".to_string(),
5553 })
5554}
5555
5556#[cfg(not(feature = "odbc"))]
5557pub async fn from_db(
5558 _connection_string: &str,
5559 _query: &str
5560) -> ElusionResult<Self> {
5561 Err(ElusionError::InvalidOperation {
5562 operation: "Database Connection".to_string(),
5563 reason: "ODBC support is not compiled. Recompile with --features odbc".to_string(),
5564 suggestion: "Compile with ODBC feature enabled to use database loading".to_string(),
5565 })
5566}
5567
5568#[cfg(feature = "odbc")]
5569pub async fn load_db(
5570 connection_string: &str,
5571 query: &str,
5572 alias: &str,
5573) -> ElusionResult<AliasedDataFrame> {
5574 let connection = DB_ENV
5577 .connect_with_connection_string(connection_string, ConnectionOptions::default())
5578 .map_err(|e| ElusionError::InvalidOperation {
5579 operation: "Database Connection".to_string(),
5580 reason: format!("Failed to connect to database: {}", e),
5581 suggestion: "💡 Check your connection string and ensure database is accessible".to_string()
5582 })?;
5583
5584 let owned_cursor = connection
5586 .into_cursor(query, ())
5587 .map_err(|e| e.error)
5588 .map_err(|e| ElusionError::InvalidOperation {
5589 operation: "Query Execution".to_string(),
5590 reason: format!("Query execution failed: {}", e),
5591 suggestion: "💡 Verify SQL syntax and table permissions".to_string()
5592 })?
5593 .ok_or_else(|| ElusionError::InvalidOperation {
5594 operation: "Query Result".to_string(),
5595 reason: "Query did not produce a result set".to_string(),
5596 suggestion: "💡 Ensure query returns data (e.g., SELECT statement)".to_string()
5597 })?;
5598
5599 let reader = OdbcReaderBuilder::new()
5601 .with_max_num_rows_per_batch(50000)
5602 .with_max_bytes_per_batch(1024 * 1024 * 1024)
5603 .with_fallibale_allocations(true)
5604 .build(owned_cursor)
5605 .map_err(|e| ElusionError::InvalidOperation {
5606 operation: "ODBC Reader Setup".to_string(),
5607 reason: format!("Failed to create ODBC reader: {}", e),
5608 suggestion: "💡 Check ODBC driver configuration and memory settings".to_string()
5609 })?;
5610
5611 let concurrent_reader = reader.into_concurrent()
5613 .map_err(|e| ElusionError::InvalidOperation {
5614 operation: "Concurrent Reader Creation".to_string(),
5615 reason: format!("Failed to create concurrent reader: {}", e),
5616 suggestion: "💡 Try reducing batch size or memory allocation".to_string()
5617 })?;
5618
5619 let mut all_batches = Vec::new();
5621 for batch_result in concurrent_reader {
5622 let batch = batch_result.map_err(|e| ElusionError::InvalidOperation {
5623 operation: "Batch Reading".to_string(),
5624 reason: format!("Failed to read data batch: {}", e),
5625 suggestion: "💡 Check for data type mismatches or invalid values".to_string()
5626 })?;
5627 all_batches.push(batch);
5628 }
5629 let ctx = SessionContext::new();
5631 if let Some(first_batch) = all_batches.first() {
5632 let schema = first_batch.schema();
5633 let mem_table = MemTable::try_new(schema.clone(), vec![all_batches])
5634 .map_err(|e| ElusionError::SchemaError {
5635 message: format!("Failed to create in-memory table: {}", e),
5636 schema: Some(schema.to_string()),
5637 suggestion: "💡 Validate data types and schema compatibility".to_string()
5638 })?;
5639
5640 let normalized_alias = normalize_alias_write(alias);
5641 ctx.register_table(&normalized_alias, Arc::new(mem_table))
5642 .map_err(|e| ElusionError::InvalidOperation {
5643 operation: "Table Registration".to_string(),
5644 reason: format!("Failed to register table: {}", e),
5645 suggestion: "💡 Try using a different alias or check table name validity".to_string()
5646 })?;
5647
5648 let df = ctx.table(&normalized_alias).await
5649 .map_err(|e| ElusionError::InvalidOperation {
5650 operation: "DataFrame Creation".to_string(),
5651 reason: format!("Failed to create DataFrame: {}", e),
5652 suggestion: "💡 Verify table registration and schema".to_string()
5653 })?;
5654
5655 let df = lowercase_column_names(df).await
5656 .map_err(|e| ElusionError::InvalidOperation {
5657 operation: "Column Normalization".to_string(),
5658 reason: format!("Failed to normalize column names: {}", e),
5659 suggestion: "💡 Check column names and schema compatibility".to_string()
5660 })?;
5661
5662 Ok(AliasedDataFrame {
5663 dataframe: df,
5664 alias: alias.to_string(),
5665 })
5666 } else {
5667 Err(ElusionError::InvalidOperation {
5668 operation: "Data Retrieval".to_string(),
5669 reason: "No data returned from query".to_string(),
5670 suggestion: "💡 Check if query returns any rows or modify WHERE conditions".to_string()
5671 })
5672 }
5673}
5674
5675#[cfg(feature = "odbc")]
5676pub async fn from_db(
5678 connection_string: &str,
5679 query: &str
5680) -> ElusionResult<Self> {
5681 let db_type = detect_database(connection_string);
5682 let db_name = connection_string
5683 .split(';')
5684 .find(|s| s.trim().starts_with("Database="))
5685 .and_then(|s| s.split('=').nth(1).map(str::trim))
5686 .unwrap_or("default");
5687
5688 let table_alias = if db_type == DatabaseType::SQLServer {
5690 "SQLServerTable".to_string()
5691 } else {
5692 extract_alias_from_sql(query, db_type.clone())
5693 .unwrap_or_else(|| db_name.to_string())
5694 };
5695
5696 let aliased_df = Self::load_db(connection_string, query, &table_alias).await?;
5697
5698 if aliased_df.dataframe.schema().fields().is_empty() {
5699 return Err(ElusionError::SchemaError {
5700 message: "Query returned empty schema".to_string(),
5701 schema: None,
5702 suggestion: "💡 Verify query returns expected columns".to_string()
5703 });
5704 }
5705
5706 Ok(CustomDataFrame {
5707 df: aliased_df.dataframe,
5708 table_alias: aliased_df.alias.clone(),
5709 from_table: aliased_df.alias,
5710 selected_columns: Vec::new(),
5711 alias_map: Vec::new(),
5712 aggregations: Vec::new(),
5713 group_by_columns: Vec::new(),
5714 where_conditions: Vec::new(),
5715 having_conditions: Vec::new(),
5716 order_by_columns: Vec::new(),
5717 limit_count: None,
5718 joins: Vec::new(),
5719 window_functions: Vec::new(),
5720 ctes: Vec::new(),
5721 subquery_source: None,
5722 set_operations: Vec::new(),
5723 query: String::new(),
5724 aggregated_df: None,
5725 union_tables: None,
5726 original_expressions: Vec::new(),
5727 })
5728}
5729
5730pub async fn from_azure_with_sas_token(
5733 url: &str,
5734 sas_token: &str,
5735 filter_keyword: Option<&str>,
5736 alias: &str,
5737) -> ElusionResult<Self> {
5738
5739 validate_azure_url(url)?;
5742
5743 println!("Starting from_azure_with_sas_token with url={}, alias={}", url, alias);
5744 let url_parts: Vec<&str> = url.split('/').collect();
5746 let (account, endpoint_type) = url_parts[2]
5747 .split('.')
5748 .next()
5749 .map(|acc| {
5750 if url.contains(".dfs.") {
5751 (acc, "dfs")
5752 } else {
5753 (acc, "blob")
5754 }
5755 })
5756 .ok_or_else(|| ElusionError::Custom("Invalid URL format".to_string()))?;
5757
5758
5759 let container = url_parts.last()
5760 .ok_or_else(|| ElusionError::Custom("Invalid URL format".to_string()))?
5761 .to_string();
5762
5763 let credentials = StorageCredentials::sas_token(sas_token.to_string())
5766 .map_err(|e| ElusionError::Custom(format!("Invalid SAS token: {}", e)))?;
5767
5768 let client = if endpoint_type == "dfs" {
5771 let cloud_location = CloudLocation::Public {
5773 account: account.to_string(),
5774 };
5775 ClientBuilder::with_location(cloud_location, credentials)
5776 .blob_service_client()
5777 .container_client(container)
5778 } else {
5779 ClientBuilder::new(account.to_string(), credentials)
5780 .blob_service_client()
5781 .container_client(container)
5782 };
5783
5784 let mut blobs = Vec::new();
5785 let mut total_size = 0;
5786 let mut stream = client.list_blobs().into_stream();
5787 while let Some(response) = stream.next().await {
5790 let response = response.map_err(|e|
5791 ElusionError::Custom(format!("Failed to list blobs: {}", e)))?;
5792
5793 for blob in response.blobs.blobs() {
5794 if (blob.name.ends_with(".json") || blob.name.ends_with(".csv")) &&
5795 filter_keyword.map_or(true, |keyword| blob.name.contains(keyword)) {
5797 println!("Adding blob '{}' to the download list", blob.name);
5798 total_size += blob.properties.content_length as usize;
5799 blobs.push(blob.name.clone());
5800 }
5801 }
5802 }
5803
5804 println!("Total number of blobs to process: {}", blobs.len());
5815 println!("Total size of blobs: {} bytes", total_size);
5816
5817 let mut all_data = Vec::new();
5818
5819 let concurrency_limit = num_cpus::get() * 16;
5820 let client_ref = &client;
5821 let results = stream::iter(blobs.iter())
5822 .map(|blob_name| async move {
5823 let blob_client = client_ref.blob_client(blob_name);
5824 let content = blob_client
5825 .get_content()
5826 .await
5827 .map_err(|e| ElusionError::Custom(format!("Failed to get blob content: {}", e)))?;
5828
5829 println!("Got content for blob: {} ({} bytes)", blob_name, content.len());
5830
5831 if blob_name.ends_with(".json") {
5832 process_json_content(&content)
5833 } else {
5834 process_csv_content(blob_name, content).await
5835 }
5836 })
5837 .buffer_unordered(concurrency_limit);
5838
5839 pin_mut!(results);
5840 while let Some(result) = results.next().await {
5841 let mut blob_data = result?;
5842 all_data.append(&mut blob_data);
5843 }
5844
5845 println!("Total records after reading all blobs: {}", all_data.len());
5846
5847 if all_data.is_empty() {
5848 return Err(ElusionError::Custom(format!(
5849 "No valid JSON files found{} (size > 2KB)",
5850 filter_keyword.map_or("".to_string(), |k| format!(" containing keyword: {}", k))
5851 )));
5852 }
5853
5854 let schema = infer_schema_from_json(&all_data);
5855 let batch = build_record_batch(&all_data, schema.clone())
5856 .map_err(|e| ElusionError::Custom(format!("Failed to build RecordBatch: {}", e)))?;
5857
5858 let ctx = SessionContext::new();
5859 let mem_table = MemTable::try_new(schema, vec![vec![batch]])
5860 .map_err(|e| ElusionError::Custom(format!("Failed to create MemTable: {}", e)))?;
5861
5862 ctx.register_table(alias, Arc::new(mem_table))
5863 .map_err(|e| ElusionError::Custom(format!("Failed to register table: {}", e)))?;
5864
5865 let df = ctx.table(alias)
5866 .await
5867 .map_err(|e| ElusionError::Custom(format!("Failed to create DataFrame: {}", e)))?;
5868
5869 let df = lowercase_column_names(df).await?;
5870
5871 println!("✅ Successfully created and registered in-memory table with alias '{}'", alias);
5872 Ok(CustomDataFrame {
5874 df,
5875 table_alias: alias.to_string(),
5876 from_table: alias.to_string(),
5877 selected_columns: Vec::new(),
5878 alias_map: Vec::new(),
5879 aggregations: Vec::new(),
5880 group_by_columns: Vec::new(),
5881 where_conditions: Vec::new(),
5882 having_conditions: Vec::new(),
5883 order_by_columns: Vec::new(),
5884 limit_count: None,
5885 joins: Vec::new(),
5886 window_functions: Vec::new(),
5887 ctes: Vec::new(),
5888 subquery_source: None,
5889 set_operations: Vec::new(),
5890 query: String::new(),
5891 aggregated_df: None,
5892 union_tables: None,
5893 original_expressions: Vec::new(),
5894 })
5895}
5896
5897pub async fn load(
5899 file_path: &str,
5900 alias: &str,
5901) -> ElusionResult<AliasedDataFrame> {
5902 let path_manager = DeltaPathManager::new(file_path);
5903 if path_manager.is_delta_table() {
5904 let aliased_df = Self::load_delta(file_path, alias).await?;
5905 let df_lower = lowercase_column_names(aliased_df.dataframe).await?;
5907 return Ok(AliasedDataFrame {
5908 dataframe: df_lower,
5909 alias: alias.to_string(),
5910 });
5911 }
5912
5913 let ext = file_path
5914 .split('.')
5915 .last()
5916 .unwrap_or_default()
5917 .to_lowercase();
5918
5919 let aliased_df = match ext.as_str() {
5920 "csv" => Self::load_csv(file_path, alias).await?,
5921 "json" => Self::load_json(file_path, alias).await?,
5922 "parquet" => Self::load_parquet(file_path, alias).await?,
5923 "" => return Err(ElusionError::InvalidOperation {
5924 operation: "File Loading".to_string(),
5925 reason: format!("Directory is not a Delta table and has no recognized extension: {file_path}"),
5926 suggestion: "💡 Provide a file with a supported extension (.csv, .json, .parquet) or a valid Delta table directory".to_string(),
5927 }),
5928 other => return Err(ElusionError::InvalidOperation {
5929 operation: "File Loading".to_string(),
5930 reason: format!("Unsupported file extension: {other}"),
5931 suggestion: "💡 Use one of the supported file types: .csv, .json, .parquet, or Delta table".to_string(),
5932 }),
5933 };
5934
5935 let df_lower = lowercase_column_names(aliased_df.dataframe).await?;
5936 Ok(AliasedDataFrame {
5937 dataframe: df_lower,
5938 alias: alias.to_string(),
5939 })
5940}
5941
5942pub async fn plot_linee(
5945 &self,
5946 x_col: &str,
5947 y_col: &str,
5948 show_markers: bool,
5949 title: Option<&str>
5950 ) -> ElusionResult<Plot> {
5951 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
5952 let batch = &batches[0];
5953
5954 let x_idx = batch.schema().index_of(x_col)
5955 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", x_col, e)))?;
5956 let y_idx = batch.schema().index_of(y_col)
5957 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", y_col, e)))?;
5958
5959 let x_values: Vec<f64> = convert_to_f64_vec(batch.column(x_idx))?;
5961 let y_values: Vec<f64> = convert_to_f64_vec(batch.column(y_idx))?;
5962
5963 let (sorted_x, sorted_y) = sort_by_date(&x_values, &y_values);
5965
5966 let trace = if show_markers {
5968 Scatter::new(sorted_x, sorted_y)
5969 .mode(Mode::LinesMarkers)
5970 .name(&format!("{} vs {}", y_col, x_col))
5971 .line(Line::new()
5972 .color(Rgb::new(55, 128, 191))
5973 .width(2.0))
5974 .marker(Marker::new()
5975 .color(Rgb::new(55, 128, 191))
5976 .size(8))
5977 } else {
5978 Scatter::new(sorted_x, sorted_y)
5979 .mode(Mode::Lines)
5980 .name(&format!("{} vs {}", y_col, x_col))
5981 .line(Line::new()
5982 .color(Rgb::new(55, 128, 191))
5983 .width(2.0))
5984 };
5985
5986 let mut plot = Plot::new();
5987 plot.add_trace(trace);
5988
5989 let x_axis = if matches!(batch.column(x_idx).data_type(), ArrowDataType::Date32) {
5991 Axis::new()
5992 .title(x_col.to_string())
5993 .grid_color(Rgb::new(229, 229, 229))
5994 .show_grid(true)
5995 .type_(plotly::layout::AxisType::Date)
5996 } else {
5997 Axis::new()
5998 .title(x_col.to_string())
5999 .grid_color(Rgb::new(229, 229, 229))
6000 .show_grid(true)
6001 };
6002
6003 let layout = Layout::new()
6004 .title(title.unwrap_or(&format!("{} vs {}", y_col, x_col)))
6005 .x_axis(x_axis)
6006 .y_axis(Axis::new()
6007 .title(y_col.to_string())
6008 .grid_color(Rgb::new(229, 229, 229))
6009 .show_grid(true));
6010
6011 plot.set_layout(layout);
6012 Ok(plot)
6013 }
6014
6015 pub async fn plot_time_seriess(
6017 &self,
6018 date_col: &str,
6019 value_col: &str,
6020 show_markers: bool,
6021 title: Option<&str>,
6022 ) -> ElusionResult<Plot> {
6023 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6024 let batch = &batches[0];
6025
6026 let x_idx = batch.schema().index_of(date_col)
6027 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", date_col, e)))?;
6028 let y_idx = batch.schema().index_of(value_col)
6029 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", value_col, e)))?;
6030
6031 if !matches!(batch.column(x_idx).data_type(), ArrowDataType::Date32) {
6033 return Err(ElusionError::Custom(
6034 format!("Column {} must be a Date32 type for time series plot", date_col)
6035 ));
6036 }
6037
6038 let x_values = convert_to_f64_vec(batch.column(x_idx))?;
6039 let y_values = convert_to_f64_vec(batch.column(y_idx))?;
6040
6041 let (sorted_x, sorted_y) = sort_by_date(&x_values, &y_values);
6043
6044 let trace = if show_markers {
6045 Scatter::new(sorted_x, sorted_y)
6046 .mode(Mode::LinesMarkers)
6047 .name(value_col)
6048 .line(Line::new()
6049 .color(Rgb::new(55, 128, 191))
6050 .width(2.0))
6051 .marker(Marker::new()
6052 .color(Rgb::new(55, 128, 191))
6053 .size(8))
6054 } else {
6055 Scatter::new(sorted_x, sorted_y)
6056 .mode(Mode::Lines)
6057 .name(value_col)
6058 .line(Line::new()
6059 .color(Rgb::new(55, 128, 191))
6060 .width(2.0))
6061 };
6062
6063 let mut plot = Plot::new();
6064 plot.add_trace(trace);
6065
6066 let layout = Layout::new()
6067 .title(title.unwrap_or(&format!("{} over Time", value_col)))
6068 .x_axis(Axis::new()
6069 .title(date_col.to_string())
6070 .grid_color(Rgb::new(229, 229, 229))
6071 .show_grid(true)
6072 .type_(plotly::layout::AxisType::Date))
6073 .y_axis(Axis::new()
6074 .title(value_col.to_string())
6075 .grid_color(Rgb::new(229, 229, 229))
6076 .show_grid(true));
6077
6078 plot.set_layout(layout);
6079 Ok(plot)
6080 }
6081
6082 pub async fn plot_scatterr(
6084 &self,
6085 x_col: &str,
6086 y_col: &str,
6087 marker_size: Option<usize>,
6088 ) -> ElusionResult<Plot> {
6089 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6090 let batch = &batches[0];
6091
6092 let x_idx = batch.schema().index_of(x_col)
6093 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", x_col, e)))?;
6094 let y_idx = batch.schema().index_of(y_col)
6095 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", y_col, e)))?;
6096
6097 let x_values: Vec<f64> = convert_to_f64_vec(batch.column(x_idx))?;
6098 let y_values: Vec<f64> = convert_to_f64_vec(batch.column(y_idx))?;
6099
6100 let trace = Scatter::new(x_values, y_values)
6101 .mode(Mode::Markers)
6102 .name(&format!("{} vs {}", y_col, x_col))
6103 .marker(Marker::new()
6104 .color(Rgb::new(55, 128, 191))
6105 .size(marker_size.unwrap_or(8)));
6106
6107 let mut plot = Plot::new();
6108 plot.add_trace(trace);
6109
6110 let layout = Layout::new()
6111 .title(format!("Scatter Plot: {} vs {}", y_col, x_col))
6112 .x_axis(Axis::new().title(x_col.to_string()))
6113 .y_axis(Axis::new().title(y_col.to_string()));
6114
6115 plot.set_layout(layout);
6116 Ok(plot)
6117 }
6118
6119 pub async fn plot_barr(
6121 &self,
6122 x_col: &str,
6123 y_col: &str,
6124 orientation: Option<&str>,
6125 title: Option<&str>,
6126 ) -> ElusionResult<Plot> {
6127 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6128 let batch = &batches[0];
6129
6130 let x_idx = batch.schema().index_of(x_col)
6131 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", x_col, e)))?;
6132 let y_idx = batch.schema().index_of(y_col)
6133 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", y_col, e)))?;
6134
6135 let (x_values, y_values) = if batch.column(x_idx).data_type() == &ArrowDataType::Utf8 {
6136 (convert_to_string_vec(batch.column(x_idx))?, convert_to_f64_vec(batch.column(y_idx))?)
6137 } else {
6138 (convert_to_string_vec(batch.column(y_idx))?, convert_to_f64_vec(batch.column(x_idx))?)
6139 };
6140
6141 let trace = match orientation.unwrap_or("v") {
6142 "h" => {
6143 Bar::new(x_values.clone(), y_values.clone())
6144 .orientation(Orientation::Horizontal)
6145 .name(&format!("{} by {}", y_col, x_col))
6146 },
6147 _ => {
6148 Bar::new(x_values, y_values)
6149 .orientation(Orientation::Vertical)
6150 .name(&format!("{} by {}", y_col, x_col))
6151 }
6152 };
6153
6154 let mut plot = Plot::new();
6155 plot.add_trace(trace);
6156
6157 let layout = Layout::new()
6158 .title(title.unwrap_or(&format!("Bar Chart: {} by {}", y_col, x_col)))
6159 .x_axis(Axis::new().title(x_col.to_string()))
6160 .y_axis(Axis::new().title(y_col.to_string()));
6161
6162 plot.set_layout(layout);
6163 Ok(plot)
6164 }
6165
6166 pub async fn plot_histogramm(
6168 &self,
6169 col: &str,
6170 bins: Option<usize>,
6171 title: Option<&str>
6172 ) -> ElusionResult<Plot> {
6173 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6174 let batch = &batches[0];
6175
6176 let idx = batch.schema().index_of(col)
6177 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", col, e)))?;
6178
6179 let values = convert_to_f64_vec(batch.column(idx))?;
6180
6181 let trace = Histogram::new(values)
6182 .name(col)
6183 .n_bins_x(bins.unwrap_or(30));
6184
6185 let mut plot = Plot::new();
6186 plot.add_trace(trace);
6187
6188 let layout = Layout::new()
6189 .title(title.unwrap_or(&format!("Histogram of {}", col)))
6190 .x_axis(Axis::new().title(col.to_string()))
6191 .y_axis(Axis::new().title("Count".to_string()));
6192
6193 plot.set_layout(layout);
6194 Ok(plot)
6195 }
6196
6197 pub async fn plot_boxx(
6199 &self,
6200 value_col: &str,
6201 group_by_col: Option<&str>,
6202 title: Option<&str>,
6203 ) -> ElusionResult<Plot> {
6204 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6205 let batch = &batches[0];
6206
6207 let value_idx = batch.schema().index_of(value_col)
6209 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", value_col, e)))?;
6210
6211 let values = convert_to_f64_vec(batch.column(value_idx))?;
6213
6214 let trace = if let Some(group_col) = group_by_col {
6215 let group_idx = batch.schema().index_of(group_col)
6217 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", group_col, e)))?;
6218
6219 let groups = convert_to_f64_vec(batch.column(group_idx))?;
6221
6222 BoxPlot::new(values)
6223 .x(groups) .name(value_col)
6225 } else {
6226 BoxPlot::new(values)
6227 .name(value_col)
6228 };
6229
6230 let mut plot = Plot::new();
6231 plot.add_trace(trace);
6232
6233 let layout = Layout::new()
6234 .title(title.unwrap_or(&format!("Distribution of {}", value_col)))
6235 .y_axis(Axis::new()
6236 .title(value_col.to_string())
6237 .grid_color(Rgb::new(229, 229, 229))
6238 .show_grid(true))
6239 .x_axis(Axis::new()
6240 .title(group_by_col.unwrap_or("").to_string())
6241 .grid_color(Rgb::new(229, 229, 229))
6242 .show_grid(true));
6243
6244 plot.set_layout(layout);
6245 Ok(plot)
6246 }
6247
6248 pub async fn plot_piee(
6250 &self,
6251 label_col: &str,
6252 value_col: &str,
6253 title: Option<&str>,
6254 ) -> ElusionResult<Plot> {
6255 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6256 let batch = &batches[0];
6257
6258 let label_idx = batch.schema().index_of(label_col)
6260 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", label_col, e)))?;
6261 let value_idx = batch.schema().index_of(value_col)
6262 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", value_col, e)))?;
6263
6264 let labels = convert_to_string_vec(batch.column(label_idx))?;
6266 let values = convert_to_f64_vec(batch.column(value_idx))?;
6267
6268 let trace = Pie::new(values)
6270 .labels(labels)
6271 .name(value_col)
6272 .hole(0.0);
6273
6274 let mut plot = Plot::new();
6275 plot.add_trace(trace);
6276
6277 let layout = Layout::new()
6279 .title(title.unwrap_or(&format!("Distribution of {}", value_col)))
6280 .show_legend(true);
6281
6282 plot.set_layout(layout);
6283 Ok(plot)
6284 }
6285
6286 pub async fn plot_donutt(
6288 &self,
6289 label_col: &str,
6290 value_col: &str,
6291 title: Option<&str>,
6292 hole_size: Option<f64>, ) -> ElusionResult<Plot> {
6294 let batches = self.df.clone().collect().await.map_err(ElusionError::DataFusion)?;
6295 let batch = &batches[0];
6296
6297 let label_idx = batch.schema().index_of(label_col)
6298 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", label_col, e)))?;
6299 let value_idx = batch.schema().index_of(value_col)
6300 .map_err(|e| ElusionError::Custom(format!("Column {} not found: {}", value_col, e)))?;
6301
6302 let labels = convert_to_string_vec(batch.column(label_idx))?;
6303 let values = convert_to_f64_vec(batch.column(value_idx))?;
6304
6305 let hole_size = hole_size.unwrap_or(0.5).max(0.0).min(1.0);
6307
6308 let trace = Pie::new(values)
6309 .labels(labels)
6310 .name(value_col)
6311 .hole(hole_size);
6312
6313 let mut plot = Plot::new();
6314 plot.add_trace(trace);
6315
6316 let layout = Layout::new()
6317 .title(title.unwrap_or(&format!("Distribution of {}", value_col)))
6318 .show_legend(true);
6319
6320 plot.set_layout(layout);
6321 Ok(plot)
6322 }
6323
6324 pub async fn plot_line(
6326 &self,
6327 date_col: &str,
6328 value_col: &str,
6329 show_markers: bool,
6330 title: Option<&str>,
6331 ) -> ElusionResult<Plot> {
6332 let mut plot = self.plot_linee(date_col, value_col, show_markers, title).await?;
6333
6334 let buttons = vec![
6336 Button::new()
6337 .name("1m")
6338 .args(json!({
6339 "xaxis.range": ["now-1month", "now"]
6340 }))
6341 .label("1m"),
6342 Button::new()
6343 .name("6m")
6344 .args(json!({
6345 "xaxis.range": ["now-6months", "now"]
6346 }))
6347 .label("6m"),
6348 Button::new()
6349 .name("1y")
6350 .args(json!({
6351 "xaxis.range": ["now-1year", "now"]
6352 }))
6353 .label("1y"),
6354 Button::new()
6355 .name("YTD")
6356 .args(json!({
6357 "xaxis.range": ["now-ytd", "now"]
6358 }))
6359 .label("YTD"),
6360 Button::new()
6361 .name("all")
6362 .args(json!({
6363 "xaxis.autorange": true
6364 }))
6365 .label("All")
6366 ];
6367
6368 let layout = plot.layout().clone()
6370 .x_axis(Axis::new()
6371 .title(date_col.to_string())
6372 .grid_color(Rgb::new(229, 229, 229))
6373 .show_grid(true)
6374 .type_(plotly::layout::AxisType::Date)
6375 .range_slider(RangeSlider::new().visible(true)))
6376 .update_menus(vec![
6377 UpdateMenu::new()
6378 .buttons(buttons)
6379 .direction(UpdateMenuDirection::Down)
6380 .show_active(true)
6381 ])
6382 .drag_mode(DragMode::Zoom);
6383
6384 plot.set_layout(layout);
6385 Ok(plot)
6386 }
6387 pub async fn plot_time_series(
6389 &self,
6390 date_col: &str,
6391 value_col: &str,
6392 show_markers: bool,
6393 title: Option<&str>,
6394 ) -> ElusionResult<Plot> {
6395 let mut plot = self.plot_time_seriess(date_col, value_col, show_markers, title).await?;
6396
6397 let buttons = vec![
6399 Button::new()
6400 .name("1m")
6401 .args(json!({
6402 "xaxis.range": ["now-1month", "now"]
6403 }))
6404 .label("1m"),
6405 Button::new()
6406 .name("6m")
6407 .args(json!({
6408 "xaxis.range": ["now-6months", "now"]
6409 }))
6410 .label("6m"),
6411 Button::new()
6412 .name("1y")
6413 .args(json!({
6414 "xaxis.range": ["now-1year", "now"]
6415 }))
6416 .label("1y"),
6417 Button::new()
6418 .name("YTD")
6419 .args(json!({
6420 "xaxis.range": ["now-ytd", "now"]
6421 }))
6422 .label("YTD"),
6423 Button::new()
6424 .name("all")
6425 .args(json!({
6426 "xaxis.autorange": true
6427 }))
6428 .label("All")
6429 ];
6430
6431 let layout = plot.layout().clone()
6433 .x_axis(Axis::new()
6434 .title(date_col.to_string())
6435 .grid_color(Rgb::new(229, 229, 229))
6436 .show_grid(true)
6437 .type_(plotly::layout::AxisType::Date)
6438 .range_slider(RangeSlider::new().visible(true)))
6439 .update_menus(vec![
6440 UpdateMenu::new()
6441 .buttons(buttons)
6442 .direction(UpdateMenuDirection::Down)
6443 .show_active(true)
6444 ])
6445 .drag_mode(DragMode::Zoom);
6446
6447 plot.set_layout(layout);
6448 Ok(plot)
6449 }
6450
6451 pub async fn plot_bar(
6453 &self,
6454 x_col: &str,
6455 y_col: &str,
6456 title: Option<&str>,
6457 ) -> ElusionResult<Plot> {
6458 let mut plot = self.plot_barr(x_col, y_col, None, title).await?;
6459
6460 let update_menu_buttons = vec![
6462 Button::new()
6463 .name("reset")
6464 .args(json!({
6465 "xaxis.type": "category",
6466 "xaxis.categoryorder": "trace"
6467 }))
6468 .label("Reset"),
6469 Button::new()
6470 .name("ascending")
6471 .args(json!({
6472 "xaxis.type": "category",
6473 "xaxis.categoryorder": "total ascending"
6474 }))
6475 .label("Sort Ascending"),
6476 Button::new()
6477 .name("descending")
6478 .args(json!({
6479 "xaxis.type": "category",
6480 "xaxis.categoryorder": "total descending"
6481 }))
6482 .label("Sort Descending")
6483 ];
6484
6485 let layout = plot.layout().clone()
6487 .show_legend(true)
6488 .update_menus(vec![
6489 UpdateMenu::new()
6490 .buttons(update_menu_buttons)
6491 .direction(UpdateMenuDirection::Down)
6492 .show_active(true)
6493 ]);
6494
6495 plot.set_layout(layout);
6496 Ok(plot)
6497 }
6498
6499 pub async fn plot_scatter(
6501 &self,
6502 x_col: &str,
6503 y_col: &str,
6504 marker_size: Option<usize>,
6505 ) -> ElusionResult<Plot> {
6506 let mut plot = self.plot_scatterr(x_col, y_col, marker_size).await?;
6507
6508 let mode_buttons = vec![
6510 Button::new()
6511 .name("zoom")
6512 .args(json!({
6513 "dragmode": "zoom"
6514 }))
6515 .label("Zoom"),
6516 Button::new()
6517 .name("select")
6518 .args(json!({
6519 "dragmode": "select"
6520 }))
6521 .label("Select"),
6522 Button::new()
6523 .name("pan")
6524 .args(json!({
6525 "dragmode": "pan"
6526 }))
6527 .label("Pan")
6528 ];
6529
6530 let layout = plot.layout().clone()
6532 .show_legend(true)
6533 .drag_mode(DragMode::Zoom)
6534 .update_menus(vec![
6535 UpdateMenu::new()
6536 .buttons(mode_buttons)
6537 .direction(UpdateMenuDirection::Down)
6538 .show_active(true)
6539 ]);
6540
6541 plot.set_layout(layout);
6542 Ok(plot)
6543 }
6544 pub async fn plot_histogram(
6546 &self,
6547 col: &str,
6548 title: Option<&str>,
6549 ) -> ElusionResult<Plot> {
6550 let mut plot = self.plot_histogramm(col, None, title).await?;
6551
6552 let bin_buttons = vec![
6554 Button::new()
6555 .name("bins10")
6556 .args(json!({
6557 "xbins.size": 10
6558 }))
6559 .label("10 Bins"),
6560 Button::new()
6561 .name("bins20")
6562 .args(json!({
6563 "xbins.size": 20
6564 }))
6565 .label("20 Bins"),
6566 Button::new()
6567 .name("bins30")
6568 .args(json!({
6569 "xbins.size": 30
6570 }))
6571 .label("30 Bins")
6572 ];
6573
6574 let layout = plot.layout().clone()
6575 .show_legend(true)
6576 .update_menus(vec![
6577 UpdateMenu::new()
6578 .buttons(bin_buttons)
6579 .direction(UpdateMenuDirection::Down)
6580 .show_active(true)
6581 ]);
6582
6583 plot.set_layout(layout);
6584 Ok(plot)
6585 }
6586 pub async fn plot_box(
6588 &self,
6589 value_col: &str,
6590 group_by_col: Option<&str>,
6591 title: Option<&str>,
6592 ) -> ElusionResult<Plot> {
6593 let mut plot = self.plot_boxx(value_col, group_by_col, title).await?;
6594
6595 let outlier_buttons = vec![
6597 Button::new()
6598 .name("show_outliers")
6599 .args(json!({
6600 "boxpoints": "outliers"
6601 }))
6602 .label("Show Outliers"),
6603 Button::new()
6604 .name("hide_outliers")
6605 .args(json!({
6606 "boxpoints": false
6607 }))
6608 .label("Hide Outliers")
6609 ];
6610
6611 let layout = plot.layout().clone()
6612 .show_legend(true)
6613 .update_menus(vec![
6614 UpdateMenu::new()
6615 .buttons(outlier_buttons)
6616 .direction(UpdateMenuDirection::Down)
6617 .show_active(true)
6618 ]);
6619
6620 plot.set_layout(layout);
6621 Ok(plot)
6622 }
6623 pub async fn plot_pie(
6625 &self,
6626 label_col: &str,
6627 value_col: &str,
6628 title: Option<&str>,
6629 ) -> ElusionResult<Plot> {
6630 let mut plot = self.plot_piee(label_col, value_col, title).await?;
6631
6632 let display_buttons = vec![
6634 Button::new()
6635 .name("percentage")
6636 .args(json!({
6637 "textinfo": "percent"
6638 }))
6639 .label("Show Percentages"),
6640 Button::new()
6641 .name("values")
6642 .args(json!({
6643 "textinfo": "value"
6644 }))
6645 .label("Show Values"),
6646 Button::new()
6647 .name("both")
6648 .args(json!({
6649 "textinfo": "value+percent"
6650 }))
6651 .label("Show Both")
6652 ];
6653
6654 let layout = plot.layout().clone()
6655 .show_legend(true)
6656 .update_menus(vec![
6657 UpdateMenu::new()
6658 .buttons(display_buttons)
6659 .direction(UpdateMenuDirection::Down)
6660 .show_active(true)
6661 ]);
6662
6663 plot.set_layout(layout);
6664 Ok(plot)
6665 }
6666 pub async fn plot_donut(
6668 &self,
6669 label_col: &str,
6670 value_col: &str,
6671 title: Option<&str>,
6672 ) -> ElusionResult<Plot> {
6673 let mut plot = self.plot_donutt(label_col, value_col, title, Some(0.5)).await?;
6674
6675 let hole_buttons = vec![
6677 Button::new()
6678 .name("small")
6679 .args(json!({
6680 "hole": 0.3
6681 }))
6682 .label("Small Hole"),
6683 Button::new()
6684 .name("medium")
6685 .args(json!({
6686 "hole": 0.5
6687 }))
6688 .label("Medium Hole"),
6689 Button::new()
6690 .name("large")
6691 .args(json!({
6692 "hole": 0.7
6693 }))
6694 .label("Large Hole")
6695 ];
6696
6697 let layout = plot.layout().clone()
6698 .show_legend(true)
6699 .update_menus(vec![
6700 UpdateMenu::new()
6701 .buttons(hole_buttons)
6702 .direction(UpdateMenuDirection::Down)
6703 .show_active(true)
6704 ]);
6705
6706 plot.set_layout(layout);
6707 Ok(plot)
6708 }
6709 pub async fn create_report(
6711 plots: Option<&[(&Plot, &str)]>,
6712 tables: Option<&[(&CustomDataFrame, &str)]>,
6713 report_title: &str,
6714 filename: &str, layout_config: Option<ReportLayout>,
6716 table_options: Option<TableOptions>,
6717 ) -> ElusionResult<()> {
6718
6719 if let Some(parent) = LocalPath::new(filename).parent() {
6720 if !parent.exists() {
6721 fs::create_dir_all(parent)?;
6722 }
6723 }
6724
6725 let file_path_str = LocalPath::new(filename).to_str()
6726 .ok_or_else(|| ElusionError::Custom("Invalid path".to_string()))?;
6727
6728 let layout = layout_config.unwrap_or_default();
6730
6731 let plot_containers = plots.map(|plots| {
6733 plots.iter().enumerate()
6734 .map(|(i, (plot, title))| format!(
6735 r#"<div class="plot-container"
6736 data-plot-data='{}'
6737 data-plot-layout='{}'>
6738 <div class="plot-title">{}</div>
6739 <div id="plot_{}" style="width:100%;height:{}px;"></div>
6740 </div>"#,
6741 serde_json::to_string(plot.data()).unwrap(),
6742 serde_json::to_string(plot.layout()).unwrap(),
6743 title,
6744 i,
6745 layout.plot_height
6746 ))
6747 .collect::<Vec<_>>()
6748 .join("\n")
6749 }).unwrap_or_default();
6750
6751 let table_containers = if let Some(tables) = tables {
6753
6754 let table_op = TableOptions::default();
6755 let table_opts = table_options.as_ref().unwrap_or(&table_op);
6756
6757 let mut containers = Vec::new();
6758 for (i, (df, title)) in tables.iter().enumerate() {
6759 let batches = df.df.clone().collect().await?;
6761 let schema = df.df.schema();
6762 let columns = schema.fields().iter()
6763 .map(|f| {
6764 let base_def = format!(
6765 r#"{{
6766 field: "{}",
6767 headerName: "{}",
6768 sortable: true,
6769 filter: true,
6770 resizable: true"#,
6771 f.name(),
6772 f.name()
6773 );
6774
6775 let column_def = match f.data_type() {
6777 ArrowDataType::Date32 | ArrowDataType::Date64 | ArrowDataType::Timestamp(_, _) => {
6778 format!(
6779 r#"{},
6780 filter: 'agDateColumnFilter',
6781 filterParams: {{
6782 browserDatePicker: true,
6783 minValidYear: 1000,
6784 maxValidYear: 9999
6785 }}"#,
6786 base_def
6787 )
6788 },
6789 ArrowDataType::Utf8 if f.name().to_lowercase().contains("date") ||
6790 f.name().to_lowercase().contains("time") => {
6791 format!(
6792 r#"{},
6793 filter: 'agDateColumnFilter',
6794 filterParams: {{
6795 browserDatePicker: true,
6796 minValidYear: 1000,
6797 maxValidYear: 9999,
6798 comparator: (filterValue, cellValue) => {{
6799 try {{
6800 const filterDate = new Date(filterValue);
6801 const cellDate = new Date(cellValue);
6802 if (!isNaN(filterDate) && !isNaN(cellDate)) {{
6803 return cellDate - filterDate;
6804 }}
6805 }} catch (e) {{}}
6806 return 0;
6807 }}
6808 }}"#,
6809 base_def
6810 )
6811 },
6812 _ => base_def,
6813 };
6814
6815 format!("{}}}", column_def)
6817 })
6818 .collect::<Vec<_>>()
6819 .join(",");
6820
6821 let mut rows = Vec::new();
6823 for batch in &batches {
6824 for row_idx in 0..batch.num_rows() {
6825 let mut row = serde_json::Map::new();
6826 for (col_idx, field) in batch.schema().fields().iter().enumerate() {
6827 let col = batch.column(col_idx);
6828 let value = match col.data_type() {
6829 ArrowDataType::Int32 => {
6830 let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
6831 if array.is_null(row_idx) {
6832 serde_json::Value::Null
6833 } else {
6834 serde_json::Value::Number(array.value(row_idx).into())
6835 }
6836 },
6837 ArrowDataType::Int64 => {
6838 let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
6839 if array.is_null(row_idx) {
6840 serde_json::Value::Null
6841 } else {
6842 serde_json::Value::Number(array.value(row_idx).into())
6843 }
6844 },
6845 ArrowDataType::Float64 => {
6846 let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
6847 if array.is_null(row_idx) {
6848 serde_json::Value::Null
6849 } else {
6850 let num = array.value(row_idx);
6851 if num.is_finite() {
6852 serde_json::Number::from_f64(num)
6853 .map(serde_json::Value::Number)
6854 .unwrap_or(serde_json::Value::Null)
6855 } else {
6856 serde_json::Value::Null
6857 }
6858 }
6859 },
6860 ArrowDataType::Date32 => {
6861 let array = col.as_any().downcast_ref::<Date32Array>().unwrap();
6862 if array.is_null(row_idx) {
6863 serde_json::Value::Null
6864 } else {
6865 let days = array.value(row_idx);
6866 let date = chrono::NaiveDate::from_num_days_from_ce_opt(days + 719163)
6868 .unwrap_or(chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap());
6869 let datetime = date.and_hms_opt(0, 0, 0).unwrap();
6870 serde_json::Value::String(datetime.format("%Y-%m-%d").to_string())
6871 }
6872 },
6873 ArrowDataType::Date64 => {
6874 let array = col.as_any().downcast_ref::<Date64Array>().unwrap();
6875 if array.is_null(row_idx) {
6876 serde_json::Value::Null
6877 } else {
6878 let ms = array.value(row_idx);
6879 let datetime = chrono::DateTime::from_timestamp_millis(ms)
6880 .unwrap_or_default()
6881 .naive_utc();
6882 serde_json::Value::String(datetime.format("%Y-%m-%d %H:%M:%S").to_string())
6883 }
6884 },
6885 ArrowDataType::Timestamp(time_unit, None) => {
6886 let array = col.as_any().downcast_ref::<TimestampNanosecondArray>().unwrap();
6887 if array.is_null(row_idx) {
6888 serde_json::Value::Null
6889 } else {
6890 let ts = array.value(row_idx);
6891 let datetime = match time_unit {
6892 TimeUnit::Second => chrono::DateTime::from_timestamp(ts, 0),
6893 TimeUnit::Millisecond => chrono::DateTime::from_timestamp_millis(ts),
6894 TimeUnit::Microsecond => chrono::DateTime::from_timestamp_micros(ts),
6895 TimeUnit::Nanosecond => chrono::DateTime::from_timestamp(
6896 ts / 1_000_000_000,
6897 (ts % 1_000_000_000) as u32
6898 ),
6899 }.unwrap_or_default().naive_utc();
6900 serde_json::Value::String(datetime.format("%Y-%m-%d %H:%M:%S").to_string())
6901 }
6902 },
6903 ArrowDataType::Utf8 => {
6904 let array = col.as_any().downcast_ref::<StringArray>().unwrap();
6905 if array.is_null(row_idx) {
6906 serde_json::Value::Null
6907 } else {
6908 let value = array.value(row_idx);
6909 if field.name().to_lowercase().contains("date") ||
6911 field.name().to_lowercase().contains("time") {
6912 if let Some(datetime) = parse_date_string(value) {
6913 serde_json::Value::String(
6914 datetime.format("%Y-%m-%d %H:%M:%S").to_string()
6915 )
6916 } else {
6917 serde_json::Value::String(value.to_string())
6918 }
6919 } else {
6920 serde_json::Value::String(value.to_string())
6921 }
6922 }
6923 },
6924 _ => serde_json::Value::Null,
6925 };
6926 row.insert(field.name().clone(), value);
6927 }
6928 rows.push(serde_json::Value::Object(row));
6929 }
6930 }
6931
6932 let container = format!(
6933 r#"<div class="table-container">
6934 <div class="table-title">{0}</div>
6935 <div id="grid_{1}" class="{2}" style="width:100%;height:{3}px;">
6936 <!-- AG Grid will be rendered here -->
6937 </div>
6938 <script>
6939 (function() {{
6940 console.log('Initializing grid_{1}');
6941
6942 // Column definitions with more detailed configuration
6943 const columnDefs = [{4}];
6944 console.log('Column definitions:', columnDefs);
6945
6946 const rowData = {5};
6947 console.log('Row data:', rowData);
6948
6949 // Grid options with more features
6950 const gridOptions = {{
6951 columnDefs: columnDefs,
6952 rowData: rowData,
6953 pagination: {6},
6954 paginationPageSize: {7},
6955 defaultColDef: {{
6956 flex: 1,
6957 minWidth: 100,
6958 sortable: {8},
6959 filter: {9},
6960 floatingFilter: true,
6961 resizable: true,
6962 cellClass: 'ag-cell-font-size'
6963 }},
6964 onGridReady: function(params) {{
6965 console.log('Grid Ready event fired for grid_{1}');
6966 params.api.sizeColumnsToFit();
6967 const event = new CustomEvent('gridReady');
6968 gridDiv.dispatchEvent(event);
6969 }},
6970 enableRangeSelection: true,
6971 enableCharts: true,
6972 popupParent: document.body,
6973 // Add styling options
6974 headerClass: "ag-header-cell",
6975 rowClass: "ag-row-font-size",
6976 sideBar: {{
6977 toolPanels: ['columns', 'filters'],
6978 defaultToolPanel: '',
6979 hiddenByDefault: {10}
6980 }}
6981 }};
6982
6983 // Initialize AG Grid
6984 const gridDiv = document.querySelector('#grid_{1}');
6985 console.log('Grid container:', gridDiv);
6986 console.log('AG Grid loaded:', typeof agGrid !== 'undefined');
6987
6988 if (!gridDiv) {{
6989 console.error('Grid container not found for grid_{1}');
6990 return;
6991 }}
6992
6993 try {{
6994 new agGrid.Grid(gridDiv, gridOptions);
6995 gridDiv.gridOptions = gridOptions;
6996 }} catch (error) {{
6997 console.error('Error initializing AG Grid:', error);
6998 }}
6999 }})();
7000 </script>
7001 </div>"#,
7002 title, i, table_opts.theme, layout.table_height, columns, serde_json::to_string(&rows).unwrap_or_default(), table_opts.pagination, table_opts.page_size, table_opts.enable_sorting, table_opts.enable_filtering, !table_opts.enable_column_menu );
7014 containers.push(container);
7015 }
7016 containers.join("\n")
7017 } else {
7018 String::new()
7019 };
7020
7021 let html_content = format!(
7022 r#"<!DOCTYPE html>
7023 <html>
7024 <head>
7025 <title>{0}</title>
7026 {1}
7027 {2}
7028 <style>
7029 body {{
7030 font-family: Arial, sans-serif;
7031 margin: 0;
7032 padding: 20px;
7033 background-color: #f5f5f5;
7034 }}
7035 .container {{
7036 max-width: {3}px;
7037 margin: 0 auto;
7038 background-color: white;
7039 padding: 20px;
7040 border-radius: 8px;
7041 box-shadow: 0 2px 4px rgba(0,0,0,0.1);
7042 }}
7043 h1 {{
7044 color: #333;
7045 text-align: center;
7046 margin-bottom: 30px;
7047 }}
7048 .controls {{
7049 margin-bottom: 20px;
7050 padding: 15px;
7051 background: #f8f9fa;
7052 border-radius: 8px;
7053 display: flex;
7054 gap: 10px;
7055 justify-content: center;
7056 }}
7057 .controls button {{
7058 padding: 8px 16px;
7059 border: none;
7060 border-radius: 4px;
7061 background: #007bff;
7062 color: white;
7063 cursor: pointer;
7064 transition: background 0.2s;
7065 }}
7066 .controls button:hover {{
7067 background: #0056b3;
7068 }}
7069 .controls button {{
7070 padding: 8px 16px;
7071 border: none;
7072 border-radius: 4px;
7073 background: #007bff;
7074 color: white;
7075 cursor: pointer;
7076 transition: background 0.2s;
7077 }}
7078 .controls button:hover {{
7079 background: #0056b3;
7080 }}
7081 .controls button.export-button {{
7082 background: #28a745;
7083 }}
7084 .controls button.export-button:hover {{
7085 background: #218838;
7086 }}
7087 .grid {{
7088 display: grid;
7089 grid-template-columns: repeat({4}, 1fr);
7090 gap: {5}px;
7091 }}
7092 .plot-container, .table-container {{
7093 background: white;
7094 padding: 15px;
7095 border-radius: 8px;
7096 box-shadow: 0 1px 3px rgba(0,0,0,0.1);
7097 }}
7098 .plot-title, .table-title {{
7099 font-size: 18px;
7100 font-weight: bold;
7101 margin-bottom: 10px;
7102 color: #444;
7103 }}
7104 @media (max-width: 768px) {{
7105 .grid {{
7106 grid-template-columns: 1fr;
7107 }}
7108 }}
7109 .loading {{
7110 display: none;
7111 position: fixed;
7112 top: 50%;
7113 left: 50%;
7114 transform: translate(-50%, -50%);
7115 background: rgba(255,255,255,0.9);
7116 padding: 20px;
7117 border-radius: 8px;
7118 box-shadow: 0 2px 4px rgba(0,0,0,0.1);
7119 }}
7120 .ag-cell-font-size {{
7121 font-size: 17px;
7122 }}
7123
7124 .ag-cell-bold {{
7125 font-weight: bold;
7126 }}
7127
7128 .ag-header-cell {{
7129 font-weight: bold;
7130 border-bottom: 1px solid #3fdb59;
7131 }}
7132
7133 .align-right {{
7134 text-align: left;
7135 }}
7136
7137 .ag-theme-alpine {{
7138 --ag-font-size: 17px;
7139 --ag-header-height: 40px;
7140 }}
7141 </style>
7142 </head>
7143 <body>
7144 <div class="container">
7145 <h1>{0}</h1>
7146 <div class="controls">
7147 {6}
7148 </div>
7149 <div id="loading" class="loading">Processing...</div>
7150 {7}
7151 {8}
7152 </div>
7153 <script>
7154 {9}
7155 </script>
7156 </body>
7157 </html>"#,
7158 report_title, if plots.is_some() { r#"<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>"#
7161 } else { "" },
7162 if tables.is_some() { r#"
7164 <script>
7165 // Check if AG Grid is already loaded
7166 console.log('AG Grid script loading status:', typeof agGrid !== 'undefined');
7167 </script>
7168 <script src="https://cdn.jsdelivr.net/npm/ag-grid-community@31.0.1/dist/ag-grid-community.min.js"></script>
7169 <script src="https://cdnjs.cloudflare.com/ajax/libs/xlsx/0.18.5/xlsx.full.min.js"></script>
7170 <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/ag-grid-community@31.0.1/styles/ag-grid.css">
7171 <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/ag-grid-community@31.0.1/styles/ag-theme-alpine.css">
7172 <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/ag-grid-community@31.0.1/styles/ag-theme-quartz.css">
7173 <script>
7174 // Verify AG Grid loaded correctly
7175 document.addEventListener('DOMContentLoaded', function() {
7176 console.log('AG Grid loaded check:', typeof agGrid !== 'undefined');
7177 console.log('XLSX loaded check:', typeof XLSX !== 'undefined');
7178 });
7179 </script>
7180 "#
7181 } else { "" },
7182 layout.max_width, layout.grid_columns, layout.grid_gap, generate_controls(plots.is_some(), tables.is_some()), if !plot_containers.is_empty() { format!(r#"<div class="grid">{}</div>"#, plot_containers)
7188 } else { String::new() },
7189 if !table_containers.is_empty() { format!(r#"<div class="tables">{}</div>"#, table_containers)
7191 } else { String::new() },
7192 generate_javascript(plots.is_some(), tables.is_some(), layout.grid_columns) );
7194
7195 std::fs::write(file_path_str, html_content)?;
7196 println!("✅ Interactive Dashboard created at {}", file_path_str);
7197 Ok(())
7198 }
7199
7200}
7201#[derive(Clone)]
7203pub struct PipelineScheduler {
7204 scheduler: JobScheduler,
7205}
7206
7207#[derive(Debug)]
7208pub enum SchedulerError {
7209 InvalidTime(String),
7210 InvalidFrequency(String),
7211 JobFailed(String),
7212}
7213
7214impl PipelineScheduler {
7215 pub async fn new<F, Fut>(frequency: &str, job: F) -> ElusionResult<Self>
7217 where
7218 F: Fn() -> Fut + Send + Sync + 'static,
7219 Fut: Future<Output = ElusionResult<()>> + Send + 'static
7220{
7221 println!("Initializing JobScheduler");
7222
7223 let scheduler = JobScheduler::new().await
7224 .map_err(|e| ElusionError::Custom(format!("Scheduler init failed: {}", e)))?;
7225 println!("Jobs are scheduled, and will run with frequency: '{}'", frequency);
7226
7227 let cron = Self::parse_schedule(frequency)?;
7228 let job_fn = Arc::new(job);
7231
7232 let job = Job::new_async(&cron, move |uuid, mut l| {
7233 let job_fn = job_fn.clone();
7234 Box::pin(async move {
7235 let future = job_fn();
7236 future.await.unwrap_or_else(|e| eprintln!("❌ Job execution failed: {}", e));
7237
7238 let next_tick = l.next_tick_for_job(uuid).await;
7239 match next_tick {
7240 Ok(Some(ts)) => println!("Next job execution: {:?} UTC Time", ts),
7241 _ => println!("Could not determine next job execution"),
7242 }
7243 })
7244 }).map_err(|e| ElusionError::Custom(format!("❌ Job creation failed: {}", e)))?;
7245
7246
7247 scheduler.add(job).await
7248 .map_err(|e| ElusionError::Custom(format!("❌ Job scheduling failed: {}", e)))?;
7249
7250 scheduler.start().await
7251 .map_err(|e| ElusionError::Custom(format!("❌ Scheduler start failed: {}", e)))?;
7252
7253 println!("JobScheduler successfully initialized and started.");
7254
7255 Ok(Self { scheduler })
7256 }
7257
7258 fn parse_schedule(frequency: &str) -> ElusionResult<String> {
7259 let cron = match frequency.to_lowercase().as_str() {
7260 "1min" => "0 */1 * * * *".to_string(),
7261 "2min" => "0 */2 * * * *".to_string(),
7262 "5min" => "0 */5 * * * *".to_string(),
7263 "10min" => "0 */10 * * * *".to_string(),
7264 "15min" => "0 */15 * * * *".to_string(),
7265 "30min" => "0 */30 * * * *".to_string(),
7266 "1h" => "0 0 * * * *".to_string(),
7267 "2h" => "0 0 */2 * * *".to_string(),
7268 "3h" => "0 0 */3 * * *".to_string(),
7269 "4h" => "0 0 */4 * * *".to_string(),
7270 "5h" => "0 0 */5 * * *".to_string(),
7271 "6h" => "0 0 */6 * * *".to_string(),
7272 "7h" => "0 0 */7 * * *".to_string(),
7273 "8h" => "0 0 */8 * * *".to_string(),
7274 "9h" => "0 0 */9 * * *".to_string(),
7275 "10h" => "0 0 */10 * * *".to_string(),
7276 "11h" => "0 0 */11 * * *".to_string(),
7277 "12h" => "0 0 */12 * * *".to_string(),
7278 "24h" => "0 0 0 * * *".to_string(),
7279 "2days" => "0 0 0 */2 * *".to_string(),
7280 "3days" => "0 0 0 */3 * *".to_string(),
7281 "4days" => "0 0 0 */4 * *".to_string(),
7282 "5days" => "0 0 0 */5 * *".to_string(),
7283 "6days" => "0 0 0 */6 * *".to_string(),
7284 "7days" => "0 0 0 */7 * *".to_string(),
7285 "14days" => "0 0 0 */14 * *".to_string(),
7286 "30days" => "0 0 1 */1 * *".to_string(),
7287 _ => return Err(ElusionError::Custom(
7288 "Invalid frequency. Use: 1min,2min,5min,10min,15min,30min,
7289 1h,2h,3h,4h,5h,6h,7h,8h,9h,10h,11h,12h,24h,
7290 2days,3days,4days,5days,6days,7days,14days,30days".into()
7291 ))
7292 };
7293
7294 Ok(cron)
7295 }
7296 pub async fn shutdown(mut self) -> ElusionResult<()> {
7298 println!("Shutdown is ready if needed with -> Ctr+C");
7299 tokio::signal::ctrl_c().await
7300 .map_err(|e| ElusionError::Custom(format!("❌ Ctrl+C handler failed: {}", e)))?;
7301 self.scheduler.shutdown().await
7302 .map_err(|e| ElusionError::Custom(format!("❌ Shutdown failed: {}", e)))
7303 }
7304}
7305
7306
7307#[derive(Clone)]
7309pub struct ElusionApi;
7310
7311enum JsonType {
7312 Array,
7313 Object,
7314}
7315
7316fn validate_https_url(url: &str) -> ElusionResult<()> {
7317 if !url.starts_with("https://") {
7318 return Err(ElusionError::Custom("URL must start with 'https://'".to_string()));
7319 }
7320 Ok(())
7321}
7322
7323impl ElusionApi{
7324
7325 pub fn new () -> Self {
7326 Self
7327 }
7328
7329pub async fn from_api(
7331 &self,
7332 url: &str,
7333 file_path: &str
7334) -> ElusionResult<()> {
7335 validate_https_url(url)?;
7336 let client = Client::new();
7337 let response = client.get(url)
7338 .send()
7339 .await
7340 .map_err(|e| ElusionError::Custom(format!("❌ HTTP request failed: {}", e)))?;
7341
7342 let content = response.bytes()
7343 .await
7344 .map_err(|e| ElusionError::Custom(format!("Failed to get response content: {}", e)))?;
7345
7346 println!("Generated URL: {}", url);
7347
7348 Self::save_json_to_file(content, file_path).await
7349}
7350
7351pub async fn from_api_with_headers(
7353 &self,
7354 url: &str,
7355 headers: HashMap<String, String>,
7356 file_path: &str
7357) -> ElusionResult<()> {
7358 validate_https_url(url)?;
7359 let client = Client::new();
7360 let mut request = client.get(url);
7361
7362 for (key, value) in headers {
7363 request = request.header(&key, value);
7364 }
7365
7366 let response = request
7367 .send()
7368 .await
7369 .map_err(|e| ElusionError::Custom(format!("❌ HTTP request failed: {}", e)))?;
7370
7371 let content = response.bytes()
7372 .await
7373 .map_err(|e| ElusionError::Custom(format!("Failed to get response content: {}", e)))?;
7374 println!("Generated URL: {}", url);
7375
7376 Self::save_json_to_file(content, file_path).await
7377}
7378
7379pub async fn from_api_with_params(
7381 &self,
7382 base_url: &str,
7383 params: HashMap<&str, &str>,
7384 file_path: &str
7385) -> ElusionResult<()> {
7386 validate_https_url(base_url)?;
7387
7388 if params.is_empty() {
7389 return Self::from_api( &self, base_url, file_path).await;
7390 }
7391
7392 let query_string: String = params
7393 .iter()
7394 .map(|(k, v)| {
7395 if v.contains(' ') {
7397 format!("{}={}", k, v)
7398 } else {
7399 format!("{}={}", urlencoding::encode(k), urlencoding::encode(v))
7400 }
7401 })
7402 .collect::<Vec<String>>()
7403 .join("&");
7404
7405 let url = format!("{}?{}", base_url, query_string);
7406
7407 Self::from_api( &self, &url, file_path).await
7408}
7409
7410pub async fn from_api_with_params_and_headers(
7412 &self,
7413 base_url: &str,
7414 params: HashMap<&str, &str>,
7415 headers: HashMap<String, String>,
7416 file_path: &str
7417) -> ElusionResult<()> {
7418 if params.is_empty() {
7419 return Self::from_api_with_headers( &self, base_url, headers, file_path).await;
7420 }
7421
7422 let query_string: String = params
7423 .iter()
7424 .map(|(k, v)| {
7425 if !k.contains(' ') && !v.contains(' ') {
7427 format!("{}={}",
7428 urlencoding::encode(k),
7429 urlencoding::encode(v)
7430 )
7431 } else {
7432 format!("{}={}", k, v)
7433 }
7434 })
7435 .collect::<Vec<String>>()
7436 .join("&");
7437
7438 let url = format!("{}?{}", base_url, query_string);
7439
7440 Self::from_api_with_headers( &self, &url, headers, file_path).await
7441}
7442
7443pub async fn from_api_with_dates(
7445 &self,
7446 base_url: &str,
7447 from_date: &str,
7448 to_date: &str,
7449 file_path: &str
7450) -> ElusionResult<()> {
7451 let url = format!("{}?from={}&to={}",
7452 base_url,
7453 if from_date.contains(' ') { from_date.to_string() } else { urlencoding::encode(from_date).to_string() },
7455 if to_date.contains(' ') { to_date.to_string() } else { urlencoding::encode(to_date).to_string() }
7456 );
7457
7458 Self::from_api( &self, &url, file_path).await
7459}
7460
7461pub async fn from_api_with_pagination(
7463 &self,
7464 base_url: &str,
7465 page: u32,
7466 per_page: u32,
7467 file_path: &str
7468) -> ElusionResult<()> {
7469 let url = format!("{}?page={}&per_page={}", base_url, page, per_page);
7470
7471 Self::from_api( &self, &url, file_path).await
7472}
7473
7474pub async fn from_api_with_sort(
7476 &self,
7477 base_url: &str,
7478 sort_field: &str,
7479 order: &str,
7480 file_path: &str
7481) -> ElusionResult<()> {
7482 let url = format!("{}?sort={}&order={}",
7483 base_url,
7484 if sort_field.contains(' ') { sort_field.to_string() } else { urlencoding::encode(sort_field).to_string() },
7485 if order.contains(' ') { order.to_string() } else { urlencoding::encode(order).to_string() }
7486 );
7487
7488 Self::from_api( &self, &url, file_path).await
7489}
7490
7491pub async fn from_api_with_headers_and_sort(
7493 &self,
7494 base_url: &str,
7495 headers: HashMap<String, String>,
7496 sort_field: &str,
7497 order: &str,
7498 file_path: &str
7499) -> ElusionResult<()> {
7500 validate_https_url(base_url)?;
7501
7502 let url = format!("{}?sort={}&order={}",
7503 base_url,
7504 if sort_field.contains(' ') { sort_field.to_string() } else { urlencoding::encode(sort_field).to_string() },
7505 if order.contains(' ') { order.to_string() } else { urlencoding::encode(order).to_string() }
7506 );
7507
7508 Self::from_api_with_headers(&self, &url, headers, file_path).await
7509}
7510
7511async fn save_json_to_file(content: Bytes, file_path: &str) -> ElusionResult<()> {
7513
7514 if content.is_empty() {
7515 return Err(ElusionError::InvalidOperation {
7516 operation: "JSON Processing".to_string(),
7517 reason: "Empty content provided".to_string(),
7518 suggestion: "💡 Ensure API response contains data".to_string(),
7519 });
7520 }
7521
7522 let reader = std::io::BufReader::new(content.as_ref());
7523 let stream = serde_json::Deserializer::from_reader(reader).into_iter::<Value>();
7524 let mut stream = stream.peekable();
7525
7526 let json_type = match stream.peek() {
7527 Some(Ok(Value::Array(_))) => JsonType::Array,
7528 Some(Ok(Value::Object(_))) => JsonType::Object,
7529 Some(Ok(_)) => return Err(ElusionError::InvalidOperation {
7530 operation: "JSON Validation".to_string(),
7531 reason: "Invalid JSON structure".to_string(),
7532 suggestion: "💡 JSON must be either an array or object at root level".to_string(),
7533 }),
7534 Some(Err(e)) => return Err(ElusionError::InvalidOperation {
7535 operation: "JSON Parsing".to_string(),
7536 reason: format!("JSON syntax error: {}", e),
7537 suggestion: "💡 Check if the JSON content is well-formed".to_string(),
7538 }),
7539 None => return Err(ElusionError::InvalidOperation {
7540 operation: "JSON Reading".to_string(),
7541 reason: "Empty or invalid JSON content".to_string(),
7542 suggestion: "💡 Verify the API response contains valid JSON data".to_string(),
7543 }),
7544 };
7545
7546 let path = LocalPath::new(file_path);
7548 if let Some(parent) = path.parent() {
7549 fs::create_dir_all(parent).map_err(|e| ElusionError::WriteError {
7550 path: parent.display().to_string(),
7551 operation: "Directory Creation".to_string(),
7552 reason: e.to_string(),
7553 suggestion: "💡 Check directory permissions and path validity".to_string(),
7554 })?;
7555 }
7556
7557 let file = OpenOptions::new()
7559 .create(true)
7560 .write(true)
7561 .truncate(true)
7562 .open(file_path)
7563 .map_err(|e| ElusionError::WriteError {
7564 path: file_path.to_string(),
7565 operation: "File Creation".to_string(),
7566 reason: e.to_string(),
7567 suggestion: "💡 Verify file path and write permissions".to_string(),
7568 })?;
7569
7570 let mut writer = std::io::BufWriter::new(file);
7571 let mut first = true;
7572 let mut items_written = 0;
7573
7574 match json_type {
7575 JsonType::Array => {
7576 writeln!(writer, "[").map_err(|e| ElusionError::WriteError {
7577 path: file_path.to_string(),
7578 operation: "Array Start".to_string(),
7579 reason: e.to_string(),
7580 suggestion: "💡 Check disk space and write permissions".to_string(),
7581 })?;
7582
7583 for value in stream {
7584 match value {
7585 Ok(Value::Array(array)) => {
7586 for item in array {
7587 if !first {
7588 writeln!(writer, ",").map_err(|e| ElusionError::WriteError {
7589 path: file_path.to_string(),
7590 operation: "Array Separator".to_string(),
7591 reason: e.to_string(),
7592 suggestion: "💡 Check disk space and write permissions".to_string(),
7593 })?;
7594 }
7595 first = false;
7596 items_written += 1;
7597
7598 serde_json::to_writer_pretty(&mut writer, &item)
7599 .map_err(|e| ElusionError::WriteError {
7600 path: file_path.to_string(),
7601 operation: format!("Write Array Item {}", items_written),
7602 reason: format!("JSON serialization error: {}", e),
7603 suggestion: "💡 Check if item contains valid JSON data".to_string(),
7604 })?;
7605 }
7606 }
7607 Ok(_) => continue,
7608 Err(e) => return Err(ElusionError::InvalidOperation {
7609 operation: "Array Processing".to_string(),
7610 reason: format!("Failed to parse array item: {}", e),
7611 suggestion: "💡 Verify JSON array structure is valid".to_string(),
7612 }),
7613 }
7614 }
7615 writeln!(writer, "\n]").map_err(|e| ElusionError::WriteError {
7616 path: file_path.to_string(),
7617 operation: "Array End".to_string(),
7618 reason: e.to_string(),
7619 suggestion: "💡 Check disk space and write permissions".to_string(),
7620 })?;
7621 }
7622 JsonType::Object => {
7623 for value in stream {
7624 match value {
7625 Ok(Value::Object(map)) => {
7626 items_written += 1;
7627 serde_json::to_writer_pretty(&mut writer, &Value::Object(map))
7628 .map_err(|e| ElusionError::WriteError {
7629 path: file_path.to_string(),
7630 operation: format!("Write Object {}", items_written),
7631 reason: format!("JSON serialization error: {}", e),
7632 suggestion: "💡 Check if object contains valid JSON data".to_string(),
7633 })?;
7634 }
7635 Ok(_) => return Err(ElusionError::InvalidOperation {
7636 operation: "Object Processing".to_string(),
7637 reason: "Non-object value in object stream".to_string(),
7638 suggestion: "💡 Ensure all items are valid JSON objects".to_string(),
7639 }),
7640 Err(e) => return Err(ElusionError::InvalidOperation {
7641 operation: "Object Processing".to_string(),
7642 reason: format!("Failed to parse object: {}", e),
7643 suggestion: "💡 Verify JSON object structure is valid".to_string(),
7644 }),
7645 }
7646 }
7647 }
7648 }
7649
7650 writer.flush().map_err(|e| ElusionError::WriteError {
7651 path: file_path.to_string(),
7652 operation: "File Finalization".to_string(),
7653 reason: e.to_string(),
7654 suggestion: "💡 Check disk space and write permissions".to_string(),
7655 })?;
7656
7657 println!("✅ Successfully created {} with {} items", file_path, items_written);
7658
7659 if items_written == 0 {
7660 println!("⚠️ Warning: No items were written to the file. Check if this is expected.");
7661 }
7662
7663 Ok(())
7664}
7665
7666}
7667
7668#[derive(Debug, Clone)]
7669pub struct ReportLayout {
7670 pub grid_columns: usize, pub grid_gap: usize, pub max_width: usize, pub plot_height: usize, pub table_height: usize, }
7676
7677impl Default for ReportLayout {
7678 fn default() -> Self {
7679 Self {
7680 grid_columns: 2,
7681 grid_gap: 20,
7682 max_width: 1200,
7683 plot_height: 400,
7684 table_height: 400,
7685 }
7686 }
7687}
7688
7689#[derive(Debug, Clone)]
7690pub struct TableOptions {
7691 pub pagination: bool,
7692 pub page_size: usize,
7693 pub enable_sorting: bool,
7694 pub enable_filtering: bool,
7695 pub enable_column_menu: bool,
7696 pub theme: String, }
7698
7699impl Default for TableOptions {
7700 fn default() -> Self {
7701 Self {
7702 pagination: true,
7703 page_size: 10,
7704 enable_sorting: true,
7705 enable_filtering: true,
7706 enable_column_menu: true,
7707 theme: "ag-theme-alpine".to_string(),
7708 }
7709 }
7710}
7711
7712fn generate_controls(has_plots: bool, has_tables: bool) -> String {
7713 let mut controls = Vec::new();
7714
7715 if has_plots {
7716 controls.extend_from_slice(&[
7717 r#"<button onclick="toggleGrid()">Toggle Layout</button>"#
7718 ]);
7719 }
7720
7721 if has_tables {
7722 controls.extend_from_slice(&[
7723 r#"<button onclick="exportAllTables()" class="export-button">Export to CSV</button>"#,
7724 r#"<button onclick="exportToExcel()" class="export-button">Export to Excel</button>"#
7725 ]);
7726 }
7727
7728 controls.join("\n")
7729}
7730
7731fn generate_javascript(has_plots: bool, has_tables: bool, grid_columns: usize) -> String {
7732 let mut js = String::new();
7733
7734 js.push_str(r#"
7736 document.addEventListener('DOMContentLoaded', function() {
7737 console.log('DOMContentLoaded event fired');
7738 showLoading();
7739
7740 const promises = [];
7741
7742 // Wait for plots if they exist
7743 const plotContainers = document.querySelectorAll('.plot-container');
7744 console.log('Found plot containers:', plotContainers.length);
7745
7746 if (plotContainers.length > 0) {
7747 promises.push(...Array.from(plotContainers).map(container =>
7748 new Promise(resolve => {
7749 const observer = new MutationObserver((mutations, obs) => {
7750 if (container.querySelector('.js-plotly-plot')) {
7751 obs.disconnect();
7752 resolve();
7753 }
7754 });
7755 observer.observe(container, { childList: true, subtree: true });
7756 })
7757 ));
7758 }
7759
7760 // Wait for grids if they exist
7761 const gridContainers = document.querySelectorAll('[id^="grid_"]');
7762 console.log('Found grid containers:', gridContainers.length);
7763
7764 if (gridContainers.length > 0) {
7765 promises.push(...Array.from(gridContainers).map(container =>
7766 new Promise(resolve => {
7767 container.addEventListener('gridReady', () => {
7768 console.log('Grid ready event received for:', container.id);
7769 resolve();
7770 }, { once: true });
7771 // Add a timeout to prevent infinite waiting
7772 setTimeout(() => {
7773 console.log('Grid timeout for:', container.id);
7774 resolve();
7775 }, 5000);
7776 })
7777 ));
7778 }
7779
7780 // If no async content to wait for, hide loading immediately
7781 if (promises.length === 0) {
7782 console.log('No async content to wait for');
7783 hideLoading();
7784 return;
7785 }
7786
7787 // Wait for all content to load or timeout
7788 Promise.all(promises)
7789 .then(() => {
7790 console.log('All content loaded successfully');
7791 hideLoading();
7792 showNotification('Report loaded successfully', 'info');
7793 })
7794 .catch(error => {
7795 console.error('Error loading report:', error);
7796 hideLoading();
7797 showNotification('Error loading some components', 'error');
7798 });
7799 });
7800 "#);
7801
7802 if has_plots {
7804 js.push_str(&format!(r#"
7805 const plots = [];
7806 let currentGridColumns = {};
7807
7808 document.querySelectorAll('.plot-container').forEach((container, index) => {{
7809 const plotDiv = container.querySelector(`#plot_${{index}}`);
7810 const data = JSON.parse(container.dataset.plotData);
7811 const layout = JSON.parse(container.dataset.plotLayout);
7812
7813 layout.autosize = true;
7814
7815 Plotly.newPlot(plotDiv, data, layout, {{
7816 responsive: true,
7817 scrollZoom: true,
7818 modeBarButtonsToAdd: [
7819 'hoverClosestCartesian',
7820 'hoverCompareCartesian'
7821 ],
7822 displaylogo: false
7823 }}).then(gd => {{
7824 plots.push(gd);
7825 gd.on('plotly_click', function(data) {{
7826 highlightPoint(data, index);
7827 }});
7828 }}).catch(error => {{
7829 console.error('Error creating plot:', error);
7830 showNotification('Error creating plot', 'error');
7831 }});
7832 }});
7833
7834
7835 function toggleGrid() {{
7836 const grid = document.querySelector('.grid');
7837 currentGridColumns = currentGridColumns === {0} ? 1 : {0};
7838 grid.style.gridTemplateColumns = `repeat(${{currentGridColumns}}, 1fr)`;
7839 showNotification(`Layout changed to ${{currentGridColumns}} column(s)`);
7840 }}
7841
7842 function highlightPoint(data, plotIndex) {{
7843 if (!data.points || !data.points[0]) return;
7844
7845 const point = data.points[0];
7846 const pointColor = 'red';
7847
7848 plots.forEach((plot, idx) => {{
7849 if (idx !== plotIndex) {{
7850 const trace = plot.data[0];
7851 if (trace.x && trace.y) {{
7852 const matchingPoints = trace.x.map((x, i) => {{
7853 return {{x, y: trace.y[i]}};
7854 }}).filter(p => p.x === point.x && p.y === point.y);
7855
7856 if (matchingPoints.length > 0) {{
7857 Plotly.restyle(plot, {{'marker.color': pointColor}}, [0]);
7858 }}
7859 }}
7860 }}
7861 }});
7862 }}
7863 "#, grid_columns));
7864 }
7865
7866 if has_tables {
7868 js.push_str(r#"
7869 // Table utility functions
7870 function exportAllTables() {
7871 try {
7872 document.querySelectorAll('.ag-theme-alpine').forEach((container, index) => {
7873 const gridApi = container.gridOptions.api;
7874 const csvContent = gridApi.getDataAsCsv({
7875 skipHeader: false,
7876 skipFooters: true,
7877 skipGroups: true,
7878 fileName: `table_${index}.csv`
7879 });
7880
7881 const blob = new Blob([csvContent], { type: 'text/csv;charset=utf-8;' });
7882 const link = document.createElement('a');
7883 link.href = URL.createObjectURL(blob);
7884 link.download = `table_${index}.csv`;
7885 link.click();
7886 });
7887 showNotification('Exported all tables');
7888 } catch (error) {
7889 console.error('Error exporting tables:', error);
7890 showNotification('Error exporting tables', 'error');
7891 }
7892 }
7893
7894 function exportToExcel() {
7895 try {
7896 document.querySelectorAll('.ag-theme-alpine').forEach((container, index) => {
7897 const gridApi = container.gridOptions.api;
7898 const columnApi = container.gridOptions.columnApi;
7899
7900 // Get displayed columns
7901 const columns = columnApi.getAllDisplayedColumns();
7902 const columnDefs = columns.map(col => ({
7903 header: col.colDef.headerName || col.colDef.field,
7904 field: col.colDef.field
7905 }));
7906
7907 // Get all rows data
7908 const rowData = [];
7909 gridApi.forEachNode(node => {
7910 const row = {};
7911 columnDefs.forEach(col => {
7912 row[col.header] = node.data[col.field];
7913 });
7914 rowData.push(row);
7915 });
7916
7917 // Create workbook and worksheet
7918 const worksheet = XLSX.utils.json_to_sheet(rowData);
7919 const workbook = XLSX.utils.book_new();
7920 XLSX.utils.book_append_sheet(workbook, worksheet, `Table_${index}`);
7921
7922 // Generate Excel file
7923 const excelBuffer = XLSX.write(workbook, { bookType: 'xlsx', type: 'array' });
7924 const blob = new Blob([excelBuffer], { type: 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' });
7925 const url = window.URL.createObjectURL(blob);
7926 const link = document.createElement('a');
7927 link.href = url;
7928 link.download = `table_${index}.xlsx`;
7929 document.body.appendChild(link);
7930 link.click();
7931
7932 // Cleanup
7933 setTimeout(() => {
7934 document.body.removeChild(link);
7935 window.URL.revokeObjectURL(url);
7936 }, 0);
7937 });
7938 showNotification('Exported tables to Excel');
7939 } catch (error) {
7940 console.error('Error exporting to Excel:', error);
7941 showNotification('Error exporting to Excel', 'error');
7942 }
7943 }
7944
7945 // Initialize AG Grid Quick Filter after DOM is loaded
7946 document.addEventListener('DOMContentLoaded', function() {
7947 document.querySelectorAll('.ag-theme-alpine').forEach(container => {
7948 const gridOptions = container.gridOptions;
7949 if (gridOptions) {
7950 console.log('Initializing quick filter for container:', container.id);
7951 const quickFilterInput = document.createElement('input');
7952 quickFilterInput.type = 'text';
7953 quickFilterInput.placeholder = 'Quick Filter...';
7954 quickFilterInput.className = 'quick-filter';
7955 quickFilterInput.style.cssText = 'margin: 10px 0; padding: 5px; width: 200px;';
7956
7957 quickFilterInput.addEventListener('input', function(e) {
7958 gridOptions.api.setQuickFilter(e.target.value);
7959 });
7960
7961 container.parentNode.insertBefore(quickFilterInput, container);
7962 }
7963 });
7964 });
7965 "#);
7966 }
7967
7968 js.push_str(r#"
7970 const style = document.createElement('style');
7971 style.textContent = `
7972 .notification {
7973 position: fixed;
7974 bottom: 20px;
7975 right: 20px;
7976 padding: 10px 20px;
7977 border-radius: 4px;
7978 color: white;
7979 font-weight: bold;
7980 z-index: 1000;
7981 animation: slideIn 0.5s ease-out;
7982 }
7983 .notification.info {
7984 background-color: #007bff;
7985 }
7986 .notification.error {
7987 background-color: #dc3545;
7988 }
7989 @keyframes slideIn {
7990 from { transform: translateX(100%); }
7991 to { transform: translateX(0); }
7992 }
7993 `;
7994 document.head.appendChild(style);
7995 "#);
7996
7997 js
7998}