1use std::default::Default;
2use std::fmt::Display;
3use std::path::Path;
4
5use anyhow::{anyhow, Result};
6use futures::future::join_all;
7use log::debug;
8use log::info;
9use polars::{
10 lazy::{
11 dsl::col,
12 frame::{IntoLazy, LazyFrame, ScanArgsParquet},
13 },
14 prelude::{DataFrame, JoinArgs, JoinType, ParquetCompression, ParquetWriter, UnionArgs},
15};
16use tokio::try_join;
17
18use crate::{config::Config, search::MetricId, COL};
19
20pub mod paths {
22 pub const GEOMETRY_METADATA: &str = "geometry_metadata.parquet";
23 pub const METRIC_METADATA: &str = "metric_metadata.parquet";
24 pub const COUNTRY: &str = "country_metadata.parquet";
25 pub const SOURCE: &str = "source_data_releases.parquet";
26 pub const PUBLISHER: &str = "data_publishers.parquet";
27}
28use paths as PATHS;
29
30pub struct CountryMetadataLoader {
34 country: String,
35}
36
37pub struct ExpandedMetadata(pub LazyFrame);
39
40impl ExpandedMetadata {
41 pub fn as_df(&self) -> LazyFrame {
43 self.0.clone()
44 }
45}
46
47#[derive(Debug, PartialEq)]
53pub struct Metadata {
54 pub metrics: DataFrame,
55 pub geometries: DataFrame,
56 pub source_data_releases: DataFrame,
57 pub data_publishers: DataFrame,
58 pub countries: DataFrame,
59}
60
61#[cfg(feature = "cache")]
62fn path_to_df<P: AsRef<Path>>(path: P) -> anyhow::Result<DataFrame> {
63 Ok(LazyFrame::scan_parquet(path, ScanArgsParquet::default())?.collect()?)
64}
65
66#[cfg(feature = "cache")]
67fn df_to_file<P: AsRef<Path>>(path: P, df: &DataFrame) -> anyhow::Result<()> {
68 let file = std::fs::File::create(path)?;
69 ParquetWriter::new(file)
70 .with_compression(ParquetCompression::Zstd(None))
71 .finish(&mut df.clone())?;
72 Ok(())
73}
74
75#[cfg(feature = "cache")]
76fn prepend<P: AsRef<Path>>(cache_path: P, file_name: &str) -> std::path::PathBuf {
77 cache_path.as_ref().join(file_name)
78}
79
80#[cfg(feature = "cache")]
82impl Metadata {
83 pub fn from_cache<P: AsRef<Path>>(cache_dir: P) -> anyhow::Result<Self> {
84 let metrics = path_to_df(prepend(&cache_dir, PATHS::METRIC_METADATA))?;
85 let geometries = path_to_df(prepend(&cache_dir, PATHS::GEOMETRY_METADATA))?;
86 let source_data_releases = path_to_df(prepend(&cache_dir, PATHS::SOURCE))?;
87 let data_publishers = path_to_df(prepend(&cache_dir, PATHS::PUBLISHER))?;
88 let countries = path_to_df(prepend(&cache_dir, PATHS::COUNTRY))?;
89 Ok(Self {
90 metrics,
91 geometries,
92 source_data_releases,
93 data_publishers,
94 countries,
95 })
96 }
97
98 pub fn write_cache<P: AsRef<Path>>(&self, cache_dir: P) -> anyhow::Result<()> {
99 df_to_file(prepend(&cache_dir, PATHS::METRIC_METADATA), &self.metrics)?;
100 df_to_file(
101 prepend(&cache_dir, PATHS::GEOMETRY_METADATA),
102 &self.geometries,
103 )?;
104 df_to_file(
105 prepend(&cache_dir, PATHS::SOURCE),
106 &self.source_data_releases,
107 )?;
108 df_to_file(prepend(&cache_dir, PATHS::PUBLISHER), &self.data_publishers)?;
109 df_to_file(prepend(&cache_dir, PATHS::COUNTRY), &self.countries)?;
110 Ok(())
111 }
112}
113
114pub struct FullSelectionPlan {
119 pub explicit_metric_ids: Vec<MetricId>,
120 pub geometry: String,
121 pub year: Vec<String>,
122 pub advice: String,
123}
124
125impl Display for FullSelectionPlan {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 write!(
128 f,
129 "Getting {} metrics \n, on {} geometries \n , for the years {}",
130 self.explicit_metric_ids.len(),
131 self.geometry,
132 self.year.join(",")
133 )
134 }
135}
136
137impl Metadata {
138 pub fn combined_metric_source_geometry(&self) -> ExpandedMetadata {
140 let mut df: LazyFrame = self
141 .metrics
142 .clone()
143 .lazy()
144 .join(
146 self.source_data_releases.clone().lazy(),
147 [col(COL::METRIC_SOURCE_DATA_RELEASE_ID)],
148 [col(COL::SOURCE_DATA_RELEASE_ID)],
149 JoinArgs::new(JoinType::Inner),
150 )
151 .join(
153 self.geometries.clone().lazy(),
154 [col(COL::SOURCE_DATA_RELEASE_GEOMETRY_METADATA_ID)],
155 [col(COL::GEOMETRY_ID)],
156 JoinArgs::new(JoinType::Inner),
157 )
158 .join(
160 self.data_publishers.clone().lazy(),
161 [col(COL::SOURCE_DATA_RELEASE_DATA_PUBLISHER_ID)],
162 [col(COL::DATA_PUBLISHER_ID)],
163 JoinArgs::new(JoinType::Inner),
164 )
165 .explode([col(COL::DATA_PUBLISHER_COUNTRIES_OF_INTEREST)])
167 .join(
168 self.countries.clone().lazy(),
169 [col(COL::DATA_PUBLISHER_COUNTRIES_OF_INTEREST)],
170 [col(COL::COUNTRY_ID)],
171 JoinArgs::new(JoinType::Inner),
172 );
173
174 let schema = df.schema().unwrap();
176 let column_names = schema
177 .iter_names()
178 .map(|s| s.as_str())
179 .collect::<Vec<&str>>();
180 debug!("Column names in merged metadata: {:?}", column_names);
181
182 ExpandedMetadata(df)
183 }
184}
185
186impl CountryMetadataLoader {
187 pub fn new(country: &str) -> Self {
189 Self {
190 country: country.into(),
191 }
192 }
193
194 pub async fn load(self, config: &Config) -> Result<Metadata> {
197 let t = try_join!(
198 self.load_metadata(PATHS::METRIC_METADATA, config),
199 self.load_metadata(PATHS::GEOMETRY_METADATA, config),
200 self.load_metadata(PATHS::SOURCE, config),
201 self.load_metadata(PATHS::PUBLISHER, config),
202 self.load_metadata(PATHS::COUNTRY, config),
203 )?;
204 Ok(Metadata {
205 metrics: t.0,
206 geometries: t.1,
207 source_data_releases: t.2,
208 data_publishers: t.3,
209 countries: t.4,
210 })
211 }
212
213 async fn load_metadata(&self, path: &str, config: &Config) -> Result<DataFrame> {
215 let full_path = format!("{}/{}/{path}", config.base_path, self.country);
216 let args = ScanArgsParquet::default();
217 info!("Attempting to load dataframe from {full_path}");
218 tokio::task::spawn_blocking(move || {
219 LazyFrame::scan_parquet(&full_path, args)?
220 .collect()
221 .map_err(|e| anyhow!("Failed to load '{full_path}': {e}"))
222 })
223 .await?
224 }
225}
226
227async fn get_country_names(config: &Config) -> anyhow::Result<Vec<String>> {
228 Ok(reqwest::Client::new()
229 .get(format!("{}/countries.txt", config.base_path))
230 .send()
231 .await?
232 .text()
233 .await?
234 .lines()
235 .map(|s| s.to_string())
236 .collect())
237}
238
239pub async fn load_all(config: &Config) -> Result<Metadata> {
242 let country_names = get_country_names(config).await?;
243
244 info!("Detected country names: {:?}", country_names);
245 let metadata: Result<Vec<Metadata>> = join_all(
246 country_names
247 .iter()
248 .map(|c| CountryMetadataLoader::new(c).load(config)),
249 )
250 .await
251 .into_iter()
252 .collect();
253 let metadata = metadata?;
254
255 let metric_dfs: Vec<LazyFrame> = metadata.iter().map(|m| m.metrics.clone().lazy()).collect();
257 let metrics = polars::prelude::concat(metric_dfs, UnionArgs::default())?.collect()?;
258 info!("Merged metrics with shape: {:?}", metrics.shape());
259
260 let geometries_dfs: Vec<LazyFrame> = metadata
262 .iter()
263 .map(|m| m.geometries.clone().lazy())
264 .collect();
265 let geometries = polars::prelude::concat(geometries_dfs, UnionArgs::default())?.collect()?;
266 info!("Merged geometries with shape: {:?}", geometries.shape());
267
268 let source_data_dfs: Vec<LazyFrame> = metadata
270 .iter()
271 .map(|m| m.source_data_releases.clone().lazy())
272 .collect();
273
274 let source_data_releases =
275 polars::prelude::concat(source_data_dfs, UnionArgs::default())?.collect()?;
276 info!(
277 "Merged source data releases with shape: {:?}",
278 source_data_releases.shape()
279 );
280
281 let data_publisher_dfs: Vec<LazyFrame> = metadata
283 .iter()
284 .map(|m| m.data_publishers.clone().lazy())
285 .collect();
286
287 let data_publishers =
288 polars::prelude::concat(data_publisher_dfs, UnionArgs::default())?.collect()?;
289 info!(
290 "Merged data publishers with shape: {:?}",
291 data_publishers.shape()
292 );
293
294 let countries_dfs: Vec<LazyFrame> = metadata
296 .iter()
297 .map(|m| m.countries.clone().lazy())
298 .collect();
299 let countries = polars::prelude::concat(countries_dfs, UnionArgs::default())?.collect()?;
300 info!("Merged countries with shape: {:?}", countries.shape());
301
302 Ok(Metadata {
303 metrics,
304 geometries,
305 source_data_releases,
306 data_publishers,
307 countries,
308 })
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 #[tokio::test]
317 async fn country_metadata_should_load() {
318 let config = Config::default();
319 let metadata = CountryMetadataLoader::new("bel").load(&config).await;
320 println!("{metadata:#?}");
321 assert!(metadata.is_ok(), "Data should have loaded ok");
322 }
323
324 #[tokio::test]
325 async fn all_metadata_should_load() {
326 let config = Config::default();
327 let metadata = load_all(&config).await;
328 println!("{metadata:#?}");
329 assert!(metadata.is_ok(), "Data should have loaded ok");
330 }
331}