use crate::qvf::{
self, Collection, Instrument, Product, QvfDate, QvfFilename, QvfFilenames, Satellite,
};
use crate::stac::{filter_assets_by_key, filter_items, filter_tile, filter_tile_pc};
use crate::io::item_with_assets;
use crate::utils::{bbox_from_polygon, Bbox, Cmp, ImagerySource, Intersects, run_gdal_command};
use crate::PRODUCT_REGISTRY;
use anyhow::{bail, Result};
use chrono::{DateTime, Duration, FixedOffset, NaiveDate, NaiveDateTime, NaiveTime, TimeZone};
use gdal::Dataset;
use glob::glob;
use log::{debug, warn};
use postgres::{Client, NoTls};
use reqwest;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
pub use stac::ItemCollection;
use stac::{Asset, Item, Properties};
use std::collections::HashMap;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time;
#[derive(Debug, Clone)]
enum BandSpec {
Explicit(Vec<String>),
Canonical(Vec<String>),
All,
}
pub struct ImageQueryBuilder<'a> {
source: ImagerySource,
collection: Collection,
bands: BandSpec,
start_date: NaiveDate,
end_date: NaiveDate,
landcover: (Cmp, usize),
cloudcover: (Cmp, usize),
intersects: Intersects<'a>,
}
pub struct ImageQuery {
pub query: String,
}
pub struct DbConnection {
client: Client,
}
impl DbConnection {
pub(crate) fn new() -> Self {
let db_user = std::env::var("RSS_DB_USER").unwrap_or_else(|_| "slats".to_string());
let db_name = std::env::var("RSS_DB_NAME").unwrap_or_else(|_| "slatsmeta".to_string());
let db_host = std::env::var("RSS_DB_HOST").unwrap_or_else(|_| "sipgdb".to_string());
let db_url = format!("postgresql://{db_user}@{db_host}/{db_name}");
let client = Client::connect(&db_url, NoTls).expect("Unable to connect to the database");
DbConnection { client }
}
pub fn query_imagery(mut self, query: ImageQuery, stage_codes: &[&str]) -> QvfFilenames {
let results = self.client.query(&query.query, &[]).unwrap();
let mut qvf_filenames = Vec::new();
for row in results {
let row_date: &str = row.get(2);
let row_date =
NaiveDate::parse_from_str(row_date, "%Y%m%d").expect("Could not parse date");
for stage_code in stage_codes {
let scene: &str = row.get(0);
let r = QvfFilename {
scene: scene.to_string(),
date: qvf::QvfDate::Date(row_date),
satellite: Satellite::from_str(row.get(1)).expect("Invalid Satellite"),
instrument: qvf::Instrument::ms,
product: qvf::Product::re,
stage_code: stage_code.to_string(),
zone: format!("m{}", &scene[2..3]),
extension: qvf::Extension::img,
collection: qvf::Collection::Sentinel2,
image_type: qvf::ImageType::Scene,
location: None,
extra_fields: None,
};
qvf_filenames.push(r);
}
}
QvfFilenames { qvf_filenames }
}
}
impl<'a> ImageQueryBuilder<'a> {
pub fn new(
source: impl Into<ImagerySource>,
collection: Collection,
intersects: Intersects<'a>,
) -> Self {
let end = format!("{}", chrono::Utc::now().format("%Y%m%d"));
let start = format!(
"{}",
(chrono::Utc::now() - Duration::days(14)).format("%Y%m%d")
);
ImageQueryBuilder {
source: source.into(),
collection,
bands: BandSpec::All,
start_date: NaiveDate::parse_from_str(&start, "%Y%m%d").unwrap(),
end_date: NaiveDate::parse_from_str(&end, "%Y%m%d").unwrap(),
intersects,
landcover: (Cmp::Greater, 0),
cloudcover: (Cmp::Less, 0),
}
}
pub fn bands(mut self, bands: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.bands = BandSpec::Explicit(
bands.into_iter().map(|b| b.into()).collect()
);
self
}
pub fn canonical_bands(mut self, canonical: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.bands = BandSpec::Canonical(
canonical.into_iter().map(|c| c.into()).collect()
);
self
}
pub fn start_date(mut self, date: NaiveDate) -> ImageQueryBuilder<'a> {
self.start_date = date;
self
}
pub fn end_date(mut self, date: NaiveDate) -> ImageQueryBuilder<'a> {
self.end_date = date;
self
}
pub fn landcover(mut self, cmp: (Cmp, usize)) -> ImageQueryBuilder<'a> {
self.landcover = cmp;
self
}
pub fn cloudcover(mut self, cmp: (Cmp, usize)) -> ImageQueryBuilder<'a> {
self.cloudcover = cmp;
self
}
fn resolve_bands(&self) -> Result<Vec<String>> {
if let BandSpec::Explicit(names) = &self.bands {
return Ok(names.clone());
}
if matches!(self.source, ImagerySource::Apollo(_)) {
bail!(
"canonical_bands() and default (all bands) are not supported for Apollo source; \
use .bands() with explicit stage codes (e.g., \"aba\", \"dbg\")"
);
}
let source_name = self.source.name();
let product = PRODUCT_REGISTRY.by_source_and_collection(&source_name, self.collection)?;
match &self.bands {
BandSpec::Explicit(_) => unreachable!(),
BandSpec::Canonical(canonical_names) => {
let mut resolved = Vec::new();
let mut errors = Vec::new();
for name in canonical_names {
match product.resolve_canonical(name) {
Some(provider_name) => resolved.push(provider_name.to_string()),
None => errors.push(format!(
"canonical band '{}' not found in product '{}'",
name, product.name
)),
}
}
if !errors.is_empty() {
bail!("Failed to resolve {} band(s):\n - {}", errors.len(), errors.join("\n "));
}
Ok(resolved)
}
BandSpec::All => {
Ok(product.measurement_names().into_iter().map(String::from).collect())
}
}
}
fn bbox_from_scenes(scenes: &[&str], collection: Collection) -> HashMap<String, Bbox> {
let mut filtered = HashMap::new();
let tile2bbox = match collection {
Collection::Sentinel2 => {
include_str!("../data/sentinel2_bbox.json")
}
Collection::Landsat9
| Collection::Landsat8
| Collection::Landsat7
| Collection::Landsat5
| Collection::LandsatAll => {
include_str!("../data/landsat_bbox.json")
}
};
let full: HashMap<String, Bbox> =
serde_json::from_str(tile2bbox).expect("error deserializing tiles info");
for s in scenes {
let b = full
.get(*s)
.unwrap_or_else(|| panic!("{}", format!("Scene {s:?} not found")));
filtered.insert(s.to_string(), b.to_owned());
}
filtered
}
pub fn from_qvf(
qvf_filename: QvfFilename,
source: impl Into<ImagerySource>,
bands: impl IntoIterator<Item = impl Into<String>>,
) -> Result<QueryResult> {
let collection = match qvf_filename.satellite {
Satellite::l5 => Collection::Landsat5,
Satellite::l7 => Collection::Landsat7,
Satellite::l8 => Collection::Landsat8,
Satellite::ce => Collection::Sentinel2,
Satellite::cf => Collection::Sentinel2,
Satellite::cv => Collection::Sentinel2,
_ => panic!("Collection not supported"),
};
let start_date = qvf_filename.date.into();
let scene = match collection {
Collection::Sentinel2 => qvf_filename.scene[1..].to_string(),
_ => qvf_filename.scene,
};
let intersects = Intersects::Scene(vec![&scene]);
let query = ImageQueryBuilder::new(source, collection, intersects)
.bands(bands)
.start_date(start_date)
.end_date(start_date + Duration::days(1))
.cloudcover((Cmp::Less, 100))
.build();
Ok(query)
}
pub fn query_stac(self) -> Result<QueryResult> {
let resolved_bands = self.resolve_bands()?;
let layers: Vec<&str> = resolved_bands.iter().map(|s| s.as_str()).collect();
let timeout = std::env::var("REQWEST_TIMEOUT").unwrap_or(360.to_string());
let limit = std::env::var("REQWEST_LIMIT").unwrap_or(50.to_string());
let client = reqwest::blocking::Client::builder()
.timeout(time::Duration::from_secs(timeout.parse::<u64>().unwrap()))
.gzip(true)
.brotli(true)
.deflate(true)
.build()?;
let bboxs = match self.intersects {
Intersects::Scene(ref scenes) => Self::bbox_from_scenes(scenes, self.collection),
Intersects::Bbox(bbox) => {
let mut hm = HashMap::new();
hm.insert("bbox".to_string(), bbox);
hm
}
Intersects::Polygon(ref polygon) => {
let bbox = bbox_from_polygon(polygon);
let mut hm = HashMap::new();
hm.insert("bbox".to_string(), bbox);
hm
}
};
debug!("Bbox is: {bboxs:?}");
let start_date = self.start_date.format("%Y-%m-%dT00:00:00Z").to_string();
let end_date = self.end_date.format("%Y-%m-%dT00:00:00Z").to_string();
let datetime = format!("{}/{}", start_date, end_date);
let mut items = Vec::new();
let source_name = self.source.name();
let collection_name = PRODUCT_REGISTRY
.get_stac_collection(&source_name, self.collection)
.expect("Failed to lookup STAC collection")
.expect("STAC collection not registered for this source/collection");
let client_url = self.source.get_url();
debug!("client_url: {:?}", client_url);
for (_tile, bbox) in bboxs {
let initial_req = client
.get(&client_url)
.query(&[("limit", limit.clone())])
.query(&[("collections", collection_name)])
.query(&[("bbox", bbox.to_string())])
.query(&[("datetime", datetime.clone())])
.query(&[("stac_version", 1)])
.build()
.unwrap();
let mut next_link = Some(initial_req.url().as_str().to_string());
let mut feature_collections: Vec<ItemCollection> = Vec::new();
while let Some(link) = next_link {
debug!("Following some link: {link}");
let data_req = client.get(&link).build()?;
let data_resp = client.execute(data_req)?;
let data = data_resp.text()?;
let page_collection: ItemCollection = serde_json::from_str(&data)?;
debug!("page_collection len: {}", page_collection.items.len());
feature_collections.push(page_collection);
let page_collection_v: Value = serde_json::from_str(&data)?;
next_link = page_collection_v["links"]
.as_array()
.unwrap()
.iter()
.find_map(|link| {
if link["rel"].as_str().unwrap() == "next" {
Some(link["href"].as_str().unwrap().to_string())
} else {
None
}
})
}
debug!("Found {} chunks of items", feature_collections.len());
let mut feature_collection = feature_collections[0].clone();
feature_collection.items = vec![];
for item_collection in feature_collections.iter() {
feature_collection
.items
.extend(item_collection.items.clone());
}
let feature_collection_filtered = filter_items::<f64>(
feature_collection,
"eo:cloud_cover",
self.cloudcover.0,
self.cloudcover.1 as f64,
)
.expect("Oh, no!");
let feature_collection_filtered = match self.source.to_owned() {
ImagerySource::Apollo(_) => feature_collection_filtered,
ImagerySource::Element(_) => match self.intersects {
Intersects::Scene(ref scenes) => {
let scenes: Vec<String> = if scenes[0].starts_with('p') {
scenes.iter().map(|s| s.to_string()).collect()
} else {
scenes
.iter()
.map(|s| format!("MGRS-{s}").to_uppercase())
.collect()
};
if scenes[0].starts_with('M') {
filter_tile(feature_collection_filtered.clone(), "grid:code", &scenes)
.expect("Error filtering tiles.")
} else {
feature_collection_filtered.clone()
}
}
Intersects::Bbox(_) => feature_collection_filtered,
Intersects::Polygon(_) => feature_collection_filtered,
},
ImagerySource::Dea(_) => match self.intersects {
Intersects::Scene(ref scenes) => {
let scenes: Vec<String> = if scenes[0].starts_with('p') {
scenes.iter().map(|s| remove_p_and_r(s)).collect()
} else {
scenes.iter().map(|s| s.to_string()).collect()
};
filter_tile(feature_collection_filtered, "odc:region_code", &scenes)
.expect("Error filtering tiles.")
}
Intersects::Bbox(_) => feature_collection_filtered,
Intersects::Polygon(_) => feature_collection_filtered,
},
ImagerySource::PlanetaryComputer(_) => match self.intersects {
Intersects::Scene(ref scenes) => {
filter_tile_pc(feature_collection_filtered, scenes)
.expect("Error filtering tiles.")
}
Intersects::Bbox(_) => feature_collection_filtered,
Intersects::Polygon(_) => feature_collection_filtered,
},
};
for mut item in feature_collection_filtered.items {
let assets = std::mem::take(&mut item.assets);
if let Some(filtered_asset) = filter_assets_by_key(assets, &layers) {
items.push(item_with_assets(item, filtered_asset));
}
}
}
let query_result = ItemCollection::from_iter(items);
let result = QueryResult {
result: query_result,
source: self.source.clone(),
};
Ok(result)
}
pub fn query_apollo(&self) -> Result<QueryResult> {
let start = self.start_date.format("%Y%m%d").to_string();
let end = self.end_date.format("%Y%m%d").to_string();
let mut q = match self.collection {
Collection::Sentinel2 => format!(
"\
SELECT sentinel2_list.scene, sentinel2_list.qvf_sat, sentinel2_list.date, pcntcloud_land,pcntcloud FROM sentinel2_list
JOIN cloudamount using (scene, date)
WHERE sentinel2_list.acqdate >= '{}'::date
AND sentinel2_list.acqdate <= '{}'::date
",
start, end
),
Collection::Landsat5
| Collection::Landsat7
| Collection::Landsat8
| Collection::Landsat9
| Collection::LandsatAll => build_landsat_sql(&start, &end, self.collection),
};
let sat_list = match self.collection {
Collection::Landsat5
| Collection::Landsat7
| Collection::Landsat8
| Collection::Landsat9 => "l", Collection::Sentinel2 => "sentinel2_list",
Collection::LandsatAll => "l",
};
let intersects = match &self.intersects {
Intersects::Scene(scenes) => {
let escaped_scenes: Vec<String> = scenes.iter().map(|s| escape_sql_literal(s)).collect();
format!(" AND {}.scene IN ('{}')", sat_list, escaped_scenes.join("', '"))
}
Intersects::Bbox(bbox) => format!(
"and st_intersects (st_MakeEnvelope({},{},{},{},4326),geom)",
bbox.xmin, bbox.ymin, bbox.xmax, bbox.ymax
),
Intersects::Polygon(_) => bail!("Polygon intersects not supported for Apollo queries"),
};
q.push_str(&intersects);
let cloud_cover = match self.cloudcover.0 {
Cmp::Less => format!(" AND ((pcntcloud_land IS NOT null and pcntcloud_land < {}) OR (pcntcloud_land IS null and pcntcloud < {}))", self.cloudcover.1, self.cloudcover.1),
Cmp::Greater => format!(" AND ((pcntcloud_land IS NOT null and pcntcloud_land > {}) OR (pcntcloud_land IS null and pcntcloud > {}))", self.cloudcover.1, self.cloudcover.1),
Cmp::LessEqual => format!(" AND ((pcntcloud_land IS NOT null and pcntcloud_land <= {}) OR (pcntcloud_land IS null and pcntcloud <= {}))", self.cloudcover.1, self.cloudcover.1),
Cmp::GreaterEqual => format!(" AND ((pcntcloud_land IS NOT null and pcntcloud_land >= {}) OR (pcntcloud_land IS null and pcntcloud >= {}))", self.cloudcover.1, self.cloudcover.1),
Cmp::Equal => format!(" AND ((pcntcloud_land IS NOT null and pcntcloud_land = {}) OR (pcntcloud_land IS null and pcntcloud = {}))", self.cloudcover.1, self.cloudcover.1),
};
q.push_str(&cloud_cover);
let resolved_bands = self.resolve_bands()?;
let stage_codes: Vec<&str> = resolved_bands.iter().map(|s| s.as_str()).collect();
let query_result = ItemCollection::from_db_query(q, &stage_codes);
let result = QueryResult {
result: query_result,
source: self.source.clone(),
};
Ok(result)
}
#[must_use]
pub fn build(self) -> QueryResult {
match self.source {
ImagerySource::Apollo(_) => self.query_apollo().expect("Invalid apollo query"),
ImagerySource::Dea(_) => self.query_stac().expect("Invalid stac query"),
ImagerySource::Element(_) => self.query_stac().expect("Invalid stac query"),
ImagerySource::PlanetaryComputer(_) => self.query_stac().expect("Invalid query"),
}
}
}
#[derive(Debug)]
pub enum FilesLocation {
QvfFilenames(QvfFilenames),
FeatureCollection(Box<ItemCollection>),
}
#[derive(Debug, Clone)]
pub struct QueryResult {
pub result: ItemCollection,
pub source: ImagerySource,
}
pub trait From {
fn from_db_query(db_query: String, stage_codes: &[&str]) -> ItemCollection;
fn get_times(feature_collection: &Self) -> Vec<NaiveDate>;
fn get_layers(feature_collection: &Self) -> Vec<String>;
fn from_qvf_folder(folder: &Path, ext: &str) -> Result<ItemCollection>;
}
impl From for ItemCollection {
fn get_times(_feature_collection: &Self) -> Vec<NaiveDate> {
todo!();
}
fn get_layers(_feature_collection: &Self) -> Vec<String> {
todo!();
}
fn from_qvf_folder(folder: &Path, ext: &str) -> Result<ItemCollection> {
let mut files = Vec::new();
let file_glob = glob(&format!("{}/*.{}", folder.to_str().unwrap(), ext)).unwrap();
for entry in file_glob {
files.push(entry);
}
let qvf_files: Vec<QvfFilename> = files
.iter()
.map(|f| QvfFilename::from_str(f.as_ref().unwrap().to_str().unwrap()).unwrap())
.collect();
let mut stac_assets = HashMap::new();
let mut stac_items = Vec::new();
let mut single_band_vrts: Vec<QvfFilename> = Vec::new();
for source in qvf_files.iter() {
let path = PathBuf::from(source.name());
let stem = path.file_stem().unwrap().to_str().unwrap();
let source_fn = folder.join(source.name());
let source_fn_str = source_fn.to_owned();
let source_fn_str = source_fn_str.to_str().unwrap();
let ds = Dataset::open(source_fn)?;
let bands = ds.raster_count();
for band_id in 0..bands {
let new_stem = format!("{}_b{}", stem, band_id + 1);
let new_vrt = format!("{}.{}", new_stem, "vrt");
let new_vrt = folder.join(new_vrt);
let new_vrt = new_vrt.to_str().unwrap();
let argv = &[
"gdal_translate",
"-b",
&format!("{}", band_id + 1),
"-q",
source_fn_str,
new_vrt,
];
run_gdal_command(argv);
single_band_vrts.push(QvfFilename::from_str(new_vrt).unwrap());
}
}
for q in single_band_vrts {
let band_name = if q.extra_fields.is_some() {
let extras = q.to_owned().extra_fields.unwrap().join("_");
let stage = q.stage_code.to_owned();
format!("{}_{}", stage, extras)
} else {
q.stage_code.to_string()
};
let href = format!("{}", folder.join(q.name()).to_string_lossy());
let title = band_name.to_string();
let description = Some("See wiki :) ".to_string());
let _asset_type = Some("image/img; application=HFA".to_string());
let roles = vec!["data".to_string()];
let created = Some("created".to_owned());
let updated = Some("updated".to_owned());
let mut additional_fields = Map::new();
additional_fields.insert("eo:cloud_cover".to_string(), json!("-999"));
additional_fields.insert("eo:land_cover".to_string(), json!("-999"));
debug!("## == -- Adding eo:bands -- == ##");
additional_fields.insert(
"eo:bands".to_string(),
json!([{"name": band_name}]), );
let mut asset = Asset::new(href);
asset.title = Some(title.clone());
asset.description = description;
asset.roles = roles;
asset.additional_fields = additional_fields.to_owned();
asset.created = created;
asset.updated = updated;
stac_assets.insert(title.clone(), asset);
let time = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); let tz_offset = FixedOffset::east_opt(3600).unwrap(); let d = match q.date {
QvfDate::Date(d) => d,
QvfDate::DateRange(d) => d.start,
};
let datetime = NaiveDateTime::new(d, time);
let dt_with_tz: DateTime<FixedOffset> =
tz_offset.from_local_datetime(&datetime).unwrap();
let properties = Properties {
start_datetime: Some(dt_with_tz.into()),
title: Some("Title".to_owned()),
updated: Some(dt_with_tz.to_rfc3339()),
datetime: Some(dt_with_tz.into()),
additional_fields,
created: Some(dt_with_tz.to_rfc3339()),
description: Some("Some description".to_owned()),
end_datetime: Some(dt_with_tz.into()),
};
let geometry = None;
let bbox = None;
let mut f_item = Item::new("todo".to_string());
f_item.geometry = geometry;
f_item.bbox = bbox;
f_item.properties = properties;
f_item.links = Vec::new();
f_item.assets = stac_assets.clone();
stac_items.push(f_item);
}
let item_collection = ItemCollection::from_iter(stac_items);
Ok(item_collection)
}
fn from_db_query(db_query: String, stage_codes: &[&str]) -> ItemCollection {
let mut connection = DbConnection::new();
let results = connection.client.query(&db_query, &[]).unwrap();
let mut stac_items = Vec::new();
for item in results {
let item_scene: &str = item.get(0);
let item_satellite: &str = item.get(1);
let item_date: &str = item.get(2);
let item_land: Option<i32> = item.get(3);
let item_cloud: i32 = item.get(4);
let item_satellite = Satellite::from_str(item_satellite).expect("Invalid Satellite");
let item_collection: Collection = match item_satellite {
Satellite::l5 => Collection::Landsat5,
Satellite::l7 => Collection::Landsat7,
Satellite::l8 => Collection::Landsat8,
Satellite::l9 => Collection::Landsat9,
Satellite::lz => todo!(),
Satellite::ce => Collection::Sentinel2,
Satellite::cf => Collection::Sentinel2,
Satellite::cv => Collection::Sentinel2,
};
let instrument = match &item_satellite {
Satellite::l5 | Satellite::l7 | Satellite::l8 | Satellite::lz | Satellite::l9 => {
let item_instrument: &str = item.get(5);
Instrument::from_str(item_instrument).expect("invalid Instrument")
}
Satellite::ce | Satellite::cf | Satellite::cv => Instrument::ms,
};
let product = match &item_satellite {
Satellite::l5 | Satellite::l7 | Satellite::l8 | Satellite::lz | Satellite::l9 => {
let item_product: &str = item.get(6);
Product::from_str(item_product).expect("invalid product")
}
Satellite::ce | Satellite::cf | Satellite::cv => Product::re,
};
let item_date =
NaiveDate::parse_from_str(item_date, "%Y%m%d").expect("Could not parse date");
let utm_zone = match &item_satellite {
Satellite::l5 | Satellite::l7 | Satellite::l8 | Satellite::lz | Satellite::l9 => {
let mut connection = DbConnection::new();
let q = format!("SELECT zone FROM landsat_zone WHERE scene='{}'", escape_sql_literal(item_scene));
let results = connection.client.query(&q, &[]).expect("Invalid query");
let utm_zone: i32 = results[0].get(0);
let utm_zone = &utm_zone.to_string()[1..2];
format!("m{}", &utm_zone)
}
Satellite::ce | Satellite::cf | Satellite::cv => {
format!("m{}", &item_scene[2..3])
}
};
let mut stac_assets = HashMap::new();
let mut qvf_filenames = Vec::new();
for stage_code in stage_codes {
let q = QvfFilename {
scene: item_scene.to_string(),
date: qvf::QvfDate::Date(item_date),
satellite: item_satellite.clone(),
instrument: instrument.clone(),
product: product.clone(),
stage_code: stage_code.to_string(),
zone: utm_zone.clone(),
extension: qvf::Extension::img,
collection: item_collection,
image_type: qvf::ImageType::Scene,
location: None,
extra_fields: None,
};
qvf_filenames.push(q);
}
let all_stages_on_filestore = qvf_filenames.iter().all(|q| q.exist_on_filestore());
if all_stages_on_filestore {
for q in qvf_filenames {
let href = format!("{}", q.qv_dir().unwrap().join(q.name()).to_string_lossy());
let title = q.stage_code.to_string();
let description = Some("See wiki :) ".to_string());
let _asset_type = Some("image/img; application=HFA".to_string());
let roles = vec!["data".to_string()];
let additional_fields = Map::new();
let created = Some("created".to_owned());
let updated = Some("updated".to_owned());
let mut asset = Asset::new(href);
asset.title = Some(title.clone());
asset.description = description;
asset.roles = roles;
asset.additional_fields = additional_fields.to_owned();
asset.created = created;
asset.updated = updated;
stac_assets.insert(title.clone(), asset);
}
let mut additional_fields = Map::new();
additional_fields.insert("eo:cloud_cover".to_string(), json!(item_cloud));
additional_fields.insert("eo:land_cover".to_string(), json!(item_land));
let time = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); let tz_offset = FixedOffset::east_opt(3600).unwrap(); let datetime = NaiveDateTime::new(item_date, time);
let dt_with_tz: DateTime<FixedOffset> =
tz_offset.from_local_datetime(&datetime).unwrap();
let properties = Properties {
start_datetime: Some(dt_with_tz.into()),
title: Some("Title".to_owned()),
updated: Some(dt_with_tz.to_rfc3339()),
datetime: Some(dt_with_tz.into()),
additional_fields,
created: Some(dt_with_tz.to_rfc3339()),
description: Some("Some description".to_owned()),
end_datetime: Some(dt_with_tz.into()),
};
let geometry = None;
let bbox = None;
let mut f_item = Item::new("todo".to_string());
f_item.geometry = geometry;
f_item.bbox = bbox;
f_item.properties = properties;
f_item.links = Vec::new();
f_item.assets = stac_assets;
stac_items.push(f_item);
} else {
warn!("Some of the requested stages in scene {item_scene} and date {item_date:?} were missing")
}
}
ItemCollection::from_iter(stac_items)
}
}
fn remove_p_and_r(scene: &str) -> String {
let without_p = &scene[1..4];
let without_r = &scene[5..];
format!("{}{}", without_p, without_r)
}
fn build_landsat_sql(start: &str, end: &str, collection: Collection) -> String {
let satellite_filter = match collection {
Collection::Landsat5 => "l.satellite = 'l5'",
Collection::Landsat7 => "l.satellite = 'l7'",
Collection::Landsat8 => "l.satellite = 'l8'",
Collection::Landsat9 => "l.satellite = 'l9'",
Collection::LandsatAll => {
"(l.satellite = 'l5') or (l.satellite = 'l7') or (l.satellite = 'l8') or (l.satellite = 'l9')"
}
_ => unreachable!("build_landsat_sql called with non-Landsat collection"),
};
format!(
"\
SELECT distinct scene, satellite, date, pcntcloud_land, pcntcloud, l.instrument,l.product
FROM landsat_list as l
JOIN cloudamount as c
USING (satellite, scene,date)
WHERE l.acqdate >= '{start}'::date AND l.acqdate <= '{end}'::date
AND ({satellite_filter})",
)
}
#[must_use]
fn escape_sql_literal(s: &str) -> String {
s.replace('\'', "''")
}
#[cfg(test)]
mod escape_tests {
use super::escape_sql_literal;
#[test]
fn test_escape_sql_literal_no_quotes() {
assert_eq!(escape_sql_literal("hello"), "hello");
}
#[test]
fn test_escape_sql_literal_single_quote() {
assert_eq!(escape_sql_literal("it's"), "it''s");
}
#[test]
fn test_escape_sql_literal_multiple_quotes() {
assert_eq!(escape_sql_literal("o'o'o"), "o''o''o");
}
#[test]
fn test_escape_sql_literal_leading_quote() {
assert_eq!(escape_sql_literal("'hello"), "''hello");
}
#[test]
fn test_escape_sql_literal_trailing_quote() {
assert_eq!(escape_sql_literal("hello'"), "hello''");
}
#[test]
fn test_escape_sql_literal_empty_string() {
assert_eq!(escape_sql_literal(""), "");
}
#[test]
fn test_escape_sql_literal_only_quotes() {
assert_eq!(escape_sql_literal("'''"), "''''''");
}
#[test]
fn test_escape_sql_literal_sql_injection_attempt() {
let input = "'; DROP TABLE scenes; --";
let escaped = escape_sql_literal(input);
assert_eq!(escaped, "''; DROP TABLE scenes; --");
}
}
#[cfg(test)]
mod band_spec_tests {
use super::*;
use crate::DEA;
fn make_builder(source: ImagerySource, collection: Collection) -> ImageQueryBuilder<'static> {
let bbox = Bbox {
xmin: -122.435,
xmax: -122.425,
ymin: 37.775,
ymax: 37.785,
};
ImageQueryBuilder::new(source, collection, Intersects::Bbox(bbox))
}
#[test]
fn test_explicit_bands_pass_through() {
let builder = make_builder(DEA.clone(), Collection::Sentinel2)
.bands(["nbart_red", "nbart_nir"]);
let resolved = builder.resolve_bands().unwrap();
assert_eq!(resolved, vec!["nbart_red", "nbart_nir"]);
}
#[test]
fn test_canonical_bands_resolves() {
let builder = make_builder(DEA.clone(), Collection::Sentinel2)
.canonical_bands(["red", "nir"]);
let resolved = builder.resolve_bands().unwrap();
assert_eq!(resolved, vec!["nbart_red", "nbart_nir"]);
}
#[test]
fn test_canonical_bands_collects_all_errors() {
let builder = make_builder(DEA.clone(), Collection::Sentinel2)
.canonical_bands(["red", "nonexistent_band", "also_missing"]);
let err = builder.resolve_bands().unwrap_err();
let err_msg = err.to_string();
assert!(err_msg.contains("Failed to resolve 2 band(s)"));
assert!(err_msg.contains("nonexistent_band"));
assert!(err_msg.contains("also_missing"));
assert!(!err_msg.contains("red not found"));
}
#[test]
fn test_default_all_bands() {
let builder = make_builder(DEA.clone(), Collection::Sentinel2);
let resolved = builder.resolve_bands().unwrap();
assert!(!resolved.is_empty());
assert!(resolved.contains(&"nbart_red".to_string()));
assert!(resolved.contains(&"nbart_nir".to_string()));
}
#[test]
fn test_apollo_requires_explicit_bands() {
let apollo = crate::APOLLO.clone();
let builder = make_builder(apollo, Collection::Sentinel2);
let err = builder.resolve_bands().unwrap_err();
let err_msg = err.to_string();
assert!(err_msg.contains("not supported for Apollo source"));
assert!(err_msg.contains(".bands()"));
}
#[test]
fn test_apollo_explicit_bands_works() {
let apollo = crate::APOLLO.clone();
let builder = make_builder(apollo, Collection::Sentinel2)
.bands(["aba", "dbg"]);
let resolved = builder.resolve_bands().unwrap();
assert_eq!(resolved, vec!["aba", "dbg"]);
}
}
#[cfg(test)]
mod landsat_sql_tests {
use super::*;
#[test]
fn test_build_landsat_sql_single_satellite() {
let sql = build_landsat_sql("20220101", "20220115", Collection::Landsat8);
assert!(sql.contains("l.satellite = 'l8'"));
assert!(sql.contains("20220101"));
assert!(sql.contains("20220115"));
assert!(sql.contains("landsat_list"));
}
#[test]
fn test_build_landsat_sql_all_satellites() {
let sql = build_landsat_sql("20220101", "20220115", Collection::LandsatAll);
assert!(sql.contains("l.satellite = 'l5'"));
assert!(sql.contains("l.satellite = 'l7'"));
assert!(sql.contains("l.satellite = 'l8'"));
assert!(sql.contains("l.satellite = 'l9'"));
}
#[test]
fn test_build_landsat_sql_each_satellite() {
let sql5 = build_landsat_sql("20220101", "20220115", Collection::Landsat5);
assert!(sql5.contains("l.satellite = 'l5'"));
assert!(!sql5.contains("l.satellite = 'l7'"));
let sql7 = build_landsat_sql("20220101", "20220115", Collection::Landsat7);
assert!(sql7.contains("l.satellite = 'l7'"));
assert!(!sql7.contains("l.satellite = 'l8'"));
let sql9 = build_landsat_sql("20220101", "20220115", Collection::Landsat9);
assert!(sql9.contains("l.satellite = 'l9'"));
assert!(!sql9.contains("l.satellite = 'l5'"));
}
}