use crate::configs::Configs;
use crate::engine::{Engine, EngineContainer};
use crate::shared::CsvManipulator;
use crate::Profile;
use csv;
use indexmap::IndexMap;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::io::Write;
use std::result::Result;
use csv::WriterBuilder;
use serde_json;
use serde_json::Value;
use std::error::Error;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
const DELIMITER: u8 = b',';
type ProfilesMap = IndexMap<String, Profile>;
#[derive(Serialize, Deserialize, Debug)]
pub struct DataSampleParser {
pub issues: bool,
cfg: Option<Configs>,
#[serde(with = "indexmap::serde_seq")]
profiles: ProfilesMap,
}
impl CsvManipulator for DataSampleParser {}
impl Engine for DataSampleParser {}
impl DataSampleParser {
pub fn new() -> DataSampleParser {
DataSampleParser {
issues: false,
cfg: None,
profiles: ProfilesMap::new(),
}
}
pub fn new_with(path: &String) -> DataSampleParser {
DataSampleParser {
issues: false,
cfg: Some(Configs::new(path)),
profiles: ProfilesMap::new(),
}
}
pub fn from_file(path: &String) -> DataSampleParser {
let mut file = match File::open(format!("{}.json", &path)) {
Err(_e) => {
error!("Could not open file {:?}", &path.to_string());
panic!("Could not open file {:?}", &path.to_string());
}
Ok(f) => {
info!("Successfully opened file {:?}", &path.to_string());
f
}
};
let mut serialized = String::new();
match file.read_to_string(&mut serialized) {
Err(e) => {
error!(
"Could not read file {:?} because of {:?}",
&path.to_string(),
e.to_string()
);
panic!(
"Could not read file {:?} because of {:?}",
&path.to_string(),
e.to_string()
);
}
Ok(s) => {
info!("Successfully read file {:?}", &path.to_string());
s
}
};
let dsp: Value = serde_json::from_str(&serialized).unwrap();
let prfils = dsp.get("profiles").unwrap();
match prfils.is_array() {
true => {
debug!("Version 0.3.0 detected. Using latest version");
return serde_json::from_str(&serialized).unwrap();
}
false => {
info!("Prior version 0.2.1 detected. Trying to upgrade to latest version");
return Self::upgrade_to_latest_version(serialized);
}
}
}
fn upgrade_to_latest_version(serialized: String) -> DataSampleParser {
let dsp: Value = serde_json::from_str(&serialized).unwrap();
let prfils = dsp.get("profiles").unwrap();
let mut pm: ProfilesMap = ProfilesMap::new();
let issues = dsp.get("issues").unwrap().as_bool().unwrap();
for prf in prfils.as_object().iter() {
for attr in prf.keys() {
let id = prf
.get(attr)
.unwrap()
.as_object()
.unwrap()
.get("id")
.unwrap()
.as_str()
.unwrap()
.to_string();
let serl = &serde_json::to_string(prf.get(attr).unwrap()).unwrap();
println!("{:?} : {:?}", id, serl);
pm.insert(id, Profile::from_serialized(serl));
}
}
let mut rtn = match dsp.get("cfg").unwrap() {
serde_json::Value::Null => DataSampleParser::new(),
_ => DataSampleParser::new_with(
&dsp.get("cfg")
.unwrap()
.as_object()
.unwrap()
.get("file")
.unwrap()
.as_str()
.unwrap()
.to_string(),
),
};
rtn.issues = issues;
rtn.profiles = pm;
return rtn;
}
#[inline]
fn analyze_columns(&mut self, profile_keys: Vec<String>, columns: Vec<Vec<String>>) {
let col_cnt = columns.len();
let (tx, rx): (
Sender<Result<Profile, String>>,
Receiver<Result<Profile, String>>,
) = mpsc::channel();
let mut jobs = Vec::new();
for (idx, column) in columns.iter().enumerate() {
let thread_tx = tx.clone();
let container = EngineContainer {
profile: self.profiles.get(&profile_keys[idx]).unwrap().clone(),
entities: column.to_vec(),
};
let job = thread::spawn(move || {
let result = Self::profile_entities_with_container(container);
thread_tx.send(result).unwrap();
});
jobs.push(job);
}
let mut results = Vec::with_capacity(col_cnt);
for _ in 0..col_cnt {
results.push(rx.recv());
}
for job in jobs {
job.join().expect("Error: Could not run the job");
}
for result in results {
match result {
Ok(msg) => {
match msg {
Ok(p) => {
let id = p.clone().id.unwrap();
debug!("Profile {} has finished analyzing the entities.", id);
self.profiles.insert(id, p);
}
Err(e) => {
error!(
"Profile wasn't able to analyzing the entities. Error: {}",
e
);
}
}
}
Err(e) => {
error!("Receiver wasn't able to receive message from sender which was analyzing entities for the profile. Error: {}", e);
panic!("Receiver wasn't able to receive message from sender which was analyzing entities for the profile. Error: {}", e);
}
}
}
}
pub fn analyze_csv_data(
&mut self,
data: &String,
delimiter: Option<u8>,
) -> Result<i32, String> {
debug!("Starting to analyzed the csv data {}", data);
let mut rdr = csv::ReaderBuilder::new()
.has_headers(true)
.quote(b'"')
.double_quote(true)
.delimiter(Self::else_default_delimiter(delimiter))
.from_reader(data.as_bytes());
for headers in rdr.headers() {
for header in headers.iter() {
let p = Profile::new_with_id(header.to_string());
self.profiles.insert(header.to_string(), p);
}
}
let profile_keys: Vec<_> = self.profiles.keys().cloned().collect();
debug!("CSV headers: {:?}", profile_keys);
let columns = Self::read_as_columns(rdr);
let rec_cnt = columns[0].len();
self.analyze_columns(profile_keys, columns);
debug!("Successfully analyzed the csv data");
debug!(
"Analyzed {} records, {} fields",
rec_cnt,
self.profiles.len()
);
self.profiles.iter_mut().for_each(|p| p.1.pre_generate());
Ok(1)
}
pub fn analyze_csv_file(
&mut self,
path: &String,
delimiter: Option<u8>,
) -> Result<i32, String> {
info!("Starting to analyzed the csv file {}", path);
let mut file = (File::open(path).map_err(|e| {
error!("csv file {} couldn't be opened!", path);
e.to_string()
}))?;
let mut data = String::new();
file.read_to_string(&mut data)
.map_err(|e| {
error!("csv file {} couldn't be read!", path);
e.to_string()
})
.unwrap();
self.analyze_csv_data(&data, delimiter)
}
pub fn demo_date(&self) -> String {
let mut profil = Profile::new();
profil.analyze("01/04/2017");
profil.analyze("02/09/2017");
profil.analyze("03/13/2017");
profil.analyze("04/17/2017");
profil.analyze("05/22/2017");
profil.analyze("07/26/2017");
profil.analyze("08/30/2017");
profil.analyze("09/07/2017");
profil.analyze("10/11/2017");
profil.analyze("11/15/2017");
profil.analyze("12/21/2017");
profil.analyze("01/14/2016");
profil.analyze("02/19/2016");
profil.analyze("03/23/2016");
profil.analyze("04/27/2016");
profil.analyze("05/02/2016");
profil.analyze("07/16/2015");
profil.analyze("08/20/2015");
profil.analyze("09/17/2015");
profil.analyze("10/01/2014");
profil.analyze("11/25/2014");
profil.analyze("12/31/2018");
profil.pre_generate();
profil.generate()
}
pub fn demo_person_name(&self) -> String {
let mut profil = Profile::new();
profil.analyze("Smith, John");
profil.analyze("O'Brien, Henny");
profil.analyze("Dale, Danny");
profil.analyze("Rickets, Ronnae");
profil.analyze("Richard, Richie");
profil.analyze("Roberts, Blake");
profil.analyze("Conways, Sephen");
profil.pre_generate();
profil.generate()
}
fn else_default_delimiter(delimiter: Option<u8>) -> u8 {
match delimiter {
Some(d) => {
return d;
}
None => {
return DELIMITER;
}
}
}
pub fn extract_headers(&mut self) -> Vec<String> {
let mut headers = vec![];
for profile in self.profiles.iter_mut() {
headers.push(profile.0.to_string());
}
headers
}
pub fn generate_by_field_name(&mut self, field: String) -> String {
self.profiles
.get_mut(&field)
.unwrap()
.generate()
.to_string()
}
pub fn generate_record(&mut self) -> Vec<String> {
let mut record = Vec::new();
for profile in self.profiles.iter_mut() {
record.push(profile.1.generate().to_string());
}
record
}
pub fn generate_csv(
&mut self,
row_count: u32,
path: &String,
delimiter: Option<u8>,
) -> Result<(), Box<dyn Error>> {
info!("generating csv file {}", path);
let mut wtr = (WriterBuilder::new()
.has_headers(true)
.quote(b'"')
.double_quote(true)
.delimiter(Self::else_default_delimiter(delimiter))
.from_path(path)
.map_err(|e| {
error!("csv file {} couldn't be created!", path);
e.to_string()
}))?;
let headers = self.extract_headers();
wtr.write_record(&headers)?;
for _r in 0..row_count {
let mut record = Vec::new();
for profile in self.profiles.iter_mut() {
record.push(profile.1.generate());
}
wtr.write_record(&record)?;
}
wtr.flush()?;
Ok(())
}
pub fn levenshtein_distance(&mut self, control: &String, experiment: &String) -> usize {
levenshtein_distance!(control, experiment)
}
pub fn realistic_test(&mut self, control: &String, experiment: &String) -> f64 {
realistic_test!(control, experiment)
}
pub fn running_with_issues(&self) -> &bool {
&self.issues
}
pub fn save(&mut self, path: &String) -> Result<bool, io::Error> {
let dsp_json = serde_json::to_string(&self).unwrap();
let mut file = match File::create(format!("{}.json", &path)) {
Err(e) => {
error!("Could not create file {:?}", &path.to_string());
return Err(e);
}
Ok(f) => {
info!("Successfully exported to {:?}", &path.to_string());
f
}
};
match file.write_all(dsp_json.as_bytes()) {
Err(e) => {
error!("Could not write to file {}", &path.to_string());
return Err(e);
}
Ok(_) => {
info!("Successfully exported to {}", &path.to_string());
}
};
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::io::BufReader;
#[test]
fn test_new() {
let _dsp = DataSampleParser::new();
assert!(true);
}
#[test]
fn test_new_with() {
let _dsp = DataSampleParser::new_with(&String::from("./config/tdg.yaml"));
assert!(true);
}
#[test]
fn test_from_file() {
let mut dsp = DataSampleParser::from_file(&String::from("./tests/samples/sample-00-dsp"));
println!("Sample data is [{:?}]", dsp.generate_record()[0]);
assert_eq!(dsp.generate_record()[0], "OK".to_string());
}
#[test]
fn test_from_file_v021_with_cfg() {
let mut dsp =
DataSampleParser::from_file(&String::from("./tests/samples/sample-0.2.1-dsp"));
println!("Sample data is [{:?}]", dsp.generate_record()[0]);
assert_eq!(dsp.generate_record()[0], "OK".to_string());
}
#[test]
fn test_from_file_v021_no_cfg() {
let mut dsp =
DataSampleParser::from_file(&String::from("./tests/samples/sample-0.2.1-nocfg-dsp"));
println!("Sample data is [{:?}]", dsp.generate_record()[0]);
assert_eq!(dsp.generate_record()[0], "OK".to_string());
}
#[test]
fn test_read_headers() {
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-01.csv"), None)
.unwrap();
let headers = dsp.extract_headers();
assert_eq!(headers.len(), 2);
}
#[test]
fn test_read_headers_order() {
let mut expected = Vec::new();
expected.push("column-Z");
expected.push("column-D");
expected.push("column-A");
expected.push("column-G");
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-02.csv"), None)
.unwrap();
let headers = dsp.extract_headers();
assert_eq!(headers, expected);
}
#[test]
fn test_parse_csv_file() {
let mut dsp = DataSampleParser::new();
assert_eq!(
dsp.analyze_csv_file(&String::from("./tests/samples/sample-01.csv"), None)
.unwrap(),
1
);
}
#[test]
fn test_parse_csv_data_using_defaults() {
let mut dsp = DataSampleParser::new();
let mut data = String::from("");
data.push_str("\"firstname\",\"lastname\"\n");
data.push_str("\"Aaron\",\"Aaberg\"\n");
data.push_str("\"Aaron\",\"Aaby\"\n");
data.push_str("\"Abbey\",\"Aadland\"\n");
data.push_str("\"Abbie\",\"Aagaard\"\n");
data.push_str("\"Abby\",\"Aakre\"");
assert_eq!(dsp.analyze_csv_data(&data, None).unwrap(), 1);
}
#[test]
fn test_parse_csv_data() {
let mut dsp = DataSampleParser::new();
let mut data = String::from("");
data.push_str("\"firstname\"|\"lastname\"\n");
data.push_str("\"Aaron\"|\"Aaberg\"\n");
data.push_str("\"Aaron\"|\"Aaby\"\n");
data.push_str("\"Abbey\"|\"Aadland\"\n");
data.push_str("\"Abbie\"|\"Aagaard\"\n");
data.push_str("\"Abby\"|\"Aakre\"");
assert_eq!(dsp.analyze_csv_data(&data, Some(b'|')).unwrap(), 1);
}
#[test]
fn test_generate_field_from_csv_file() {
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-01.csv"), None)
.unwrap();
println!(
"Generated data for first name {}",
dsp.generate_by_field_name("firstname".to_string())
);
}
#[test]
fn test_generate_record_from_csv_file() {
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-01.csv"), None)
.unwrap();
assert_eq!(dsp.generate_record().len(), 2);
}
#[test]
fn test_parse_csv_file_bad() {
let mut dsp = DataSampleParser::new();
assert_eq!(
dsp.analyze_csv_file(&String::from("./badpath/sample-01.csv"), None)
.is_err(),
true
);
}
#[test]
fn test_save() {
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-00.csv"), None)
.unwrap();
assert_eq!(
dsp.save(&String::from("./tests/samples/sample-00-dsp"))
.unwrap(),
true
);
}
#[test]
fn test_levenshtein_test() {
let mut dsp = DataSampleParser::new();
assert_eq!(
dsp.levenshtein_distance(&"kitten".to_string(), &"sitting".to_string()),
3 as usize
);
}
#[test]
fn test_realistic_data_test() {
let mut dsp = DataSampleParser::new();
assert_eq!(
dsp.realistic_test(&"kitten".to_string(), &"sitting".to_string()),
76.92307692307692 as f64
);
}
#[test]
fn test_demo() {
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-01.csv"), None)
.unwrap();
println!(
"My new name is {} {}",
dsp.generate_record()[0],
dsp.generate_record()[1]
);
assert!(true);
}
#[test]
fn test_extract_headers_from_sample() {
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-01.csv"), None)
.unwrap();
let headers = dsp.extract_headers();
assert_eq!(headers.len(), 2);
}
#[test]
fn test_generate_csv_test_data_from_sample() {
let mut dsp = DataSampleParser::new();
dsp.analyze_csv_file(&String::from("./tests/samples/sample-01.csv"), None)
.unwrap();
dsp.generate_csv(
100,
&String::from("./tests/samples/generated-01b.csv"),
Some(b'|'),
)
.unwrap();
let generated_row_count =
match File::open(format!("{}", "./tests/samples/generated-01b.csv")) {
Err(_e) => 0,
Ok(f) => {
let mut count = 0;
let bf = BufReader::new(f);
for _line in bf.lines() {
count += 1;
}
count
}
};
assert_eq!(generated_row_count, 101);
}
}