use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use surtgis_algorithms::imagery::{CloudMaskStrategy, S2SclMask, LandsatQaMask, NoCloudMask};
use surtgis_cloud::blocking::{CogReaderBlocking, StacClientBlocking};
use surtgis_cloud::{BBox, CogReaderOptions, StacCatalog, StacClientOptions, StacItem, StacSearchParams};
use tracing::debug;
use crate::commands::StacCommands;
use crate::helpers::{done, parse_bbox, spinner, write_result};
use crate::stac_introspect::{CloudMaskType, StacCollectionSchema};
use crate::streaming::resolve_asset_key;
#[derive(Clone)]
pub enum CollectionProfile {
Sentinel2L2A {
cloud_mask_strategy: Arc<dyn CloudMaskStrategy>,
},
LandsatC2L2 {
cloud_mask_strategy: Arc<dyn CloudMaskStrategy>,
},
Sentinel1RTC,
}
impl std::fmt::Debug for CollectionProfile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sentinel2L2A { .. } => f.debug_struct("Sentinel2L2A").finish(),
Self::LandsatC2L2 { .. } => f.debug_struct("LandsatC2L2").finish(),
Self::Sentinel1RTC => f.debug_struct("Sentinel1RTC").finish(),
}
}
}
impl CollectionProfile {
pub fn from_collection_name(name: &str) -> Result<Self> {
match name {
"sentinel-2-l2a" => Ok(Self::Sentinel2L2A {
cloud_mask_strategy: Arc::new(S2SclMask::new()),
}),
"landsat-c2-l2" => Ok(Self::LandsatC2L2 {
cloud_mask_strategy: Arc::new(LandsatQaMask::new()),
}),
"sentinel-1-rtc" => Ok(Self::Sentinel1RTC),
_ => {
eprintln!(" ℹ Collection '{}' not recognized — proceeding without cloud masking", name);
Ok(Self::Sentinel1RTC)
}
}
}
pub fn mask_asset_name(&self) -> Option<&str> {
match self {
Self::Sentinel2L2A { .. } => Some("scl"),
Self::LandsatC2L2 { .. } => Some("QA_PIXEL"),
Self::Sentinel1RTC => None,
}
}
pub fn description(&self) -> &str {
match self {
Self::Sentinel2L2A { .. } => "Sentinel-2 L2A",
Self::LandsatC2L2 { .. } => "Landsat C2 L2",
Self::Sentinel1RTC => "Sentinel-1 RTC",
}
}
}
pub fn create_cloud_mask_strategy(mask_type: &CloudMaskType) -> Arc<dyn CloudMaskStrategy> {
match mask_type {
CloudMaskType::Categorical {
asset: _,
num_classes: _,
} => {
Arc::new(S2SclMask::new())
}
CloudMaskType::Bitmask { asset: _, bits: _ } => {
Arc::new(LandsatQaMask::new())
}
CloudMaskType::None => Arc::new(NoCloudMask),
}
}
#[derive(Clone, Debug)]
pub struct StacCatalogInfo {
pub shorthand: &'static str,
pub name: &'static str,
pub url: &'static str,
pub description: &'static str,
}
pub fn get_known_catalogs() -> Vec<StacCatalogInfo> {
vec![
StacCatalogInfo {
shorthand: "pc",
name: "Planetary Computer",
url: "https://planetarycomputer.microsoft.com/api/stac/v1",
description: "Microsoft's STAC: S2, Landsat, Sentinel-1, Copernicus DEM, GEBCO, etc.",
},
StacCatalogInfo {
shorthand: "es",
name: "Earth Search (AWS)",
url: "https://earth-search.aws.element84.com/v1",
description: "Element 84 on AWS: S2, Landsat, Sentinel-1, DEM collections",
},
StacCatalogInfo {
shorthand: "cdse",
name: "Copernicus Data Space (CDSE)",
url: "https://catalogue.dataspace.copernicus.eu/odata/v1",
description: "EU's Copernicus: S1, S2, S3, S5P, DEM (free, Europe-focused)",
},
StacCatalogInfo {
shorthand: "usgs",
name: "USGS 3DEP / OpenTopography",
url: "https://cloud.sdsc.edu/v1/AUTH_opentopography/Raster/SRTM_GL30/SRTM_GL30_srtm",
description: "USGS elevation data: SRTM, ASTER GDEM, HydroSHEDS",
},
StacCatalogInfo {
shorthand: "osgeo",
name: "OGC STAC Index API",
url: "https://stacindex.org/api/v1",
description: "STAC Index: Registry of all public STAC catalogs worldwide",
},
]
}
pub fn resolve_catalog_url(catalog: &str) -> String {
for known in get_known_catalogs() {
if catalog == known.shorthand {
return known.url.to_string();
}
}
catalog.to_string()
}
pub fn get_catalog_collections(catalog: &str) -> Vec<(&'static str, &'static str)> {
match catalog {
"pc" => vec![
("sentinel-2-l2a", "Sentinel-2 Level 2A (optical, 10-60m, 2016-present)"),
("landsat-c2-l2", "Landsat Collection 2 Level 2 (optical, 30m, 1980-present)"),
("sentinel-1-rtc", "Sentinel-1 RTC (SAR, 10m, 2015-present)"),
("cop-dem-glo-30", "Copernicus DEM 30m (elevation, global)"),
("nasadem", "NASADEM (elevation, 30m, global)"),
("gebco", "GEBCO bathymetry (ocean, 15 arc-seconds)"),
],
"es" => vec![
("sentinel-2-l2a", "Sentinel-2 Level 2A (optical, 10-60m)"),
("landsat-c2-l2", "Landsat Collection 2 Level 2 (optical, 30m)"),
("sentinel-1-rtc", "Sentinel-1 RTC (SAR, 10m)"),
],
"cdse" => vec![
("sentinel-1-grd", "Sentinel-1 GRD (SAR, 10m, ground range detected)"),
("sentinel-1-slc", "Sentinel-1 SLC (SAR, 10m, single look complex)"),
("sentinel-2-l1c", "Sentinel-2 L1C (optical, 10-60m, L1 processing)"),
("sentinel-2-l2a", "Sentinel-2 L2A (optical, 10-60m, atmospherically corrected)"),
("sentinel-3-olci", "Sentinel-3 OLCI (optical, 300-1000m, ocean/land)"),
("sentinel-5p", "Sentinel-5P (atmospheric, daily global coverage)"),
],
"usgs" => vec![
("srtm-30m", "SRTM 30m DEM (90m for polar regions, 2000 data)"),
("aster-gdem", "ASTER GDEM (30m elevation, global, 2011 release)"),
("nasadem", "NASADEM (30m, merged SRTM+ASTER, improved voids)"),
("hydrosheds", "HydroSHEDS (hydrological datasets, 15 arc-seconds)"),
],
"osgeo" => vec![
("(registry)", "STAC Index: Search all public STAC catalogs worldwide"),
("(api)", "Use API to discover 1000+ STAC catalogs globally"),
],
_ => vec![
("(unknown)", "Use 'surtgis stac search --catalog <url> ...' to discover collections"),
],
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct StacIndexCatalog {
pub id: String,
pub title: String,
pub description: Option<String>,
pub url: Option<String>,
}
fn fetch_stac_index_catalogs() -> Result<Vec<StacIndexCatalog>> {
let cache_path = get_stac_index_cache_path();
let cache_ttl_secs = 24 * 60 * 60;
if let Ok(cached_catalogs) = load_from_cache(&cache_path, cache_ttl_secs) {
eprintln!(" 📦 Using cached catalog list ({} catalogs)", cached_catalogs.len());
return Ok(cached_catalogs);
}
eprintln!(" 🌐 Discovering catalogs from STAC Index API...");
match fetch_stac_index_http() {
Ok(catalogs) => {
let _ = save_to_cache(&cache_path, &catalogs);
eprintln!(" ✅ Found {} catalogs (cached)", catalogs.len());
Ok(catalogs)
}
Err(e) => {
eprintln!(" ℹ️ STAC Index API unavailable: {}", e);
eprintln!(" Using curated catalogs instead");
Ok(Vec::new())
}
}
}
fn truncate_utf8(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
return s.to_string();
}
let mut truncated = String::new();
for c in s.chars() {
if truncated.len() + c.len_utf8() <= max_len {
truncated.push(c);
} else {
break;
}
}
truncated.push_str("...");
truncated
}
fn get_stac_index_cache_path() -> std::path::PathBuf {
#[cfg(unix)]
{
let cache_dir = std::env::var("XDG_CACHE_HOME")
.unwrap_or_else(|_| format!("{}/.cache", std::env::var("HOME").unwrap_or_else(|_| ".".to_string())));
std::path::PathBuf::from(cache_dir).join("surtgis").join("stac_index_catalogs.json")
}
#[cfg(windows)]
{
let cache_dir = std::env::var("LOCALAPPDATA").unwrap_or_else(|_| ".".to_string());
std::path::PathBuf::from(cache_dir).join("surtgis-cache").join("stac_index_catalogs.json")
}
#[cfg(not(any(unix, windows)))]
{
std::path::PathBuf::from(".cache/stac_index_catalogs.json")
}
}
fn load_from_cache(path: &std::path::PathBuf, ttl_secs: u64) -> Result<Vec<StacIndexCatalog>> {
use std::fs;
use std::time::SystemTime;
let metadata = fs::metadata(path).context("Cache file not found")?;
let modified = metadata.modified().context("Could not get modification time")?;
let elapsed = SystemTime::now()
.duration_since(modified)
.context("Could not calculate cache age")?;
if elapsed.as_secs() > ttl_secs {
anyhow::bail!("Cache expired");
}
let content = fs::read_to_string(path).context("Could not read cache file")?;
let catalogs: Vec<StacIndexCatalog> = serde_json::from_str(&content)
.context("Could not parse cache file")?;
Ok(catalogs)
}
fn save_to_cache(path: &std::path::PathBuf, catalogs: &[StacIndexCatalog]) -> Result<()> {
use std::fs;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).ok();
}
let json = serde_json::to_string_pretty(catalogs)
.context("Could not serialize catalogs")?;
fs::write(path, json).context("Could not write cache file")?;
Ok(())
}
fn fetch_stac_index_http() -> Result<Vec<StacIndexCatalog>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("Failed to create async runtime")?;
rt.block_on(async {
fetch_stac_index_async().await
})
}
async fn fetch_stac_index_async() -> Result<Vec<StacIndexCatalog>> {
use std::time::Duration;
let url = "https://stacindex.org/api/catalogs";
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.context("Failed to create HTTP client")?;
let response = client
.get(url)
.send()
.await
.context("Failed to fetch from STAC Index API")?;
if !response.status().is_success() {
anyhow::bail!("HTTP {}: {}", response.status(), url);
}
let json: serde_json::Value = response
.json()
.await
.context("Failed to parse STAC Index response")?;
let catalogs = parse_stac_index_response(&json)
.context("Failed to parse catalog data")?;
Ok(catalogs)
}
fn parse_stac_index_response(json: &serde_json::Value) -> Result<Vec<StacIndexCatalog>> {
let mut catalogs = Vec::new();
let catalog_list = if let Some(arr) = json.as_array() {
arr.clone()
} else {
anyhow::bail!("Expected array response from STAC Index API");
};
for item in catalog_list {
let slug = item
.get("slug")
.and_then(|v| v.as_str())
.unwrap_or("");
if slug.is_empty() {
continue; }
let title = item
.get("title")
.and_then(|v| v.as_str())
.unwrap_or(slug)
.to_string();
let description = item
.get("summary")
.and_then(|v| v.as_str())
.map(|s| {
s.lines().next().unwrap_or(s).to_string()
});
let url = item
.get("url")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
catalogs.push(StacIndexCatalog {
id: slug.to_string(),
title,
description,
url,
});
}
catalogs.sort_by(|a, b| a.title.cmp(&b.title));
Ok(catalogs)
}
pub fn handle(action: StacCommands, compress: bool) -> Result<()> {
match action {
StacCommands::Search {
catalog,
bbox,
datetime,
collections,
limit,
} => {
let cat = StacCatalog::from_str_or_url(&catalog);
let pb = spinner("Searching STAC catalog...");
let client =
StacClientBlocking::new(cat, StacClientOptions::default())
.context("Failed to create STAC client")?;
let mut params = StacSearchParams::new().limit(limit);
if let Some(ref b) = bbox {
let bb = parse_bbox(b)?;
params = params.bbox(bb.min_x, bb.min_y, bb.max_x, bb.max_y);
}
if let Some(ref dt) = datetime {
params = params.datetime(dt);
}
if let Some(ref cols) = collections {
let c: Vec<&str> = cols.split(',').map(|s| s.trim()).collect();
params = params.collections(&c);
}
let results =
client.search(¶ms).context("STAC search failed")?;
pb.finish_and_clear();
println!(
"Found {} items (matched: {})",
results.len(),
results
.number_matched
.map_or("?".to_string(), |n| n.to_string())
);
println!();
for item in &results.features {
let dt = item
.properties
.datetime
.as_deref()
.unwrap_or("-");
let cc = item
.properties
.eo_cloud_cover
.map(|c| format!("{:.1}%", c))
.unwrap_or_else(|| "-".to_string());
let col = item.collection.as_deref().unwrap_or("-");
let asset_keys: Vec<&str> =
item.assets.keys().map(|k| k.as_str()).collect();
println!(" {} [{}]", item.id, col);
println!(" datetime: {} cloud: {}", dt, cc);
println!(" assets: {}", asset_keys.join(", "));
}
if results.has_next() {
println!(
"\n (more results available -- increase --limit to fetch more)"
);
}
}
StacCommands::Fetch {
catalog,
bbox,
collection,
asset,
datetime,
variable,
time_step,
output,
} => {
let cat = StacCatalog::from_str_or_url(&catalog);
let bb = parse_bbox(&bbox)?;
let mut params = StacSearchParams::new()
.bbox(bb.min_x, bb.min_y, bb.max_x, bb.max_y)
.collections(&[collection.as_str()])
.limit(1);
if let Some(ref dt) = datetime {
params = params.datetime(dt);
}
let pb = spinner("Searching STAC catalog...");
let client =
StacClientBlocking::new(cat, StacClientOptions::default())
.context("Failed to create STAC client")?;
let results =
client.search(¶ms).context("STAC search failed")?;
let item = results.features.first().ok_or_else(|| {
anyhow::anyhow!("No items found matching the search criteria")
})?;
pb.finish_and_clear();
println!(
"Item: {} [{}]",
item.id,
item.collection.as_deref().unwrap_or("-")
);
let asset_key = if let Some(ref k) = asset {
k.clone()
} else {
if let Some((k, _)) = item.first_cog_asset() {
println!("Auto-detected COG asset: {}", k);
k.clone()
} else if let Some((k, _)) = item.first_zarr_asset() {
println!("Auto-detected Zarr asset: {}", k);
k.clone()
} else {
anyhow::bail!(
"No COG or Zarr asset found. Specify --asset explicitly. Available: {}",
item.assets
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ")
);
}
};
let stac_asset = item.asset(&asset_key).ok_or_else(|| {
anyhow::anyhow!(
"Asset '{}' not found. Available: {}",
asset_key,
item.assets
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ")
)
})?;
let format = item.asset_format(&asset_key);
let href = client
.sign_asset_href(
&stac_asset.href,
item.collection.as_deref().unwrap_or(""),
)
.context("Failed to sign asset URL")?;
match format {
#[cfg(feature = "zarr")]
surtgis_cloud::AssetFormat::Zarr => {
use surtgis_cloud::blocking::ZarrReaderBlocking;
use surtgis_cloud::{ZarrReaderOptions, TimeReduction, TimeSelector};
let var = variable.as_deref().ok_or_else(|| {
anyhow::anyhow!(
"--variable is required for Zarr assets. \
Hint: the store may contain variables like 'precipitation_amount_1hour_Accumulation', 'ppt', etc."
)
})?;
let sas_token = href.split_once('?').map(|(_, q)| q.to_string());
let base_url = href.split('?').next().unwrap_or(&href);
let opts = ZarrReaderOptions {
sas_token,
};
let time = parse_time_step(&time_step)?;
let pb = spinner("Opening Zarr store...");
let start = Instant::now();
let reader = ZarrReaderBlocking::open(base_url, var, opts)
.context("Failed to open Zarr store")?;
let meta = reader.metadata();
println!(
"Zarr: {} — shape {:?}, dims {:?}",
meta.variable, meta.shape, meta.dimension_names
);
if let Some((t0, t1)) = &meta.time_range {
println!("Time range: {} to {}", t0.format("%Y-%m-%d"), t1.format("%Y-%m-%d"));
}
pb.finish_and_clear();
let pb = spinner("Reading Zarr subset...");
let raster = reader
.read_bbox(&bb, &time)
.context("Failed to read Zarr subset")?;
pb.finish_and_clear();
let elapsed = start.elapsed();
let (rows, cols) = raster.shape();
println!(
"Fetched: {} x {} ({} cells)",
cols, rows, raster.len()
);
write_result(&raster, &output, compress)?;
done("STAC Zarr fetch", &output, elapsed);
}
_ => {
let pb = spinner("Fetching COG tiles...");
let start = Instant::now();
let opts = CogReaderOptions::default();
let mut reader = CogReaderBlocking::open(&href, opts)
.context("Failed to open remote COG")?;
let read_bb = {
use surtgis_cloud::reproject;
let epsg = item.epsg().or_else(|| {
reader
.metadata()
.crs
.as_ref()
.and_then(|c| c.epsg())
});
if let Some(epsg) = epsg {
if !reproject::is_wgs84(epsg) {
let reprojected =
reproject::reproject_bbox_to_cog(&bb, epsg);
println!("Reprojected bbox to EPSG:{}", epsg);
reprojected
} else {
bb
}
} else {
bb
}
};
let raster: surtgis_core::Raster<f64> = reader
.read_bbox(&read_bb, None)
.context("Failed to read bounding box from COG")?;
pb.finish_and_clear();
let elapsed = start.elapsed();
let (rows, cols) = raster.shape();
println!(
"Fetched: {} x {} ({} cells)",
cols,
rows,
raster.len()
);
write_result(&raster, &output, compress)?;
done("STAC fetch", &output, elapsed);
}
}
}
StacCommands::FetchMosaic {
catalog,
bbox,
collection,
asset,
datetime,
max_items,
output,
} => {
let cat = StacCatalog::from_str_or_url(&catalog);
let bb = parse_bbox(&bbox)?;
let mut params = StacSearchParams::new()
.bbox(bb.min_x, bb.min_y, bb.max_x, bb.max_y)
.collections(&[collection.as_str()])
.limit(max_items);
if let Some(ref dt) = datetime {
params = params.datetime(dt);
}
let pb = spinner("Searching STAC catalog...");
let client_opts = StacClientOptions {
max_items: max_items as usize,
..StacClientOptions::default()
};
let client = StacClientBlocking::new(cat, client_opts)
.context("Failed to create STAC client")?;
let items = client.search_all(¶ms).context("STAC search failed")?;
pb.finish_and_clear();
if items.is_empty() {
anyhow::bail!("No items found matching the search criteria");
}
println!("Found {} items, fetching and mosaicking...", items.len());
let asset_key = if let Some(ref k) = asset {
k.clone()
} else {
let (k, _) = items[0].first_cog_asset().ok_or_else(|| {
anyhow::anyhow!(
"No COG asset found. Specify --asset explicitly. Available: {}",
items[0]
.assets
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ")
)
})?;
println!("Auto-detected asset: {}", k);
k.clone()
};
let start = Instant::now();
let mut rasters: Vec<surtgis_core::Raster<f64>> = Vec::new();
for (i, item) in items.iter().enumerate() {
let pb = spinner(&format!(
"Fetching tile {} of {} [{}]...",
i + 1,
items.len(),
item.id
));
let stac_asset = match item.asset(&asset_key) {
Some(a) => a,
None => {
pb.finish_and_clear();
eprintln!(
" Warning: item {} missing asset '{}', skipping",
item.id, asset_key
);
continue;
}
};
let href = client
.sign_asset_href(
&stac_asset.href,
item.collection.as_deref().unwrap_or(""),
)
.context("Failed to sign asset URL")?;
let opts = CogReaderOptions::default();
let mut reader = match CogReaderBlocking::open(&href, opts) {
Ok(r) => r,
Err(e) => {
pb.finish_and_clear();
eprintln!(
" Warning: failed to open COG for {}: {}, skipping",
item.id, e
);
continue;
}
};
let read_bb = {
use surtgis_cloud::reproject;
let epsg = item.epsg().or_else(|| {
reader
.metadata()
.crs
.as_ref()
.and_then(|c| c.epsg())
});
if let Some(epsg) = epsg {
if !reproject::is_wgs84(epsg) {
reproject::reproject_bbox_to_cog(&bb, epsg)
} else {
bb
}
} else {
bb
}
};
match reader.read_bbox::<f64>(&read_bb, None) {
Ok(raster) => {
pb.finish_and_clear();
let (rows, cols) = raster.shape();
println!(
" [{}/{}] {} -- {} x {}",
i + 1,
items.len(),
item.id,
cols,
rows
);
rasters.push(raster);
}
Err(e) => {
pb.finish_and_clear();
eprintln!(
" Warning: failed to read tile {}: {}, skipping",
item.id, e
);
}
}
}
if rasters.is_empty() {
anyhow::bail!("No tiles were successfully fetched");
}
let pb = spinner("Mosaicking tiles...");
let refs: Vec<&surtgis_core::Raster<f64>> = rasters.iter().collect();
let result = surtgis_core::mosaic(&refs, None)
.context("Failed to mosaic tiles")?;
pb.finish_and_clear();
let elapsed = start.elapsed();
let (rows, cols) = result.shape();
println!(
"Mosaic: {} tiles -> {} x {} ({} cells)",
rasters.len(),
cols,
rows,
result.len()
);
write_result(&result, &output, compress)?;
done("STAC fetch-mosaic", &output, elapsed);
}
StacCommands::Composite {
catalog,
bbox,
collection,
asset,
datetime,
max_scenes,
scl_asset: _scl_asset, scl_keep: _scl_keep, align_to,
naming,
cache,
strip_rows: cli_strip_rows,
output,
} => {
let assets: Vec<&str> = asset.split(',').map(|s| s.trim()).collect();
if assets.len() > 1 {
return handle_multiband_composite(
&catalog, &bbox, &collection, &assets,
&datetime, max_scenes,
align_to.as_ref(), &output, &naming, cache, cli_strip_rows, compress,
);
}
let profile = CollectionProfile::from_collection_name(&collection)?;
let mask_asset_name = profile.mask_asset_name();
eprintln!("📷 Collection: {}", profile.description());
let cat = StacCatalog::from_str_or_url(&catalog);
let bb = parse_bbox(&bbox)?;
let bbox_width_deg = (bb.max_x - bb.min_x).abs();
let bbox_height_deg = (bb.max_y - bb.min_y).abs();
let bbox_width_km = bbox_width_deg * 111.0;
let bbox_height_km = bbox_height_deg * 111.0;
let tiles_x = ((bbox_width_km / 60.0).ceil() as usize).max(1);
let tiles_y = ((bbox_height_km / 60.0).ceil() as usize).max(1);
let estimated_spatial_tiles = tiles_x * tiles_y;
let search_limit = ((estimated_spatial_tiles * max_scenes * 5).max(1000)) as u32;
let search_limit = search_limit.min(10000);
eprintln!(" bbox: {:.1}×{:.1} km, ~{} spatial tiles, search_limit={}",
bbox_width_km, bbox_height_km, estimated_spatial_tiles, search_limit);
let params = StacSearchParams::new()
.bbox(bb.min_x, bb.min_y, bb.max_x, bb.max_y)
.collections(&[collection.as_str()])
.datetime(&datetime)
.limit(search_limit.min(cat.max_page_size()));
let pb = spinner("Searching STAC catalog...");
let client_opts = StacClientOptions {
max_items: search_limit as usize,
..StacClientOptions::default()
};
let client = StacClientBlocking::new(cat, client_opts)
.context("Failed to create STAC client")?;
let items = client.search_all(¶ms).context("STAC search failed")?;
pb.finish_and_clear();
if items.is_empty() {
anyhow::bail!("No items found matching the search criteria");
}
let mut by_date: BTreeMap<String, Vec<&StacItem>> = BTreeMap::new();
for item in &items {
let date = item
.properties
.datetime
.as_deref()
.unwrap_or("")
.get(..10)
.unwrap_or("unknown")
.to_string();
by_date.entry(date).or_default().push(item);
}
let mut date_coverage: Vec<(String, usize)> = by_date.iter()
.map(|(d, items)| (d.clone(), items.len()))
.collect();
date_coverage.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
let dates: Vec<String> = date_coverage.into_iter()
.take(max_scenes)
.map(|(d, _)| d)
.collect();
let tiles_per_date: Vec<usize> = dates.iter()
.map(|d| by_date.get(d).map(|g| g.len()).unwrap_or(0))
.collect();
let min_tiles = tiles_per_date.iter().min().copied().unwrap_or(0);
let max_tiles = tiles_per_date.iter().max().copied().unwrap_or(0);
let mut item_south = f64::INFINITY;
let mut item_north = f64::NEG_INFINITY;
for item in &items {
if let Some(bbox_arr) = &item.bbox {
if bbox_arr.len() >= 4 {
item_south = item_south.min(bbox_arr[1]);
item_north = item_north.max(bbox_arr[3]);
}
}
}
let coverage_south = item_south <= bb.min_y + 0.1; let coverage_north = item_north >= bb.max_y - 0.1;
println!(
"Found {} items across {} dates (using {} dates, {}-{} tiles/date)",
items.len(),
by_date.len(),
dates.len(),
min_tiles, max_tiles,
);
if !coverage_south || !coverage_north {
eprintln!(" ⚠ SPATIAL COVERAGE WARNING: items cover lat [{:.2}, {:.2}] but bbox needs [{:.2}, {:.2}]",
item_south, item_north, bb.min_y, bb.max_y);
if items.len() >= search_limit as usize {
eprintln!(" ⚠ Search hit limit ({}) — may need more items for full coverage. Consider reducing date range.",
search_limit);
}
}
let start = Instant::now();
#[allow(dead_code)]
struct TileRef {
data_href: String,
scl_href: String,
original_data_href: String,
original_scl_href: String,
epsg: Option<u32>,
signed_at: Instant,
}
struct SceneInfo {
date: String,
tiles: Vec<TileRef>,
epsg: Option<u32>, }
let mut scenes: Vec<SceneInfo> = Vec::new();
for date in &dates {
let group = &by_date[date];
let mut tiles = Vec::new();
let mut scene_epsg = None;
let mut sign_failures = 0usize;
let mut asset_missing = 0usize;
let mut mask_missing = 0usize;
let is_verbose = tracing::enabled!(tracing::Level::DEBUG);
for item in group {
let tile_epsg = item.epsg();
let item_id = item.id.as_str();
let data_result = match resolve_asset_key(item, &asset) {
Some((resolved_key, a)) => {
debug!("item={} asset='{}' → '{}' ✓", item_id, asset, resolved_key);
let original = a.href.clone();
match client.sign_asset_href(&a.href, item.collection.as_deref().unwrap_or("")) {
Ok(h) => Some((h, original)),
Err(e) => {
sign_failures += 1;
debug!("item={} asset='{}' sign FAILED: {}", item_id, asset, e);
None
}
}
}
None => {
asset_missing += 1;
if is_verbose || (scenes.is_empty() && tiles.is_empty()) {
let available: Vec<&str> = item.assets.keys().map(|k| k.as_str()).collect();
eprintln!(" [resolve] item={} data asset '{}' NOT FOUND. Available: {}",
item_id, asset, available.join(", "));
}
None
}
};
let scl_result = mask_asset_name.and_then(|mask_name| {
match resolve_asset_key(item, mask_name) {
Some((resolved_key, a)) => {
debug!("item={} mask='{}' → '{}' ✓", item_id, mask_name, resolved_key);
let original = a.href.clone();
match client.sign_asset_href(&a.href, item.collection.as_deref().unwrap_or("")) {
Ok(h) => Some((h, original)),
Err(_) => { sign_failures += 1; None }
}
}
None => {
mask_missing += 1;
if is_verbose && mask_missing <= 2 {
eprintln!(" [resolve] item={} mask '{}' not found → no cloud masking",
item_id, mask_name);
}
None
}
}
});
if let Some((dh, orig_dh)) = data_result {
if scene_epsg.is_none() {
scene_epsg = tile_epsg;
}
let (sh, orig_sh) = scl_result.unwrap_or_default();
tiles.push(TileRef {
data_href: dh,
scl_href: sh,
original_data_href: orig_dh,
original_scl_href: orig_sh,
epsg: tile_epsg,
signed_at: Instant::now(),
});
}
}
if tiles.is_empty() {
eprintln!(" ⚠ {}: 0 tiles resolved (items={}, sign_fail={}, data_missing={}, mask_missing={})",
date, group.len(), sign_failures, asset_missing, mask_missing);
} else if mask_missing > 0 && scenes.is_empty() {
eprintln!(" ℹ {}: {} tiles (mask unavailable for {} items → no cloud masking)",
date, tiles.len(), mask_missing);
}
if !tiles.is_empty() {
scenes.push(SceneInfo {
date: date.clone(),
tiles,
epsg: scene_epsg,
});
}
}
if scenes.is_empty() {
anyhow::bail!("No valid scenes found");
}
let first_href = &scenes[0].tiles[0].data_href;
let opts = CogReaderOptions::default();
let probe_reader = CogReaderBlocking::open(first_href, opts)
.context("Failed to probe first COG")?;
let probe_meta = probe_reader.metadata();
let (out_cols, out_rows, out_transform, out_crs, cog_bb);
eprintln!(" align_to = {:?}", align_to.as_ref().map(|p| p.display().to_string()));
if let Some(ref align_path) = align_to {
let reference: surtgis_core::Raster<f64> =
surtgis_core::io::read_geotiff(align_path, None)
.context("Failed to read alignment reference raster")?;
let rgt = reference.transform();
let (rr, rc) = reference.shape();
out_rows = rr;
out_cols = rc;
out_transform = *rgt;
out_crs = reference.crs().cloned();
let min_x = rgt.origin_x;
let max_y = rgt.origin_y;
let max_x = min_x + rc as f64 * rgt.pixel_width;
let min_y = max_y + rr as f64 * rgt.pixel_height; cog_bb = BBox::new(min_x, min_y, max_x, max_y);
eprintln!(" Using reference grid from --align-to: {}x{} px=({:.1},{:.1})",
rc, rr, rgt.pixel_width, rgt.pixel_height);
} else {
let cog_bb_computed = {
use surtgis_cloud::reproject;
if let Some(epsg) = scenes[0].epsg {
if !reproject::is_wgs84(epsg) {
reproject::reproject_bbox_to_cog(&bb, epsg)
} else { bb }
} else { bb }
};
cog_bb = cog_bb_computed;
let pixel_width = probe_meta.geo_transform.pixel_width.abs();
let pixel_height = probe_meta.geo_transform.pixel_height.abs();
out_cols = ((cog_bb.max_x - cog_bb.min_x) / pixel_width).round() as usize;
out_rows = ((cog_bb.max_y - cog_bb.min_y) / pixel_height).round() as usize;
out_transform = surtgis_core::GeoTransform::new(
cog_bb.min_x, cog_bb.max_y, pixel_width, -pixel_height,
);
out_crs = scenes[0].epsg.map(surtgis_core::CRS::from_epsg);
}
println!(
"Output grid: {} x {} ({:.1}M cells), {} dates",
out_cols, out_rows,
(out_cols * out_rows) as f64 / 1e6,
scenes.len()
);
eprintln!(" Grid config: cols={} rows={} px=({:.1},{:.1})",
out_cols, out_rows, out_transform.pixel_width, out_transform.pixel_height);
eprintln!(" Grid bbox: x=[{:.1}, {:.1}] y=[{:.1}, {:.1}]",
cog_bb.min_x, cog_bb.max_x, cog_bb.min_y, cog_bb.max_y);
let strip_rows = 512usize; let num_strips = (out_rows + strip_rows - 1) / strip_rows;
let n_scenes = scenes.len();
let out_epsg_for_tiles = out_crs.as_ref().and_then(|c| c.epsg());
let config = surtgis_core::io::StripWriterConfig {
rows: out_rows,
cols: out_cols,
transform: out_transform,
crs: out_crs,
nodata: Some(f64::NAN),
compress,
rows_per_strip: strip_rows as u32,
};
const TOKEN_REFRESH_THRESHOLD: Duration = Duration::from_secs(30 * 60);
let scenes_ref = &mut scenes;
let client_ref = &client;
let cloud_mask_strategy = match &profile {
CollectionProfile::Sentinel2L2A {
cloud_mask_strategy,
} => cloud_mask_strategy.clone(),
CollectionProfile::LandsatC2L2 {
cloud_mask_strategy,
} => cloud_mask_strategy.clone(),
CollectionProfile::Sentinel1RTC => Arc::new(NoCloudMask),
};
let mask_ref = &cloud_mask_strategy;
let mut strip_idx_counter = 0usize;
surtgis_core::io::write_geotiff_streaming(
&output,
&config,
|_strip_idx, strip_out_rows| {
let current_strip = strip_idx_counter;
strip_idx_counter += 1;
let row_start = current_strip * strip_rows;
let row_end = (row_start + strip_out_rows).min(out_rows);
let actual_rows = row_end - row_start;
let ph = out_transform.pixel_height.abs();
let strip_min_y = cog_bb.max_y - (row_end as f64 * ph);
let strip_max_y = cog_bb.max_y - (row_start as f64 * ph);
let strip_bb = BBox::new(
cog_bb.min_x, strip_min_y, cog_bb.max_x, strip_max_y,
);
eprintln!(" Strip {}/{}: bbox y=[{:.0}, {:.0}]",
current_strip + 1, num_strips, strip_min_y, strip_max_y);
print!(
"\r Strip {}/{} (rows {}-{})...",
current_strip + 1, num_strips, row_start, row_end
);
let mut scene_strips: Vec<ndarray::Array2<f64>> = Vec::with_capacity(n_scenes);
let strip_ref = {
let mut r = surtgis_core::Raster::<f64>::new(actual_rows, out_cols);
r.set_transform(surtgis_core::GeoTransform::new(
strip_bb.min_x, strip_bb.max_y,
out_transform.pixel_width, out_transform.pixel_height,
));
r
};
for scene in scenes_ref.iter_mut() {
let is_first_strip = current_strip == 0;
let token_age = scene.tiles.first()
.map(|t| t.signed_at.elapsed());
if token_age.map(|age| age > TOKEN_REFRESH_THRESHOLD).unwrap_or(false) {
let age_secs = token_age.unwrap().as_secs();
let mut refreshed = 0usize;
for tile in &mut scene.tiles {
if let Ok(h) = client_ref.sign_asset_href(&tile.original_data_href, "") {
tile.data_href = h;
}
if !tile.original_scl_href.is_empty() {
if let Ok(h) = client_ref.sign_asset_href(&tile.original_scl_href, "") {
tile.scl_href = h;
}
}
tile.signed_at = Instant::now();
refreshed += 1;
}
eprintln!(" [token] Re-signed {} tiles for {} (tokens were {}s old)",
refreshed, scene.date, age_secs);
}
let pad = 100.0; let tile_bb = BBox::new(
strip_bb.min_x - pad,
strip_bb.min_y - pad,
strip_bb.max_x + pad,
strip_bb.max_y + pad,
);
let tile_results: Vec<(
Option<surtgis_core::Raster<f64>>,
Option<surtgis_core::Raster<f64>>,
)> = {
let mut handles = Vec::with_capacity(scene.tiles.len());
let out_epsg = out_epsg_for_tiles;
for (tile_idx, tile) in scene.tiles.iter().enumerate() {
let data_href = tile.data_href.clone();
let scl_href = tile.scl_href.clone();
let bb = if let (Some(tile_epsg), Some(out_e)) = (tile.epsg, out_epsg) {
if tile_epsg != out_e {
reproject_bbox_between_crs(&tile_bb, out_e, tile_epsg)
} else {
tile_bb
}
} else {
tile_bb
};
let first = is_first_strip && tile_idx == 0;
handles.push(std::thread::spawn(move || {
let data = read_cog_tile(&data_href, &bb, first);
let mask = if scl_href.is_empty() {
None
} else {
read_cog_tile_raw(&scl_href, &bb)
};
(data, mask)
}));
}
handles.into_iter().map(|h| h.join().unwrap_or((None, None))).collect()
};
let mut data_tiles: Vec<surtgis_core::Raster<f64>> = Vec::new();
let mut scl_tiles: Vec<surtgis_core::Raster<f64>> = Vec::new();
let mut data_ok = 0usize;
let mut data_fail = 0usize;
let mut scl_ok = 0usize;
for (data_opt, mask_opt) in tile_results {
if let Some(r) = data_opt {
data_tiles.push(r);
data_ok += 1;
} else {
data_fail += 1;
}
if let Some(r) = mask_opt {
scl_tiles.push(r);
scl_ok += 1;
}
}
if is_first_strip {
eprintln!(" ℹ {}: {} tiles (parallel), data={}/{} mask={}",
scene.date, scene.tiles.len(),
data_ok, data_ok + data_fail, scl_ok);
}
if data_tiles.is_empty() {
if is_first_strip {
eprintln!(" ⚠ {}: SKIPPED (no data tiles)", scene.date);
}
continue;
}
fn unify_crs(tiles: &mut Vec<surtgis_core::Raster<f64>>) {
if tiles.len() <= 1 { return; }
let target_epsg = tiles[0].crs().and_then(|c| c.epsg());
if let Some(target) = target_epsg {
for i in 1..tiles.len() {
if let Some(src_epsg) = tiles[i].crs().and_then(|c| c.epsg()) {
if src_epsg != target {
if let Some(reprojected) = surtgis_cloud::reproject::reproject_raster_utm(&tiles[i], src_epsg, target) {
tiles[i] = reprojected;
}
}
}
}
}
}
unify_crs(&mut data_tiles);
if !scl_tiles.is_empty() {
unify_crs(&mut scl_tiles);
}
if is_first_strip {
eprintln!(" ℹ {}: mosaic input: data={} mask={} tiles",
scene.date, data_tiles.len(), scl_tiles.len());
}
let data_m = if data_tiles.len() == 1 {
data_tiles.into_iter().next().unwrap()
} else {
let refs: Vec<&surtgis_core::Raster<f64>> = data_tiles.iter().collect();
match surtgis_core::mosaic(&refs, None) {
Ok(m) => m,
Err(e) => {
eprintln!(" ⚠ Mosaic failed for date (data): {}", e);
continue;
}
}
};
let scl_m = if scl_tiles.is_empty() {
None
} else if scl_tiles.len() == 1 {
Some(scl_tiles.into_iter().next().unwrap())
} else {
let refs: Vec<&surtgis_core::Raster<f64>> = scl_tiles.iter().collect();
match surtgis_core::mosaic(&refs, None) {
Ok(m) => Some(m),
Err(e) => {
eprintln!(" ⚠ Mosaic failed for date (mask): {}", e);
None
}
}
};
if is_first_strip {
let dgt = data_m.transform();
let data_valid = data_m.data().iter().filter(|v| v.is_finite() && **v > 0.0).count();
let data_total = data_m.data().len();
eprintln!(" [mosaic] data_m: {}x{} px=({:.1},{:.1}) valid={}/{} ({:.0}%)",
data_m.shape().1, data_m.shape().0,
dgt.pixel_width, dgt.pixel_height,
data_valid, data_total,
if data_total > 0 { data_valid as f64 / data_total as f64 * 100.0 } else { 0.0 });
if let Some(ref scl) = scl_m {
let scl_valid = scl.data().iter().filter(|v| v.is_finite()).count();
let scl_total = scl.data().len();
eprintln!(" [mosaic] mask: {}x{} px=({:.1},{:.1}) valid={}/{} ({:.0}%)",
scl.shape().1, scl.shape().0,
scl.transform().pixel_width, scl.transform().pixel_height,
scl_valid, scl_total,
if scl_total > 0 { scl_valid as f64 / scl_total as f64 * 100.0 } else { 0.0 });
} else {
eprintln!(" [mosaic] no mask — proceeding without cloud masking");
}
}
if is_first_strip {
if let Some(ref scl) = scl_m {
let mut scl_hist = [0usize; 16];
for v in scl.data().iter() {
let vi = *v as usize;
if vi < 16 { scl_hist[vi] += 1; }
}
let scl_nonzero: Vec<String> = scl_hist.iter().enumerate()
.filter(|(_, c)| **c > 0)
.map(|(i, c)| format!("{}:{}", i, c))
.collect();
eprintln!(" [mask] histogram: {}", scl_nonzero.join(" "));
}
}
let clean_result = if let Some(ref scl) = scl_m {
mask_ref.mask(&data_m, scl).ok()
} else {
Some(data_m) };
match clean_result {
Some(clean) => {
let (clean_r, clean_c) = clean.shape();
if current_strip == 0 {
let clean_valid = clean.data().iter().filter(|v| v.is_finite() && **v > 0.0).count();
let clean_total = clean.data().len();
let cgt = clean.transform();
let rgt = strip_ref.transform();
eprintln!(" [cloud] clean: {}x{} px=({:.1},{:.1}) valid={}/{} ({:.0}%)",
clean_c, clean_r, cgt.pixel_width, cgt.pixel_height,
clean_valid, clean_total,
if clean_total > 0 { clean_valid as f64 / clean_total as f64 * 100.0 } else { 0.0 });
eprintln!(" Resample: src origin=({:.1},{:.1}) px=({:.1},{:.1}) {}x{}",
cgt.origin_x, cgt.origin_y, cgt.pixel_width, cgt.pixel_height, clean_c, clean_r);
eprintln!(" Resample: dst origin=({:.1},{:.1}) px=({:.1},{:.1}) {}x{}",
rgt.origin_x, rgt.origin_y, rgt.pixel_width, rgt.pixel_height,
strip_ref.shape().1, strip_ref.shape().0);
}
let resampled = surtgis_core::resample_to_grid(
&clean, &strip_ref,
surtgis_core::ResampleMethod::Bilinear,
).unwrap_or(clean);
let valid = resampled.data().iter().filter(|v| v.is_finite()).count();
let total = resampled.data().len();
if current_strip == 0 {
let pct = if total > 0 { valid as f64 / total as f64 * 100.0 } else { 0.0 };
eprintln!(", mosaic OK, resample {}x{}→{}x{}, {:.0}% clear ✓",
clean_c, clean_r,
resampled.shape().1, resampled.shape().0, pct);
}
if valid > 0 {
scene_strips.push(resampled.data().to_owned());
} else if current_strip == 0 {
eprintln!(" ⚠ {}: 100% cloudy, skipped", scene.date);
}
}
None => {
if current_strip == 0 {
eprintln!(" ⚠ {}: cloud mask failed, skipping", scene.date);
}
}
}
}
let mut output = ndarray::Array2::<f64>::from_elem(
(actual_rows, out_cols), f64::NAN,
);
if !scene_strips.is_empty() {
let n = scene_strips.len();
if current_strip == 0 {
for (si, strip) in scene_strips.iter().enumerate() {
let vals: Vec<f64> = strip.iter().filter(|v| v.is_finite()).copied().collect();
if !vals.is_empty() {
let min = vals.iter().cloned().fold(f64::INFINITY, f64::min);
let max = vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
let over = vals.iter().filter(|&&v| v > 10000.0).count();
if si < 3 || over > 0 {
eprintln!(" scene_strip[{}]: n={} range=[{:.0},{:.0}] >10k={}",
si, vals.len(), min, max, over);
}
}
}
}
let mut coverage: Vec<(usize, usize)> = scene_strips.iter().enumerate()
.map(|(i, s)| {
let valid = s.iter().filter(|v| v.is_finite()).count();
(i, valid)
})
.collect();
coverage.sort_by(|a, b| b.1.cmp(&a.1));
for r in 0..actual_rows {
for c in 0..out_cols {
let mut values: Vec<f64> = Vec::with_capacity(n);
for strip in &scene_strips {
if r < strip.nrows() && c < strip.ncols() {
let v = strip[[r, c]];
if v.is_finite() {
values.push(v);
}
}
}
if !values.is_empty() {
values.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap());
let mid = values.len() / 2;
output[[r, c]] = if values.len() % 2 == 0 {
(values[mid - 1] + values[mid]) / 2.0
} else {
values[mid]
};
}
}
}
let nan_before = output.iter().filter(|v| !v.is_finite()).count();
if nan_before > 0 {
for &(scene_idx, _) in &coverage {
let strip = &scene_strips[scene_idx];
let mut filled = 0usize;
for r in 0..actual_rows {
for c in 0..out_cols {
if !output[[r, c]].is_finite()
&& r < strip.nrows() && c < strip.ncols()
{
let v = strip[[r, c]];
if v.is_finite() {
output[[r, c]] = v;
filled += 1;
}
}
}
}
if filled == 0 { continue; }
let nan_remaining = output.iter().filter(|v| !v.is_finite()).count();
if nan_remaining == 0 { break; }
}
}
let mut nan_remaining = output.iter().filter(|v| !v.is_finite()).count();
if nan_remaining > 0 && nan_remaining < output.len() {
let max_passes = 20;
for _pass in 0..max_passes {
let prev = output.clone();
let mut filled_this_pass = 0usize;
for r in 0..actual_rows {
for c in 0..out_cols {
if prev[[r, c]].is_finite() { continue; }
let mut sum = 0.0;
let mut cnt = 0u32;
for dr in -1i32..=1 {
for dc in -1i32..=1 {
let nr = r as i32 + dr;
let nc = c as i32 + dc;
if nr >= 0 && nr < actual_rows as i32
&& nc >= 0 && nc < out_cols as i32
{
let v = prev[[nr as usize, nc as usize]];
if v.is_finite() {
sum += v;
cnt += 1;
}
}
}
}
if cnt >= 2 {
output[[r, c]] = sum / cnt as f64;
filled_this_pass += 1;
}
}
}
if filled_this_pass == 0 { break; }
nan_remaining = output.iter().filter(|v| !v.is_finite()).count();
if nan_remaining == 0 { break; }
}
}
}
let valid_count = output.iter().filter(|v| v.is_finite()).count();
let total = output.len();
let pct = if total > 0 { valid_count as f64 / total as f64 * 100.0 } else { 0.0 };
if current_strip == 0 || current_strip == num_strips - 1 || scene_strips.is_empty() {
let valid_vals: Vec<f64> = output.iter().filter(|v| v.is_finite()).copied().collect();
let (vmin, vmax, vmean) = if !valid_vals.is_empty() {
let min = valid_vals.iter().cloned().fold(f64::INFINITY, f64::min);
let max = valid_vals.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
let mean = valid_vals.iter().sum::<f64>() / valid_vals.len() as f64;
(min, max, mean)
} else { (0.0, 0.0, 0.0) };
let over_10k = valid_vals.iter().filter(|&&v| v > 10000.0).count();
eprintln!(" Strip {}/{}: scenes={}, {:.0}% valid, range=[{:.0},{:.0}] mean={:.0} >10k={}",
current_strip + 1, num_strips,
scene_strips.len(), pct, vmin, vmax, vmean, over_10k);
}
Ok(output)
},
).context("Failed to write streaming composite")?;
let file_size = std::fs::metadata(&output).map(|m| m.len()).unwrap_or(0);
let expected_size = (out_rows * out_cols * 4) as u64; eprintln!(" File written: {} bytes ({:.1} MB), expected ~{:.1} MB",
file_size, file_size as f64 / 1e6, expected_size as f64 / 1e6);
println!();
if false && align_to.is_some() {
let align_path = align_to.as_ref().unwrap();
let composite: surtgis_core::Raster<f64> =
surtgis_core::io::read_geotiff(&output, None)
.context("Failed to re-read composite for alignment")?;
let reference: surtgis_core::Raster<f64> =
surtgis_core::io::read_geotiff(align_path, None)
.context("Failed to read alignment reference raster")?;
let cgt = composite.transform();
let (cr, cc) = composite.shape();
let rgt = reference.transform();
let (rr, rc) = reference.shape();
eprintln!(" Composite transform: origin=({:.1},{:.1}) px=({:.1},{:.1}) shape={}x{}",
cgt.origin_x, cgt.origin_y, cgt.pixel_width, cgt.pixel_height, cr, cc);
eprintln!(" Reference transform: origin=({:.1},{:.1}) px=({:.1},{:.1}) shape={}x{}",
rgt.origin_x, rgt.origin_y, rgt.pixel_width, rgt.pixel_height, rr, rc);
let sample_x = rgt.origin_x + 0.5 * rgt.pixel_width;
let sample_y = rgt.origin_y + 0.5 * rgt.pixel_height;
let src_col_f = (sample_x - cgt.origin_x) / cgt.pixel_width - 0.5;
let src_row_f = (sample_y - cgt.origin_y) / cgt.pixel_height - 0.5;
eprintln!(" Pixel(0,0) ref→comp: geo=({:.1},{:.1}) → src_pix=({:.1},{:.1}) bounds=(0..{},0..{})",
sample_x, sample_y, src_col_f, src_row_f, cc, cr);
let pb = spinner("Aligning to reference grid...");
let aligned = surtgis_core::resample_to_grid(
&composite,
&reference,
surtgis_core::ResampleMethod::Bilinear,
)
.context("Failed to resample to reference grid")?;
pb.finish_and_clear();
let (ar, ac) = aligned.shape();
let valid_aligned = aligned.data().iter().filter(|v| v.is_finite()).count();
let total_aligned = ar * ac;
let pct_aligned = if total_aligned > 0 { valid_aligned as f64 / total_aligned as f64 * 100.0 } else { 0.0 };
eprintln!(" Aligned result: {}x{}, {}/{} valid ({:.1}%)",
ac, ar, valid_aligned, total_aligned, pct_aligned);
if valid_aligned == 0 {
eprintln!(" ⚠ WARNING: Aligned raster has 0% valid pixels!");
eprintln!(" Composite CRS may differ from reference DEM CRS.");
eprintln!(" Composite origin: ({:.1}, {:.1}), Reference origin: ({:.1}, {:.1})",
cgt.origin_x, cgt.origin_y, rgt.origin_x, rgt.origin_y);
}
let nan_post = aligned.data().iter().filter(|v| !v.is_finite()).count();
let mut aligned = aligned;
if nan_post > 0 && nan_post < total_aligned {
let (ar, ac) = aligned.shape();
let max_passes = 30;
for _pass in 0..max_passes {
let prev = aligned.data().clone();
let mut filled = 0usize;
for r in 0..ar {
for c in 0..ac {
if prev[[r, c]].is_finite() { continue; }
let mut sum = 0.0;
let mut cnt = 0u32;
for dr in -1i32..=1 {
for dc in -1i32..=1 {
let nr = r as i32 + dr;
let nc = c as i32 + dc;
if nr >= 0 && nr < ar as i32 && nc >= 0 && nc < ac as i32 {
let v = prev[[nr as usize, nc as usize]];
if v.is_finite() { sum += v; cnt += 1; }
}
}
}
if cnt >= 2 {
aligned.set(r, c, sum / cnt as f64);
filled += 1;
}
}
}
if filled == 0 { break; }
let remaining = aligned.data().iter().filter(|v| !v.is_finite()).count();
if remaining == 0 { break; }
}
let final_valid = aligned.data().iter().filter(|v| v.is_finite()).count();
eprintln!(" Post-align gap-fill: {:.1}% → {:.1}%",
pct_aligned, final_valid as f64 / total_aligned as f64 * 100.0);
}
write_result(&aligned, &output, compress)?;
println!(
"Aligned to reference: {} x {} -> {} x {}",
out_cols, out_rows, ac, ar
);
}
let elapsed = start.elapsed();
let total_items: usize = scenes.iter().map(|s| s.tiles.len()).sum();
println!(
"Composite: {} dates ({} tiles) -> {} x {} ({:.1}M cells)",
scenes.len(), total_items, out_cols, out_rows,
(out_cols * out_rows) as f64 / 1e6,
);
eprintln!(
" ℹ If fewer dates than expected were used, check ⚠ warnings above.\n Common causes: CRS mismatch across UTM zones, cloud mask failures."
);
done("STAC composite", &output, elapsed);
}
StacCommands::ListCatalogs { search } => {
println!("📚 Available STAC Catalogs\n");
if let Some(ref query) = search {
println!("🔍 Searching for: '{}'\n", query);
}
println!("═══════════════════════════════════════════════════════════════");
println!("CURATED CATALOGS (reliable, actively maintained):\n");
let mut curated_matches = 0;
for catalog in get_known_catalogs() {
if catalog.shorthand == "osgeo" {
continue;
}
if let Some(ref q) = search {
let query_lower = q.to_lowercase();
let matches = catalog.name.to_lowercase().contains(&query_lower)
|| catalog.description.to_lowercase().contains(&query_lower)
|| catalog.shorthand.to_lowercase().contains(&query_lower);
if !matches {
continue;
}
}
curated_matches += 1;
println!("{:<6} {}", catalog.shorthand, catalog.name);
println!(" {}", catalog.description);
println!(" URL: {}\n", catalog.url);
}
println!("═══════════════════════════════════════════════════════════════");
println!("STAC INDEX (dynamically discovered, 1000+ catalogs):\n");
match fetch_stac_index_catalogs() {
Ok(mut all_catalogs) => {
if let Some(ref q) = search {
let query_lower = q.to_lowercase();
all_catalogs.retain(|cat| {
cat.title.to_lowercase().contains(&query_lower)
|| cat.id.to_lowercase().contains(&query_lower)
|| cat.description
.as_ref()
.map(|d| d.to_lowercase().contains(&query_lower))
.unwrap_or(false)
});
}
if all_catalogs.is_empty() {
if search.is_some() {
println!(" ⚠️ No STAC Index catalogs match your search");
} else {
println!(" ⚠️ No catalogs found in STAC Index");
}
} else {
let total = all_catalogs.len();
let display_count = all_catalogs.len().min(15);
println!(" Showing {} of {} available catalogs:\n", display_count, total);
for (i, catalog_info) in all_catalogs.iter().take(display_count).enumerate() {
let idx = i + 1;
println!(" [{}] {} ({})", idx, catalog_info.title, catalog_info.id);
if let Some(desc) = &catalog_info.description {
let preview = truncate_utf8(desc, 70);
println!(" {}", preview);
}
if let Some(url) = &catalog_info.url {
println!(" 🔗 {}", url);
}
println!();
}
if total > 15 {
println!(" ... and {} more catalogs in STAC Index", total - 15);
}
}
}
Err(e) => {
if search.is_some() {
eprintln!(" ⚠️ Could not search STAC Index: {}", e);
} else {
eprintln!(" ⚠️ Could not fetch STAC Index: {}", e);
}
eprintln!(" (This is optional - you can still use curated catalogs above)");
}
}
println!("\n═══════════════════════════════════════════════════════════════");
println!("💡 You can also use any custom STAC API URL:");
println!(" surtgis stac search --catalog https://your-stac-api.com/v1 ...");
println!("\n💡 Search for specific data types:");
println!(" surtgis stac list-catalogs --search sentinel-2");
println!(" surtgis stac list-catalogs --search dem");
println!(" surtgis stac list-catalogs --search thermal\n");
}
StacCommands::ListCollections { catalog } => {
println!("📍 Catalog: {}\n", catalog);
let collections = get_catalog_collections(&catalog);
println!("📊 Available Collections:\n");
let is_unknown = collections.len() == 1 && collections[0].0 == "(unknown)";
if is_unknown {
println!(" {}", collections[0].1);
} else {
for (id, desc) in &collections {
println!(" • {} - {}", id, desc);
}
println!("\n✨ Total: {} collections", collections.len());
}
println!("\n💡 Usage: surtgis stac composite --catalog {} --collection <id> --asset <band> ...", catalog);
}
StacCommands::TimeSeries {
catalog, bbox, collection, asset, datetime, interval,
scl_asset, max_scenes, align_to, output,
} => {
let assets: Vec<&str> = asset.split(',').map(|s| s.trim()).collect();
if assets.len() > 1 {
println!("Multi-band time series: {} bands", assets.len());
let total_start = Instant::now();
for (band_idx, band) in assets.iter().enumerate() {
let band_outdir = output.join(band);
println!("\n[{}/{}] Band: {} → {}", band_idx + 1, assets.len(), band, band_outdir.display());
handle_time_series(
&catalog, &bbox, &collection, band, &datetime,
&interval, &scl_asset, max_scenes, align_to.as_ref(),
&band_outdir, compress,
)?;
}
println!("\nAll {} bands complete in {:.1?}", assets.len(), total_start.elapsed());
} else {
handle_time_series(
&catalog, &bbox, &collection, &asset, &datetime,
&interval, &scl_asset, max_scenes, align_to.as_ref(),
&output, compress,
)?;
}
}
#[cfg(feature = "zarr")]
StacCommands::DownloadClimate {
catalog,
bbox,
collection,
variable,
datetime,
aggregate,
output,
} => {
use surtgis_cloud::blocking::ZarrReaderBlocking;
use surtgis_cloud::{AggMethod, TimeReduction, ZarrReaderOptions};
use surtgis_cloud::zarr_auth::abfs_to_https_with_account;
let cat = StacCatalog::from_str_or_url(&catalog);
let bb = parse_bbox(&bbox)?;
let (interval_type, agg_method) = parse_aggregate(&aggregate)?;
let (dt_start, dt_end) = parse_datetime_range(&datetime)?;
let pb = spinner("Searching STAC catalog...");
let client = StacClientBlocking::new(cat, StacClientOptions::default())
.context("Failed to create STAC client")?;
let params = StacSearchParams::new()
.bbox(bb.min_x, bb.min_y, bb.max_x, bb.max_y)
.collections(&[collection.as_str()])
.datetime(&datetime)
.limit(1);
let results = client.search(¶ms).context("STAC search failed")?;
let item = results.features.first().ok_or_else(|| {
anyhow::anyhow!("No items found for collection '{}' in range '{}'", collection, datetime)
})?;
pb.finish_and_clear();
println!("Item: {} [{}]", item.id, item.collection.as_deref().unwrap_or("-"));
let stac_asset = item.asset(&variable).ok_or_else(|| {
anyhow::anyhow!(
"Asset '{}' not found. Available: {}",
variable,
item.assets.keys().cloned().collect::<Vec<_>>().join(", ")
)
})?;
let auth = client.get_collection_zarr_auth(&collection)
.context("Failed to get collection auth")?;
let (sas_token, store_url) = if let Some((token, account, _container)) = auth {
let url = abfs_to_https_with_account(&stac_asset.href, Some(&account));
(Some(token), url)
} else {
(None, stac_asset.href.clone())
};
let opts = ZarrReaderOptions { sas_token };
let pb = spinner("Opening Zarr store...");
let reader = ZarrReaderBlocking::open(&store_url, &variable, opts)
.context("Failed to open Zarr store")?;
let meta = reader.metadata();
println!(
"Variable: {} — shape {:?}, dims {:?}",
meta.variable, meta.shape, meta.dimension_names
);
if let Some((t0, t1)) = &meta.time_range {
println!("Time range: {} to {}", t0.format("%Y-%m-%d"), t1.format("%Y-%m-%d"));
}
pb.finish_and_clear();
let intervals = generate_intervals(dt_start, dt_end, interval_type);
println!(
"Downloading {} intervals ({}) for {}...",
intervals.len(),
aggregate,
variable
);
std::fs::create_dir_all(&output)
.context("Failed to create output directory")?;
let total_start = Instant::now();
for (i, (int_start, int_end, label)) in intervals.iter().enumerate() {
let time = if agg_method.is_some() {
TimeReduction::Aggregate {
start: *int_start,
end: *int_end,
method: agg_method.unwrap(),
}
} else {
TimeReduction::Single(surtgis_cloud::TimeSelector::Nearest(*int_start))
};
let pb = spinner(&format!("[{}/{}] {}", i + 1, intervals.len(), label));
match reader.read_bbox(&bb, &time) {
Ok(raster) => {
let suffix = agg_method.map(|m| match m {
AggMethod::Mean => "mean",
AggMethod::Sum => "sum",
AggMethod::Min => "min",
AggMethod::Max => "max",
}).unwrap_or("value");
let filename = format!("{}_{}.tif", label, suffix);
let out_path = output.join(&filename);
write_result(&raster, &out_path, compress)?;
let (rows, cols) = raster.shape();
pb.finish_and_clear();
println!(" {} — {}x{}", filename, cols, rows);
}
Err(e) => {
pb.finish_and_clear();
eprintln!(" Warning: {} — {}", label, e);
}
}
}
let elapsed = total_start.elapsed();
println!(
"\nDone: {} intervals written to {} in {:.1?}",
intervals.len(),
output.display(),
elapsed
);
}
#[cfg(not(feature = "zarr"))]
StacCommands::DownloadClimate { .. } => {
anyhow::bail!("Zarr support not enabled. Recompile with --features zarr");
}
}
Ok(())
}
pub fn fetch_stac_band(
catalog: &str,
bbox: &str,
collection: &str,
asset: &str,
datetime: &str,
max_scenes: usize,
align_to: Option<&surtgis_core::Raster<f64>>,
) -> Result<surtgis_core::Raster<f64>> {
use surtgis_core::mosaic;
let cat = StacCatalog::from_str_or_url(catalog);
let bb = parse_bbox(bbox)?;
let bbox_w_km = (bb.max_x - bb.min_x).abs() * 111.0;
let bbox_h_km = (bb.max_y - bb.min_y).abs() * 111.0;
let tiles_est = (((bbox_w_km / 60.0).ceil() as usize).max(1))
* (((bbox_h_km / 60.0).ceil() as usize).max(1));
let search_limit = ((tiles_est * max_scenes * 5).max(1000)).min(10000) as u32;
let params = StacSearchParams::new()
.bbox(bb.min_x, bb.min_y, bb.max_x, bb.max_y)
.collections(&[collection])
.datetime(datetime)
.limit(search_limit.min(cat.max_page_size()));
let pb = spinner(&format!(
"Searching for {} scenes (limit={})...",
collection, search_limit
));
let client_opts = StacClientOptions {
max_items: search_limit as usize,
..StacClientOptions::default()
};
let client = StacClientBlocking::new(cat, client_opts)
.context("Failed to create STAC client")?;
let items = client.search_all(¶ms).context("STAC search failed")?;
pb.finish_and_clear();
if items.is_empty() {
anyhow::bail!(
"No items found for {} in bbox {}",
collection,
bbox
);
}
let pb = spinner("Introspecting collection schema...");
let schema = StacCollectionSchema::from_stac_item(collection, &items[0])
.context("Failed to introspect STAC collection")?;
pb.finish_and_clear();
eprintln!("📊 Collection: {}", schema.collection_name);
eprintln!(" Available bands: {}", schema.format_bands());
eprintln!(" Cloud masking: {}", match &schema.cloud_mask_type {
CloudMaskType::Categorical { asset, num_classes } => format!("Categorical {} ({} classes)", asset, num_classes),
CloudMaskType::Bitmask { asset, bits } => format!("Bitmask {} ({} bits)", asset, bits.len()),
CloudMaskType::None => "None (SAR)".to_string(),
});
let band_info = schema.find_band_by_name(asset)
.context(format!(
"Band '{}' not found. Available: {}",
asset, schema.format_bands()
))?;
eprintln!(" Band matched: {} → {}", asset, band_info.asset_key);
let cloud_mask_strategy = create_cloud_mask_strategy(&schema.cloud_mask_type);
for (idx, item) in items.iter().take(max_scenes).enumerate() {
let date = item.properties.datetime.as_deref().unwrap_or("-");
let cloud_cover = item.properties.eo_cloud_cover.unwrap_or(0.0);
if idx < 3 {
eprintln!(" Scene {}: {} [{}% cloud]", idx + 1, date, cloud_cover as u32);
}
}
if items.len() > 3 {
eprintln!(" ... and {} more scenes", items.len() - 3);
}
let mut rasters: Vec<surtgis_core::Raster<f64>> = Vec::new();
let mut successful = 0;
for (i, item) in items.iter().take(max_scenes).enumerate() {
let date = item.properties.datetime.as_deref().unwrap_or("-");
let cloud = item.properties.eo_cloud_cover.unwrap_or(0.0);
let pb = spinner(&format!(
"Downloading {}/{}: {} [{}% cloud, {}]...",
i + 1, max_scenes, item.id, cloud as u32, date
));
let data_asset = resolve_asset_key(item, &band_info.asset_key)
.and_then(|(_, a)| {
client.sign_asset_href(&a.href, item.collection.as_deref().unwrap_or(""))
.ok()
.map(|h| (h, item.epsg()))
});
let mask_href = schema.cloud_mask_asset.as_ref().and_then(|mask_name| {
resolve_asset_key(item, mask_name)
.and_then(|(_, a)| {
client.sign_asset_href(&a.href, item.collection.as_deref().unwrap_or(""))
.ok()
})
});
let Some((data_href, epsg)) = data_asset else {
pb.finish_and_clear();
eprintln!(
" ⚠️ Skipping {}: asset {} not found",
item.id, band_info.asset_key
);
continue;
};
let opts = CogReaderOptions::default();
let mut reader = match CogReaderBlocking::open(&data_href, opts) {
Ok(r) => r,
Err(e) => {
pb.finish_and_clear();
eprintln!(" ⚠️ Skipping {}: failed to open COG: {}", item.id, e);
continue;
}
};
let read_bb = {
use surtgis_cloud::reproject;
if let Some(epsg) = epsg {
if !reproject::is_wgs84(epsg) {
reproject::reproject_bbox_to_cog(&bb, epsg)
} else {
bb
}
} else {
bb
}
};
let mut raster = match reader.read_bbox::<f64>(&read_bb, None) {
Ok(r) => r,
Err(e) => {
pb.finish_and_clear();
eprintln!(" ⚠️ Skipping {}: failed to read bbox: {}", item.id, e);
continue;
}
};
let (rrows, rcols) = raster.shape();
pb.println(format!(" → Read {} × {} pixels", rcols, rrows));
if let Some(mask_href) = &mask_href {
let mask_desc = match &schema.cloud_mask_type {
CloudMaskType::Categorical { asset, .. } => asset.clone(),
CloudMaskType::Bitmask { asset, .. } => asset.clone(),
CloudMaskType::None => "None".to_string(),
};
pb.println(format!(" → Applying {} cloud mask...", mask_desc));
let mask_opts = CogReaderOptions::default();
let mut mask_reader = match CogReaderBlocking::open(mask_href, mask_opts) {
Ok(r) => r,
Err(_) => {
pb.finish_and_clear();
rasters.push(raster);
successful += 1;
continue;
}
};
if let Ok(mask_raster) = mask_reader.read_bbox::<f64>(&read_bb, None) {
raster = cloud_mask_strategy
.mask(&raster, &mask_raster)
.unwrap_or(raster); }
}
pb.finish_and_clear();
successful += 1;
rasters.push(raster);
if rasters.len() >= max_scenes {
break;
}
}
if rasters.is_empty() {
anyhow::bail!(
"Failed to download any {} {} scenes (0/{} successful)",
collection, asset, items.len()
);
}
eprintln!(" ✅ Successfully loaded {} scenes", successful);
let pb = spinner(&format!(
"Compositing {} scenes (median stack)...",
rasters.len()
));
let refs: Vec<&surtgis_core::Raster<f64>> = rasters.iter().collect();
let result = mosaic(&refs, None)
.context("Failed to mosaic scenes")?;
let (comp_rows, comp_cols) = result.shape();
pb.finish_and_clear();
eprintln!(" ✓ Composite: {} × {} pixels", comp_cols, comp_rows);
let final_result = if let Some(reference) = align_to {
let pb = spinner("Aligning to DEM grid (bilinear resample)...");
let aligned = surtgis_core::resample_to_grid(
&result,
reference,
surtgis_core::ResampleMethod::Bilinear,
)
.context("Failed to resample to DEM grid")?;
let (out_rows, out_cols) = aligned.shape();
pb.finish_and_clear();
eprintln!(" ✓ Aligned: {} × {} pixels", out_cols, out_rows);
aligned
} else {
result
};
eprintln!(" ✓ {} band ready for indices", asset);
Ok(final_result)
}
fn cog_cache_path(href: &str, bb: &BBox) -> std::path::PathBuf {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let base_url = href.split('?').next().unwrap_or(href);
let key = format!("{}__{:.2}_{:.2}_{:.2}_{:.2}", base_url, bb.min_x, bb.min_y, bb.max_x, bb.max_y);
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
let hash = hasher.finish();
let hex = format!("{:016x}", hash);
let cache_dir = std::env::var("XDG_CACHE_HOME")
.map(std::path::PathBuf::from)
.or_else(|_| std::env::var("HOME").map(|h| std::path::PathBuf::from(h).join(".cache")))
.unwrap_or_else(|_| std::path::PathBuf::from("/tmp"))
.join("surtgis")
.join("cog");
cache_dir.join(&hex[..2]).join(&hex[2..4]).join(format!("{}.tif", &hex[4..]))
}
fn cache_read(path: &std::path::Path) -> Option<surtgis_core::Raster<f64>> {
if !path.exists() {
return None;
}
surtgis_core::io::read_geotiff::<f64, _>(path, None).ok()
}
fn cache_write(path: &std::path::Path, raster: &surtgis_core::Raster<f64>) {
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = surtgis_core::io::write_geotiff(
raster,
path,
Some(surtgis_core::io::GeoTiffOptions { compression: "DEFLATE".into() }),
);
}
fn handle_multiband_composite(
catalog: &str,
bbox_str: &str,
collection: &str,
band_names: &[&str],
datetime: &str,
max_scenes: usize,
align_to: Option<&std::path::PathBuf>,
output: &std::path::Path,
naming: &str,
use_cache: bool,
strip_rows_cfg: usize,
compress: bool,
) -> Result<()> {
use surtgis_cloud::reproject;
let n_bands = band_names.len();
println!("Multi-band composite: {} bands [{}]", n_bands, band_names.join(", "));
let profile = CollectionProfile::from_collection_name(collection)?;
let mask_asset_name = profile.mask_asset_name();
eprintln!("📷 Collection: {} (mask: {:?})", profile.description(), mask_asset_name);
if use_cache {
let sample_dir = cog_cache_path("sample", &BBox::new(0.0, 0.0, 1.0, 1.0));
eprintln!("📦 COG cache enabled: {}", sample_dir.parent().unwrap().parent().unwrap().parent().unwrap().display());
}
let cat = StacCatalog::from_str_or_url(catalog);
let bb = parse_bbox(bbox_str)?;
let bbox_width_deg = (bb.max_x - bb.min_x).abs();
let bbox_height_deg = (bb.max_y - bb.min_y).abs();
let tiles_x = ((bbox_width_deg * 111.0) / 60.0).ceil().max(1.0) as usize;
let tiles_y = ((bbox_height_deg * 111.0) / 60.0).ceil().max(1.0) as usize;
let est_tiles = tiles_x * tiles_y;
let search_limit = ((est_tiles * max_scenes * 5).max(1000) as u32).min(10000);
eprintln!(" bbox: {:.1}°×{:.1}° (~{} spatial tiles), search_limit={}",
bbox_width_deg, bbox_height_deg, est_tiles, search_limit);
let params = StacSearchParams::new()
.bbox(bb.min_x, bb.min_y, bb.max_x, bb.max_y)
.datetime(datetime)
.collections(&[collection])
.limit(search_limit.min(cat.max_page_size()));
let pb = spinner("Searching STAC catalog...");
let client_opts = StacClientOptions {
max_items: search_limit as usize, ..StacClientOptions::default()
};
let client = StacClientBlocking::new(cat, client_opts)
.context("Failed to create STAC client")?;
let items = client.search_all(¶ms).context("STAC search failed")?;
pb.finish_and_clear();
if items.is_empty() {
anyhow::bail!("No items found matching the search criteria");
}
let mut by_date: BTreeMap<String, Vec<&StacItem>> = BTreeMap::new();
for item in &items {
let date = item.properties.datetime.as_deref().unwrap_or("")
.get(..10).unwrap_or("unknown").to_string();
by_date.entry(date).or_default().push(item);
}
let mut date_coverage: Vec<(String, usize)> = by_date.iter()
.map(|(d, items)| (d.clone(), items.len()))
.collect();
date_coverage.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
let dates: Vec<String> = date_coverage.into_iter()
.take(max_scenes)
.map(|(d, _)| d)
.collect();
println!("Found {} items across {} dates (using {} dates)",
items.len(), by_date.len(), dates.len());
let start = Instant::now();
struct MbTileRef {
band_hrefs: Vec<(String, String)>,
scl_href: String,
original_scl_href: String,
epsg: Option<u32>,
signed_at: Instant,
}
struct MbScene {
date: String,
tiles: Vec<MbTileRef>,
epsg: Option<u32>,
}
let mut scenes: Vec<MbScene> = Vec::new();
for date in &dates {
let group = &by_date[date];
let mut tiles = Vec::new();
let mut scene_epsg = None;
for item in group {
let tile_epsg = item.epsg();
let mut resolved_bands: Vec<(String, String)> = Vec::with_capacity(n_bands);
let mut all_bands_ok = true;
for &band_name in band_names {
match crate::streaming::resolve_asset_key(item, band_name) {
Some((_resolved_key, a)) => {
let original = a.href.clone();
match client.sign_asset_href(&a.href, item.collection.as_deref().unwrap_or("")) {
Ok(signed) => resolved_bands.push((signed, original)),
Err(_) => { all_bands_ok = false; break; }
}
}
None => { all_bands_ok = false; break; }
}
}
if !all_bands_ok {
continue; }
let (scl_signed, scl_original) = mask_asset_name
.and_then(|mask_name| {
crate::streaming::resolve_asset_key(item, mask_name)
.and_then(|(_, a)| {
let orig = a.href.clone();
client.sign_asset_href(&a.href, item.collection.as_deref().unwrap_or(""))
.ok()
.map(|signed| (signed, orig))
})
})
.unwrap_or_default();
if scene_epsg.is_none() {
scene_epsg = tile_epsg;
}
tiles.push(MbTileRef {
band_hrefs: resolved_bands,
scl_href: scl_signed,
original_scl_href: scl_original,
epsg: tile_epsg,
signed_at: Instant::now(),
});
}
if !tiles.is_empty() {
scenes.push(MbScene { date: date.clone(), tiles, epsg: scene_epsg });
}
}
if scenes.is_empty() {
anyhow::bail!("No valid scenes found (all bands must be present in each item)");
}
eprintln!(" Resolved {} scenes with {} bands each", scenes.len(), n_bands);
let first_href = &scenes[0].tiles[0].band_hrefs[0].0;
let opts = CogReaderOptions::default();
let probe_reader = CogReaderBlocking::open(first_href, opts)
.context("Failed to probe first COG")?;
let probe_meta = probe_reader.metadata();
let (out_cols, out_rows, out_transform, out_crs, cog_bb);
if let Some(ref align_path) = align_to {
let reference: surtgis_core::Raster<f64> =
surtgis_core::io::read_geotiff(align_path, None)
.context("Failed to read alignment reference raster")?;
let rgt = reference.transform();
let (rr, rc) = reference.shape();
out_rows = rr;
out_cols = rc;
out_transform = *rgt;
out_crs = reference.crs().cloned();
let min_x = rgt.origin_x;
let max_y = rgt.origin_y;
let max_x = min_x + rc as f64 * rgt.pixel_width;
let min_y = max_y + rr as f64 * rgt.pixel_height;
cog_bb = BBox::new(min_x, min_y, max_x, max_y);
eprintln!(" Using reference grid from --align-to: {}x{} px=({:.1},{:.1})",
rc, rr, rgt.pixel_width, rgt.pixel_height);
} else {
let cog_bb_computed = if let Some(epsg) = scenes[0].epsg {
if !reproject::is_wgs84(epsg) {
reproject::reproject_bbox_to_cog(&bb, epsg)
} else { bb }
} else { bb };
cog_bb = cog_bb_computed;
let pixel_width = probe_meta.geo_transform.pixel_width.abs();
let pixel_height = probe_meta.geo_transform.pixel_height.abs();
out_cols = ((cog_bb.max_x - cog_bb.min_x) / pixel_width).round() as usize;
out_rows = ((cog_bb.max_y - cog_bb.min_y) / pixel_height).round() as usize;
out_transform = surtgis_core::GeoTransform::new(
cog_bb.min_x, cog_bb.max_y, pixel_width, -pixel_height,
);
out_crs = scenes[0].epsg.map(surtgis_core::CRS::from_epsg);
}
println!("Output grid: {} x {} ({:.1}M cells), {} dates, {} bands",
out_cols, out_rows, (out_cols * out_rows) as f64 / 1e6,
scenes.len(), n_bands);
let n_scenes = scenes.len();
let out_epsg_for_tiles = out_crs.as_ref().and_then(|c| c.epsg());
let total_cells = out_rows * out_cols;
const TARGET_SCENE_ACCUMULATOR_BYTES: usize = 4 * 1024 * 1024 * 1024;
let bytes_per_row = out_cols.max(1) * n_bands.max(1) * n_scenes.max(1) * 8;
let auto_strip_rows = (TARGET_SCENE_ACCUMULATOR_BYTES / bytes_per_row.max(1))
.clamp(16, 512);
let strip_rows = strip_rows_cfg.min(auto_strip_rows);
if strip_rows < strip_rows_cfg {
let est_peak_gb = (strip_rows * bytes_per_row) as f64 / 1e9;
let requested_peak_gb = (strip_rows_cfg * bytes_per_row) as f64 / 1e9;
eprintln!(
"⚠ strip_rows capped: {} → {} (requested peak ~{:.1} GB, using ~{:.1} GB for {} bands × {} scenes × {} cols)",
strip_rows_cfg, strip_rows, requested_peak_gb, est_peak_gb,
n_bands, n_scenes, out_cols,
);
} else {
let est_peak_gb = (strip_rows * bytes_per_row) as f64 / 1e9;
eprintln!(
" Estimated peak RAM for scene accumulator: ~{:.1} GB (strip_rows={}, {} bands × {} scenes × {} cols)",
est_peak_gb, strip_rows, n_bands, n_scenes, out_cols,
);
}
let num_strips = (out_rows + strip_rows - 1) / strip_rows;
const TOKEN_REFRESH_THRESHOLD: Duration = Duration::from_secs(30 * 60);
let cloud_mask_strategy: Arc<dyn CloudMaskStrategy> = match &profile {
CollectionProfile::Sentinel2L2A { cloud_mask_strategy } => cloud_mask_strategy.clone(),
CollectionProfile::LandsatC2L2 { cloud_mask_strategy } => cloud_mask_strategy.clone(),
CollectionProfile::Sentinel1RTC => Arc::new(NoCloudMask),
};
let download_rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.enable_all()
.build()
.context("Failed to create download runtime")?;
let mut band_buffers: Vec<Vec<f64>> = (0..n_bands)
.map(|_| vec![f64::NAN; total_cells])
.collect();
for strip_idx in 0..num_strips {
let row_start = strip_idx * strip_rows;
let row_end = (row_start + strip_rows).min(out_rows);
let actual_rows = row_end - row_start;
let ph = out_transform.pixel_height.abs();
let strip_min_y = cog_bb.max_y - (row_end as f64 * ph);
let strip_max_y = cog_bb.max_y - (row_start as f64 * ph);
let strip_bb = BBox::new(cog_bb.min_x, strip_min_y, cog_bb.max_x, strip_max_y);
print!("\r Strip {}/{} (rows {}-{}, {} bands)...",
strip_idx + 1, num_strips, row_start, row_end, n_bands);
let strip_ref = {
let mut r = surtgis_core::Raster::<f64>::new(actual_rows, out_cols);
r.set_transform(surtgis_core::GeoTransform::new(
strip_bb.min_x, strip_bb.max_y,
out_transform.pixel_width, out_transform.pixel_height,
));
r
};
let mut band_scene_strips: Vec<Vec<ndarray::Array2<f64>>> = (0..n_bands)
.map(|_| Vec::with_capacity(n_scenes))
.collect();
for scene in &mut scenes {
let is_first_strip = strip_idx == 0;
let token_age = scene.tiles.first().map(|t| t.signed_at.elapsed());
if token_age.map(|age| age > TOKEN_REFRESH_THRESHOLD).unwrap_or(false) {
let age_secs = token_age.unwrap().as_secs();
for tile in &mut scene.tiles {
for (signed, original) in &mut tile.band_hrefs {
if let Ok(h) = client.sign_asset_href(original, "") {
*signed = h;
}
}
if !tile.original_scl_href.is_empty() {
if let Ok(h) = client.sign_asset_href(&tile.original_scl_href, "") {
tile.scl_href = h;
}
}
tile.signed_at = Instant::now();
}
eprintln!(" [token] Re-signed {} tiles for {} ({}s old)",
scene.tiles.len(), scene.date, age_secs);
}
let pad = 100.0;
let tile_bb = BBox::new(
strip_bb.min_x - pad, strip_bb.min_y - pad,
strip_bb.max_x + pad, strip_bb.max_y + pad,
);
let tile_results: Vec<(Vec<Option<surtgis_core::Raster<f64>>>, Option<surtgis_core::Raster<f64>>)> = {
let out_epsg = out_epsg_for_tiles;
let tile_cache = use_cache;
let mut cached_results: Vec<Option<(Vec<Option<surtgis_core::Raster<f64>>>, Option<surtgis_core::Raster<f64>>)>> =
Vec::with_capacity(scene.tiles.len());
let mut tiles_to_download: Vec<(usize, Vec<String>, String, BBox)> = Vec::new();
for (ti, tile) in scene.tiles.iter().enumerate() {
let band_hrefs: Vec<String> = tile.band_hrefs.iter()
.map(|(signed, _)| signed.clone())
.collect();
let scl_href = tile.scl_href.clone();
let tile_epsg = tile.epsg;
let bb = if let (Some(tepsg), Some(out_e)) = (tile_epsg, out_epsg) {
if tepsg != out_e {
reproject_bbox_between_crs(&tile_bb, out_e, tepsg)
} else { tile_bb }
} else { tile_bb };
if tile_cache {
let mut all_cached = true;
let mut cached_bands: Vec<Option<surtgis_core::Raster<f64>>> = Vec::with_capacity(n_bands);
for href in &band_hrefs {
match cache_read(&cog_cache_path(href, &bb)) {
Some(r) => cached_bands.push(Some(r)),
None => { all_cached = false; break; }
}
}
if all_cached && cached_bands.len() == n_bands {
let cached_mask = if !scl_href.is_empty() {
cache_read(&cog_cache_path(&scl_href, &bb))
} else { None };
cached_results.push(Some((cached_bands, cached_mask)));
continue;
}
}
cached_results.push(None); tiles_to_download.push((ti, band_hrefs, scl_href, bb));
}
if !tiles_to_download.is_empty() {
let downloaded = download_rt.block_on(async {
use surtgis_cloud::{CogReader, CogReaderOptions};
let mut tile_handles: Vec<(usize, Vec<tokio::task::JoinHandle<Option<surtgis_core::Raster<f64>>>>, Option<tokio::task::JoinHandle<Option<surtgis_core::Raster<f64>>>>)> = Vec::new();
for (ti, band_hrefs, scl_href, bb) in tiles_to_download {
let mut band_handles = Vec::with_capacity(n_bands);
for href in band_hrefs {
let bb = bb;
let do_cache = tile_cache;
band_handles.push(tokio::spawn(async move {
for attempt in 0..2u8 {
if attempt > 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
let mut reader = match CogReader::open(&href, CogReaderOptions::default()).await {
Ok(r) => r,
Err(e) => {
let msg = e.to_string();
if msg.contains("bbox does not intersect") { return None; }
if attempt == 1 { eprintln!(" [cog] open FAILED: {}", msg); return None; }
continue;
}
};
let nodata_val = reader.metadata().nodata.unwrap_or(0.0);
match reader.read_bbox::<f64>(&bb, None).await {
Ok(mut r) => {
for val in r.data_mut().iter_mut() {
if *val == nodata_val || *val == 0.0 { *val = f64::NAN; }
}
if do_cache { cache_write(&cog_cache_path(&href, &bb), &r); }
return Some(r);
}
Err(e) => {
let msg = e.to_string();
if msg.contains("bbox does not intersect") { return None; }
if attempt == 1 { return None; }
}
}
}
None
}));
}
let mask_handle = if !scl_href.is_empty() {
let bb = bb;
let do_cache = tile_cache;
Some(tokio::spawn(async move {
for attempt in 0..2u8 {
if attempt > 0 { tokio::time::sleep(std::time::Duration::from_secs(1)).await; }
let mut reader = match CogReader::open(&scl_href, CogReaderOptions::default()).await {
Ok(r) => r,
Err(_) if attempt == 0 => continue,
Err(_) => return None,
};
match reader.read_bbox::<f64>(&bb, None).await {
Ok(r) => {
if do_cache { cache_write(&cog_cache_path(&scl_href, &bb), &r); }
return Some(r);
}
Err(e) => {
if !e.to_string().contains("bbox does not intersect") && attempt == 1 { return None; }
if e.to_string().contains("bbox does not intersect") { return None; }
}
}
}
None
}))
} else { None };
tile_handles.push((ti, band_handles, mask_handle));
}
let mut results: Vec<(usize, Vec<Option<surtgis_core::Raster<f64>>>, Option<surtgis_core::Raster<f64>>)> = Vec::new();
for (ti, band_handles, mask_handle) in tile_handles {
let mut bands = Vec::with_capacity(n_bands);
for h in band_handles { bands.push(h.await.ok().flatten()); }
let mask = if let Some(h) = mask_handle { h.await.ok().flatten() } else { None };
results.push((ti, bands, mask));
}
results
});
for (ti, bands, mask) in downloaded {
cached_results[ti] = Some((bands, mask));
}
}
cached_results.into_iter()
.map(|r| r.unwrap_or_else(|| (vec![None; n_bands], None)))
.collect()
};
let mut per_band_tiles: Vec<Vec<surtgis_core::Raster<f64>>> = (0..n_bands)
.map(|_| Vec::new())
.collect();
let mut scl_tiles: Vec<surtgis_core::Raster<f64>> = Vec::new();
let mut tiles_ok = 0usize;
let mut tiles_outside = 0usize;
let mut tiles_partial = 0usize;
for (band_data, mask_opt) in tile_results {
let n_some = band_data.iter().filter(|r| r.is_some()).count();
if n_some == 0 {
tiles_outside += 1;
continue; }
if n_some < n_bands {
tiles_partial += 1;
}
for (bi, raster_opt) in band_data.into_iter().enumerate() {
if let Some(r) = raster_opt {
per_band_tiles[bi].push(r);
}
}
if let Some(m) = mask_opt {
scl_tiles.push(m);
}
tiles_ok += 1;
}
if is_first_strip || tiles_ok == 0 || tiles_partial > 0 {
eprintln!(" ℹ {}: tiles OK={} outside={} partial={} mask={}",
scene.date, tiles_ok, tiles_outside, tiles_partial, scl_tiles.len());
}
if tiles_ok == 0 {
continue;
}
fn unify_crs(tiles: &mut Vec<surtgis_core::Raster<f64>>) {
if tiles.len() <= 1 { return; }
let target_epsg = tiles[0].crs().and_then(|c| c.epsg());
if let Some(target) = target_epsg {
for i in 1..tiles.len() {
if let Some(src_epsg) = tiles[i].crs().and_then(|c| c.epsg()) {
if src_epsg != target {
if let Some(reprojected) = surtgis_cloud::reproject::reproject_raster_utm(
&tiles[i], src_epsg, target,
) {
tiles[i] = reprojected;
}
}
}
}
}
}
for band_tiles in &mut per_band_tiles {
unify_crs(band_tiles);
}
if !scl_tiles.is_empty() {
unify_crs(&mut scl_tiles);
}
let scl_m = if scl_tiles.is_empty() {
None
} else if scl_tiles.len() == 1 {
Some(scl_tiles.into_iter().next().unwrap())
} else {
let refs: Vec<&surtgis_core::Raster<f64>> = scl_tiles.iter().collect();
surtgis_core::mosaic(&refs, None).ok()
};
for bi in 0..n_bands {
let data_tiles = std::mem::take(&mut per_band_tiles[bi]);
if data_tiles.is_empty() { continue; }
let data_m = if data_tiles.len() == 1 {
data_tiles.into_iter().next().unwrap()
} else {
let refs: Vec<&surtgis_core::Raster<f64>> = data_tiles.iter().collect();
match surtgis_core::mosaic(&refs, None) {
Ok(m) => m,
Err(_) => continue,
}
};
let clean = if let Some(ref scl) = scl_m {
cloud_mask_strategy.mask(&data_m, scl).unwrap_or(data_m)
} else {
data_m
};
let resampled = surtgis_core::resample_to_grid(
&clean, &strip_ref,
surtgis_core::ResampleMethod::Bilinear,
).unwrap_or(clean);
let valid = resampled.data().iter().filter(|v| v.is_finite()).count();
if valid > 0 {
band_scene_strips[bi].push(resampled.data().to_owned());
}
}
}
for bi in 0..n_bands {
let scene_strips = &band_scene_strips[bi];
let mut strip_out = ndarray::Array2::<f64>::from_elem(
(actual_rows, out_cols), f64::NAN,
);
if !scene_strips.is_empty() {
let n = scene_strips.len();
let mut coverage: Vec<(usize, usize)> = scene_strips.iter().enumerate()
.map(|(i, s)| (i, s.iter().filter(|v| v.is_finite()).count()))
.collect();
coverage.sort_by(|a, b| b.1.cmp(&a.1));
for r in 0..actual_rows {
for c in 0..out_cols {
let mut values: Vec<f64> = Vec::with_capacity(n);
for strip in scene_strips {
if r < strip.nrows() && c < strip.ncols() {
let v = strip[[r, c]];
if v.is_finite() { values.push(v); }
}
}
if !values.is_empty() {
values.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap());
let mid = values.len() / 2;
strip_out[[r, c]] = if values.len() % 2 == 0 {
(values[mid - 1] + values[mid]) / 2.0
} else {
values[mid]
};
}
}
}
let nan_before = strip_out.iter().filter(|v| !v.is_finite()).count();
if nan_before > 0 {
for &(scene_idx, _) in &coverage {
let strip = &scene_strips[scene_idx];
for r in 0..actual_rows {
for c in 0..out_cols {
if !strip_out[[r, c]].is_finite()
&& r < strip.nrows() && c < strip.ncols()
{
let v = strip[[r, c]];
if v.is_finite() { strip_out[[r, c]] = v; }
}
}
}
let remaining = strip_out.iter().filter(|v| !v.is_finite()).count();
if remaining == 0 { break; }
}
}
let mut nan_remaining = strip_out.iter().filter(|v| !v.is_finite()).count();
if nan_remaining > 0 && nan_remaining < strip_out.len() {
let mut prev_buf = strip_out.clone(); for _pass in 0..20 {
prev_buf.assign(&strip_out);
let mut filled = 0usize;
for r in 0..actual_rows {
for c in 0..out_cols {
if prev_buf[[r, c]].is_finite() { continue; }
let mut sum = 0.0;
let mut cnt = 0u32;
for dr in -1i32..=1 {
for dc in -1i32..=1 {
let nr = r as i32 + dr;
let nc = c as i32 + dc;
if nr >= 0 && nr < actual_rows as i32
&& nc >= 0 && nc < out_cols as i32
{
let v = prev_buf[[nr as usize, nc as usize]];
if v.is_finite() { sum += v; cnt += 1; }
}
}
}
if cnt >= 2 {
strip_out[[r, c]] = sum / cnt as f64;
filled += 1;
}
}
}
if filled == 0 { break; }
nan_remaining = strip_out.iter().filter(|v| !v.is_finite()).count();
if nan_remaining == 0 { break; }
}
}
}
let buf = &mut band_buffers[bi];
let offset = row_start * out_cols;
if let Some(slice) = strip_out.as_slice() {
buf[offset..offset + actual_rows * out_cols].copy_from_slice(slice);
}
}
let valid_b0 = band_scene_strips[0].len();
if strip_idx == 0 || strip_idx == num_strips - 1 {
eprintln!(" Strip {}/{}: {} scenes contributed to {} bands",
strip_idx + 1, num_strips, valid_b0, n_bands);
}
}
println!();
let stem = output.file_stem().unwrap_or_default().to_string_lossy();
let use_asset_naming = naming.eq_ignore_ascii_case("asset");
let opts = if compress {
Some(surtgis_core::io::GeoTiffOptions { compression: "DEFLATE".into() })
} else {
None
};
for (bi, band_name) in band_names.iter().enumerate() {
let band_path = if use_asset_naming {
output.with_file_name(format!("{}.tif", band_name))
} else {
output.with_file_name(format!("{}_{}.tif", stem, band_name))
};
let buffer = std::mem::take(&mut band_buffers[bi]);
let mut raster = surtgis_core::Raster::from_vec(buffer, out_rows, out_cols)
.context("Failed to create raster from buffer")?;
raster.set_transform(out_transform);
raster.set_crs(out_crs.clone());
surtgis_core::io::write_geotiff(&raster, &band_path, opts.clone())
.with_context(|| format!("Failed to write {}", band_path.display()))?;
let file_size = std::fs::metadata(&band_path).map(|m| m.len()).unwrap_or(0);
println!(" ✓ {} → {} ({:.1} MB)",
band_name, band_path.display(), file_size as f64 / 1e6);
}
let elapsed = start.elapsed();
let total_tiles: usize = scenes.iter().map(|s| s.tiles.len()).sum();
println!("Multi-band composite: {} dates × {} bands ({} tiles) → {} × {} ({:.1}M cells) in {:.1?}",
scenes.len(), n_bands, total_tiles,
out_cols, out_rows, (out_cols * out_rows) as f64 / 1e6, elapsed);
Ok(())
}
fn read_cog_tile(href: &str, bb: &BBox, log_meta: bool) -> Option<surtgis_core::Raster<f64>> {
let mut dr = match CogReaderBlocking::open(href, CogReaderOptions::default()) {
Ok(r) => r,
Err(e) => {
let short_href = &href[..80.min(href.len())];
eprintln!(" [cog] open FAILED: {} (href={}...)", e, short_href);
return None;
}
};
let tile_meta = dr.metadata();
if log_meta {
eprintln!(" COG meta: {}x{} bps={} sf={} compression={} px={:.0}m",
tile_meta.width, tile_meta.height,
tile_meta.bits_per_sample, tile_meta.sample_format,
tile_meta.compression,
tile_meta.geo_transform.pixel_width.abs());
}
let mut r: surtgis_core::Raster<f64> = match dr.read_bbox(bb, None) {
Ok(r) => r,
Err(e) => {
let msg = e.to_string();
if !msg.contains("bbox does not intersect") {
eprintln!(" [cog] read_bbox FAILED: {}", msg);
}
return None;
}
};
let nodata_val = tile_meta.nodata.unwrap_or(0.0);
for val in r.data_mut().iter_mut() {
if *val == nodata_val || *val == 0.0 {
*val = f64::NAN;
}
}
Some(r)
}
fn read_cog_tile_raw(href: &str, bb: &BBox) -> Option<surtgis_core::Raster<f64>> {
let mut sr = match CogReaderBlocking::open(href, CogReaderOptions::default()) {
Ok(r) => r,
Err(e) => {
let short_href = &href[..80.min(href.len())];
eprintln!(" [cog] mask open FAILED: {} (href={}...)", e, short_href);
return None;
}
};
match sr.read_bbox(bb, None) {
Ok(r) => Some(r),
Err(e) => {
let msg = e.to_string();
if !msg.contains("bbox does not intersect") {
eprintln!(" [cog] mask read_bbox FAILED: {}", msg);
}
None
}
}
}
fn handle_time_series(
catalog: &str,
bbox: &str,
collection: &str,
asset: &str,
datetime: &str,
interval: &str,
_scl_asset: &str,
max_scenes: usize,
align_to: Option<&std::path::PathBuf>,
outdir: &std::path::PathBuf,
compress: bool,
) -> Result<()> {
let parts: Vec<&str> = datetime.split('/').collect();
if parts.len() != 2 {
anyhow::bail!("datetime must be a range: YYYY-MM-DD/YYYY-MM-DD");
}
let start_date = parse_date(parts[0])?;
let end_date = parse_date(parts[1])?;
let intervals = split_date_range(&start_date, &end_date, interval)?;
println!("Time series: {} intervals ({}) from {} to {}",
intervals.len(), interval, parts[0], parts[1]);
let reference = match align_to {
Some(path) => {
let r: surtgis_core::Raster<f64> = surtgis_core::io::read_geotiff(path, None)
.context("Failed to read align-to reference")?;
Some(r)
}
None => None,
};
std::fs::create_dir_all(outdir)?;
let start = Instant::now();
let tasks: Vec<(usize, SimpleDate, SimpleDate, String, String)> = intervals.iter().enumerate()
.map(|(i, (win_start, win_end))| {
let win_dt = format!("{}/{}", format_date(win_start), format_date(win_end));
let label = format_date(win_start);
(i, *win_start, *win_end, win_dt, label)
})
.collect();
let max_concurrent = 3usize.min(tasks.len());
let mut success = 0usize;
let mut metadata: Vec<serde_json::Value> = vec![serde_json::Value::Null; tasks.len()];
for chunk_start in (0..tasks.len()).step_by(max_concurrent) {
let chunk_end = (chunk_start + max_concurrent).min(tasks.len());
let chunk = &tasks[chunk_start..chunk_end];
let results: Vec<(usize, std::result::Result<surtgis_core::Raster<f64>, String>)> = {
let mut handles = Vec::with_capacity(chunk.len());
for &(i, win_start, win_end, ref win_dt, ref _label) in chunk {
let cat = catalog.to_string();
let bb = bbox.to_string();
let col = collection.to_string();
let ast = asset.to_string();
let dt = win_dt.clone();
let ms = max_scenes;
println!("[{}/{}] {} → {}", i + 1, tasks.len(), format_date(&win_start), format_date(&win_end));
handles.push(std::thread::spawn(move || {
let r = fetch_stac_band(&cat, &bb, &col, &ast, &dt, ms, None);
(i, r.map_err(|e| e.to_string()))
}));
}
handles.into_iter().filter_map(|h| h.join().ok()).collect()
};
for (i, result) in results {
let (_, win_start, win_end, _, ref label) = tasks[i];
match result {
Ok(raster) => {
let final_raster = if let Some(ref refr) = reference {
surtgis_core::resample_to_grid(&raster, refr, surtgis_core::ResampleMethod::Bilinear)
.unwrap_or(raster)
} else {
raster
};
let (rows, cols) = final_raster.shape();
let valid = final_raster.data().iter().filter(|v| v.is_finite()).count();
let total = rows * cols;
let pct = if total > 0 { valid as f64 / total as f64 * 100.0 } else { 0.0 };
let filename = format!("{}_{}.tif", asset, label);
let path = outdir.join(&filename);
write_result(&final_raster, &path, compress)?;
println!(" [{}/{}] → {} ({}x{}, {:.1}% valid)", i + 1, tasks.len(), filename, cols, rows, pct);
metadata[i] = serde_json::json!({
"index": i,
"date_start": format_date(&win_start),
"date_end": format_date(&win_end),
"file": filename,
"rows": rows,
"cols": cols,
"valid_pct": (pct * 10.0).round() / 10.0,
});
success += 1;
}
Err(e) => {
eprintln!(" ⚠️ [{}/{}] No data for {}: {}", i + 1, tasks.len(), label, e);
}
}
}
}
let meta_path = outdir.join("time_series.json");
let meta_json = serde_json::json!({
"catalog": catalog,
"collection": collection,
"asset": asset,
"bbox": bbox,
"datetime": datetime,
"interval": interval,
"total_intervals": intervals.len(),
"successful": success,
"rasters": metadata.into_iter().filter(|v| !v.is_null()).collect::<Vec<_>>(),
});
std::fs::write(&meta_path, serde_json::to_string_pretty(&meta_json)?)?;
println!("\nMetadata → {}", meta_path.display());
done(&format!("Time series ({}/{})", success, intervals.len()), outdir, start.elapsed());
Ok(())
}
#[derive(Clone, Copy)]
pub(crate) struct SimpleDate {
pub(crate) year: i32,
pub(crate) month: u32,
pub(crate) day: u32,
}
pub(crate) fn parse_date(s: &str) -> Result<SimpleDate> {
let date_str = s.trim().split('T').next().unwrap_or(s.trim());
let parts: Vec<&str> = date_str.split('-').collect();
if parts.len() != 3 {
anyhow::bail!("invalid date format: '{}' (expected YYYY-MM-DD)", s);
}
Ok(SimpleDate {
year: parts[0].parse().context("invalid year")?,
month: parts[1].parse().context("invalid month")?,
day: parts[2].parse().context("invalid day")?,
})
}
pub(crate) fn format_date(d: &SimpleDate) -> String {
format!("{:04}-{:02}-{:02}", d.year, d.month, d.day)
}
pub(crate) fn days_in_month(year: i32, month: u32) -> u32 {
match month {
1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
4 | 6 | 9 | 11 => 30,
2 => if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) { 29 } else { 28 },
_ => 30,
}
}
pub(crate) fn advance_days(d: &SimpleDate, n: u32) -> SimpleDate {
let mut y = d.year;
let mut m = d.month;
let mut day = d.day + n;
loop {
let dim = days_in_month(y, m);
if day <= dim { break; }
day -= dim;
m += 1;
if m > 12 { m = 1; y += 1; }
}
SimpleDate { year: y, month: m, day }
}
pub(crate) fn advance_months(d: &SimpleDate, n: u32) -> SimpleDate {
let mut m = d.month + n;
let mut y = d.year;
while m > 12 { m -= 12; y += 1; }
let day = d.day.min(days_in_month(y, m));
SimpleDate { year: y, month: m, day }
}
pub(crate) fn date_le(a: &SimpleDate, b: &SimpleDate) -> bool {
(a.year, a.month, a.day) <= (b.year, b.month, b.day)
}
pub(crate) fn split_date_range(start: &SimpleDate, end: &SimpleDate, interval: &str) -> Result<Vec<(SimpleDate, SimpleDate)>> {
let mut windows = Vec::new();
let mut cursor = *start;
while date_le(&cursor, end) {
let next = match interval.to_lowercase().as_str() {
"monthly" | "month" => advance_months(&cursor, 1),
"biweekly" | "2weeks" => advance_days(&cursor, 14),
"weekly" | "week" => advance_days(&cursor, 7),
"quarterly" | "quarter" => advance_months(&cursor, 3),
"yearly" | "year" | "annual" => advance_months(&cursor, 12),
custom => {
let days: u32 = custom.parse()
.with_context(|| format!("invalid interval: '{}'. Use monthly, biweekly, weekly, quarterly, yearly, or a number of days", custom))?;
advance_days(&cursor, days)
}
};
let win_end = if date_le(&next, end) {
advance_days(&next, 0) } else {
*end
};
let actual_end = if date_le(&win_end, end) { win_end } else { *end };
windows.push((cursor, actual_end));
cursor = next;
}
if windows.is_empty() {
anyhow::bail!("date range too short for interval '{}'", interval);
}
Ok(windows)
}
#[deprecated(since = "0.3.0", note = "use fetch_stac_band() instead")]
pub fn fetch_s2_band_from_stac(
catalog: &str,
bbox: &str,
collection: &str,
asset: &str,
datetime: &str,
max_scenes: usize,
_scl_asset: &str,
_scl_keep: &str,
align_to: Option<&surtgis_core::Raster<f64>>,
) -> Result<surtgis_core::Raster<f64>> {
fetch_stac_band(catalog, bbox, collection, asset, datetime, max_scenes, align_to)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_collection_profile_sentinel2() {
let profile = CollectionProfile::from_collection_name("sentinel-2-l2a").unwrap();
assert_eq!(profile.mask_asset_name(), Some("scl"));
assert_eq!(profile.description(), "Sentinel-2 L2A");
}
#[test]
fn test_collection_profile_landsat() {
let profile = CollectionProfile::from_collection_name("landsat-c2-l2").unwrap();
assert_eq!(profile.mask_asset_name(), Some("QA_PIXEL"));
assert_eq!(profile.description(), "Landsat C2 L2");
}
#[test]
fn test_collection_profile_sentinel1() {
let profile = CollectionProfile::from_collection_name("sentinel-1-rtc").unwrap();
assert_eq!(profile.mask_asset_name(), None);
assert_eq!(profile.description(), "Sentinel-1 RTC");
}
#[test]
fn test_collection_profile_unknown_falls_back() {
let profile = CollectionProfile::from_collection_name("esa-worldcover").unwrap();
assert_eq!(profile.mask_asset_name(), None); }
#[test]
fn test_collection_profile_debug() {
let s2_profile = CollectionProfile::from_collection_name("sentinel-2-l2a").unwrap();
let debug_str = format!("{:?}", s2_profile);
assert!(debug_str.contains("Sentinel2L2A"));
}
}
fn reproject_bbox_between_crs(bbox: &surtgis_cloud::BBox, from_epsg: u32, to_epsg: u32) -> surtgis_cloud::BBox {
#[cfg(feature = "projections")]
{
use proj4rs::Proj;
if let (Ok(src), Ok(dst)) = (
Proj::from_epsg_code(from_epsg as u16),
Proj::from_epsg_code(to_epsg as u16),
) {
let corners = [
(bbox.min_x, bbox.min_y),
(bbox.min_x, bbox.max_y),
(bbox.max_x, bbox.min_y),
(bbox.max_x, bbox.max_y),
];
let mut min_x = f64::MAX;
let mut min_y = f64::MAX;
let mut max_x = f64::MIN;
let mut max_y = f64::MIN;
for &(x, y) in &corners {
if let Ok((rx, ry)) = proj4rs::adaptors::transform_xy(&src, &dst, x, y) {
min_x = min_x.min(rx);
min_y = min_y.min(ry);
max_x = max_x.max(rx);
max_y = max_y.max(ry);
}
}
if min_x < max_x && min_y < max_y {
return surtgis_cloud::BBox::new(min_x, min_y, max_x, max_y);
}
}
}
let wgs84 = surtgis_cloud::reproject::reproject_bbox_from_utm(bbox, from_epsg);
surtgis_cloud::reproject::reproject_bbox_to_cog(&wgs84, to_epsg)
}
#[cfg(feature = "zarr")]
fn parse_time_step(s: &str) -> Result<surtgis_cloud::TimeReduction> {
use surtgis_cloud::{TimeReduction, TimeSelector};
match s.to_lowercase().as_str() {
"first" => Ok(TimeReduction::Single(TimeSelector::First)),
"last" => Ok(TimeReduction::Single(TimeSelector::Last)),
other => {
if let Ok(dt) = chrono::NaiveDate::parse_from_str(other, "%Y-%m-%d") {
let dt_utc = dt.and_hms_opt(0, 0, 0).unwrap().and_utc();
Ok(TimeReduction::Single(TimeSelector::Nearest(dt_utc)))
} else if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(other, "%Y-%m-%dT%H:%M:%S") {
Ok(TimeReduction::Single(TimeSelector::Nearest(dt.and_utc())))
} else {
if let Ok(idx) = other.parse::<usize>() {
Ok(TimeReduction::Single(TimeSelector::Index(idx)))
} else {
anyhow::bail!(
"Invalid --time-step: '{}'. Use 'first', 'last', an ISO date (2020-06-15), or an index (0)",
other
);
}
}
}
}
}
#[cfg(feature = "zarr")]
enum IntervalType {
Daily,
Monthly,
Yearly,
None,
}
#[cfg(feature = "zarr")]
fn parse_aggregate(s: &str) -> Result<(IntervalType, Option<surtgis_cloud::AggMethod>)> {
use surtgis_cloud::AggMethod;
match s.to_lowercase().replace('_', "-").as_str() {
"none" => Ok((IntervalType::None, None)),
"daily-sum" => Ok((IntervalType::Daily, Some(AggMethod::Sum))),
"daily-mean" => Ok((IntervalType::Daily, Some(AggMethod::Mean))),
"monthly-mean" => Ok((IntervalType::Monthly, Some(AggMethod::Mean))),
"monthly-sum" => Ok((IntervalType::Monthly, Some(AggMethod::Sum))),
"yearly-mean" => Ok((IntervalType::Yearly, Some(AggMethod::Mean))),
"yearly-sum" => Ok((IntervalType::Yearly, Some(AggMethod::Sum))),
other => anyhow::bail!(
"Invalid --aggregate: '{}'. Options: none, daily-sum, daily-mean, monthly-mean, monthly-sum, yearly-mean, yearly-sum",
other
),
}
}
#[cfg(feature = "zarr")]
fn parse_datetime_range(s: &str) -> Result<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>)> {
let parts: Vec<&str> = s.split('/').collect();
if parts.len() != 2 {
anyhow::bail!("--datetime must be a range: 'YYYY-MM-DD/YYYY-MM-DD'");
}
let start = chrono::NaiveDate::parse_from_str(parts[0].trim(), "%Y-%m-%d")
.context("Invalid start date")?
.and_hms_opt(0, 0, 0).unwrap().and_utc();
let end = chrono::NaiveDate::parse_from_str(parts[1].trim(), "%Y-%m-%d")
.context("Invalid end date")?
.and_hms_opt(23, 59, 59).unwrap().and_utc();
Ok((start, end))
}
#[cfg(feature = "zarr")]
fn generate_intervals(
start: chrono::DateTime<chrono::Utc>,
end: chrono::DateTime<chrono::Utc>,
interval: IntervalType,
) -> Vec<(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>, String)> {
use chrono::{Datelike, NaiveDate, TimeZone, Utc};
let mut intervals = Vec::new();
match interval {
IntervalType::None => {
let label = format!(
"{}_to_{}",
start.format("%Y-%m-%d"),
end.format("%Y-%m-%d")
);
intervals.push((start, end, label));
}
IntervalType::Daily => {
let mut day = start.date_naive();
let end_day = end.date_naive();
while day <= end_day {
let day_start = Utc.from_utc_datetime(
&day.and_hms_opt(0, 0, 0).unwrap(),
);
let day_end = Utc.from_utc_datetime(
&day.and_hms_opt(23, 59, 59).unwrap(),
);
let label = day.format("%Y-%m-%d").to_string();
intervals.push((day_start, day_end, label));
day = day.succ_opt().unwrap_or(day);
}
}
IntervalType::Monthly => {
let mut year = start.year();
let mut month = start.month();
while Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(year, month, 1).unwrap()
.and_hms_opt(0, 0, 0).unwrap(),
) <= end {
let month_start = Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(year, month, 1).unwrap()
.and_hms_opt(0, 0, 0).unwrap(),
);
let next_month = if month == 12 { (year + 1, 1) } else { (year, month + 1) };
let month_end = Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(next_month.0, next_month.1, 1).unwrap()
.and_hms_opt(0, 0, 0).unwrap(),
) - chrono::TimeDelta::seconds(1);
let label = format!("{:04}-{:02}", year, month);
intervals.push((month_start, month_end, label));
month += 1;
if month > 12 {
month = 1;
year += 1;
}
}
}
IntervalType::Yearly => {
let mut year = start.year();
while year <= end.year() {
let year_start = Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(year, 1, 1).unwrap()
.and_hms_opt(0, 0, 0).unwrap(),
);
let year_end = Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(year, 12, 31).unwrap()
.and_hms_opt(23, 59, 59).unwrap(),
);
let label = format!("{:04}", year);
intervals.push((year_start, year_end, label));
year += 1;
}
}
}
intervals
}