use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
mpsc::{Receiver, Sender},
};
use bywind::{
TimedWindMap,
fetch::{
FetchProgress, FetchSpec, fetch_to_grib2, parse_yyyymmddhh as parse_yyyymmddhh_lib,
transcode_grib2_to_wcav,
},
io::Format,
};
use chrono::{DateTime, Timelike as _, Utc};
pub(crate) enum FetchEvent {
Progress(FetchProgress),
EncodingStarted,
Done(Result<TimedWindMap, String>),
}
#[derive(Default)]
pub(crate) struct FetchJob {
pub(crate) log: Vec<String>,
pub(crate) phase: FetchPhase,
rx: Option<Receiver<FetchEvent>>,
cancel: Option<Arc<AtomicBool>>,
}
const MAX_LOG_LINES: usize = 500;
#[derive(Default, PartialEq, Eq, Clone, Copy)]
pub(crate) enum FetchPhase {
#[default]
Idle,
Fetching,
Encoding,
Cancelling,
}
impl FetchJob {
pub(crate) fn is_running(&self) -> bool {
!matches!(self.phase, FetchPhase::Idle)
}
pub(crate) fn attach(&mut self, rx: Receiver<FetchEvent>, cancel: Arc<AtomicBool>) {
self.rx = Some(rx);
self.cancel = Some(cancel);
self.phase = FetchPhase::Fetching;
}
pub(crate) fn request_cancel(&mut self) {
if let Some(flag) = &self.cancel {
flag.store(true, Ordering::Release);
}
if self.phase == FetchPhase::Fetching {
self.phase = FetchPhase::Cancelling;
}
}
pub(crate) fn poll(&mut self) -> Option<TimedWindMap> {
let events = {
let rx = self.rx.as_ref()?;
let mut buf = Vec::new();
let mut saw_done = false;
loop {
match rx.try_recv() {
Ok(ev) => {
if matches!(&ev, FetchEvent::Done(_)) {
saw_done = true;
}
buf.push(ev);
}
Err(std::sync::mpsc::TryRecvError::Empty) => break,
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
if !saw_done {
buf.push(FetchEvent::Done(Err(
"worker disconnected without a Done event".to_owned(),
)));
}
break;
}
}
}
buf
};
let mut delivered = None;
for ev in events {
match ev {
FetchEvent::Progress(p) => self.append_log(format_progress(&p)),
FetchEvent::EncodingStarted => {
self.phase = FetchPhase::Encoding;
self.append_log("encoding to .wcav…".to_owned());
}
FetchEvent::Done(Ok(map)) => {
self.append_log("done".to_owned());
self.phase = FetchPhase::Idle;
delivered = Some(map);
self.rx = None;
self.cancel = None;
}
FetchEvent::Done(Err(msg)) => {
self.append_log(format!("error: {msg}"));
self.phase = FetchPhase::Idle;
self.rx = None;
self.cancel = None;
}
}
}
delivered
}
pub(crate) fn reset_log(&mut self) {
self.log.clear();
}
fn append_log(&mut self, line: String) {
self.log.push(line);
if self.log.len() > MAX_LOG_LINES {
let excess = self.log.len() - MAX_LOG_LINES;
self.log.drain(..excess);
}
}
}
pub(crate) fn spawn_worker(
spec: FetchSpec,
out_path: PathBuf,
ctx: egui::Context,
) -> (Receiver<FetchEvent>, Arc<AtomicBool>) {
let (tx, rx) = std::sync::mpsc::channel();
let cancel = Arc::new(AtomicBool::new(false));
let cancel_for_worker = Arc::clone(&cancel);
std::thread::spawn(move || {
let result = run_worker(&spec, &out_path, &tx, &cancel_for_worker, &ctx);
drop(tx.send(FetchEvent::Done(result)));
ctx.request_repaint();
});
(rx, cancel)
}
fn run_worker(
spec: &FetchSpec,
out_path: &Path,
tx: &Sender<FetchEvent>,
cancel: &Arc<AtomicBool>,
ctx: &egui::Context,
) -> Result<TimedWindMap, String> {
let out_fmt = Format::from_path(out_path).map_err(|e| format!("{e}"))?;
let staging = match out_fmt {
Format::Grib2 => out_path.to_path_buf(),
Format::WindAv1 => out_path.with_extension("grib2.tmp"),
};
{
let file =
File::create(&staging).map_err(|e| format!("creating {}: {e}", staging.display()))?;
let mut writer = BufWriter::new(file);
let tx_ref = tx.clone();
let ctx_ref = ctx.clone();
let cancel_ref = Arc::clone(cancel);
fetch_to_grib2(spec, &mut writer, |event| {
drop(tx_ref.send(FetchEvent::Progress(event)));
ctx_ref.request_repaint();
if cancel_ref.load(Ordering::Acquire) {
std::ops::ControlFlow::Break(())
} else {
std::ops::ControlFlow::Continue(())
}
})
.map_err(|e| format!("{e}"))?;
}
if cancel.load(Ordering::Acquire) {
if out_fmt == Format::WindAv1 {
drop(std::fs::remove_file(&staging));
}
return Err("cancelled".to_owned());
}
if out_fmt == Format::Grib2 {
use std::io::BufReader;
let reader = BufReader::new(
File::open(&staging).map_err(|e| format!("opening {}: {e}", staging.display()))?,
);
return TimedWindMap::from_grib2_reader(reader, 1, None)
.map_err(|e| format!("decoding fetched GRIB2: {e}"));
}
drop(tx.send(FetchEvent::EncodingStarted));
ctx.request_repaint();
let map = transcode_grib2_to_wcav(&staging, out_path).map_err(|e| e.to_string())?;
drop(std::fs::remove_file(&staging));
Ok(map)
}
fn format_progress(p: &FetchProgress) -> String {
match p {
FetchProgress::Fetched {
idx,
total,
timestamp,
bytes,
} => format!(
"[{idx:3}/{total:3}] {} ok ({} KB)",
timestamp.format("%Y-%m-%d %H:%M UTC"),
bytes / 1024,
),
FetchProgress::Skipped {
idx,
total,
timestamp,
reason,
} => format!(
"[{idx:3}/{total:3}] {} skipped: {reason}",
timestamp.format("%Y-%m-%d %H:%M UTC"),
),
}
}
pub(crate) fn format_yyyymmddhh(t: DateTime<Utc>) -> String {
t.format("%Y%m%d%H").to_string()
}
pub(crate) fn parse_yyyymmddhh(s: &str) -> Result<DateTime<Utc>, String> {
parse_yyyymmddhh_lib(s).map_err(|e| e.to_string())
}
pub(crate) fn snap_to_cycle(t: DateTime<Utc>) -> DateTime<Utc> {
let hour = t.hour() / 6 * 6;
t.with_hour(hour)
.and_then(|t| t.with_minute(0))
.and_then(|t| t.with_second(0))
.and_then(|t| t.with_nanosecond(0))
.expect("the resulting (h, 0, 0, 0) is always a valid time")
}