1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
use crate::types::frequency::Frequency;
use crate::weather_data::error::WeatherDataError;
use async_compression::tokio::bufread::GzipDecoder;
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use polars::frame::DataFrame;
use polars::prelude::*;
use reqwest::Client;
use std::io::Write;
use std::path::{Path, PathBuf};
use tempfile::NamedTempFile;
use tokio::io::AsyncReadExt;
use tokio::{fs, task};
use tokio_util::io::StreamReader;
#[derive(Debug, Clone)]
pub struct WeatherDataLoader {
cache_dir: PathBuf,
download_client: Client,
}
impl WeatherDataLoader {
pub fn new(cache_dir: &Path) -> Self {
let download_client = Client::new();
Self {
cache_dir: cache_dir.to_path_buf(),
download_client,
}
}
/// Gets the last modification time of the cached Parquet file for a given
/// station and frequency.
///
/// This function is cross-platform.
///
/// # Arguments
///
/// * `station` - The ID of the station.
/// * `frequency` - The data frequency.
///
/// # Returns
///
/// * `Ok(Some(NaiveDateTime))` - If the cache file exists and its modification
/// time could be determined (returned as UTC).
/// * `Ok(None)` - If the cache file does not exist.
/// * `Err(WeatherDataError)` - For I/O errors reading metadata.
pub async fn get_cache_modification_time(
&self,
station: &str,
frequency: Frequency,
) -> Result<Option<DateTime<Utc>>, WeatherDataError> {
let cache_filename = format!("{}{}.parquet", frequency.cache_file_prefix(), station);
let parquet_path = self.cache_dir.join(&cache_filename);
match fs::metadata(&parquet_path).await {
Ok(metadata) => {
// File exists, try to get modification time
let modified_system_time = metadata
.modified()
.map_err(|e| WeatherDataError::CacheMetadataRead(parquet_path.clone(), e))?;
// Convert SystemTime to chrono::DateTime<Utc>
let modified_datetime_utc: DateTime<Utc> = DateTime::from(modified_system_time);
Ok(Some(modified_datetime_utc))
}
Err(io_err) => {
if io_err.kind() == std::io::ErrorKind::NotFound {
// File doesn't exist, this is a normal cache miss scenario
Ok(None)
} else {
// Other error accessing metadata (permissions, etc.)
Err(WeatherDataError::CacheMetadataRead(parquet_path, io_err))
}
}
}
}
/// Generic function to load a `DataFrame` for a given station and data type.
/// Handles caching and downloading. Returns a `LazyFrame` with schema-specific column names and types.
pub async fn get_frame(
&self,
data_type: Frequency,
station: &str,
) -> Result<LazyFrame, WeatherDataError> {
let cache_filename = format!("{}{}.parquet", data_type.cache_file_prefix(), station);
let parquet_path = self.cache_dir.join(&cache_filename);
if fs::metadata(&parquet_path).await.is_ok() {
} else {
let station_id = station.to_string();
let raw_bytes = self.download(data_type, &station_id).await?;
let df = Self::csv_to_dataframe(raw_bytes, &station_id, data_type).await?;
fs::create_dir_all(&self.cache_dir)
.await
.map_err(|e| WeatherDataError::CacheDirCreation(self.cache_dir.clone(), e))?;
// Pass df by value (ownership moves to cache_dataframe)
Self::cache_dataframe(df, &parquet_path).await?;
}
LazyFrame::scan_parquet(parquet_path.clone(), ScanArgsParquet::default())
.map_err(|e| WeatherDataError::ParquetScan(parquet_path, e))
}
/// Downloads and decompresses data for a specific type and station.
async fn download(
&self,
data_type: Frequency,
station: &str,
) -> Result<Vec<u8>, WeatherDataError> {
let url = format!(
"https://bulk.meteostat.net/v2/{}/{}.csv.gz",
data_type.path_segment(),
station
);
let response = self
.download_client
.get(&url)
.send()
.await
.map_err(|e| WeatherDataError::NetworkRequest(url.clone(), e))?;
let response = match response.error_for_status() {
Ok(resp) => resp,
Err(e) => {
return Err(if let Some(status) = e.status() {
WeatherDataError::HttpStatus {
url,
status,
source: e,
}
} else {
WeatherDataError::NetworkRequest(url, e)
});
}
};
let stream = response.bytes_stream().map_err(std::io::Error::other);
let stream_reader = StreamReader::new(stream);
let mut decoder = GzipDecoder::new(stream_reader);
let mut decompressed = Vec::new();
// Map IO error during decompression
decoder
.read_to_end(&mut decompressed)
.await
.map_err(WeatherDataError::DownloadIo)?;
Ok(decompressed)
}
/// Parses raw CSV bytes (without header) into a `DataFrame` using a blocking task.
/// Assigns correct column names and casts columns to appropriate data types based on Frequency.
async fn csv_to_dataframe(
bytes: Vec<u8>,
station: &str,
data_type: Frequency,
) -> Result<DataFrame, WeatherDataError> {
let station_owned = station.to_string();
let schema_names = data_type.get_schema_column_names(); // Original CSV schema
task::spawn_blocking(move || {
let mut temp_file = NamedTempFile::new().map_err(|e| WeatherDataError::CsvReadIo {
station: station_owned.clone(),
source: e,
})?;
temp_file
.write_all(&bytes)
.map_err(|e| WeatherDataError::CsvReadIo {
station: station_owned.clone(),
source: e,
})?;
temp_file.flush().map_err(|e| WeatherDataError::CsvReadIo {
station: station_owned.clone(),
source: e,
})?;
// Read the initial DataFrame - use infer_schema_length(0) to read all as Utf8 first
let mut df = CsvReadOptions::default()
.with_has_header(false)
.with_infer_schema_length(Some(0)) // Read all as Utf8 initially for robust parsing/casting
.try_into_reader_with_file_path(Some(temp_file.path().to_path_buf()))
.map_err(|e| WeatherDataError::CsvReadPolars {
station: station_owned.clone(),
source: e,
})?
.finish()
.map_err(|e| WeatherDataError::CsvReadPolars {
station: station_owned.clone(),
source: e,
})?;
if df.width() != schema_names.len() {
return Err(WeatherDataError::SchemaMismatch {
station: station_owned,
data_type,
expected: schema_names.len(),
found: df.width(),
});
}
df.set_column_names(schema_names.iter().copied())
.map_err(|e| WeatherDataError::ColumnRenameError {
station: station_owned.clone(),
source: e,
})?;
// --- START Type Casting and Pre-computation ---
let mut lazy_df = df.lazy();
// Common strptime options
let date_options = StrptimeOptions {
format: Some("%Y-%m-%d".into()),
strict: false, // Be slightly lenient with parsing if needed
exact: true,
cache: true,
};
// Apply type casting based on frequency using with_columns for efficiency
lazy_df = match data_type {
Frequency::Hourly => {
// Hourly logic remains the same...
lazy_df.with_columns([
// Create datetime first from string date and i64 hour
(col("date")
.str()
.strptime(DataType::Date, date_options, lit("raise"))
.cast(DataType::Datetime(TimeUnit::Milliseconds, None))
+ duration(
DurationArgs::new().with_hours(col("hour").cast(DataType::Int64)),
))
.alias("datetime"),
// Cast numerical columns
col("date").cast(DataType::String),
col("hour").cast(DataType::Int64),
col("temp").cast(DataType::Float64),
col("dwpt").cast(DataType::Float64),
col("rhum").cast(DataType::Int64), // integer percentage
col("prcp").cast(DataType::Float64),
col("snow").cast(DataType::Int64),
col("wdir").cast(DataType::Int64), // Degrees
col("wspd").cast(DataType::Float64),
col("wpgt").cast(DataType::Float64),
col("pres").cast(DataType::Float64),
col("tsun").cast(DataType::Int64), // minutes
col("coco").cast(DataType::Int64), // Weather condition code
])
}
Frequency::Daily => {
// Daily logic remains the same...
lazy_df.with_columns([
// Parse date string to Date type
col("date")
.str()
.strptime(DataType::Date, date_options, lit("raise"))
.alias("date"), // Overwrite original string date column
// Cast numerical columns
col("tavg").cast(DataType::Float64),
col("tmin").cast(DataType::Float64),
col("tmax").cast(DataType::Float64),
col("prcp").cast(DataType::Float64),
col("snow").cast(DataType::Int64),
col("wdir").cast(DataType::Int64),
col("wspd").cast(DataType::Float64),
col("wpgt").cast(DataType::Float64),
col("pres").cast(DataType::Float64),
col("tsun").cast(DataType::Int64),
])
}
Frequency::Monthly => {
lazy_df.with_columns([
// Cast year and month first
col("year").cast(DataType::Int64),
col("month").cast(DataType::Int64),
// Cast numerical columns
col("tavg").cast(DataType::Float64),
col("tmin").cast(DataType::Float64),
col("tmax").cast(DataType::Float64),
col("prcp").cast(DataType::Float64),
col("wspd").cast(DataType::Float64),
col("pres").cast(DataType::Float64),
col("tsun").cast(DataType::Int64),
])
}
Frequency::Climate => {
// Climate logic remains the same...
lazy_df.with_columns([
// Cast year and month
col("start_year").cast(DataType::Int64),
col("end_year").cast(DataType::Int64),
col("month").cast(DataType::Int64),
// Cast numerical columns
col("tmin").cast(DataType::Float64),
col("tmax").cast(DataType::Float64),
col("prcp").cast(DataType::Float64),
col("wspd").cast(DataType::Float64),
col("pres").cast(DataType::Float64),
col("tsun").cast(DataType::Int64),
])
}
};
// Collect the lazy frame to apply transformations and handle potential errors
let typed_df =
lazy_df
.collect()
.map_err(|e| WeatherDataError::ColumnOperationError {
station: station_owned.clone(),
source: e,
})?;
Ok(typed_df) // Return the transformed DataFrame
})
.await? // Unwrap the JoinError
// Propagate the inner Result<DataFrame, WeatherDataError>
}
/// Writes a `DataFrame` to a Parquet file asynchronously using `spawn_blocking`.
async fn cache_dataframe(mut df: DataFrame, path: &Path) -> Result<(), WeatherDataError> {
let path_buf = path.to_path_buf();
task::spawn_blocking(move || {
let file = std::fs::File::create(&path_buf)
.map_err(|e| WeatherDataError::ParquetWriteIo(path_buf.clone(), e))?;
ParquetWriter::new(file)
.with_compression(ParquetCompression::Snappy) // Snappy is generally a good balance
.finish(&mut df) // finish consumes df if mutable, or takes &mut df
.map_err(|e| WeatherDataError::ParquetWritePolars(path_buf, e))?;
Ok::<(), WeatherDataError>(())
})
.await??; // Unwrap JoinError, then unwrap the inner Result
Ok(())
}
}