1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
use super::errors::Result;
use super::raw_parser::LineParser;
use super::typer::{TypedValue, Typer};
use crate::errors::MuleError;
use itertools::Itertools;
use maplit::hashmap;
use std::collections::HashMap;
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};

static COMMON_SEPARATORS: [&str; 3] = [",", "\t", "|"];

/// Detects the separator used the most in the given reader from the common separators
pub async fn infer_separator(reader: impl AsyncRead + Unpin) -> Result<String> {
    let mut counts: HashMap<&str, usize> = HashMap::default();

    let mut lines = BufReader::new(reader).lines();
    while let Some(line) = lines.next_line().await? {
        for sep in COMMON_SEPARATORS.iter() {
            *counts.entry(sep).or_default() += line.clone().matches(sep).count();
        }
    }
    let sep = counts
        .into_iter()
        .sorted_by_key(|(_, v)| *v)
        .last()
        .map(|(k, _)| k)
        .unwrap_or(",");
    Ok(sep.to_string())
}

pub async fn read_column_names(
    reader: impl AsyncRead + Unpin,
    separator: &str,
    text_quote: &str,
    text_quote_escape: &str,
) -> Result<Option<Vec<String>>> {
    let names = BufReader::new(reader)
        .lines()
        .next_line()
        .await?
        .map(|line| {
            let names = LineParser::new(line, separator, text_quote, text_quote_escape);
            names.map_into().collect_vec()
        });

    Ok(names)
}

pub async fn count_lines(reader: impl AsyncRead + Unpin) -> Result<usize> {
    let mut count = 0;
    let mut lines = BufReader::new(reader).lines();
    while let Some(_) = lines.next_line().await? {
        count += 1;
    }
    Ok(count)
}

pub async fn infer_column_types<T: Typer>(
    reader: impl AsyncRead + Unpin,
    skip_first_row: bool,
    inference_depth: usize,
    separator: &str,
    text_quote: &str,
    text_quote_escape: &str,
    typer: T,
) -> Result<Vec<T::TypeTag>> {
    let mut lines = BufReader::new(reader).lines();

    if skip_first_row {
        let _ = lines.next_line().await?;
    }

    let mut column_freqs: Vec<HashMap<T::TypeTag, usize>> = Vec::new();

    let mut count = 0;
    while let Some(line) = lines.next_line().await? {
        if count > inference_depth {
            break;
        }
        count += 1;
        let line_values = LineParser::new(line, separator, text_quote, text_quote_escape);
        for (ix, val) in line_values.enumerate() {
            let typed_value = typer.type_value(&val);
            let type_tag = typed_value.tag();
            match column_freqs.get_mut(ix) {
                Some(counts) => *counts.entry(type_tag).or_default() += 1,
                None => column_freqs.push(hashmap! {type_tag => 1}),
            }
        }
    }

    let mut output = vec![];

    for (col_ix, col_freq) in column_freqs.into_iter().enumerate() {
        let type_tag = col_freq
            .into_iter()
            .sorted_by_key(|(_, count)| *count)
            .last()
            .map(|(tag, _)| tag)
            .ok_or_else(|| {
                MuleError::ColumnTyping(format!(
                    "Failed to find at least a single matching type for column {}",
                    col_ix
                ))
            })?;
        output.push(type_tag);
    }

    Ok(output)
}