1use crate::csv_handler::StreamingCsvReader;
7use anyhow::{Context, Result};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone)]
12pub struct Schema {
13 pub columns: Vec<String>,
15 pub types: Vec<ColumnType>,
17 pub row_count: Option<usize>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize)]
23pub enum ColumnType {
24 String,
25 Integer,
26 Float,
27 Boolean,
28 Empty,
29 Unknown,
30}
31
32impl ColumnType {
33 pub fn infer_from_samples(samples: &[String]) -> Self {
35 if samples.is_empty() {
36 return ColumnType::Unknown;
37 }
38
39 let mut has_integers = false;
40 let mut has_floats = false;
41 let mut has_booleans = false;
42 let mut has_strings = false;
43 let mut has_empty = false;
44
45 for sample in samples {
46 if sample.is_empty() {
47 has_empty = true;
48 } else if sample.parse::<i64>().is_ok() {
49 has_integers = true;
50 } else if sample.parse::<f64>().is_ok() {
51 has_floats = true;
52 } else if matches!(sample.to_lowercase().as_str(), "true" | "false") {
53 has_booleans = true;
54 } else {
55 has_strings = true;
56 }
57 }
58
59 if has_strings {
61 ColumnType::String
62 } else if has_floats {
63 ColumnType::Float
64 } else if has_integers {
65 ColumnType::Integer
66 } else if has_booleans {
67 ColumnType::Boolean
68 } else if has_empty && samples.len() == 1 {
69 ColumnType::Empty
70 } else {
71 ColumnType::Unknown
72 }
73 }
74}
75
76pub fn head(path: &str, n: usize) -> Result<Vec<Vec<String>>> {
87 let mut reader = StreamingCsvReader::open(path)?;
88 let mut result = Vec::with_capacity(n);
89
90 for row_result in reader.by_ref().take(n) {
91 result.push(row_result?);
92 }
93
94 Ok(result)
95}
96
97pub fn tail(path: &str, n: usize) -> Result<Vec<Vec<String>>> {
109 let mut reader = StreamingCsvReader::open(path)?;
110 let mut buffer: Vec<Vec<String>> = Vec::with_capacity(n);
111
112 for row_result in reader {
113 let row = row_result?;
114 buffer.push(row);
115
116 if buffer.len() > n {
118 buffer.remove(0);
119 }
120 }
121
122 Ok(buffer)
123}
124
125pub fn infer_schema(path: &str, sample_size: usize, has_headers: bool) -> Result<Schema> {
137 let mut reader = StreamingCsvReader::open(path)?;
138
139 let headers = if has_headers {
141 match reader.next() {
142 Some(Ok(row)) => row,
143 _ => return Ok(Schema {
144 columns: Vec::new(),
145 types: Vec::new(),
146 row_count: Some(0),
147 }),
148 }
149 } else {
150 match reader.next() {
152 Some(Ok(row)) => {
153 (0..row.len())
154 .map(|i| format!("column_{}", i))
155 .collect()
156 }
157 _ => return Ok(Schema {
158 columns: Vec::new(),
159 types: Vec::new(),
160 row_count: Some(0),
161 }),
162 }
163 };
164
165 let mut sample_rows: Vec<Vec<String>> = Vec::with_capacity(sample_size);
167 for row_result in reader.by_ref().take(sample_size) {
168 if let Ok(row) = row_result {
169 sample_rows.push(row);
170 }
171 }
172
173 infer_types(&headers, &sample_rows)
174}
175
176fn infer_types(headers: &[String], sample_rows: &[Vec<String>]) -> Result<Schema> {
177 let num_cols = headers.len();
178 let mut column_samples: Vec<Vec<String>> = vec![Vec::new(); num_cols];
179
180 for row in sample_rows {
182 for (col_idx, value) in row.iter().enumerate().take(num_cols) {
183 column_samples[col_idx].push(value.clone());
184 }
185 }
186
187 let types: Vec<ColumnType> = column_samples
189 .iter()
190 .map(|samples| ColumnType::infer_from_samples(samples))
191 .collect();
192
193 Ok(Schema {
194 columns: headers.to_vec(),
195 types,
196 row_count: None, })
198}
199
200pub fn get_info(path: &str, max_sample_rows: usize) -> Result<HashMap<String, serde_json::Value>> {
211 let metadata = std::fs::metadata(path)?;
212 let file_size = metadata.len();
213
214 let mut reader = StreamingCsvReader::open(path)?;
215
216 let first_row = reader.next();
218 let (num_cols, has_headers) = match first_row {
219 Some(Ok(row)) => (row.len(), true),
220 _ => return Ok(HashMap::new()),
221 };
222
223 let schema = infer_schema(path, max_sample_rows.saturating_sub(1), true)?;
225
226 let row_count = count_rows(path)?;
228
229 let mut info = HashMap::new();
230 info.insert(
231 "file_size".to_string(),
232 serde_json::json!(file_size),
233 );
234 info.insert(
235 "row_count".to_string(),
236 serde_json::json!(row_count),
237 );
238 info.insert(
239 "column_count".to_string(),
240 serde_json::json!(num_cols),
241 );
242 info.insert(
243 "has_headers".to_string(),
244 serde_json::json!(has_headers),
245 );
246 info.insert(
247 "columns".to_string(),
248 serde_json::json!(schema.columns),
249 );
250 info.insert(
251 "column_types".to_string(),
252 serde_json::json!(schema.types),
253 );
254
255 Ok(info)
256}
257
258pub fn count_rows(path: &str) -> Result<usize> {
262 let reader = StreamingCsvReader::open(path)?;
263 Ok(reader.count())
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use std::fs;
270 use tempfile::TempDir;
271
272 #[test]
273 fn test_head() {
274 let dir = TempDir::new().unwrap();
275 let csv_path = dir.path().join("test.csv");
276
277 let data = vec![
278 vec!["A".to_string(), "B".to_string()],
279 vec!["1".to_string(), "2".to_string()],
280 vec!["3".to_string(), "4".to_string()],
281 vec!["5".to_string(), "6".to_string()],
282 vec!["7".to_string(), "8".to_string()],
283 ];
284
285 write_csv(&csv_path, &data).unwrap();
286
287 let result = head(csv_path.to_str().unwrap(), 2).unwrap();
288 assert_eq!(result.len(), 2);
289 assert_eq!(result[0], vec!["A".to_string(), "B".to_string()]);
290 assert_eq!(result[1], vec!["1".to_string(), "2".to_string()]);
291 }
292
293 #[test]
294 fn test_tail() {
295 let dir = TempDir::new().unwrap();
296 let csv_path = dir.path().join("test.csv");
297
298 let data = vec![
299 vec!["A".to_string(), "B".to_string()],
300 vec!["1".to_string(), "2".to_string()],
301 vec!["3".to_string(), "4".to_string()],
302 vec!["5".to_string(), "6".to_string()],
303 vec!["7".to_string(), "8".to_string()],
304 ];
305
306 write_csv(&csv_path, &data).unwrap();
307
308 let result = tail(csv_path.to_str().unwrap(), 2).unwrap();
309 assert_eq!(result.len(), 2);
310 assert_eq!(result[0], vec!["5".to_string(), "6".to_string()]);
311 assert_eq!(result[1], vec!["7".to_string(), "8".to_string()]);
312 }
313
314 #[test]
315 fn test_infer_schema() {
316 let dir = TempDir::new().unwrap();
317 let csv_path = dir.path().join("test.csv");
318
319 let data = vec![
320 vec!["Name".to_string(), "Age".to_string(), "Active".to_string()],
321 vec!["Alice".to_string(), "25".to_string(), "true".to_string()],
322 vec!["Bob".to_string(), "30".to_string(), "false".to_string()],
323 ];
324
325 write_csv(&csv_path, &data).unwrap();
326
327 let schema = infer_schema(csv_path.to_str().unwrap(), 10, true).unwrap();
328 assert_eq!(schema.columns, vec!["Name", "Age", "Active"]);
329 assert_eq!(schema.types.len(), 3);
330 assert_eq!(schema.types[0], ColumnType::String);
331 assert_eq!(schema.types[1], ColumnType::Integer);
332 assert_eq!(schema.types[2], ColumnType::Boolean);
333 }
334
335 #[test]
336 fn test_column_type_inference() {
337 assert_eq!(
338 ColumnType::infer_from_samples(&["1".to_string(), "2".to_string(), "3".to_string()]),
339 ColumnType::Integer
340 );
341
342 assert_eq!(
343 ColumnType::infer_from_samples(&["1.5".to_string(), "2.5".to_string()]),
344 ColumnType::Float
345 );
346
347 assert_eq!(
348 ColumnType::infer_from_samples(&["hello".to_string(), "world".to_string()]),
349 ColumnType::String
350 );
351
352 assert_eq!(
353 ColumnType::infer_from_samples(&["true".to_string(), "false".to_string()]),
354 ColumnType::Boolean
355 );
356 }
357
358 fn write_csv(path: &std::path::Path, data: &[Vec<String>]) -> Result<()> {
359 let mut writer = csv::WriterBuilder::new()
360 .has_headers(false)
361 .from_path(path)?;
362 for row in data {
363 writer.write_record(row)?;
364 }
365 writer.flush()?;
366 Ok(())
367 }
368}