use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use crate::datasets::sodir::catalog;
use crate::datasets::sodir::client::ArcGISClient;
use crate::datasets::sodir::error::{Result, SodirError};
use crate::datasets::sodir::fetch;
use crate::datasets::sodir::index::{self, Action, DatasetEntry, SodirIndex};
use crate::datasets::sodir::layout::Workdir;
use crate::datasets::sodir::preprocess::{self, PreprocessReport};
#[derive(Debug, Clone, Default)]
pub struct RefreshReport {
pub fetched: Vec<String>,
pub unchanged: Vec<String>,
pub user_supplied: Vec<String>,
pub cached: Vec<String>,
pub unfetchable: Vec<String>,
pub errors: Vec<(String, String)>,
}
#[derive(Debug, Clone, Default)]
pub struct FetchAllReport {
pub refresh: RefreshReport,
pub preprocess: PreprocessReport,
}
enum ExecResult {
Fetched { rows: u64, elapsed: f64 },
Unchanged,
}
pub async fn refresh_csvs(
workdir: &Workdir,
client: &ArcGISClient,
needed: &[String],
index: &mut SodirIndex,
index_cooldown_days: i64,
dataset_cooldown_days: i64,
concurrency: usize,
) -> Result<RefreshReport> {
let mut report = RefreshReport::default();
let sweep_due = index::sweep_due(index.last_full_check_iso.as_deref(), index_cooldown_days);
let mut stems: Vec<String> = needed.to_vec();
stems.sort();
stems.dedup();
let mut work: Vec<(String, Action)> = Vec::new();
for stem in &stems {
let csv_path = workdir.csv_path(stem);
if !catalog::is_known(stem) {
if csv_path.is_file() {
report.user_supplied.push(stem.clone());
index.datasets.entry(stem.clone()).or_insert_with(|| {
DatasetEntry::user_supplied(
stem,
index::quick_row_count(&csv_path),
&index::now_iso(),
)
});
} else {
report.unfetchable.push(stem.clone());
}
continue;
}
match index::decide_action(
index.datasets.get(stem),
&csv_path,
sweep_due,
dataset_cooldown_days,
) {
Action::Skip => report.cached.push(stem.clone()),
action => work.push((stem.clone(), action)),
}
}
work.sort_by_key(|item| std::cmp::Reverse(size_hint(index, &item.0)));
if !work.is_empty() {
let sem = Arc::new(Semaphore::new(concurrency.max(1)));
let mut set: JoinSet<(String, Result<ExecResult>)> = JoinSet::new();
for (stem, action) in work {
let client = client.clone();
let sem = sem.clone();
let csv_path = workdir.csv_path(&stem);
let prior_count = index.datasets.get(&stem).map(|e| e.row_count);
set.spawn(async move {
let _permit = sem.acquire().await;
let result = execute_one(&client, &stem, action, &csv_path, prior_count).await;
(stem, result)
});
}
while let Some(joined) = set.join_next().await {
let (stem, result) =
joined.map_err(|e| SodirError::Decode(format!("task join: {e}")))?;
match result {
Ok(ExecResult::Fetched { rows, elapsed }) => {
let (base, layer_id) = catalog::resolve(&stem)?;
let kind = catalog::kind_of(&stem)?;
index.datasets.insert(
stem.clone(),
DatasetEntry::fetched(
kind.as_str(),
layer_id,
base,
&stem,
rows,
elapsed,
&index::now_iso(),
),
);
report.fetched.push(stem);
}
Ok(ExecResult::Unchanged) => {
if let Some(entry) = index.datasets.get_mut(&stem) {
entry.count_checked_at_iso = index::now_iso();
}
report.unchanged.push(stem);
}
Err(e) => report.errors.push((stem, e.to_string())),
}
index::save(workdir, index)?;
}
}
if sweep_due {
index.last_full_check_iso = Some(index::now_iso());
}
Ok(report)
}
async fn execute_one(
client: &ArcGISClient,
stem: &str,
action: Action,
csv_path: &std::path::Path,
prior_count: Option<u64>,
) -> Result<ExecResult> {
let mut action = action;
if action == Action::Probe {
let remote = fetch::count(client, stem).await?;
if prior_count == Some(remote) {
return Ok(ExecResult::Unchanged);
}
action = Action::Fetch; }
debug_assert_eq!(action, Action::Fetch);
let t0 = std::time::Instant::now();
let rows = fetch::fetch_to_csv(client, stem, csv_path).await?;
Ok(ExecResult::Fetched {
rows: rows as u64,
elapsed: t0.elapsed().as_secs_f64(),
})
}
fn size_hint(index: &SodirIndex, stem: &str) -> u64 {
index.datasets.get(stem).map(|e| e.row_count).unwrap_or(0)
}
pub async fn fetch_all(
workdir: &Workdir,
needed: &[String],
index_cooldown_days: i64,
dataset_cooldown_days: i64,
concurrency: usize,
) -> Result<FetchAllReport> {
workdir.ensure_dirs()?;
let client = ArcGISClient::new()?;
let mut index = index::load(workdir)?;
let refresh = refresh_csvs(
workdir,
&client,
needed,
&mut index,
index_cooldown_days,
dataset_cooldown_days,
concurrency,
)
.await?;
index::save(workdir, &index)?;
let preprocess = preprocess::apply(&workdir.csv_dir())?;
Ok(FetchAllReport {
refresh,
preprocess,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn size_hint_uses_prior_row_count() {
let mut idx = SodirIndex::default();
idx.datasets.insert(
"field".to_string(),
DatasetEntry::fetched(
"layer",
7100,
"http://x",
"field",
87,
1.0,
&index::now_iso(),
),
);
assert_eq!(size_hint(&idx, "field"), 87);
assert_eq!(size_hint(&idx, "missing"), 0);
}
}