Skip to main content

xls_rs/
streaming_ops.rs

1//! Streaming operations for large files
2//!
3//! This module provides memory-efficient operations that work on streams
4//! rather than loading entire datasets into memory.
5
6use crate::csv_handler::StreamingCsvReader;
7use anyhow::{Context, Result};
8use std::collections::HashMap;
9
10/// Schema information for a dataset
11#[derive(Debug, Clone)]
12pub struct Schema {
13    /// Column names
14    pub columns: Vec<String>,
15    /// Column types inferred from sample data
16    pub types: Vec<ColumnType>,
17    /// Total row count (if available)
18    pub row_count: Option<usize>,
19}
20
21/// Inferred column type
22#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
23pub enum ColumnType {
24    String,
25    Integer,
26    Float,
27    Boolean,
28    Empty,
29    Unknown,
30}
31
32impl ColumnType {
33    /// Infer column type from a sample of values
34    pub fn infer_from_samples(samples: &[String]) -> Self {
35        if samples.is_empty() {
36            return ColumnType::Unknown;
37        }
38
39        let mut has_integers = false;
40        let mut has_floats = false;
41        let mut has_booleans = false;
42        let mut has_strings = false;
43        let mut has_empty = false;
44
45        for sample in samples {
46            if sample.is_empty() {
47                has_empty = true;
48            } else if sample.parse::<i64>().is_ok() {
49                has_integers = true;
50            } else if sample.parse::<f64>().is_ok() {
51                has_floats = true;
52            } else if matches!(sample.to_lowercase().as_str(), "true" | "false") {
53                has_booleans = true;
54            } else {
55                has_strings = true;
56            }
57        }
58
59        // Determine the most specific type that fits all samples
60        if has_strings {
61            ColumnType::String
62        } else if has_floats {
63            ColumnType::Float
64        } else if has_integers {
65            ColumnType::Integer
66        } else if has_booleans {
67            ColumnType::Boolean
68        } else if has_empty && samples.len() == 1 {
69            ColumnType::Empty
70        } else {
71            ColumnType::Unknown
72        }
73    }
74}
75
76/// Read first N rows from a CSV file without loading the entire file
77///
78/// This is memory-efficient for large files where you only need the first few rows.
79///
80/// # Arguments
81/// * `path` - Path to the CSV file
82/// * `n` - Number of rows to read (excluding headers if present)
83///
84/// # Returns
85/// Vector of rows (as Vec<String>)
86pub fn head(path: &str, n: usize) -> Result<Vec<Vec<String>>> {
87    let mut reader = StreamingCsvReader::open(path)?;
88    let mut result = Vec::with_capacity(n);
89
90    for row_result in reader.by_ref().take(n) {
91        result.push(row_result?);
92    }
93
94    Ok(result)
95}
96
97/// Read last N rows from a CSV file
98///
99/// Note: This currently requires reading the entire file to find the end,
100/// but only keeps the last N rows in memory.
101///
102/// # Arguments
103/// * `path` - Path to the CSV file
104/// * `n` - Number of rows to read
105///
106/// # Returns
107/// Vector of rows (as Vec<String>)
108pub fn tail(path: &str, n: usize) -> Result<Vec<Vec<String>>> {
109    let mut reader = StreamingCsvReader::open(path)?;
110    let mut buffer: Vec<Vec<String>> = Vec::with_capacity(n);
111
112    for row_result in reader {
113        let row = row_result?;
114        buffer.push(row);
115
116        // Keep only last N rows
117        if buffer.len() > n {
118            buffer.remove(0);
119        }
120    }
121
122    Ok(buffer)
123}
124
125/// Infer schema from a CSV file by sampling the first N rows
126///
127/// This is memory-efficient for large files as it only reads a sample.
128///
129/// # Arguments
130/// * `path` - Path to the CSV file
131/// * `sample_size` - Number of rows to sample (default: 1000)
132/// * `has_headers` - Whether the first row contains headers (default: true)
133///
134/// # Returns
135/// Schema information including column names and inferred types
136pub fn infer_schema(path: &str, sample_size: usize, has_headers: bool) -> Result<Schema> {
137    let mut reader = StreamingCsvReader::open(path)?;
138
139    // Read headers if present
140    let headers = if has_headers {
141        match reader.next() {
142            Some(Ok(row)) => row,
143            _ => return Ok(Schema {
144                columns: Vec::new(),
145                types: Vec::new(),
146                row_count: Some(0),
147            }),
148        }
149    } else {
150        // No headers, need to infer column count from first data row
151        match reader.next() {
152            Some(Ok(row)) => {
153                (0..row.len())
154                    .map(|i| format!("column_{}", i))
155                    .collect()
156            }
157            _ => return Ok(Schema {
158                columns: Vec::new(),
159                types: Vec::new(),
160                row_count: Some(0),
161            }),
162        }
163    };
164
165    // Sample rows for type inference
166    let mut sample_rows: Vec<Vec<String>> = Vec::with_capacity(sample_size);
167    for row_result in reader.by_ref().take(sample_size) {
168        if let Ok(row) = row_result {
169            sample_rows.push(row);
170        }
171    }
172
173    infer_types(&headers, &sample_rows)
174}
175
176fn infer_types(headers: &[String], sample_rows: &[Vec<String>]) -> Result<Schema> {
177    let num_cols = headers.len();
178    let mut column_samples: Vec<Vec<String>> = vec![Vec::new(); num_cols];
179
180    // Collect samples for each column
181    for row in sample_rows {
182        for (col_idx, value) in row.iter().enumerate().take(num_cols) {
183            column_samples[col_idx].push(value.clone());
184        }
185    }
186
187    // Infer types for each column
188    let types: Vec<ColumnType> = column_samples
189        .iter()
190        .map(|samples| ColumnType::infer_from_samples(samples))
191        .collect();
192
193    Ok(Schema {
194        columns: headers.to_vec(),
195        types,
196        row_count: None, // Row count not available without reading entire file
197    })
198}
199
200/// Get basic info about a CSV file without loading all data
201///
202/// Returns file size, row count (estimated), column count, and schema.
203///
204/// # Arguments
205/// * `path` - Path to the CSV file
206/// * `max_sample_rows` - Maximum rows to sample for schema inference
207///
208/// # Returns
209/// Map containing file information
210pub fn get_info(path: &str, max_sample_rows: usize) -> Result<HashMap<String, serde_json::Value>> {
211    let metadata = std::fs::metadata(path)?;
212    let file_size = metadata.len();
213
214    let mut reader = StreamingCsvReader::open(path)?;
215
216    // Read first row to determine column count
217    let first_row = reader.next();
218    let (num_cols, has_headers) = match first_row {
219        Some(Ok(row)) => (row.len(), true),
220        _ => return Ok(HashMap::new()),
221    };
222
223    // Sample rows for schema
224    let schema = infer_schema(path, max_sample_rows.saturating_sub(1), true)?;
225
226    // Count total rows (optional - can be expensive for large files)
227    let row_count = count_rows(path)?;
228
229    let mut info = HashMap::new();
230    info.insert(
231        "file_size".to_string(),
232        serde_json::json!(file_size),
233    );
234    info.insert(
235        "row_count".to_string(),
236        serde_json::json!(row_count),
237    );
238    info.insert(
239        "column_count".to_string(),
240        serde_json::json!(num_cols),
241    );
242    info.insert(
243        "has_headers".to_string(),
244        serde_json::json!(has_headers),
245    );
246    info.insert(
247        "columns".to_string(),
248        serde_json::json!(schema.columns),
249    );
250    info.insert(
251        "column_types".to_string(),
252        serde_json::json!(schema.types),
253    );
254
255    Ok(info)
256}
257
258/// Count total rows in a CSV file (requires reading the entire file)
259///
260/// This operation is O(n) but uses streaming to minimize memory usage.
261pub fn count_rows(path: &str) -> Result<usize> {
262    let reader = StreamingCsvReader::open(path)?;
263    Ok(reader.count())
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use std::fs;
270    use tempfile::TempDir;
271
272    #[test]
273    fn test_head() {
274        let dir = TempDir::new().unwrap();
275        let csv_path = dir.path().join("test.csv");
276
277        let data = vec![
278            vec!["A".to_string(), "B".to_string()],
279            vec!["1".to_string(), "2".to_string()],
280            vec!["3".to_string(), "4".to_string()],
281            vec!["5".to_string(), "6".to_string()],
282            vec!["7".to_string(), "8".to_string()],
283        ];
284
285        write_csv(&csv_path, &data).unwrap();
286
287        let result = head(csv_path.to_str().unwrap(), 2).unwrap();
288        assert_eq!(result.len(), 2);
289        assert_eq!(result[0], vec!["A".to_string(), "B".to_string()]);
290        assert_eq!(result[1], vec!["1".to_string(), "2".to_string()]);
291    }
292
293    #[test]
294    fn test_tail() {
295        let dir = TempDir::new().unwrap();
296        let csv_path = dir.path().join("test.csv");
297
298        let data = vec![
299            vec!["A".to_string(), "B".to_string()],
300            vec!["1".to_string(), "2".to_string()],
301            vec!["3".to_string(), "4".to_string()],
302            vec!["5".to_string(), "6".to_string()],
303            vec!["7".to_string(), "8".to_string()],
304        ];
305
306        write_csv(&csv_path, &data).unwrap();
307
308        let result = tail(csv_path.to_str().unwrap(), 2).unwrap();
309        assert_eq!(result.len(), 2);
310        assert_eq!(result[0], vec!["5".to_string(), "6".to_string()]);
311        assert_eq!(result[1], vec!["7".to_string(), "8".to_string()]);
312    }
313
314    #[test]
315    fn test_infer_schema() {
316        let dir = TempDir::new().unwrap();
317        let csv_path = dir.path().join("test.csv");
318
319        let data = vec![
320            vec!["Name".to_string(), "Age".to_string(), "Active".to_string()],
321            vec!["Alice".to_string(), "25".to_string(), "true".to_string()],
322            vec!["Bob".to_string(), "30".to_string(), "false".to_string()],
323        ];
324
325        write_csv(&csv_path, &data).unwrap();
326
327        let schema = infer_schema(csv_path.to_str().unwrap(), 10, true).unwrap();
328        assert_eq!(schema.columns, vec!["Name", "Age", "Active"]);
329        assert_eq!(schema.types.len(), 3);
330        assert_eq!(schema.types[0], ColumnType::String);
331        assert_eq!(schema.types[1], ColumnType::Integer);
332        assert_eq!(schema.types[2], ColumnType::Boolean);
333    }
334
335    #[test]
336    fn test_column_type_inference() {
337        assert_eq!(
338            ColumnType::infer_from_samples(&["1".to_string(), "2".to_string(), "3".to_string()]),
339            ColumnType::Integer
340        );
341
342        assert_eq!(
343            ColumnType::infer_from_samples(&["1.5".to_string(), "2.5".to_string()]),
344            ColumnType::Float
345        );
346
347        assert_eq!(
348            ColumnType::infer_from_samples(&["hello".to_string(), "world".to_string()]),
349            ColumnType::String
350        );
351
352        assert_eq!(
353            ColumnType::infer_from_samples(&["true".to_string(), "false".to_string()]),
354            ColumnType::Boolean
355        );
356    }
357
358    fn write_csv(path: &std::path::Path, data: &[Vec<String>]) -> Result<()> {
359        let mut writer = csv::WriterBuilder::new()
360            .has_headers(false)
361            .from_path(path)?;
362        for row in data {
363            writer.write_record(row)?;
364        }
365        writer.flush()?;
366        Ok(())
367    }
368}