#![allow(non_snake_case)]
pub use crate::dateutil::*;
pub use crate::dbschema::*;
pub use crate::loader::parseutil::*;
use chrono::NaiveDate;
use csv;
use serde::Deserialize;
use sqlx::Transaction;
#[derive(Debug, Deserialize, PartialEq, Clone)]
pub struct CsvRec {
pub date: String,
pub state: String,
pub positive: Option<i64>,
pub negative: Option<i64>,
pub pending: Option<i64>,
pub hospitalizedCurrently: Option<i64>,
pub hospitalizedCumulative: Option<i64>,
pub incluCurrently: Option<i64>,
pub incluCumulative: Option<i64>,
pub onVentilatorCurrently: Option<i64>,
pub onVentilatorCumulative: Option<i64>,
pub recovered: Option<i64>,
pub dataQualityGrade: Option<String>,
pub lastUpdateEt: Option<String>,
pub dateModified: Option<String>,
pub checkTimeEt: Option<String>,
pub death: Option<i64>,
pub hospitalized: Option<i64>,
pub dateChecked: Option<String>,
pub totalTestsViral: Option<i64>,
pub positiveTestsViral: Option<i64>,
pub negativeTestsViral: Option<i64>,
pub positiveCasesViral: Option<i64>,
pub deathConfirmed: Option<i64>,
pub deathProbable: Option<i64>,
pub totalTestEncountersViral: Option<i64>,
pub totalTestsPeopleViral: Option<i64>,
pub totalTestsAntibody: Option<i64>,
pub positiveTestsAntibody: Option<i64>,
pub negativeTestsAntibody: Option<i64>,
pub totalTestsPeopleAntibody: Option<i64>,
pub positiveTestsPeopleAntibody: Option<i64>,
pub negativeTestsPeopleAntibody: Option<i64>,
pub totalTestsPeopleAntigen: Option<i64>,
pub positiveTestsPeopleAntigen: Option<i64>,
pub totalTestsAntigen: Option<i64>,
pub positiveTestsAntigen: Option<i64>,
pub fips: i64,
pub positiveIncrease: Option<i64>,
pub negativeIncrease: Option<i64>,
pub total: Option<i64>,
pub totalTestResultsSource: String,
pub totalTestResults: Option<i64>,
pub totalTestResultsIncrease: Option<i64>,
pub posNeg: Option<i64>,
pub deathIncrease: Option<i64>,
pub hospitalizedIncrease: Option<i64>,
pub hash: Option<String>,
pub commercialScore: Option<i64>,
pub negativeRegularScore: Option<i64>,
pub negativeScore: Option<i64>,
pub positiveScore: Option<i64>,
pub score: Option<i64>,
pub grade: Option<String>,
}
pub fn parse_to_final<A: Iterator<Item = csv::StringRecord>>(
striter: A,
) -> impl Iterator<Item = CsvRec> {
striter.filter_map(|x| rec_to_struct(&x).expect("rec_to_struct"))
}
pub async fn load<'a, A: std::io::Read>(
rdr: &'a mut csv::Reader<A>,
mut transaction: Transaction<sqlx::pool::PoolConnection<sqlx::SqliteConnection>>,
) {
assert_eq!(
vec![
"date",
"state",
"positive",
"negative",
"pending",
"hospitalizedCurrently",
"hospitalizedCumulative",
"inIcuCurrently",
"inIcuCumulative",
"onVentilatorCurrently",
"onVentilatorCumulative",
"recovered",
"dataQualityGrade",
"lastUpdateEt",
"dateModified",
"checkTimeEt",
"death",
"hospitalized",
"dateChecked",
"totalTestsViral",
"positiveTestsViral",
"negativeTestsViral",
"positiveCasesViral",
"deathConfirmed",
"deathProbable",
"totalTestEncountersViral",
"totalTestsPeopleViral",
"totalTestsAntibody",
"positiveTestsAntibody",
"negativeTestsAntibody",
"totalTestsPeopleAntibody",
"positiveTestsPeopleAntibody",
"negativeTestsPeopleAntibody",
"totalTestsPeopleAntigen",
"positiveTestsPeopleAntigen",
"totalTestsAntigen",
"positiveTestsAntigen",
"fips",
"positiveIncrease",
"negativeIncrease",
"total",
"totalTestResultsSource",
"totalTestResults",
"totalTestResultsIncrease",
"posNeg",
"deathIncrease",
"hospitalizedIncrease",
"hash",
"commercialScore",
"negativeRegularScore",
"negativeScore",
"positiveScore",
"score",
"grade"
],
rdr.headers().unwrap().iter().collect::<Vec<&str>>()
);
let recs = parse_records(rdr.byte_records());
let finaliter = parse_to_final(recs);
for rec in finaliter {
let nd = NaiveDate::parse_from_str(rec.date.as_str(), "%Y%m%d").unwrap();
let dbrec = CovidTracking {
date_julian: nd_to_day(&nd),
state: rec.state,
positive: rec.positive,
negative: rec.negative,
pending: rec.pending,
hospitalizedCurrently: rec.hospitalizedCurrently,
hospitalizedCumulative: rec.hospitalizedCumulative,
incluCurrently: rec.incluCurrently,
incluCumulative: rec.incluCumulative,
onVentilatorCurrently: rec.onVentilatorCurrently,
onVentilatorCumulative: rec.onVentilatorCumulative,
recovered: rec.recovered,
dataQualityGrade: rec.dataQualityGrade,
lastUpdateEt: rec.lastUpdateEt,
dateModified: rec.dateModified,
checkTimeEt: rec.checkTimeEt,
death: rec.death,
hospitalized: rec.hospitalized,
dateChecked: rec.dateChecked,
totalTestsViral: rec.totalTestsViral,
positiveTestsViral: rec.positiveTestsViral,
negativeTestsViral: rec.negativeTestsViral,
positiveCasesViral: rec.positiveCasesViral,
deathConfirmed: rec.deathConfirmed,
deathProbable: rec.deathProbable,
totalTestEncountersViral: rec.totalTestEncountersViral,
totalTestsPeopleViral: rec.totalTestsPeopleViral,
totalTestsAntibody: rec.totalTestsAntibody,
positiveTestsAntibody: rec.positiveTestsAntibody,
negativeTestsAntibody: rec.negativeTestsAntibody,
totalTestsPeopleAntibody: rec.totalTestsPeopleAntibody,
positiveTestsPeopleAntibody: rec.positiveTestsPeopleAntibody,
negativeTestsPeopleAntibody: rec.negativeTestsPeopleAntibody,
totalTestsPeopleAntigen: rec.totalTestsPeopleAntigen,
positiveTestsPeopleAntigen: rec.positiveTestsPeopleAntigen,
totalTestsAntigen: rec.totalTestsAntigen,
positiveTestsAntigen: rec.positiveTestsAntigen,
fips: rec.fips,
positiveIncrease: rec.positiveIncrease,
negativeIncrease: rec.negativeIncrease,
total: rec.total,
totalTestResultsSource: rec.totalTestResultsSource,
totalTestResults: rec.totalTestResults,
totalTestResultsIncrease: rec.totalTestResultsIncrease,
posNeg: rec.posNeg,
deathIncrease: rec.deathIncrease,
hospitalizedIncrease: rec.hospitalizedIncrease,
commercialScore: rec.commercialScore,
negativeRegularScore: rec.negativeRegularScore,
negativeScore: rec.negativeScore,
positiveScore: rec.positiveScore,
score: rec.score,
grade: rec.grade,
};
let query = sqlx::query(CovidTracking::insert_str());
dbrec
.bind_query(query)
.execute(&mut transaction)
.await
.unwrap();
}
transaction.commit().await.unwrap();
}