1use crate::{
4 config::Config,
5 data_request_spec::RegionSpec,
6 geo::get_geometries,
7 metadata::ExpandedMetadata,
8 parquet::{get_metrics, MetricRequest},
9 COL,
10};
11use anyhow::bail;
12use chrono::NaiveDate;
13use log::{debug, error, warn};
14use nonempty::{nonempty, NonEmpty};
15use polars::lazy::dsl::{col, lit, Expr};
16use polars::prelude::{DataFrame, DataFrameJoinOps, IntoLazy, LazyFrame};
17use serde::{Deserialize, Serialize};
18use std::{collections::HashSet, str::FromStr};
19use tokio::try_join;
20
21fn combine_exprs_with_or(exprs: Vec<Expr>) -> Option<Expr> {
25 let mut query: Option<Expr> = None;
26 for expr in exprs {
27 query = if let Some(partial_query) = query {
28 Some(partial_query.or(expr))
29 } else {
30 Some(expr)
31 };
32 }
33 query
34}
35
36fn combine_exprs_with_or1(exprs: NonEmpty<Expr>) -> Expr {
39 let mut query: Expr = exprs.head;
40 for expr in exprs.tail.into_iter() {
41 query = query.or(expr);
42 }
43 query
44}
45
46fn combine_exprs_with_and(exprs: Vec<Expr>) -> Option<Expr> {
48 let mut query: Option<Expr> = None;
49 for expr in exprs {
50 query = if let Some(partial_query) = query {
51 Some(partial_query.and(expr))
52 } else {
53 Some(expr)
54 };
55 }
56 query
57}
58
59fn _combine_exprs_with_and1(exprs: NonEmpty<Expr>) -> Expr {
62 let mut query: Expr = exprs.head;
63 for expr in exprs.tail.into_iter() {
64 query = query.and(expr);
65 }
66 query
67}
68
69fn filter_contains(column: &str, value: &str, case_sensitivity: &CaseSensitivity) -> Expr {
72 let regex = match case_sensitivity {
73 CaseSensitivity::Insensitive => format!("(?i){}", regex::escape(value)),
74 CaseSensitivity::Sensitive => regex::escape(value).to_string(),
75 };
76 col(column).str().contains(lit(regex), false)
77}
78
79fn filter_startswith(column: &str, value: &str, case_sensitivity: &CaseSensitivity) -> Expr {
82 let regex = match case_sensitivity {
83 CaseSensitivity::Insensitive => format!("(?i)^{}", regex::escape(value)),
84 CaseSensitivity::Sensitive => format!("^{}", regex::escape(value)),
85 };
86 col(column).str().contains(lit(regex), false)
87}
88
89fn filter_exact(column: &str, value: &str, case_sensitivity: &CaseSensitivity) -> Expr {
92 let regex = match case_sensitivity {
93 CaseSensitivity::Insensitive => format!("(?i)^{}$", regex::escape(value)),
94 CaseSensitivity::Sensitive => format!("^{}$", regex::escape(value)),
95 };
96 col(column).str().contains(lit(regex), false)
97}
98
99fn filter_regex(column: &str, value: &str, case_sensitivity: &CaseSensitivity) -> Expr {
101 let regex = match case_sensitivity {
102 CaseSensitivity::Insensitive => format!("(?i){}", value),
103 CaseSensitivity::Sensitive => value.to_string(),
104 };
105 col(column).str().contains(lit(regex), false)
106}
107
108#[derive(Clone, Debug, Deserialize, Serialize)]
111pub enum SearchContext {
112 Hxl,
113 HumanReadableName,
114 Description,
115}
116
117impl SearchContext {
118 pub fn all() -> NonEmpty<Self> {
119 nonempty![Self::Hxl, Self::HumanReadableName, Self::Description]
120 }
121}
122
123fn get_filter_fn(match_type: &MatchType) -> impl Fn(&str, &str, &CaseSensitivity) -> Expr {
125 match match_type {
126 MatchType::Regex => filter_regex,
127 MatchType::Exact => filter_exact,
128 MatchType::Contains => filter_contains,
129 MatchType::Startswith => filter_startswith,
130 }
131}
132
133fn get_queries_for_search_text<F: Fn(&str, &str, &CaseSensitivity) -> Expr>(
134 filter_fn: F,
135 val: SearchText,
136) -> Expr {
137 let queries: NonEmpty<Expr> = val.context.map(|field| match field {
138 SearchContext::Hxl => {
139 filter_fn(COL::METRIC_HXL_TAG, &val.text, &val.config.case_sensitivity)
140 }
141 SearchContext::HumanReadableName => filter_fn(
142 COL::METRIC_HUMAN_READABLE_NAME,
143 &val.text,
144 &val.config.case_sensitivity,
145 ),
146 SearchContext::Description => filter_fn(
147 COL::METRIC_DESCRIPTION,
148 &val.text,
149 &val.config.case_sensitivity,
150 ),
151 });
152 combine_exprs_with_or1(queries)
153}
154
155impl From<SearchText> for Expr {
158 fn from(val: SearchText) -> Self {
159 get_queries_for_search_text(get_filter_fn(&val.config.match_type), val)
160 }
161}
162
163impl From<YearRange> for Expr {
164 fn from(value: YearRange) -> Self {
165 match value {
166 YearRange::Before(year) => col(COL::SOURCE_DATA_RELEASE_REFERENCE_PERIOD_START)
167 .lt_eq(lit(NaiveDate::from_ymd_opt(year.into(), 12, 31).unwrap())),
168 YearRange::After(year) => col(COL::SOURCE_DATA_RELEASE_REFERENCE_PERIOD_END)
169 .gt_eq(lit(NaiveDate::from_ymd_opt(year.into(), 1, 1).unwrap())),
170 YearRange::Between(start, end) => {
171 let start_col = col(COL::SOURCE_DATA_RELEASE_REFERENCE_PERIOD_START);
172 let end_col = col(COL::SOURCE_DATA_RELEASE_REFERENCE_PERIOD_END);
173 let start_date = lit(NaiveDate::from_ymd_opt(start.into(), 1, 1).unwrap());
174 let end_date = lit(NaiveDate::from_ymd_opt(end.into(), 12, 31).unwrap());
175 let case1 = start_col
179 .clone()
180 .lt_eq(start_date.clone())
181 .and(end_col.clone().gt_eq(start_date.clone()));
182 let case2 = start_col
183 .clone()
184 .lt_eq(end_date.clone())
185 .and(end_col.clone().gt_eq(end_date.clone()));
186 let case3 = start_col.gt_eq(start_date).and(end_col.lt_eq(end_date));
187 case1.or(case2).or(case3)
188 }
189 }
190 }
191}
192
193impl From<DataPublisher> for Expr {
194 fn from(value: DataPublisher) -> Self {
195 get_filter_fn(&value.config.match_type)(
196 COL::DATA_PUBLISHER_NAME,
197 &value.value,
198 &value.config.case_sensitivity,
199 )
200 }
201}
202
203impl From<SourceDownloadUrl> for Expr {
204 fn from(value: SourceDownloadUrl) -> Self {
205 get_filter_fn(&value.config.match_type)(
206 COL::METRIC_SOURCE_DOWNLOAD_URL,
207 &value.value,
208 &value.config.case_sensitivity,
209 )
210 }
211}
212
213impl From<SourceDataRelease> for Expr {
214 fn from(value: SourceDataRelease) -> Self {
215 get_filter_fn(&value.config.match_type)(
216 COL::SOURCE_DATA_RELEASE_NAME,
217 &value.value,
218 &value.config.case_sensitivity,
219 )
220 }
221}
222
223impl From<GeometryLevel> for Expr {
224 fn from(value: GeometryLevel) -> Self {
225 get_filter_fn(&value.config.match_type)(
226 COL::GEOMETRY_LEVEL,
227 &value.value,
228 &value.config.case_sensitivity,
229 )
230 }
231}
232
233fn combine_country_fn<F: Fn(&str, &str, &CaseSensitivity) -> Expr>(func: F, value: &str) -> Expr {
234 combine_exprs_with_or(vec![
236 func(
237 COL::COUNTRY_NAME_SHORT_EN,
238 value,
239 &CaseSensitivity::Insensitive,
240 ),
241 func(
242 COL::COUNTRY_NAME_OFFICIAL,
243 value,
244 &CaseSensitivity::Insensitive,
245 ),
246 func(COL::COUNTRY_ISO2, value, &CaseSensitivity::Insensitive),
247 func(COL::COUNTRY_ISO3, value, &CaseSensitivity::Insensitive),
248 func(COL::COUNTRY_ISO3166_2, value, &CaseSensitivity::Insensitive),
249 func(
252 COL::DATA_PUBLISHER_COUNTRIES_OF_INTEREST,
253 value,
254 &CaseSensitivity::Insensitive,
255 ),
256 ])
257 .unwrap()
259}
260
261impl From<Country> for Expr {
262 fn from(value: Country) -> Self {
263 combine_country_fn(get_filter_fn(&value.config.match_type), &value.value)
264 }
265}
266
267impl From<SourceMetricId> for Expr {
268 fn from(value: SourceMetricId) -> Self {
269 get_filter_fn(&value.config.match_type)(
270 COL::METRIC_SOURCE_METRIC_ID,
271 &value.value,
272 &value.config.case_sensitivity,
273 )
274 }
275}
276
277impl From<MetricId> for Expr {
278 fn from(value: MetricId) -> Self {
279 get_filter_fn(&value.config.match_type)(
280 COL::METRIC_ID,
281 &value.id,
282 &value.config.case_sensitivity,
283 )
284 }
285}
286
287#[derive(Clone, Debug, Deserialize, Serialize)]
288pub struct SearchText {
289 pub text: String,
290 pub context: NonEmpty<SearchContext>,
291 pub config: SearchConfig,
292}
293
294impl Default for SearchText {
295 fn default() -> Self {
296 Self {
298 text: "".to_string(),
299 context: SearchContext::all(),
300 config: SearchConfig {
301 match_type: MatchType::Exact,
302 case_sensitivity: CaseSensitivity::Insensitive,
303 },
304 }
305 }
306}
307
308#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
310pub enum YearRange {
311 Before(u16),
312 After(u16),
313 Between(u16, u16),
314}
315
316impl FromStr for YearRange {
317 type Err = anyhow::Error;
318
319 fn from_str(s: &str) -> Result<Self, Self::Err> {
320 fn str_to_option_u16(value: &str) -> Result<Option<u16>, anyhow::Error> {
321 if value.is_empty() {
322 return Ok(None);
323 }
324 match value.parse::<u16>() {
325 Ok(value) => Ok(Some(value)),
326 Err(_) => bail!("Invalid year range"),
327 }
328 }
329 let parts: Vec<Option<u16>> = s
330 .split("...")
331 .map(str_to_option_u16)
332 .collect::<Result<Vec<Option<u16>>, _>>()?;
333 match parts.as_slice() {
334 [Some(a)] => Ok(YearRange::Between(*a, *a)),
335 [None, Some(a)] => Ok(YearRange::Before(*a)),
336 [Some(a), None] => Ok(YearRange::After(*a)),
337 [Some(a), Some(b)] => {
338 if a > b {
339 bail!("Invalid year range")
340 } else {
341 Ok(YearRange::Between(*a, *b))
342 }
343 }
344 _ => bail!("Invalid year range"),
345 }
346 }
347}
348
349#[derive(Clone, Debug, Deserialize, Serialize)]
351pub struct MetricId {
352 pub id: String,
353 #[serde(default = "default_metric_id_search_config")]
354 pub config: SearchConfig,
355}
356
357fn default_metric_id_search_config() -> SearchConfig {
358 SearchConfig {
359 match_type: MatchType::Startswith,
360 case_sensitivity: CaseSensitivity::Insensitive,
361 }
362}
363
364#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
365pub enum MatchType {
366 Regex,
367 #[default]
368 Exact,
369 Contains,
370 Startswith,
371}
372
373#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
374pub enum CaseSensitivity {
375 #[default]
376 Insensitive,
377 Sensitive,
378}
379
380#[derive(Clone, Debug, Serialize, Deserialize)]
382pub struct SearchConfig {
383 pub match_type: MatchType,
385 pub case_sensitivity: CaseSensitivity,
387}
388
389#[derive(Clone, Debug, Deserialize, Serialize)]
391pub struct GeometryLevel {
392 pub value: String,
393 pub config: SearchConfig,
394}
395
396#[derive(Clone, Debug, Deserialize, Serialize)]
398pub struct SourceDataRelease {
399 pub value: String,
400 pub config: SearchConfig,
401}
402
403#[derive(Clone, Debug, Deserialize, Serialize)]
404pub struct SourceDownloadUrl {
405 pub value: String,
406 pub config: SearchConfig,
407}
408
409#[derive(Clone, Debug, Deserialize, Serialize)]
411pub struct DataPublisher {
412 pub value: String,
413 pub config: SearchConfig,
414}
415
416#[derive(Clone, Debug, Deserialize, Serialize)]
418pub struct Country {
419 pub value: String,
420 pub config: SearchConfig,
421}
422
423#[derive(Clone, Debug, Deserialize, Serialize)]
425pub struct SourceMetricId {
426 pub value: String,
427 pub config: SearchConfig,
428}
429
430#[derive(Clone, Debug, Deserialize, Serialize, Default)]
445pub struct SearchParams {
446 pub text: Vec<SearchText>,
447 pub year_range: Option<Vec<YearRange>>,
448 pub metric_id: Vec<MetricId>,
449 pub geometry_level: Option<GeometryLevel>,
450 pub source_data_release: Option<SourceDataRelease>,
451 pub data_publisher: Option<DataPublisher>,
452 pub source_download_url: Option<SourceDownloadUrl>,
453 pub country: Option<Country>,
454 pub source_metric_id: Option<SourceMetricId>,
455 pub region_spec: Vec<RegionSpec>,
456}
457
458impl SearchParams {
459 pub fn search(self, expanded_metadata: &ExpandedMetadata) -> SearchResults {
460 debug!("Searching with request: {:?}", self);
461 let expr: Option<Expr> = self.into();
462 let full_results: LazyFrame = expanded_metadata.as_df();
463 let result: LazyFrame = match expr {
464 Some(expr) => full_results.filter(expr),
465 None => full_results,
466 };
467 SearchResults(result.collect().unwrap())
468 }
469}
470
471fn to_queries_then_or<T: Into<Expr>>(queries: Vec<T>) -> Option<Expr> {
472 let queries: Vec<Expr> = queries.into_iter().map(|q| q.into()).collect();
473 combine_exprs_with_or(queries)
474}
475
476fn _to_optqueries_then_or<T: Into<Option<Expr>>>(queries: Vec<T>) -> Option<Expr> {
477 let query_options: Vec<Option<Expr>> = queries.into_iter().map(|q| q.into()).collect();
478 let queries: Vec<Expr> = query_options.into_iter().flatten().collect();
479 combine_exprs_with_or(queries)
480}
481
482impl From<SearchParams> for Option<Expr> {
483 fn from(value: SearchParams) -> Self {
484 let mut subexprs: Vec<Option<Expr>> = value
486 .text
487 .into_iter()
488 .map(|text| Some(text.into()))
489 .collect();
490
491 if let Some(year_range) = value.year_range {
492 subexprs.extend([to_queries_then_or(year_range)]);
493 }
494 let other_subexprs: Vec<Option<Expr>> = vec![
495 value.geometry_level.map(|v| v.into()),
496 value.source_data_release.map(|v| v.into()),
497 value.data_publisher.map(|v| v.into()),
498 value.source_download_url.map(|v| v.into()),
499 value.country.map(|v| v.into()),
500 value.source_metric_id.map(|v| v.into()),
501 ];
502 subexprs.extend(other_subexprs);
503 let valid_subexprs: Vec<Expr> = subexprs.into_iter().flatten().collect();
505
506 let combined_non_id_expr = combine_exprs_with_and(valid_subexprs);
508
509 let combined_id_expr = to_queries_then_or(value.metric_id);
511
512 debug!("{:#?}", combined_non_id_expr);
513 debug!("{:#?}", combined_id_expr);
514
515 combine_exprs_with_or(
517 vec![combined_non_id_expr, combined_id_expr]
518 .into_iter()
519 .flatten()
520 .collect::<Vec<_>>(),
521 )
522 }
523}
524
525#[derive(Debug, Serialize, Deserialize)]
528pub struct DownloadParams {
529 pub include_geoms: bool,
530 pub region_spec: Vec<RegionSpec>,
531}
532
533#[derive(Debug, Serialize, Deserialize)]
536pub struct Params {
537 pub search: SearchParams,
538 pub download: DownloadParams,
539}
540
541#[derive(Clone, Debug)]
542pub struct SearchResults(pub DataFrame);
543
544impl SearchResults {
545 pub fn to_metric_requests(&self, config: &Config) -> Vec<MetricRequest> {
547 let df = self
551 .0
552 .clone()
553 .lazy()
554 .select([
555 col(COL::METRIC_PARQUET_PATH),
556 col(COL::METRIC_PARQUET_COLUMN_NAME),
557 col(COL::GEOMETRY_FILEPATH_STEM),
558 ])
559 .collect()
560 .unwrap();
561 df.column(COL::METRIC_PARQUET_COLUMN_NAME)
562 .unwrap()
563 .str()
564 .unwrap()
565 .into_no_null_iter()
566 .zip(
567 df.column(COL::METRIC_PARQUET_PATH)
568 .unwrap()
569 .str()
570 .unwrap()
571 .into_no_null_iter(),
572 )
573 .zip(
574 df.column(COL::GEOMETRY_FILEPATH_STEM)
575 .unwrap()
576 .str()
577 .unwrap()
578 .into_no_null_iter(),
579 )
580 .map(|((column, metric_file), geom_file)| MetricRequest {
581 column: column.to_owned(),
582 metric_file: format!("{}/{metric_file}", config.base_path),
583 geom_file: format!("{}/{geom_file}.fgb", config.base_path),
584 })
585 .collect()
586 }
587
588 pub async fn download(
591 self,
592 config: &Config,
593 download_params: &DownloadParams,
594 ) -> anyhow::Result<DataFrame> {
595 let metric_requests = self.to_metric_requests(config);
596 debug!("metric_requests = {:#?}", metric_requests);
597
598 if metric_requests.is_empty() {
599 bail!(
600 "No metric requests were derived from `SearchResults`: {}\ngiven `DownloadParams`: {:#?}",
601 self.0,
602 download_params
603 )
604 }
605
606 let all_geom_files: HashSet<String> = metric_requests
607 .iter()
608 .map(|m| m.geom_file.clone())
609 .collect();
610
611 if all_geom_files.len() > 1 {
613 let err_info = "Multiple geometries not supported in current release";
614 error!("{err_info}: {all_geom_files:?}");
615 unimplemented!("{err_info}");
616 } else if all_geom_files.is_empty() {
617 bail!(
618 "No geometry files for the following `metric_requests`: {:#?}",
619 metric_requests
620 )
621 }
622
623 let metrics = tokio::task::spawn_blocking(move || get_metrics(&metric_requests, None));
625
626 let result = if download_params.include_geoms {
627 if download_params.region_spec.len() > 1 {
629 todo!(
630 "Multiple region specifications are not yet supported: {:#?}",
631 download_params.region_spec
632 );
633 }
634 let bbox = download_params
635 .region_spec
636 .first()
637 .and_then(|region_spec| region_spec.bbox().clone());
638
639 if bbox.is_some() {
640 warn!(
641 "The bounding box should be specified in the same coordinate reference system \
642 as the requested geometry."
643 )
644 }
645 let geoms = get_geometries(all_geom_files.iter().next().unwrap(), bbox);
646
647 let (metrics, geoms) = try_join!(
650 async move { metrics.await.map_err(anyhow::Error::from) },
651 geoms
652 )?;
653 debug!("geoms: {geoms:#?}");
654 debug!("metrics: {metrics:#?}");
655 geoms.inner_join(&metrics?, [COL::GEO_ID], [COL::GEO_ID])?
656 } else {
657 let metrics = metrics.await.map_err(anyhow::Error::from)??;
658 debug!("metrics: {metrics:#?}");
659 metrics
660 };
661
662 Ok(result)
663 }
664}
665
666#[cfg(test)]
667mod tests {
668
669 use polars::df;
670
671 use super::*;
672
673 fn test_df() -> DataFrame {
674 df!(
675 COL::METRIC_HUMAN_READABLE_NAME => &["Apple", "Apple", "Pear", "apple", ".apple", "lemon"],
676 COL::METRIC_HXL_TAG => &["Red", "Yellow", "Green", "red", "Green", "yellow"],
677 COL::METRIC_DESCRIPTION => &["Red", "Yellow", "Green", "red", "Green", "yellow"],
678 "index" => &[0u32, 1, 2, 3, 4, 5]
679 )
680 .unwrap()
681 }
682
683 fn test_search_params(
684 value: &str,
685 match_type: MatchType,
686 case_sensitivity: CaseSensitivity,
687 ) -> SearchParams {
688 SearchParams {
689 text: vec![SearchText {
690 text: value.to_string(),
691 context: nonempty![SearchContext::HumanReadableName],
692 config: SearchConfig {
693 match_type,
694 case_sensitivity,
695 },
696 }],
697 ..Default::default()
698 }
699 }
700
701 fn test_from_args(
702 value: &str,
703 match_type: MatchType,
704 case_sensitivity: CaseSensitivity,
705 expected_ids: &[u32],
706 ) -> anyhow::Result<()> {
707 let df = test_df();
708 let search_params = test_search_params(value, match_type, case_sensitivity);
709 let expr = Option::<Expr>::from(search_params.clone()).unwrap();
710 let filtered = df.clone().lazy().filter(expr).collect()?;
711 assert_eq!(filtered.select(["index"])?, df!("index" => expected_ids)?);
712 Ok(())
713 }
714
715 #[test]
716 #[rustfmt::skip]
717 fn test_search_request() -> anyhow::Result<()> {
718 test_from_args("^A", MatchType::Regex, CaseSensitivity::Sensitive, &[0, 1])?;
720 test_from_args("^A", MatchType::Regex, CaseSensitivity::Insensitive, &[0, 1, 3])?;
722 test_from_args("Apple", MatchType::Exact, CaseSensitivity::Sensitive, &[0, 1])?;
724 test_from_args("Apple", MatchType::Exact, CaseSensitivity::Insensitive, &[0, 1, 3])?;
726 test_from_args("Apple", MatchType::Regex, CaseSensitivity::Sensitive, &[0, 1])?;
728 test_from_args("Apple", MatchType::Regex, CaseSensitivity::Insensitive, &[0, 1, 3, 4])?;
730 Ok(())
731 }
732}