rust_rule_miner/
data_loader.rs

1//! Data loading utilities for Excel and CSV files using excelstream
2//!
3//! Provides high-performance streaming loading of transaction data from:
4//! - Excel files (.xlsx) - ultra-low memory streaming
5//! - CSV files (.csv) - memory-efficient streaming
6//!
7//! # Column Mapping (v0.2.0+)
8//!
9//! All data loading methods require `ColumnMapping` to specify which columns to mine.
10//! This provides full flexibility for any data schema.
11//!
12//! ```no_run
13//! use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
14//!
15//! // CSV: customer_id, product, category, price, location, timestamp
16//! //      0            1        2         3      4         5
17//!
18//! // Mine single field (product names from column 1)
19//! let mapping = ColumnMapping::simple(0, 1, 5);
20//! let transactions = DataLoader::from_csv("sales.csv", mapping)?;
21//!
22//! // Mine multiple fields combined (product + category)
23//! let mapping = ColumnMapping::multi_field(0, vec![1, 2], 5, "::".to_string());
24//! let transactions = DataLoader::from_csv("sales.csv", mapping)?;
25//! // Items: "Laptop::Electronics", "Mouse::Accessories"
26//! # Ok::<(), Box<dyn std::error::Error>>(())
27//! ```
28//!
29//! # Example
30//!
31//! ```no_run
32//! use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
33//!
34//! // Standard 3-column format: transaction_id, items, timestamp
35//! let mapping = ColumnMapping::simple(0, 1, 2);
36//!
37//! // Load from Excel
38//! let transactions = DataLoader::from_excel("sales_data.xlsx", 0, mapping.clone())?;
39//!
40//! // Load from CSV
41//! let transactions = DataLoader::from_csv("transactions.csv", mapping)?;
42//! # Ok::<(), Box<dyn std::error::Error>>(())
43//! ```
44
45use crate::errors::{MiningError, Result};
46use crate::Transaction;
47use chrono::{DateTime, NaiveDateTime, Utc};
48use excelstream::streaming_reader::StreamingReader;
49use excelstream::CsvReader;
50use std::path::Path;
51
52/// Column mapping configuration for flexible data loading
53///
54/// Allows you to specify which columns to mine from your data,
55/// supporting multiple fields combined into patterns.
56#[derive(Debug, Clone)]
57pub struct ColumnMapping {
58    /// Column index for transaction/group ID (0-based)
59    pub transaction_id: usize,
60    /// Column indices for items to mine (supports multiple columns)
61    pub item_columns: Vec<usize>,
62    /// Column index for timestamp (0-based)
63    pub timestamp: usize,
64    /// Separator to combine multiple item columns (default: "::")
65    pub field_separator: String,
66}
67
68impl ColumnMapping {
69    /// Create mapping with transaction_id, single item column, and timestamp
70    ///
71    /// # Example
72    /// ```
73    /// use rust_rule_miner::data_loader::ColumnMapping;
74    ///
75    /// // Mine product names from column 1
76    /// let mapping = ColumnMapping::simple(0, 1, 5);
77    /// // CSV: customer_id, product_name, category, price, location, timestamp
78    /// //      0            1             2         3      4         5
79    /// ```
80    pub fn simple(transaction_id: usize, item_column: usize, timestamp: usize) -> Self {
81        Self {
82            transaction_id,
83            item_columns: vec![item_column],
84            timestamp,
85            field_separator: "::".to_string(),
86        }
87    }
88
89    /// Create mapping to mine multiple fields combined
90    ///
91    /// # Example
92    /// ```
93    /// use rust_rule_miner::data_loader::ColumnMapping;
94    ///
95    /// // Mine product + category + location combined
96    /// let mapping = ColumnMapping::multi_field(
97    ///     0,                  // customer_id (column 0)
98    ///     vec![1, 2, 4],      // product(1), category(2), location(4)
99    ///     5,                  // timestamp (column 5)
100    ///     "::".to_string()    // separator
101    /// );
102    /// // CSV: customer_id, product, category, price, location, timestamp
103    /// // Results in items like: "Laptop::Electronics::US"
104    /// ```
105    pub fn multi_field(
106        transaction_id: usize,
107        item_columns: Vec<usize>,
108        timestamp: usize,
109        field_separator: String,
110    ) -> Self {
111        Self {
112            transaction_id,
113            item_columns,
114            timestamp,
115            field_separator,
116        }
117    }
118}
119
120/// Data loader for Excel and CSV files using excelstream
121pub struct DataLoader;
122
123impl DataLoader {
124    /// Load transactions from Excel file (.xlsx) with custom column mapping
125    ///
126    /// Uses excelstream for high-performance streaming with ~3-35 MB memory usage
127    /// regardless of file size.
128    ///
129    /// First row is treated as header and skipped.
130    ///
131    /// # Arguments
132    /// * `path` - Path to Excel file
133    /// * `sheet_index` - Sheet index (0-based) to read
134    /// * `mapping` - Column mapping configuration
135    ///
136    /// # Returns
137    /// Vector of transactions
138    ///
139    /// # Example
140    /// ```no_run
141    /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
142    ///
143    /// // Standard format: transaction_id(0), items(1), timestamp(2)
144    /// let mapping = ColumnMapping::simple(0, 1, 2);
145    /// let transactions = DataLoader::from_excel("sales.xlsx", 0, mapping)?;
146    /// println!("Loaded {} transactions", transactions.len());
147    /// # Ok::<(), Box<dyn std::error::Error>>(())
148    /// ```
149    pub fn from_excel<P: AsRef<Path>>(
150        path: P,
151        sheet_index: usize,
152        mapping: ColumnMapping,
153    ) -> Result<Vec<Transaction>> {
154        let mut reader = StreamingReader::open(path.as_ref())
155            .map_err(|e| MiningError::DataLoadError(format!("Failed to open Excel file: {}", e)))?;
156
157        let mut transactions = Vec::new();
158        let mut row_idx = 0;
159
160        for row_result in reader.rows_by_index(sheet_index).map_err(|e| {
161            MiningError::DataLoadError(format!("Failed to read sheet {}: {}", sheet_index, e))
162        })? {
163            let row = row_result.map_err(|e| {
164                MiningError::DataLoadError(format!("Failed to read row {}: {}", row_idx, e))
165            })?;
166
167            row_idx += 1;
168
169            // Skip header row
170            if row_idx == 1 {
171                continue;
172            }
173
174            // Convert row to Vec<String>
175            let row_values = row.to_strings();
176
177            match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
178                Ok(Some(tx)) => transactions.push(tx),
179                Ok(None) => continue, // Skip empty rows
180                Err(e) => {
181                    log::warn!("Skipping row {}: {}", row_idx, e);
182                    continue;
183                }
184            }
185        }
186
187        if transactions.is_empty() {
188            return Err(MiningError::InsufficientData(
189                "No valid transactions found in Excel file".to_string(),
190            ));
191        }
192
193        Ok(transactions)
194    }
195
196    /// Load transactions from CSV file with custom column mapping
197    ///
198    /// Uses excelstream for high-performance streaming with constant memory usage.
199    ///
200    /// First row is treated as header and skipped.
201    ///
202    /// # Arguments
203    /// * `path` - Path to CSV file
204    /// * `mapping` - Column mapping configuration
205    ///
206    /// # Returns
207    /// Vector of transactions
208    ///
209    /// # Example
210    /// ```no_run
211    /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
212    ///
213    /// // Standard format: transaction_id(0), items(1), timestamp(2)
214    /// let mapping = ColumnMapping::simple(0, 1, 2);
215    /// let transactions = DataLoader::from_csv("transactions.csv", mapping)?;
216    /// println!("Loaded {} transactions", transactions.len());
217    /// # Ok::<(), Box<dyn std::error::Error>>(())
218    /// ```
219    pub fn from_csv<P: AsRef<Path>>(path: P, mapping: ColumnMapping) -> Result<Vec<Transaction>> {
220        let mut reader = CsvReader::open(path.as_ref())
221            .map_err(|e| MiningError::DataLoadError(format!("Failed to open CSV file: {}", e)))?;
222
223        let mut transactions = Vec::new();
224        let mut row_idx = 0;
225
226        for row_result in reader.rows() {
227            let row = row_result.map_err(|e| {
228                MiningError::DataLoadError(format!("Failed to read row {}: {}", row_idx, e))
229            })?;
230
231            row_idx += 1;
232
233            // Skip header row
234            if row_idx == 1 {
235                continue;
236            }
237
238            // Convert row to Vec<String>
239            let row_values: Vec<String> = row.into_iter().map(|v| v.to_string()).collect();
240
241            match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
242                Ok(Some(tx)) => transactions.push(tx),
243                Ok(None) => continue, // Skip empty rows
244                Err(e) => {
245                    log::warn!("Skipping row {}: {}", row_idx, e);
246                    continue;
247                }
248            }
249        }
250
251        if transactions.is_empty() {
252            return Err(MiningError::InsufficientData(
253                "No valid transactions found in CSV file".to_string(),
254            ));
255        }
256
257        Ok(transactions)
258    }
259
260    /// Parse a row of values into a Transaction using column mapping
261    pub(crate) fn parse_transaction_with_mapping(
262        row_values: &[String],
263        row_idx: usize,
264        mapping: &ColumnMapping,
265    ) -> Result<Option<Transaction>> {
266        // Validate row has enough columns
267        let max_col = *[
268            mapping.transaction_id,
269            *mapping.item_columns.iter().max().unwrap_or(&0),
270            mapping.timestamp,
271        ]
272        .iter()
273        .max()
274        .unwrap_or(&0);
275
276        if row_values.len() <= max_col {
277            return Err(MiningError::DataLoadError(format!(
278                "Row {} has insufficient columns (expected at least {}, got {})",
279                row_idx,
280                max_col + 1,
281                row_values.len()
282            )));
283        }
284
285        // Extract transaction ID
286        let tx_id = row_values[mapping.transaction_id].trim();
287        if tx_id.is_empty() {
288            return Ok(None); // Skip empty transaction ID
289        }
290
291        // Extract and combine item columns
292        let items: Vec<String> = if mapping.item_columns.len() == 1 {
293            // Single column: split by comma (traditional format)
294            // CSV: "Laptop,Mouse,Keyboard"
295            row_values[mapping.item_columns[0]]
296                .split(',')
297                .map(|s| s.trim().to_string())
298                .filter(|s| !s.is_empty())
299                .collect()
300        } else {
301            // Multiple columns: split each and zip them together
302            // CSV columns:  "Laptop,Mouse"   "Electronics,Accessories"   "US,US"
303            // Result:       ["Laptop::Electronics::US", "Mouse::Accessories::US"]
304
305            let fields: Vec<Vec<String>> = mapping
306                .item_columns
307                .iter()
308                .map(|&col_idx| {
309                    row_values[col_idx]
310                        .split(',')
311                        .map(|s| s.trim().to_string())
312                        .filter(|s| !s.is_empty())
313                        .collect()
314                })
315                .collect();
316
317            // Find the maximum length to handle mismatched field counts
318            let max_len = fields.iter().map(|f| f.len()).max().unwrap_or(0);
319            if max_len == 0 {
320                return Ok(None); // Skip if no items in any field
321            }
322
323            // Zip fields together with separator
324            (0..max_len)
325                .map(|i| {
326                    fields
327                        .iter()
328                        .filter_map(|field| field.get(i).cloned())
329                        .collect::<Vec<String>>()
330                        .join(&mapping.field_separator)
331                })
332                .filter(|s| !s.is_empty())
333                .collect()
334        };
335
336        if items.is_empty() {
337            return Ok(None);
338        }
339
340        // Extract timestamp
341        let timestamp = Self::parse_timestamp(&row_values[mapping.timestamp], row_idx)?;
342
343        Ok(Some(Transaction::new(tx_id.to_string(), items, timestamp)))
344    }
345
346    /// Parse timestamp from string (supports ISO 8601, Unix timestamp, and common datetime formats)
347    fn parse_timestamp(timestamp_str: &str, row_idx: usize) -> Result<DateTime<Utc>> {
348        let trimmed = timestamp_str.trim();
349
350        // Try parsing as ISO 8601 first (most common format)
351        if let Ok(dt) = DateTime::parse_from_rfc3339(trimmed) {
352            return Ok(dt.with_timezone(&Utc));
353        }
354
355        // Try parsing as Unix timestamp (seconds)
356        if let Ok(unix_ts) = trimmed.parse::<i64>() {
357            if let Some(dt) = DateTime::from_timestamp(unix_ts, 0) {
358                return Ok(dt);
359            }
360        }
361
362        // Try parsing as naive datetime formats
363        let formats = [
364            "%Y-%m-%d %H:%M:%S",
365            "%Y-%m-%d %H:%M:%S%.f",
366            "%Y/%m/%d %H:%M:%S",
367            "%d-%m-%Y %H:%M:%S",
368            "%d/%m/%Y %H:%M:%S",
369            "%Y-%m-%d",
370            "%Y/%m/%d",
371            "%d-%m-%Y",
372            "%d/%m/%Y",
373        ];
374
375        for format in &formats {
376            if let Ok(naive_dt) = NaiveDateTime::parse_from_str(trimmed, format) {
377                return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
378            }
379        }
380
381        // Default to current time if parsing fails
382        log::warn!(
383            "Failed to parse timestamp '{}' at row {}, using current time",
384            trimmed,
385            row_idx
386        );
387        Ok(Utc::now())
388    }
389
390    /// List all sheet names from an Excel file
391    pub fn list_sheets<P: AsRef<Path>>(path: P) -> Result<Vec<String>> {
392        let reader = StreamingReader::open(path.as_ref())
393            .map_err(|e| MiningError::DataLoadError(format!("Failed to open Excel file: {}", e)))?;
394
395        Ok(reader.sheet_names().to_vec())
396    }
397
398    /// Load transactions from AWS S3 bucket (requires `cloud` feature)
399    ///
400    /// Streams directly from S3 with constant memory usage (~3-35 MB).
401    ///
402    /// # Arguments
403    /// * `bucket` - S3 bucket name
404    /// * `key` - S3 object key (file path in bucket)
405    /// * `region` - AWS region (e.g., "us-east-1")
406    /// * `sheet_index` - Sheet index (0-based) for Excel files
407    /// * `mapping` - Column mapping configuration
408    ///
409    /// # Example
410    /// ```no_run
411    /// # #[tokio::main]
412    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
413    /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
414    ///
415    /// // Standard format: transaction_id(0), items(1), timestamp(2)
416    /// let mapping = ColumnMapping::simple(0, 1, 2);
417    ///
418    /// // Load from S3
419    /// let transactions = DataLoader::from_s3(
420    ///     "my-data-bucket",
421    ///     "sales/2024/transactions.xlsx",
422    ///     "us-east-1",
423    ///     0,
424    ///     mapping
425    /// ).await?;
426    ///
427    /// println!("Loaded {} transactions from S3", transactions.len());
428    /// # Ok(())
429    /// # }
430    /// ```
431    #[cfg(feature = "cloud")]
432    pub async fn from_s3(
433        bucket: &str,
434        key: &str,
435        region: &str,
436        sheet_index: usize,
437        mapping: ColumnMapping,
438    ) -> Result<Vec<Transaction>> {
439        use excelstream::cloud::S3ExcelReader;
440
441        let mut reader = S3ExcelReader::builder()
442            .bucket(bucket)
443            .key(key)
444            .region(region)
445            .build()
446            .await
447            .map_err(|e| MiningError::DataLoadError(format!("Failed to open S3 file: {}", e)))?;
448
449        let mut transactions = Vec::new();
450        let mut row_idx = 0;
451
452        for row_result in reader.rows_by_index(sheet_index).map_err(|e| {
453            MiningError::DataLoadError(format!("Failed to read sheet {}: {}", sheet_index, e))
454        })? {
455            let row = row_result.map_err(|e| {
456                MiningError::DataLoadError(format!("Failed to read row {}: {}", row_idx, e))
457            })?;
458
459            row_idx += 1;
460
461            // Skip header row
462            if row_idx == 1 {
463                continue;
464            }
465
466            // Convert row to Vec<String>
467            let row_values = row.to_strings();
468
469            match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
470                Ok(Some(tx)) => transactions.push(tx),
471                Ok(None) => continue,
472                Err(e) => {
473                    log::warn!("Skipping row {}: {}", row_idx, e);
474                    continue;
475                }
476            }
477        }
478
479        if transactions.is_empty() {
480            return Err(MiningError::InsufficientData(
481                "No valid transactions found in S3 file".to_string(),
482            ));
483        }
484
485        Ok(transactions)
486    }
487
488    /// Load transactions from HTTP URL (requires `cloud` feature)
489    ///
490    /// Streams CSV data from HTTP endpoint with constant memory usage.
491    ///
492    /// # Arguments
493    /// * `url` - HTTP URL to CSV file
494    /// * `mapping` - Column mapping configuration
495    ///
496    /// # Example
497    /// ```no_run
498    /// # #[tokio::main]
499    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
500    /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
501    ///
502    /// // Standard format: transaction_id(0), items(1), timestamp(2)
503    /// let mapping = ColumnMapping::simple(0, 1, 2);
504    ///
505    /// // Load from HTTP endpoint
506    /// let transactions = DataLoader::from_http(
507    ///     "https://example.com/data/transactions.csv",
508    ///     mapping
509    /// ).await?;
510    ///
511    /// println!("Loaded {} transactions from HTTP", transactions.len());
512    /// # Ok(())
513    /// # }
514    /// ```
515    #[cfg(feature = "cloud")]
516    pub async fn from_http(url: &str, mapping: ColumnMapping) -> Result<Vec<Transaction>> {
517        // Download to temp file first, then use CsvReader
518        // (excelstream doesn't have direct HTTP CSV reader yet)
519        let response = reqwest::get(url)
520            .await
521            .map_err(|e| MiningError::DataLoadError(format!("HTTP request failed: {}", e)))?;
522
523        let content = response
524            .text()
525            .await
526            .map_err(|e| MiningError::DataLoadError(format!("Failed to read response: {}", e)))?;
527
528        // Parse CSV from string
529        let mut transactions = Vec::new();
530        let mut row_idx = 0;
531
532        for line in content.lines() {
533            row_idx += 1;
534
535            // Skip header
536            if row_idx == 1 {
537                continue;
538            }
539
540            // Parse CSV row
541            let row_values: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
542
543            match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
544                Ok(Some(tx)) => transactions.push(tx),
545                Ok(None) => continue,
546                Err(e) => {
547                    log::warn!("Skipping row {}: {}", row_idx, e);
548                    continue;
549                }
550            }
551        }
552
553        if transactions.is_empty() {
554            return Err(MiningError::InsufficientData(
555                "No valid transactions found in HTTP response".to_string(),
556            ));
557        }
558
559        Ok(transactions)
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use super::*;
566    use std::fs;
567    use std::io::Write;
568
569    #[test]
570    fn test_csv_loading() {
571        // Create temporary CSV file
572        let csv_content = r#"transaction_id,items,timestamp
573tx1,"Laptop,Mouse",2024-01-15T10:30:00Z
574tx2,"Phone,Phone Case",2024-01-15T11:00:00Z
575tx3,"Tablet",2024-01-15T12:00:00Z
576"#;
577
578        let temp_file = "/tmp/test_transactions_excelstream.csv";
579        let mut file = fs::File::create(temp_file).unwrap();
580        file.write_all(csv_content.as_bytes()).unwrap();
581
582        // Load transactions with column mapping
583        let mapping = ColumnMapping::simple(0, 1, 2);
584        let transactions = DataLoader::from_csv(temp_file, mapping).unwrap();
585
586        assert_eq!(transactions.len(), 3);
587        assert_eq!(transactions[0].id, "tx1");
588        assert_eq!(transactions[0].items, vec!["Laptop", "Mouse"]);
589        assert_eq!(transactions[1].items, vec!["Phone", "Phone Case"]);
590        assert_eq!(transactions[2].items, vec!["Tablet"]);
591
592        // Cleanup
593        fs::remove_file(temp_file).ok();
594    }
595
596    #[test]
597    fn test_timestamp_parsing() {
598        // ISO 8601
599        let ts1 = DataLoader::parse_timestamp("2024-01-15T10:30:00Z", 1).unwrap();
600        assert_eq!(ts1.to_rfc3339(), "2024-01-15T10:30:00+00:00");
601
602        // Unix timestamp
603        let ts2 = DataLoader::parse_timestamp("1705316400", 1).unwrap();
604        assert!(ts2.timestamp() > 0);
605
606        // Naive datetime
607        let ts3 = DataLoader::parse_timestamp("2024-01-15 10:30:00", 1).unwrap();
608        assert_eq!(ts3.format("%Y-%m-%d").to_string(), "2024-01-15");
609
610        // Alternative formats
611        let ts4 = DataLoader::parse_timestamp("2024/01/15 10:30:00", 1).unwrap();
612        assert_eq!(ts4.format("%Y-%m-%d").to_string(), "2024-01-15");
613
614        let _ts5 = DataLoader::parse_timestamp("15-01-2024", 1).unwrap();
615        // Date parsing may default to current time if format not recognized
616    }
617}