use crate::dateutil::*;
use crate::dbschema::*;
use crate::loader::combinedlocloader::LocRec;
pub use crate::loader::parseutil::*;
use chrono::NaiveDate;
use sqlx::prelude::*;
use sqlx::Transaction;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::io;
use std::io::Write;
use std::mem::drop;
fn set_per_pop(
row: &sqlx::sqlite::SqliteRow,
colprefix: &str,
colname: &str,
population: Option<i64>,
) -> Option<f64> {
match row.get::<Option<f64>, &str>(format!("{}_pop100k_{}", colprefix, colname).as_str()) {
Some(x) => Some(x),
None => match population {
None => None,
Some(pop) => Some(
(row.get::<i64, &str>(format!("{}_{}", colprefix, colname).as_str()) as f64)
* 100000.0
/ (pop as f64),
),
},
}
}
async fn fillup(
mut transaction: &mut Transaction<sqlx::pool::PoolConnection<sqlx::SqliteConnection>>,
lastrow: &Option<CDataSet>,
nextrow: Option<&CDataSet>,
prevdate: &NaiveDate,
maxdate: &NaiveDate,
) {
if let Some(lastrow) = lastrow {
let targetmaxdate = if let Some(nextrow) = nextrow {
if lastrow.dataset == nextrow.dataset && lastrow.locid == nextrow.locid {
prevdate
} else {
maxdate
}
} else {
maxdate
};
let mut thisjulian = lastrow.date_julian + 1;
let maxjulian = nd_to_day(targetmaxdate);
let mut add_days: i32 = 1;
while thisjulian <= maxjulian {
let query = sqlx::query(CDataSet::insert_str());
let mut cds = lastrow.clone().dup_day();
cds.set_date(thisjulian);
cds.day_index_0 += add_days;
cds.day_index_1 += add_days;
cds.day_index_10 = cds.day_index_10.map(|x| x + add_days);
cds.day_index_100 = cds.day_index_100.map(|x| x + add_days);
cds.day_index_1k = cds.day_index_1k.map(|x| x + add_days);
cds.day_index_10k = cds.day_index_10k.map(|x| x + add_days);
cds.day_index_peak = cds.day_index_peak.map(|x| x + add_days);
cds.day_index_peak_confirmed = cds.day_index_peak_confirmed.map(|x| x + add_days);
cds.day_index_peak_deaths = cds.day_index_peak_deaths.map(|x| x + add_days);
cds.bind_query(query)
.execute(&mut transaction)
.await
.unwrap();
thisjulian += 1;
add_days += 1;
}
}
}
pub async fn load(
inputpool: &mut sqlx::SqlitePool,
outputpool: &mut sqlx::SqlitePool,
lochm: &mut HashMap<String, LocRec>,
fipshm: &HashMap<u32, u64>,
) {
let mut conn = outputpool.acquire().await.unwrap();
conn.execute("PRAGMA auto_vacuum = 0").await.unwrap();
conn.execute("PRAGMA synchronous = 0").await.unwrap();
drop(conn);
let mut transaction = outputpool.begin().await.unwrap();
let mut iconn = inputpool.acquire().await.unwrap();
let totalrecs: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM dataset")
.fetch_one(&mut iconn)
.await
.unwrap();
let mut processedrecs: i64 = 0;
let maxdate_str: (String,) = sqlx::query_as("SELECT MAX(date) FROM dataset")
.fetch_one(&mut iconn)
.await
.unwrap();
let maxdate = NaiveDate::parse_from_str(maxdate_str.0.as_str(), "%Y-%m-%d").unwrap();
let mut cursor =
sqlx::query("SELECT * from dataset ORDER BY dataset, location_key, date").fetch(&mut iconn);
let mut lastrow = None;
let mut locrecsadded: u64 = 0;
while let Some(row) = cursor.next().await.unwrap() {
let locrec = match lochm.get(&row.get::<String, &str>("location_key")) {
Some(x) => x.clone(),
None => {
locrecsadded += 1;
let maxid: (i64,) = sqlx::query_as("SELECT MAX(locid) FROM cdataset_loc")
.fetch_one(&mut transaction)
.await
.unwrap();
let query = sqlx::query(
"INSERT INTO cdataset_loc VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
);
let locid = maxid.0 + 1;
let emptystr = String::from("");
query
.bind(locid)
.bind(
row.get::<Option<String>, &str>("location_type")
.unwrap_or(emptystr.clone()),
)
.bind(
row.get::<Option<String>, &str>("location_label")
.unwrap_or(emptystr.clone()),
)
.bind(
row.get::<Option<String>, &str>("country_code")
.unwrap_or(emptystr.clone()),
)
.bind(
row.get::<Option<String>, &str>("country")
.unwrap_or(emptystr.clone()),
)
.bind(
row.get::<Option<String>, &str>("province")
.unwrap_or(emptystr.clone()),
)
.bind(
row.get::<Option<String>, &str>("administrative")
.unwrap_or(emptystr.clone()),
)
.bind(
row.get::<Option<String>, &str>("region")
.unwrap_or(emptystr.clone()),
)
.bind("")
.bind("")
.bind(None::<Option<i64>>)
.execute(&mut transaction)
.await
.unwrap();
let locrec = LocRec {
fips: None,
population: None,
locid: u32::try_from(locid).unwrap(),
};
lochm.insert(row.get::<String, &str>("location_key"), locrec.clone());
locrec
}
};
let nd = NaiveDate::from_ymd(
row.get("date_year"),
u32::try_from(row.get::<i32, &str>("date_month")).unwrap(),
u32::try_from(row.get::<i32, &str>("date_day")).unwrap(),
);
let julian = nd_to_day(&nd);
let population: Option<i64> = match row.get("factbook_population") {
Some(pop) => Some(pop),
None => locrec.fips.and_then(|x| {
fipshm
.get(&x)
.and_then(|y| Some(i64::try_from(*y).unwrap()))
}),
};
let query = sqlx::query(CDataSet::insert_str());
let cds = CDataSet {
dataset: row.get("dataset"),
locid: i64::from(locrec.locid),
location_lat: row.get("location_lat"),
location_long: row.get("location_long"),
date_julian: julian,
day_index_0: row.get("day_index_0"),
day_index_1: row.get("day_index_1"),
day_index_10: row.get("day_index_10"),
day_index_100: row.get("day_index_100"),
day_index_1k: row.get("day_index_1k"),
day_index_10k: row.get("day_index_10k"),
day_index_peak: row.get("day_index_peak"),
day_index_peak_confirmed: row.get("day_index_peak_confirmed"),
day_index_peak_deaths: row.get("day_index_peak_deaths"),
absolute_confirmed: row
.get::<Option<i64>, &str>("absolute_confirmed")
.unwrap_or(0),
absolute_deaths: row.get::<Option<i64>, &str>("absolute_deaths").unwrap_or(0),
absolute_recovered: row
.get::<Option<i64>, &str>("absolute_recovered")
.unwrap_or(0),
absolute_infected: row
.get::<Option<i64>, &str>("absolute_infected")
.unwrap_or(0),
absolute_pop100k_confirmed: set_per_pop(&row, "absolute", "confirmed", population),
absolute_pop100k_deaths: set_per_pop(&row, "absolute", "deaths", population),
absolute_pop100k_recovered: set_per_pop(&row, "absolute", "recovered", population),
absolute_pop100k_infected: set_per_pop(&row, "absolute", "infected", population),
relative_deaths: row.get("relative_deaths"),
relative_recovered: row.get("relative_recovered"),
relative_infected: row.get("relative_infected"),
delta_confirmed: row.get::<Option<i64>, &str>("delta_confirmed").unwrap_or(0),
delta_deaths: row.get::<Option<i64>, &str>("delta_deaths").unwrap_or(0),
delta_recovered: row.get::<Option<i64>, &str>("delta_recovered").unwrap_or(0),
delta_infected: row.get::<Option<i64>, &str>("delta_infected").unwrap_or(0),
delta_pct_confirmed: row.get("delta_pct_confirmed"),
delta_pct_deaths: row.get("delta_pct_deaths"),
delta_pct_recovered: row.get("delta_pct_recovered"),
delta_pct_infected: row.get("delta_pct_infected"),
delta_pop100k_confirmed: set_per_pop(&row, "delta", "confirmed", population),
delta_pop100k_deaths: set_per_pop(&row, "delta", "deaths", population),
delta_pop100k_recovered: set_per_pop(&row, "delta", "recovered", population),
delta_pop100k_infected: set_per_pop(&row, "delta", "infected", population),
peak_pct_confirmed: row.get("peak_pct_confirmed"),
peak_pct_deaths: row.get("peak_pct_deaths"),
peak_pct_recovered: row.get("peak_pct_recovered"),
peak_pct_infected: row.get("peak_pct_infected"),
factbook_area: row.get("factbook_area"),
factbook_population: population,
factbook_death_rate: row.get("factbook_death_rate"),
factbook_median_age: row.get("factbook_median_age"),
};
fillup(&mut transaction, &lastrow, Some(&cds), &nd.pred(), &maxdate).await;
cds.clone()
.bind_query(query)
.execute(&mut transaction)
.await
.unwrap();
lastrow = Some(cds);
processedrecs += 1;
if processedrecs % 10000 == 0 {
print!(
"Processed {} of {} input records\r",
processedrecs, totalrecs.0
);
io::stdout().flush().unwrap();
}
}
fillup(&mut transaction, &lastrow, None, &maxdate, &maxdate).await;
println!(
"Processed {} of {} input records ({} location records also added)",
processedrecs, totalrecs.0, locrecsadded
);
io::stdout().flush().unwrap();
println!("Committing...");
transaction.commit().await.unwrap();
}