rust_rule_miner/data_loader.rs
1//! Data loading utilities for Excel and CSV files using excelstream
2//!
3//! Provides high-performance streaming loading of transaction data from:
4//! - Excel files (.xlsx) - ultra-low memory streaming
5//! - CSV files (.csv) - memory-efficient streaming
6//!
7//! # Column Mapping (v0.2.0+)
8//!
9//! All data loading methods require `ColumnMapping` to specify which columns to mine.
10//! This provides full flexibility for any data schema.
11//!
12//! ```no_run
13//! use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
14//!
15//! // CSV: customer_id, product, category, price, location, timestamp
16//! // 0 1 2 3 4 5
17//!
18//! // Mine single field (product names from column 1)
19//! let mapping = ColumnMapping::simple(0, 1, 5);
20//! let transactions = DataLoader::from_csv("sales.csv", mapping)?;
21//!
22//! // Mine multiple fields combined (product + category)
23//! let mapping = ColumnMapping::multi_field(0, vec![1, 2], 5, "::".to_string());
24//! let transactions = DataLoader::from_csv("sales.csv", mapping)?;
25//! // Items: "Laptop::Electronics", "Mouse::Accessories"
26//! # Ok::<(), Box<dyn std::error::Error>>(())
27//! ```
28//!
29//! # Example
30//!
31//! ```no_run
32//! use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
33//!
34//! // Standard 3-column format: transaction_id, items, timestamp
35//! let mapping = ColumnMapping::simple(0, 1, 2);
36//!
37//! // Load from Excel
38//! let transactions = DataLoader::from_excel("sales_data.xlsx", 0, mapping.clone())?;
39//!
40//! // Load from CSV
41//! let transactions = DataLoader::from_csv("transactions.csv", mapping)?;
42//! # Ok::<(), Box<dyn std::error::Error>>(())
43//! ```
44
45use crate::errors::{MiningError, Result};
46use crate::Transaction;
47use chrono::{DateTime, NaiveDateTime, Utc};
48use excelstream::streaming_reader::StreamingReader;
49use excelstream::CsvReader;
50use std::path::Path;
51
52/// Column mapping configuration for flexible data loading
53///
54/// Allows you to specify which columns to mine from your data,
55/// supporting multiple fields combined into patterns.
56#[derive(Debug, Clone)]
57pub struct ColumnMapping {
58 /// Column index for transaction/group ID (0-based)
59 pub transaction_id: usize,
60 /// Column indices for items to mine (supports multiple columns)
61 pub item_columns: Vec<usize>,
62 /// Column index for timestamp (0-based)
63 pub timestamp: usize,
64 /// Separator to combine multiple item columns (default: "::")
65 pub field_separator: String,
66}
67
68impl ColumnMapping {
69 /// Create mapping with transaction_id, single item column, and timestamp
70 ///
71 /// # Example
72 /// ```
73 /// use rust_rule_miner::data_loader::ColumnMapping;
74 ///
75 /// // Mine product names from column 1
76 /// let mapping = ColumnMapping::simple(0, 1, 5);
77 /// // CSV: customer_id, product_name, category, price, location, timestamp
78 /// // 0 1 2 3 4 5
79 /// ```
80 pub fn simple(transaction_id: usize, item_column: usize, timestamp: usize) -> Self {
81 Self {
82 transaction_id,
83 item_columns: vec![item_column],
84 timestamp,
85 field_separator: "::".to_string(),
86 }
87 }
88
89 /// Create mapping to mine multiple fields combined
90 ///
91 /// # Example
92 /// ```
93 /// use rust_rule_miner::data_loader::ColumnMapping;
94 ///
95 /// // Mine product + category + location combined
96 /// let mapping = ColumnMapping::multi_field(
97 /// 0, // customer_id (column 0)
98 /// vec![1, 2, 4], // product(1), category(2), location(4)
99 /// 5, // timestamp (column 5)
100 /// "::".to_string() // separator
101 /// );
102 /// // CSV: customer_id, product, category, price, location, timestamp
103 /// // Results in items like: "Laptop::Electronics::US"
104 /// ```
105 pub fn multi_field(
106 transaction_id: usize,
107 item_columns: Vec<usize>,
108 timestamp: usize,
109 field_separator: String,
110 ) -> Self {
111 Self {
112 transaction_id,
113 item_columns,
114 timestamp,
115 field_separator,
116 }
117 }
118}
119
120/// Data loader for Excel and CSV files using excelstream
121pub struct DataLoader;
122
123impl DataLoader {
124 /// Load transactions from Excel file (.xlsx) with custom column mapping
125 ///
126 /// Uses excelstream for high-performance streaming with ~3-35 MB memory usage
127 /// regardless of file size.
128 ///
129 /// First row is treated as header and skipped.
130 ///
131 /// # Arguments
132 /// * `path` - Path to Excel file
133 /// * `sheet_index` - Sheet index (0-based) to read
134 /// * `mapping` - Column mapping configuration
135 ///
136 /// # Returns
137 /// Vector of transactions
138 ///
139 /// # Example
140 /// ```no_run
141 /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
142 ///
143 /// // Standard format: transaction_id(0), items(1), timestamp(2)
144 /// let mapping = ColumnMapping::simple(0, 1, 2);
145 /// let transactions = DataLoader::from_excel("sales.xlsx", 0, mapping)?;
146 /// println!("Loaded {} transactions", transactions.len());
147 /// # Ok::<(), Box<dyn std::error::Error>>(())
148 /// ```
149 pub fn from_excel<P: AsRef<Path>>(
150 path: P,
151 sheet_index: usize,
152 mapping: ColumnMapping,
153 ) -> Result<Vec<Transaction>> {
154 let mut reader = StreamingReader::open(path.as_ref())
155 .map_err(|e| MiningError::DataLoadError(format!("Failed to open Excel file: {}", e)))?;
156
157 let mut transactions = Vec::new();
158 let mut row_idx = 0;
159
160 for row_result in reader.rows_by_index(sheet_index).map_err(|e| {
161 MiningError::DataLoadError(format!("Failed to read sheet {}: {}", sheet_index, e))
162 })? {
163 let row = row_result.map_err(|e| {
164 MiningError::DataLoadError(format!("Failed to read row {}: {}", row_idx, e))
165 })?;
166
167 row_idx += 1;
168
169 // Skip header row
170 if row_idx == 1 {
171 continue;
172 }
173
174 // Convert row to Vec<String>
175 let row_values = row.to_strings();
176
177 match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
178 Ok(Some(tx)) => transactions.push(tx),
179 Ok(None) => continue, // Skip empty rows
180 Err(e) => {
181 log::warn!("Skipping row {}: {}", row_idx, e);
182 continue;
183 }
184 }
185 }
186
187 if transactions.is_empty() {
188 return Err(MiningError::InsufficientData(
189 "No valid transactions found in Excel file".to_string(),
190 ));
191 }
192
193 Ok(transactions)
194 }
195
196 /// Load transactions from CSV file with custom column mapping
197 ///
198 /// Uses excelstream for high-performance streaming with constant memory usage.
199 ///
200 /// First row is treated as header and skipped.
201 ///
202 /// # Arguments
203 /// * `path` - Path to CSV file
204 /// * `mapping` - Column mapping configuration
205 ///
206 /// # Returns
207 /// Vector of transactions
208 ///
209 /// # Example
210 /// ```no_run
211 /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
212 ///
213 /// // Standard format: transaction_id(0), items(1), timestamp(2)
214 /// let mapping = ColumnMapping::simple(0, 1, 2);
215 /// let transactions = DataLoader::from_csv("transactions.csv", mapping)?;
216 /// println!("Loaded {} transactions", transactions.len());
217 /// # Ok::<(), Box<dyn std::error::Error>>(())
218 /// ```
219 pub fn from_csv<P: AsRef<Path>>(path: P, mapping: ColumnMapping) -> Result<Vec<Transaction>> {
220 let mut reader = CsvReader::open(path.as_ref())
221 .map_err(|e| MiningError::DataLoadError(format!("Failed to open CSV file: {}", e)))?;
222
223 let mut transactions = Vec::new();
224 let mut row_idx = 0;
225
226 for row_result in reader.rows() {
227 let row = row_result.map_err(|e| {
228 MiningError::DataLoadError(format!("Failed to read row {}: {}", row_idx, e))
229 })?;
230
231 row_idx += 1;
232
233 // Skip header row
234 if row_idx == 1 {
235 continue;
236 }
237
238 // Convert row to Vec<String>
239 let row_values: Vec<String> = row.into_iter().map(|v| v.to_string()).collect();
240
241 match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
242 Ok(Some(tx)) => transactions.push(tx),
243 Ok(None) => continue, // Skip empty rows
244 Err(e) => {
245 log::warn!("Skipping row {}: {}", row_idx, e);
246 continue;
247 }
248 }
249 }
250
251 if transactions.is_empty() {
252 return Err(MiningError::InsufficientData(
253 "No valid transactions found in CSV file".to_string(),
254 ));
255 }
256
257 Ok(transactions)
258 }
259
260 /// Parse a row of values into a Transaction using column mapping
261 pub(crate) fn parse_transaction_with_mapping(
262 row_values: &[String],
263 row_idx: usize,
264 mapping: &ColumnMapping,
265 ) -> Result<Option<Transaction>> {
266 // Validate row has enough columns
267 let max_col = *[
268 mapping.transaction_id,
269 *mapping.item_columns.iter().max().unwrap_or(&0),
270 mapping.timestamp,
271 ]
272 .iter()
273 .max()
274 .unwrap_or(&0);
275
276 if row_values.len() <= max_col {
277 return Err(MiningError::DataLoadError(format!(
278 "Row {} has insufficient columns (expected at least {}, got {})",
279 row_idx,
280 max_col + 1,
281 row_values.len()
282 )));
283 }
284
285 // Extract transaction ID
286 let tx_id = row_values[mapping.transaction_id].trim();
287 if tx_id.is_empty() {
288 return Ok(None); // Skip empty transaction ID
289 }
290
291 // Extract and combine item columns
292 let items: Vec<String> = if mapping.item_columns.len() == 1 {
293 // Single column: split by comma (traditional format)
294 // CSV: "Laptop,Mouse,Keyboard"
295 row_values[mapping.item_columns[0]]
296 .split(',')
297 .map(|s| s.trim().to_string())
298 .filter(|s| !s.is_empty())
299 .collect()
300 } else {
301 // Multiple columns: split each and zip them together
302 // CSV columns: "Laptop,Mouse" "Electronics,Accessories" "US,US"
303 // Result: ["Laptop::Electronics::US", "Mouse::Accessories::US"]
304
305 let fields: Vec<Vec<String>> = mapping
306 .item_columns
307 .iter()
308 .map(|&col_idx| {
309 row_values[col_idx]
310 .split(',')
311 .map(|s| s.trim().to_string())
312 .filter(|s| !s.is_empty())
313 .collect()
314 })
315 .collect();
316
317 // Find the maximum length to handle mismatched field counts
318 let max_len = fields.iter().map(|f| f.len()).max().unwrap_or(0);
319 if max_len == 0 {
320 return Ok(None); // Skip if no items in any field
321 }
322
323 // Zip fields together with separator
324 (0..max_len)
325 .map(|i| {
326 fields
327 .iter()
328 .filter_map(|field| field.get(i).cloned())
329 .collect::<Vec<String>>()
330 .join(&mapping.field_separator)
331 })
332 .filter(|s| !s.is_empty())
333 .collect()
334 };
335
336 if items.is_empty() {
337 return Ok(None);
338 }
339
340 // Extract timestamp
341 let timestamp = Self::parse_timestamp(&row_values[mapping.timestamp], row_idx)?;
342
343 Ok(Some(Transaction::new(tx_id.to_string(), items, timestamp)))
344 }
345
346 /// Parse timestamp from string (supports ISO 8601, Unix timestamp, and common datetime formats)
347 fn parse_timestamp(timestamp_str: &str, row_idx: usize) -> Result<DateTime<Utc>> {
348 let trimmed = timestamp_str.trim();
349
350 // Try parsing as ISO 8601 first (most common format)
351 if let Ok(dt) = DateTime::parse_from_rfc3339(trimmed) {
352 return Ok(dt.with_timezone(&Utc));
353 }
354
355 // Try parsing as Unix timestamp (seconds)
356 if let Ok(unix_ts) = trimmed.parse::<i64>() {
357 if let Some(dt) = DateTime::from_timestamp(unix_ts, 0) {
358 return Ok(dt);
359 }
360 }
361
362 // Try parsing as naive datetime formats
363 let formats = [
364 "%Y-%m-%d %H:%M:%S",
365 "%Y-%m-%d %H:%M:%S%.f",
366 "%Y/%m/%d %H:%M:%S",
367 "%d-%m-%Y %H:%M:%S",
368 "%d/%m/%Y %H:%M:%S",
369 "%Y-%m-%d",
370 "%Y/%m/%d",
371 "%d-%m-%Y",
372 "%d/%m/%Y",
373 ];
374
375 for format in &formats {
376 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(trimmed, format) {
377 return Ok(DateTime::from_naive_utc_and_offset(naive_dt, Utc));
378 }
379 }
380
381 // Default to current time if parsing fails
382 log::warn!(
383 "Failed to parse timestamp '{}' at row {}, using current time",
384 trimmed,
385 row_idx
386 );
387 Ok(Utc::now())
388 }
389
390 /// List all sheet names from an Excel file
391 pub fn list_sheets<P: AsRef<Path>>(path: P) -> Result<Vec<String>> {
392 let reader = StreamingReader::open(path.as_ref())
393 .map_err(|e| MiningError::DataLoadError(format!("Failed to open Excel file: {}", e)))?;
394
395 Ok(reader.sheet_names().to_vec())
396 }
397
398 /// Load transactions from AWS S3 bucket (requires `cloud` feature)
399 ///
400 /// Streams directly from S3 with constant memory usage (~3-35 MB).
401 ///
402 /// # Arguments
403 /// * `bucket` - S3 bucket name
404 /// * `key` - S3 object key (file path in bucket)
405 /// * `region` - AWS region (e.g., "us-east-1")
406 /// * `sheet_index` - Sheet index (0-based) for Excel files
407 /// * `mapping` - Column mapping configuration
408 ///
409 /// # Example
410 /// ```no_run
411 /// # #[tokio::main]
412 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
413 /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
414 ///
415 /// // Standard format: transaction_id(0), items(1), timestamp(2)
416 /// let mapping = ColumnMapping::simple(0, 1, 2);
417 ///
418 /// // Load from S3
419 /// let transactions = DataLoader::from_s3(
420 /// "my-data-bucket",
421 /// "sales/2024/transactions.xlsx",
422 /// "us-east-1",
423 /// 0,
424 /// mapping
425 /// ).await?;
426 ///
427 /// println!("Loaded {} transactions from S3", transactions.len());
428 /// # Ok(())
429 /// # }
430 /// ```
431 #[cfg(feature = "cloud")]
432 pub async fn from_s3(
433 bucket: &str,
434 key: &str,
435 region: &str,
436 sheet_index: usize,
437 mapping: ColumnMapping,
438 ) -> Result<Vec<Transaction>> {
439 use excelstream::cloud::S3ExcelReader;
440
441 let mut reader = S3ExcelReader::builder()
442 .bucket(bucket)
443 .key(key)
444 .region(region)
445 .build()
446 .await
447 .map_err(|e| MiningError::DataLoadError(format!("Failed to open S3 file: {}", e)))?;
448
449 let mut transactions = Vec::new();
450 let mut row_idx = 0;
451
452 for row_result in reader.rows_by_index(sheet_index).map_err(|e| {
453 MiningError::DataLoadError(format!("Failed to read sheet {}: {}", sheet_index, e))
454 })? {
455 let row = row_result.map_err(|e| {
456 MiningError::DataLoadError(format!("Failed to read row {}: {}", row_idx, e))
457 })?;
458
459 row_idx += 1;
460
461 // Skip header row
462 if row_idx == 1 {
463 continue;
464 }
465
466 // Convert row to Vec<String>
467 let row_values = row.to_strings();
468
469 match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
470 Ok(Some(tx)) => transactions.push(tx),
471 Ok(None) => continue,
472 Err(e) => {
473 log::warn!("Skipping row {}: {}", row_idx, e);
474 continue;
475 }
476 }
477 }
478
479 if transactions.is_empty() {
480 return Err(MiningError::InsufficientData(
481 "No valid transactions found in S3 file".to_string(),
482 ));
483 }
484
485 Ok(transactions)
486 }
487
488 /// Load transactions from HTTP URL (requires `cloud` feature)
489 ///
490 /// Streams CSV data from HTTP endpoint with constant memory usage.
491 ///
492 /// # Arguments
493 /// * `url` - HTTP URL to CSV file
494 /// * `mapping` - Column mapping configuration
495 ///
496 /// # Example
497 /// ```no_run
498 /// # #[tokio::main]
499 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
500 /// use rust_rule_miner::data_loader::{DataLoader, ColumnMapping};
501 ///
502 /// // Standard format: transaction_id(0), items(1), timestamp(2)
503 /// let mapping = ColumnMapping::simple(0, 1, 2);
504 ///
505 /// // Load from HTTP endpoint
506 /// let transactions = DataLoader::from_http(
507 /// "https://example.com/data/transactions.csv",
508 /// mapping
509 /// ).await?;
510 ///
511 /// println!("Loaded {} transactions from HTTP", transactions.len());
512 /// # Ok(())
513 /// # }
514 /// ```
515 #[cfg(feature = "cloud")]
516 pub async fn from_http(url: &str, mapping: ColumnMapping) -> Result<Vec<Transaction>> {
517 // Download to temp file first, then use CsvReader
518 // (excelstream doesn't have direct HTTP CSV reader yet)
519 let response = reqwest::get(url)
520 .await
521 .map_err(|e| MiningError::DataLoadError(format!("HTTP request failed: {}", e)))?;
522
523 let content = response
524 .text()
525 .await
526 .map_err(|e| MiningError::DataLoadError(format!("Failed to read response: {}", e)))?;
527
528 // Parse CSV from string
529 let mut transactions = Vec::new();
530 let mut row_idx = 0;
531
532 for line in content.lines() {
533 row_idx += 1;
534
535 // Skip header
536 if row_idx == 1 {
537 continue;
538 }
539
540 // Parse CSV row
541 let row_values: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
542
543 match Self::parse_transaction_with_mapping(&row_values, row_idx, &mapping) {
544 Ok(Some(tx)) => transactions.push(tx),
545 Ok(None) => continue,
546 Err(e) => {
547 log::warn!("Skipping row {}: {}", row_idx, e);
548 continue;
549 }
550 }
551 }
552
553 if transactions.is_empty() {
554 return Err(MiningError::InsufficientData(
555 "No valid transactions found in HTTP response".to_string(),
556 ));
557 }
558
559 Ok(transactions)
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566 use std::fs;
567 use std::io::Write;
568
569 #[test]
570 fn test_csv_loading() {
571 // Create temporary CSV file
572 let csv_content = r#"transaction_id,items,timestamp
573tx1,"Laptop,Mouse",2024-01-15T10:30:00Z
574tx2,"Phone,Phone Case",2024-01-15T11:00:00Z
575tx3,"Tablet",2024-01-15T12:00:00Z
576"#;
577
578 let temp_file = "/tmp/test_transactions_excelstream.csv";
579 let mut file = fs::File::create(temp_file).unwrap();
580 file.write_all(csv_content.as_bytes()).unwrap();
581
582 // Load transactions with column mapping
583 let mapping = ColumnMapping::simple(0, 1, 2);
584 let transactions = DataLoader::from_csv(temp_file, mapping).unwrap();
585
586 assert_eq!(transactions.len(), 3);
587 assert_eq!(transactions[0].id, "tx1");
588 assert_eq!(transactions[0].items, vec!["Laptop", "Mouse"]);
589 assert_eq!(transactions[1].items, vec!["Phone", "Phone Case"]);
590 assert_eq!(transactions[2].items, vec!["Tablet"]);
591
592 // Cleanup
593 fs::remove_file(temp_file).ok();
594 }
595
596 #[test]
597 fn test_timestamp_parsing() {
598 // ISO 8601
599 let ts1 = DataLoader::parse_timestamp("2024-01-15T10:30:00Z", 1).unwrap();
600 assert_eq!(ts1.to_rfc3339(), "2024-01-15T10:30:00+00:00");
601
602 // Unix timestamp
603 let ts2 = DataLoader::parse_timestamp("1705316400", 1).unwrap();
604 assert!(ts2.timestamp() > 0);
605
606 // Naive datetime
607 let ts3 = DataLoader::parse_timestamp("2024-01-15 10:30:00", 1).unwrap();
608 assert_eq!(ts3.format("%Y-%m-%d").to_string(), "2024-01-15");
609
610 // Alternative formats
611 let ts4 = DataLoader::parse_timestamp("2024/01/15 10:30:00", 1).unwrap();
612 assert_eq!(ts4.format("%Y-%m-%d").to_string(), "2024-01-15");
613
614 let _ts5 = DataLoader::parse_timestamp("15-01-2024", 1).unwrap();
615 // Date parsing may default to current time if format not recognized
616 }
617}