1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use super::errors::Result;
use super::raw_parser::LineParser;
use super::typer::{TypedValue, Typer};
use crate::errors::MuleError;
use itertools::Itertools;
use maplit::hashmap;
use std::collections::HashMap;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
static COMMON_SEPARATORS: [&str; 3] = [",", "\t", "|"];
pub async fn infer_separator(reader: impl AsyncRead + Unpin) -> Result<String> {
let mut counts: HashMap<&str, usize> = HashMap::default();
let mut lines = BufReader::new(reader).lines();
while let Some(line) = lines.next_line().await? {
for sep in COMMON_SEPARATORS.iter() {
*counts.entry(sep).or_default() += line.clone().matches(sep).count();
}
}
let sep = counts
.into_iter()
.sorted_by_key(|(_, v)| *v)
.last()
.map(|(k, _)| k)
.unwrap_or(",");
Ok(sep.to_string())
}
pub async fn read_column_names(
reader: impl AsyncRead + Unpin,
separator: &str,
text_quote: &str,
text_quote_escape: &str,
) -> Result<Option<Vec<String>>> {
let names = BufReader::new(reader)
.lines()
.next_line()
.await?
.map(|line| {
let names = LineParser::new(line, separator, text_quote, text_quote_escape);
names.map_into().collect_vec()
});
Ok(names)
}
pub async fn count_lines(reader: impl AsyncRead + Unpin) -> Result<usize> {
let mut count = 0;
let mut lines = BufReader::new(reader).lines();
while let Some(_) = lines.next_line().await? {
count += 1;
}
Ok(count)
}
pub async fn infer_column_types<T: Typer>(
reader: impl AsyncRead + Unpin,
skip_first_row: bool,
inference_depth: usize,
separator: &str,
text_quote: &str,
text_quote_escape: &str,
typer: T,
) -> Result<Vec<T::TypeTag>> {
let mut lines = BufReader::new(reader).lines();
if skip_first_row {
let _ = lines.next_line().await?;
}
let mut column_freqs: Vec<HashMap<T::TypeTag, usize>> = Vec::new();
let mut count = 0;
while let Some(line) = lines.next_line().await? {
if count > inference_depth {
break;
}
count += 1;
let line_values = LineParser::new(line, separator, text_quote, text_quote_escape);
for (ix, val) in line_values.enumerate() {
let typed_value = typer.type_value(&val);
let type_tag = typed_value.tag();
match column_freqs.get_mut(ix) {
Some(counts) => *counts.entry(type_tag).or_default() += 1,
None => column_freqs.push(hashmap! {type_tag => 1}),
}
}
}
let mut output = vec![];
for (col_ix, col_freq) in column_freqs.into_iter().enumerate() {
let type_tag = col_freq
.into_iter()
.sorted_by_key(|(_, count)| *count)
.last()
.map(|(tag, _)| tag)
.ok_or_else(|| {
MuleError::ColumnTyping(format!(
"Failed to find at least a single matching type for column {}",
col_ix
))
})?;
output.push(type_tag);
}
Ok(output)
}