popgetter_core/
metadata.rs

1use std::default::Default;
2use std::fmt::Display;
3use std::path::Path;
4
5use anyhow::{anyhow, Result};
6use futures::future::join_all;
7use log::debug;
8use log::info;
9use polars::{
10    lazy::{
11        dsl::col,
12        frame::{IntoLazy, LazyFrame, ScanArgsParquet},
13    },
14    prelude::{DataFrame, JoinArgs, JoinType, ParquetCompression, ParquetWriter, UnionArgs},
15};
16use tokio::try_join;
17
18use crate::{config::Config, search::MetricId, COL};
19
20/// This module contains the names of the files that contain the metadata.
21pub mod paths {
22    pub const GEOMETRY_METADATA: &str = "geometry_metadata.parquet";
23    pub const METRIC_METADATA: &str = "metric_metadata.parquet";
24    pub const COUNTRY: &str = "country_metadata.parquet";
25    pub const SOURCE: &str = "source_data_releases.parquet";
26    pub const PUBLISHER: &str = "data_publishers.parquet";
27}
28use paths as PATHS;
29
30/// `CountryMetadataLoader` takes a country iso string
31/// along with a CountryMetadataPaths and provides methods
32/// for fetching and constructing a `Metadata` catalogue.
33pub struct CountryMetadataLoader {
34    country: String,
35}
36
37/// A structure that represents a full joined lazy data frame containing all of the metadata
38pub struct ExpandedMetadata(pub LazyFrame);
39
40impl ExpandedMetadata {
41    /// Get access to the lazy data frame
42    pub fn as_df(&self) -> LazyFrame {
43        self.0.clone()
44    }
45}
46
47/// The metadata struct contains the polars `DataFrames` for
48/// the various different metadata tables. Can be constructed
49/// from a single `CountryMetadataLoader` or for all countries.
50/// It also provides the various functions for searching and
51/// getting `MetricRequests` from the catalogue.
52#[derive(Debug, PartialEq)]
53pub struct Metadata {
54    pub metrics: DataFrame,
55    pub geometries: DataFrame,
56    pub source_data_releases: DataFrame,
57    pub data_publishers: DataFrame,
58    pub countries: DataFrame,
59}
60
61#[cfg(feature = "cache")]
62fn path_to_df<P: AsRef<Path>>(path: P) -> anyhow::Result<DataFrame> {
63    Ok(LazyFrame::scan_parquet(path, ScanArgsParquet::default())?.collect()?)
64}
65
66#[cfg(feature = "cache")]
67fn df_to_file<P: AsRef<Path>>(path: P, df: &DataFrame) -> anyhow::Result<()> {
68    let file = std::fs::File::create(path)?;
69    ParquetWriter::new(file)
70        .with_compression(ParquetCompression::Zstd(None))
71        .finish(&mut df.clone())?;
72    Ok(())
73}
74
75#[cfg(feature = "cache")]
76fn prepend<P: AsRef<Path>>(cache_path: P, file_name: &str) -> std::path::PathBuf {
77    cache_path.as_ref().join(file_name)
78}
79
80// Only include methods with "cache" feature since it requires a filesystem
81#[cfg(feature = "cache")]
82impl Metadata {
83    pub fn from_cache<P: AsRef<Path>>(cache_dir: P) -> anyhow::Result<Self> {
84        let metrics = path_to_df(prepend(&cache_dir, PATHS::METRIC_METADATA))?;
85        let geometries = path_to_df(prepend(&cache_dir, PATHS::GEOMETRY_METADATA))?;
86        let source_data_releases = path_to_df(prepend(&cache_dir, PATHS::SOURCE))?;
87        let data_publishers = path_to_df(prepend(&cache_dir, PATHS::PUBLISHER))?;
88        let countries = path_to_df(prepend(&cache_dir, PATHS::COUNTRY))?;
89        Ok(Self {
90            metrics,
91            geometries,
92            source_data_releases,
93            data_publishers,
94            countries,
95        })
96    }
97
98    pub fn write_cache<P: AsRef<Path>>(&self, cache_dir: P) -> anyhow::Result<()> {
99        df_to_file(prepend(&cache_dir, PATHS::METRIC_METADATA), &self.metrics)?;
100        df_to_file(
101            prepend(&cache_dir, PATHS::GEOMETRY_METADATA),
102            &self.geometries,
103        )?;
104        df_to_file(
105            prepend(&cache_dir, PATHS::SOURCE),
106            &self.source_data_releases,
107        )?;
108        df_to_file(prepend(&cache_dir, PATHS::PUBLISHER), &self.data_publishers)?;
109        df_to_file(prepend(&cache_dir, PATHS::COUNTRY), &self.countries)?;
110        Ok(())
111    }
112}
113
114/// Describes a fully specified selection plan. The MetricIds should all
115/// be the ID variant. Geometry and years are backed in now.
116/// Advice specifies and alternative options that the user should
117/// be aware of.
118pub struct FullSelectionPlan {
119    pub explicit_metric_ids: Vec<MetricId>,
120    pub geometry: String,
121    pub year: Vec<String>,
122    pub advice: String,
123}
124
125impl Display for FullSelectionPlan {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(
128            f,
129            "Getting {} metrics \n, on {} geometries \n , for the years {}",
130            self.explicit_metric_ids.len(),
131            self.geometry,
132            self.year.join(",")
133        )
134    }
135}
136
137impl Metadata {
138    /// Generate a Lazy DataFrame which joins the metrics, source and geometry metadata
139    pub fn combined_metric_source_geometry(&self) -> ExpandedMetadata {
140        let mut df: LazyFrame = self
141            .metrics
142            .clone()
143            .lazy()
144            // Join source data releases
145            .join(
146                self.source_data_releases.clone().lazy(),
147                [col(COL::METRIC_SOURCE_DATA_RELEASE_ID)],
148                [col(COL::SOURCE_DATA_RELEASE_ID)],
149                JoinArgs::new(JoinType::Inner),
150            )
151            // Join geometry metadata
152            .join(
153                self.geometries.clone().lazy(),
154                [col(COL::SOURCE_DATA_RELEASE_GEOMETRY_METADATA_ID)],
155                [col(COL::GEOMETRY_ID)],
156                JoinArgs::new(JoinType::Inner),
157            )
158            // Join data publishers
159            .join(
160                self.data_publishers.clone().lazy(),
161                [col(COL::SOURCE_DATA_RELEASE_DATA_PUBLISHER_ID)],
162                [col(COL::DATA_PUBLISHER_ID)],
163                JoinArgs::new(JoinType::Inner),
164            )
165            // TODO: consider case when many countries
166            .explode([col(COL::DATA_PUBLISHER_COUNTRIES_OF_INTEREST)])
167            .join(
168                self.countries.clone().lazy(),
169                [col(COL::DATA_PUBLISHER_COUNTRIES_OF_INTEREST)],
170                [col(COL::COUNTRY_ID)],
171                JoinArgs::new(JoinType::Inner),
172            );
173
174        // Debug print the column names so that we know what we can access
175        let schema = df.schema().unwrap();
176        let column_names = schema
177            .iter_names()
178            .map(|s| s.as_str())
179            .collect::<Vec<&str>>();
180        debug!("Column names in merged metadata: {:?}", column_names);
181
182        ExpandedMetadata(df)
183    }
184}
185
186impl CountryMetadataLoader {
187    /// Create a metadata loader for a specific Country
188    pub fn new(country: &str) -> Self {
189        Self {
190            country: country.into(),
191        }
192    }
193
194    /// Load the Metadata catalouge for this country with
195    /// the specified metadata paths
196    pub async fn load(self, config: &Config) -> Result<Metadata> {
197        let t = try_join!(
198            self.load_metadata(PATHS::METRIC_METADATA, config),
199            self.load_metadata(PATHS::GEOMETRY_METADATA, config),
200            self.load_metadata(PATHS::SOURCE, config),
201            self.load_metadata(PATHS::PUBLISHER, config),
202            self.load_metadata(PATHS::COUNTRY, config),
203        )?;
204        Ok(Metadata {
205            metrics: t.0,
206            geometries: t.1,
207            source_data_releases: t.2,
208            data_publishers: t.3,
209            countries: t.4,
210        })
211    }
212
213    /// Performs a load of a given metadata parquet file
214    async fn load_metadata(&self, path: &str, config: &Config) -> Result<DataFrame> {
215        let full_path = format!("{}/{}/{path}", config.base_path, self.country);
216        let args = ScanArgsParquet::default();
217        info!("Attempting to load dataframe from {full_path}");
218        tokio::task::spawn_blocking(move || {
219            LazyFrame::scan_parquet(&full_path, args)?
220                .collect()
221                .map_err(|e| anyhow!("Failed to load '{full_path}': {e}"))
222        })
223        .await?
224    }
225}
226
227async fn get_country_names(config: &Config) -> anyhow::Result<Vec<String>> {
228    Ok(reqwest::Client::new()
229        .get(format!("{}/countries.txt", config.base_path))
230        .send()
231        .await?
232        .text()
233        .await?
234        .lines()
235        .map(|s| s.to_string())
236        .collect())
237}
238
239/// Load the metadata for a list of countries and merge them into
240/// a single `Metadata` catalogue.
241pub async fn load_all(config: &Config) -> Result<Metadata> {
242    let country_names = get_country_names(config).await?;
243
244    info!("Detected country names: {:?}", country_names);
245    let metadata: Result<Vec<Metadata>> = join_all(
246        country_names
247            .iter()
248            .map(|c| CountryMetadataLoader::new(c).load(config)),
249    )
250    .await
251    .into_iter()
252    .collect();
253    let metadata = metadata?;
254
255    // Merge metrics
256    let metric_dfs: Vec<LazyFrame> = metadata.iter().map(|m| m.metrics.clone().lazy()).collect();
257    let metrics = polars::prelude::concat(metric_dfs, UnionArgs::default())?.collect()?;
258    info!("Merged metrics with shape: {:?}", metrics.shape());
259
260    // Merge geometries
261    let geometries_dfs: Vec<LazyFrame> = metadata
262        .iter()
263        .map(|m| m.geometries.clone().lazy())
264        .collect();
265    let geometries = polars::prelude::concat(geometries_dfs, UnionArgs::default())?.collect()?;
266    info!("Merged geometries with shape: {:?}", geometries.shape());
267
268    // Merge source data relaeses
269    let source_data_dfs: Vec<LazyFrame> = metadata
270        .iter()
271        .map(|m| m.source_data_releases.clone().lazy())
272        .collect();
273
274    let source_data_releases =
275        polars::prelude::concat(source_data_dfs, UnionArgs::default())?.collect()?;
276    info!(
277        "Merged source data releases with shape: {:?}",
278        source_data_releases.shape()
279    );
280
281    // Merge source data publishers
282    let data_publisher_dfs: Vec<LazyFrame> = metadata
283        .iter()
284        .map(|m| m.data_publishers.clone().lazy())
285        .collect();
286
287    let data_publishers =
288        polars::prelude::concat(data_publisher_dfs, UnionArgs::default())?.collect()?;
289    info!(
290        "Merged data publishers with shape: {:?}",
291        data_publishers.shape()
292    );
293
294    // Merge countries
295    let countries_dfs: Vec<LazyFrame> = metadata
296        .iter()
297        .map(|m| m.countries.clone().lazy())
298        .collect();
299    let countries = polars::prelude::concat(countries_dfs, UnionArgs::default())?.collect()?;
300    info!("Merged countries with shape: {:?}", countries.shape());
301
302    Ok(Metadata {
303        metrics,
304        geometries,
305        source_data_releases,
306        data_publishers,
307        countries,
308    })
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    /// TODO stub out a mock here that we can use to test with.
315
316    #[tokio::test]
317    async fn country_metadata_should_load() {
318        let config = Config::default();
319        let metadata = CountryMetadataLoader::new("bel").load(&config).await;
320        println!("{metadata:#?}");
321        assert!(metadata.is_ok(), "Data should have loaded ok");
322    }
323
324    #[tokio::test]
325    async fn all_metadata_should_load() {
326        let config = Config::default();
327        let metadata = load_all(&config).await;
328        println!("{metadata:#?}");
329        assert!(metadata.is_ok(), "Data should have loaded ok");
330    }
331}