use crate::types::frequency::Frequency;
use crate::weather_data::data_loader::WeatherDataLoader;
use crate::weather_data::error::WeatherDataError;
use crate::RequiredData;
use chrono::Utc;
use polars::prelude::LazyFrame;
use std::collections::{hash_map::Entry, HashMap};
use std::ffi::OsStr;
use std::io;
use std::path::{Path, PathBuf};
use tokio::sync::Mutex;
pub struct FrameFetcher {
loader: WeatherDataLoader,
lazyframe_cache: Mutex<HashMap<(String, Frequency), LazyFrame>>,
cache_folder: PathBuf,
}
impl FrameFetcher {
pub fn new(cache_dir: &Path) -> Self {
Self {
loader: WeatherDataLoader::new(cache_dir),
lazyframe_cache: Mutex::new(HashMap::new()),
cache_folder: cache_dir.to_path_buf(),
}
}
pub async fn clear_cache_all(&self) -> Result<(), WeatherDataError> {
let mut entries = tokio::fs::read_dir(&self.cache_folder)
.await
.map_err(|e| WeatherDataError::CacheDeletionError(self.cache_folder.clone(), e))?;
while let Some(entry) = entries
.next_entry()
.await
.map_err(|e| WeatherDataError::CacheDeletionError(self.cache_folder.clone(), e))?
{
let file_path = entry.path();
if file_path.is_file() {
if let Some(extension) = file_path.extension() {
if extension == OsStr::new("parquet") {
match tokio::fs::remove_file(&file_path).await {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {} Err(e) => {
return Err(WeatherDataError::CacheDeletionError(
file_path.clone(),
e,
))
}
}
}
}
}
}
self.lazyframe_cache.lock().await.clear();
Ok(())
}
pub async fn clear_cache(
&self,
station: &str,
frequency: Frequency,
) -> Result<(), WeatherDataError> {
let file = self.cache_folder.join(format!(
"{}{}.parquet",
frequency.cache_file_prefix(),
station
));
match tokio::fs::remove_file(&file).await {
Ok(()) => {}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => return Err(WeatherDataError::CacheDeletionError(file.clone(), e)),
}
self.lazyframe_cache
.lock()
.await
.remove(&(station.to_string(), frequency));
Ok(())
}
async fn is_cache_stale(
&self,
station: &str,
frequency: Frequency,
required_data: RequiredData,
) -> Result<bool, WeatherDataError> {
let Some(date_required) = required_data.get_end_date() else {
return Ok(false);
};
if date_required > Utc::now().date_naive() {
return Ok(false);
}
match self
.loader
.get_cache_modification_time(station, frequency)
.await
{
Ok(Some(modified)) => {
let cache_date = modified.date_naive();
Ok(date_required > cache_date)
}
Ok(None) => Ok(true),
Err(e) => Err(e),
}
}
pub async fn get_cache_lazyframe(
&self,
station: &str,
frequency: Frequency,
required_data: RequiredData,
) -> Result<LazyFrame, WeatherDataError> {
if self
.is_cache_stale(station, frequency, required_data)
.await
.unwrap_or(false)
{
self.clear_cache(station, frequency).await?;
}
let key = (station.to_string(), frequency);
{
let cache = self.lazyframe_cache.lock().await;
if let Some(cached_frame) = cache.get(&key) {
return Ok(cached_frame.clone()); }
}
let loaded_frame = self.loader.get_frame(frequency, station).await?;
{
let mut cache = self.lazyframe_cache.lock().await;
match cache.entry(key) {
Entry::Occupied(entry) => {
Ok(entry.get().clone())
}
Entry::Vacant(entry) => {
entry.insert(loaded_frame.clone()); Ok(loaded_frame) }
}
} }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{LatLon, Meteostat, MeteostatError, RequiredData};
use chrono::Datelike;
use std::time::{Duration, SystemTime};
use tempfile::tempdir;
fn get_parquet_path(cache_dir: &Path, station: &str, frequency: Frequency) -> PathBuf {
cache_dir.join(format!(
"{}{}.parquet",
frequency.cache_file_prefix(),
station
))
}
async fn get_mtime(path: &Path) -> Option<SystemTime> {
tokio::fs::metadata(path).await.ok()?.modified().ok()
}
async fn get_nearby_station_id(client: &Meteostat) -> Result<String, MeteostatError> {
let berlin = LatLon(52.52, 13.4); client
.find_stations()
.location(berlin)
.station_limit(1)
.call()
.first()
.map(|s| s.station.id.clone())
.ok_or_else(|| MeteostatError::NoStationWithinRadius {
radius: 50.0,
lat: berlin.0,
lon: berlin.1,
}) }
#[tokio::test]
async fn test_cache_refresh_not_triggered_when_recent() -> Result<(), Box<dyn std::error::Error>>
{
let temp_dir = tempdir()?;
let cache_path = temp_dir.path().to_path_buf();
let client = Meteostat::with_cache_folder(cache_path.clone()).await?;
let station_id = get_nearby_station_id(&client).await?;
let frequency = Frequency::Daily;
let parquet_path = get_parquet_path(&cache_path, &station_id, frequency);
println!("Initial fetch for {}...", station_id);
let _ = client.daily().station(&station_id).call().await?;
assert!(
parquet_path.exists(),
"Cache file should exist after first fetch"
);
let mtime1 = get_mtime(&parquet_path)
.await
.expect("Failed to get mtime 1");
println!("Initial mtime: {:?}", mtime1);
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Fetching again with RequiredData::Any...");
let _ = client
.daily()
.station(&station_id)
.required_data(RequiredData::Any) .call()
.await?;
let mtime2 = get_mtime(&parquet_path)
.await
.expect("Failed to get mtime 2");
println!("Mtime after RequiredData::Any: {:?}", mtime2);
assert_eq!(
mtime1, mtime2,
"Cache should NOT have been refreshed with RequiredData::Any"
);
let current_year = Utc::now().year();
println!(
"Fetching again with RequiredData::FullYear({}) (current year)...",
current_year
);
let _ = client
.daily()
.station(&station_id)
.required_data(RequiredData::FullYear(current_year))
.call()
.await?;
let mtime3 = get_mtime(&parquet_path)
.await
.expect("Failed to get mtime 3");
println!(
"Mtime after RequiredData::FullYear({}): {:?}",
current_year, mtime3
);
assert_eq!(
mtime1, mtime3,
"Cache should NOT have been refreshed with current year requirement"
);
temp_dir.close()?;
Ok(())
}
#[tokio::test]
async fn test_cache_refresh_not_triggered_for_future_date(
) -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempdir()?;
let cache_path = temp_dir.path().to_path_buf();
let client = Meteostat::with_cache_folder(cache_path.clone()).await?;
let station_id = get_nearby_station_id(&client).await?;
let frequency = Frequency::Daily;
let parquet_path = get_parquet_path(&cache_path, &station_id, frequency);
println!("Initial fetch for {}...", station_id);
let _ = client.daily().station(&station_id).call().await?;
assert!(parquet_path.exists());
let mtime1 = get_mtime(&parquet_path)
.await
.expect("Failed to get mtime 1");
println!("Initial mtime: {:?}", mtime1);
tokio::time::sleep(Duration::from_secs(1)).await;
let future_year = Utc::now().year() + 5;
println!(
"Fetching again with RequiredData::FullYear({}) (future)...",
future_year
);
let _ = client
.daily()
.station(&station_id)
.required_data(RequiredData::FullYear(future_year))
.call()
.await?;
let mtime2 = get_mtime(&parquet_path)
.await
.expect("Failed to get mtime 2");
println!("Mtime after future year requirement: {:?}", mtime2);
assert_eq!(
mtime1, mtime2,
"Cache should NOT have been refreshed for a future date requirement"
);
temp_dir.close()?;
Ok(())
}
#[tokio::test]
async fn test_cache_refresh_triggered_when_required_date_is_newer(
) -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempdir()?;
let cache_path = temp_dir.path().to_path_buf();
let client = Meteostat::with_cache_folder(cache_path.clone()).await?;
let station_id = get_nearby_station_id(&client).await?;
let frequency = Frequency::Daily;
let parquet_path = get_parquet_path(&cache_path, &station_id, frequency);
println!("Initial fetch for {} to create cache...", station_id);
let _ = client.daily().station(&station_id).call().await?;
assert!(
parquet_path.exists(),
"Cache file missing after initial fetch"
);
let mtime_initial = get_mtime(&parquet_path)
.await
.expect("Failed to get initial mtime");
println!("Initial cache mtime: {:?}", mtime_initial);
println!("Manually deleting cache file to simulate old cache...");
tokio::fs::remove_file(&parquet_path).await?;
assert!(!parquet_path.exists(), "Cache file should be deleted");
let recent_past_year = Utc::now().year() - 1; println!(
"Fetching again with RequiredData::FullYear({}) (should trigger re-download)...",
recent_past_year
);
let _ = client
.daily()
.station(&station_id)
.required_data(RequiredData::FullYear(recent_past_year))
.call()
.await?;
assert!(
parquet_path.exists(),
"Cache file should exist again after required fetch"
);
let mtime_after_refresh = get_mtime(&parquet_path)
.await
.expect("Failed to get mtime after refresh");
println!("Mtime after required fetch: {:?}", mtime_after_refresh);
assert!(
mtime_after_refresh > mtime_initial,
"New cache file mtime should be later than the initial one"
);
temp_dir.close()?;
Ok(())
}
}