#![allow(clippy::uninlined_format_args)]
pub mod progress;
use crate::config::{CollectionCliArgs, CollectionConfigFile};
use crate::error::{CityJsonStacError, Result};
use crate::memory::{log_memory, memory_log_interval, memory_logging_enabled};
use crate::metadata::CRS;
use crate::reader::{get_reader_from_source, InputSource};
use crate::stac::{StacCollectionBuilder, StacItemBuilder};
use crate::traversal;
use clap::{Parser, Subcommand};
use progress::{
create_progress_bar, create_spinner, finish_spinner_err, finish_spinner_ok, print_banner,
print_error, print_info, print_success, print_warning, Summary,
};
use std::path::{Path, PathBuf};
#[derive(Parser)]
#[command(name = "citystac")]
#[command(author, version, about = "Generate STAC metadata for CityJSON datasets", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(short, long, global = true)]
verbose: bool,
#[arg(long, global = true)]
dry_run: bool,
}
#[derive(Subcommand)]
enum Commands {
Item {
input: String,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(long)]
id: Option<String>,
#[arg(long)]
title: Option<String>,
#[arg(short, long)]
description: Option<String>,
#[arg(short, long)]
collection: Option<String>,
#[arg(long)]
base_url: Option<String>,
#[arg(long, default_value_t = true)]
pretty: bool,
},
Collection {
#[arg(num_args = 0..)]
inputs: Vec<PathBuf>,
#[arg(short, long, default_value = "./stac_output")]
output: PathBuf,
#[arg(short = 'C', long)]
config: Option<PathBuf>,
#[arg(long)]
id: Option<String>,
#[arg(long)]
title: Option<String>,
#[arg(short, long)]
description: Option<String>,
#[arg(short, long, default_value = "proprietary")]
license: String,
#[arg(long)]
include: Vec<String>,
#[arg(long)]
exclude: Vec<String>,
#[arg(short, long, default_value_t = true)]
recursive: bool,
#[arg(long)]
max_depth: Option<usize>,
#[arg(long, default_value_t = true)]
skip_errors: bool,
#[arg(long)]
base_url: Option<String>,
#[arg(long, default_value_t = true)]
pretty: bool,
#[arg(long)]
overwrite_items: bool,
#[arg(long)]
overwrite_collection: bool,
#[arg(long)]
overwrite: bool,
#[arg(long)]
geoparquet: bool,
#[arg(long)]
concurrency: Option<usize>,
#[arg(long)]
max_item_links: Option<usize>,
},
#[command(visible_alias = "aggregate")]
UpdateCollection {
#[arg(required = true)]
items: Vec<PathBuf>,
#[arg(short, long, default_value = "./collection.json")]
output: PathBuf,
#[arg(short = 'C', long)]
config: Option<PathBuf>,
#[arg(long)]
id: Option<String>,
#[arg(long)]
title: Option<String>,
#[arg(short, long)]
description: Option<String>,
#[arg(short, long, default_value = "proprietary")]
license: String,
#[arg(long)]
items_base_url: Option<String>,
#[arg(long, default_value_t = true)]
skip_errors: bool,
#[arg(long, default_value_t = true)]
pretty: bool,
#[arg(long)]
geoparquet: bool,
#[arg(long)]
max_item_links: Option<usize>,
},
Catalog {
#[arg(num_args = 0..)]
inputs: Vec<PathBuf>,
#[arg(short, long, default_value = "./catalog")]
output: PathBuf,
#[arg(short = 'C', long)]
config: Option<PathBuf>,
#[arg(long)]
id: Option<String>,
#[arg(long)]
title: Option<String>,
#[arg(short, long)]
description: Option<String>,
#[arg(short, long, default_value = "proprietary")]
license: String,
#[arg(long)]
base_url: Option<String>,
#[arg(long, default_value_t = true)]
pretty: bool,
#[arg(long)]
overwrite_items: bool,
#[arg(long)]
overwrite_collections: bool,
#[arg(long)]
overwrite: bool,
#[arg(long)]
geoparquet: bool,
#[arg(long)]
concurrency: Option<usize>,
#[arg(long)]
max_item_links: Option<usize>,
},
}
fn make_geoparquet_asset() -> crate::stac::Asset {
let mut asset = crate::stac::Asset::new("./items.parquet");
asset.r#type = Some("application/vnd.apache.parquet".to_string());
asset.roles = vec!["collection-mirror".to_string()];
asset
}
pub async fn run() -> Result<()> {
let cli = Cli::parse();
if cli.verbose {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Debug)
.init();
} else {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Warn)
.init();
}
print_banner();
match cli.command {
Commands::Item {
input,
output,
id,
title,
description,
collection,
base_url,
pretty,
} => {
handle_item_command(
input,
output,
id,
title,
description,
collection,
base_url,
pretty,
cli.dry_run,
)
.await
}
Commands::Collection {
inputs,
output,
config,
id,
title,
description,
license,
include,
exclude,
recursive,
max_depth,
skip_errors,
base_url,
pretty,
overwrite_items,
overwrite_collection,
overwrite,
geoparquet,
concurrency,
max_item_links,
} => {
if inputs.is_empty() && config.is_none() {
eprintln!("Error: No inputs provided. Specify inputs via CLI arguments or in a config file.");
eprintln!("Usage: citystac collection [OPTIONS] <INPUTS>...");
eprintln!(" citystac collection --config <CONFIG_FILE>");
std::process::exit(1);
}
handle_collection_command(CollectionConfig {
inputs,
output,
config,
id,
title,
description,
license,
include,
exclude,
recursive,
max_depth,
skip_errors,
base_url,
pretty,
dry_run: cli.dry_run,
overwrite_items: overwrite_items || overwrite,
overwrite_collection: overwrite_collection || overwrite,
geoparquet,
concurrency,
max_item_links,
parent_href: None,
root_href: None,
})
.await
}
Commands::UpdateCollection {
items,
output,
config,
id,
title,
description,
license,
items_base_url,
skip_errors,
pretty,
geoparquet,
max_item_links,
} => handle_update_collection_command(UpdateCollectionConfig {
items,
output,
config,
id,
title,
description,
license,
items_base_url,
skip_errors,
pretty,
dry_run: cli.dry_run,
geoparquet,
max_item_links,
}),
Commands::Catalog {
inputs,
output,
config,
id,
title,
description,
license,
base_url,
pretty,
overwrite_items,
overwrite_collections,
overwrite,
geoparquet,
concurrency,
max_item_links,
} => {
handle_catalog_command(CatalogConfig {
inputs,
output,
config,
id,
title,
description,
license,
base_url,
pretty,
dry_run: cli.dry_run,
overwrite_items: overwrite_items || overwrite,
overwrite_collections: overwrite_collections || overwrite,
geoparquet,
concurrency,
max_item_links,
})
.await
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_item_command(
input: String,
output: Option<PathBuf>,
id: Option<String>,
title: Option<String>,
description: Option<String>,
collection: Option<String>,
base_url: Option<String>,
pretty: bool,
dry_run: bool,
) -> Result<()> {
if dry_run {
use crate::validation;
use progress::{print_banner, print_error, print_success};
print_banner();
println!("\nRunning in dry-run mode...\n");
let result = validation::validate_item_input(&input).await?;
println!();
if result.is_valid() {
print_success("Dry run complete: All validations passed");
std::process::exit(0);
} else {
print_error("Dry run failed: Errors found");
std::process::exit(result.exit_code());
}
}
let spinner = create_spinner(format!("Reading {input}…"));
let source = InputSource::from_str_input(&input)?;
let reader = match get_reader_from_source(&source).await {
Ok(r) => r,
Err(e) => {
finish_spinner_err(spinner, format!("Failed to read input: {e}"));
return Err(e);
}
};
finish_spinner_ok(
spinner,
format!("Loaded {} ({} format)", input, reader.encoding()),
);
let spinner = create_spinner("Building STAC Item…");
let original_url = match &source {
InputSource::Remote(url) => Some(url.as_str()),
InputSource::Local(_) => None,
};
let mut builder = StacItemBuilder::from_file(
reader.file_path(),
reader.as_ref(),
base_url.as_deref(),
original_url,
)?;
if let Some(custom_id) = id {
builder = StacItemBuilder::new(custom_id).cityjson_metadata(reader.as_ref())?;
if let Ok(bbox) = reader.bbox() {
let crs = reader.crs().unwrap_or_default();
let wgs84_bbox = bbox.to_wgs84(&crs)?;
builder = builder.bbox(wgs84_bbox).geometry_from_bbox();
}
}
if let Some(t) = title {
builder = builder.title(t);
}
if let Some(d) = description {
builder = builder.description(d);
}
if let Some(coll_id) = collection {
builder = builder
.collection_id(&coll_id)
.collection_link(format!("./{coll_id}.json"));
}
let output_path = output.unwrap_or_else(|| {
match source {
InputSource::Local(path) => {
let mut p = path.clone();
p.set_extension("item.json");
p
}
InputSource::Remote(url) => {
let filename = url
.split('/')
.next_back()
.and_then(|s| s.split('?').next())
.unwrap_or("remote.item.json");
PathBuf::from(format!("{}.json", filename.trim_end_matches(".json")))
}
}
});
let item = builder.build()?;
let json = if pretty {
serde_json::to_string_pretty(&item)?
} else {
serde_json::to_string(&item)?
};
std::fs::write(&output_path, json)?;
finish_spinner_ok(
spinner,
format!("Item written to {}", output_path.display()),
);
Ok(())
}
struct CatalogConfig {
inputs: Vec<PathBuf>,
output: PathBuf,
config: Option<PathBuf>,
id: Option<String>,
title: Option<String>,
description: Option<String>,
license: String,
base_url: Option<String>,
pretty: bool,
dry_run: bool,
overwrite_items: bool,
overwrite_collections: bool,
geoparquet: bool,
concurrency: Option<usize>,
max_item_links: Option<usize>,
}
fn sanitize_folder_name(name: &str) -> String {
name.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' || c == '_' || c == '.' {
c
} else {
'_'
}
})
.collect()
}
fn fallback_folder_name(path_str: &str) -> String {
std::path::Path::new(path_str)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("collection")
.to_string()
}
async fn handle_catalog_command(config: CatalogConfig) -> Result<()> {
use crate::config::{CatalogCliArgs, CatalogConfigFile};
use crate::stac::StacCatalogBuilder;
if config.dry_run {
use progress::{print_banner, print_error, print_success};
print_banner();
println!("\nRunning in dry-run mode...\n");
if let Some(config_path) = &config.config {
println!(" → Checking config file: {}", config_path.display());
match CatalogConfigFile::from_file(config_path) {
Ok(catalog_config) => {
println!(" ✓ Config file syntax: valid");
let mut semantic_errors = Vec::new();
if catalog_config.id.is_none()
|| catalog_config
.id
.as_ref()
.map(|s| s.trim())
.unwrap_or_default()
.is_empty()
{
semantic_errors.push("Missing required field: 'id'".to_string());
}
if catalog_config.title.is_none()
|| catalog_config
.title
.as_ref()
.map(|s| s.trim())
.unwrap_or_default()
.is_empty()
{
semantic_errors.push("Missing recommended field: 'title'".to_string());
}
if catalog_config.description.is_none()
|| catalog_config
.description
.as_ref()
.map(|s| s.trim())
.unwrap_or_default()
.is_empty()
{
semantic_errors
.push("Missing recommended field: 'description'".to_string());
}
if !semantic_errors.is_empty() {
for error in &semantic_errors {
println!(" ✗ {}", error);
}
println!();
print_error("Dry run failed: Config semantic errors");
std::process::exit(1);
}
println!(" ✓ Config file content: valid");
}
Err(e) => {
println!(" ✗ Config file syntax: {}", e);
println!();
print_error("Dry run failed: Config error");
std::process::exit(1);
}
}
}
let mut found = 0;
let mut missing = Vec::new();
for input in &config.inputs {
if input.exists() {
found += 1;
} else {
missing.push(input.clone());
}
}
if missing.is_empty() {
println!(" ✓ Input paths: {}/{} found", found, config.inputs.len());
} else {
println!(" ⚠ Input paths: {}/{} found", found, config.inputs.len());
for path in &missing {
println!(" ✗ {}", path.display());
}
}
println!();
if missing.is_empty() {
print_success("Dry run complete: All validations passed");
std::process::exit(0);
} else {
print_error("Dry run failed: Missing paths");
std::process::exit(2);
}
}
let base_config = if let Some(config_path) = &config.config {
CatalogConfigFile::from_file(config_path)?
} else {
CatalogConfigFile::default()
};
let merged_config = base_config.merge_with_cli(&CatalogCliArgs {
id: config.id.clone(),
title: config.title.clone(),
description: config.description.clone(),
base_url: config.base_url.clone(),
});
std::fs::create_dir_all(&config.output)?;
let mut collection_targets: Vec<(PathBuf, String)> = Vec::new();
for input in &config.inputs {
let id_hint = input
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("collection")
.to_string();
collection_targets.push((input.clone(), id_hint));
}
if let Some(config_collections) = merged_config.collections {
let base_dir = config
.config
.as_ref()
.and_then(|p| p.parent())
.unwrap_or_else(|| std::path::Path::new("."));
for coll_path_str in config_collections {
let path = base_dir.join(&coll_path_str);
let id_hint = if path.is_file() {
if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
if matches!(ext, "toml" | "yaml" | "yml") {
match CollectionConfigFile::from_file(&path) {
Ok(cfg) => {
if let Some(id) = cfg.id {
sanitize_folder_name(&id)
} else {
fallback_folder_name(&coll_path_str)
}
}
Err(_) => {
fallback_folder_name(&coll_path_str)
}
}
} else {
fallback_folder_name(&coll_path_str)
}
} else {
fallback_folder_name(&coll_path_str)
}
} else {
fallback_folder_name(&coll_path_str)
};
collection_targets.push((path, id_hint));
}
}
if collection_targets.is_empty() {
print_error("No collections provided. Specify input directories via CLI or 'collections' in config file.");
std::process::exit(1);
}
print_info(format!(
"Processing {} collection(s) for catalog",
collection_targets.len()
));
let total_collections = collection_targets.len() as u64;
let catalog_pb = create_progress_bar(total_collections, "Generating collections…");
let catalog_pb_arc = std::sync::Arc::new(catalog_pb);
let mut generated_collections: Vec<(String, String)> = Vec::new(); let mut catalog_errors: u64 = 0;
let config_output = config.output.clone();
let config_base_url = config.base_url.clone();
let config_license = config.license.clone();
let config_pretty = config.pretty;
let config_dry_run = config.dry_run;
let config_overwrite_items = config.overwrite_items;
let config_overwrite_collections = config.overwrite_collections;
let config_geoparquet = config.geoparquet;
let catalog_concurrency = config.concurrency.filter(|&n| n > 0).unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
});
use futures::stream::{self, StreamExt};
let mut result_stream = stream::iter(collection_targets)
.map(|(input_dir, id_hint)| {
let pb = catalog_pb_arc.clone();
let output = config_output.clone();
let base_url = config_base_url.clone();
let license = config_license.clone();
async move {
if !input_dir.exists() {
pb.println(format!(
" {} Directory not found, skipping: {}",
console::style("⚠").yellow(),
input_dir.display()
));
pb.inc(1);
return Err((
input_dir.display().to_string(),
"Directory not found".to_string(),
));
}
let collection_output_dir = output.join(&id_hint);
let mut collection_config = CollectionConfig {
inputs: Vec::new(),
output: collection_output_dir,
config: None,
id: Some(id_hint.clone()),
title: Some(format!("Collection from {}", id_hint)),
description: None,
license,
include: vec![],
exclude: vec![],
recursive: true,
max_depth: None,
skip_errors: true,
base_url: None,
pretty: config_pretty,
dry_run: config_dry_run,
overwrite_items: config_overwrite_items,
overwrite_collection: config_overwrite_collections,
geoparquet: config_geoparquet,
concurrency: config.concurrency,
max_item_links: config.max_item_links,
parent_href: Some("../catalog.json".to_string()),
root_href: Some("../catalog.json".to_string()),
};
if input_dir.is_file() {
if let Some(ext) = input_dir.extension().and_then(|e| e.to_str()) {
if matches!(ext, "toml" | "yaml" | "yml") {
pb.println(format!(
" {} Loading config: {}",
console::style("›").blue(),
input_dir.display()
));
collection_config.config = Some(input_dir.clone());
} else {
collection_config.inputs = vec![input_dir.clone()];
collection_config.base_url =
base_url.clone().map(|u| format!("{u}{id_hint}/"));
}
} else {
collection_config.inputs = vec![input_dir.clone()];
collection_config.base_url =
base_url.clone().map(|u| format!("{u}{id_hint}/"));
}
} else {
collection_config.inputs = vec![input_dir.clone()];
collection_config.base_url = base_url.clone().map(|u| format!("{u}{id_hint}/"));
}
pb.set_message(format!("Processing: {id_hint}"));
match process_collection_logic(collection_config).await {
Ok((_col_path, col_id, col_title)) => {
let relative_href = format!("./{}/collection.json", id_hint);
let href = if let Some(base) = &base_url {
let normalized_base = if base.ends_with('/') {
base.to_string()
} else {
format!("{base}/")
};
format!("{normalized_base}{id_hint}/collection.json")
} else {
relative_href
};
pb.println(format!(
" {} Collection ready: {}",
console::style("✓").green(),
col_title.clone().unwrap_or_else(|| col_id.clone())
));
pb.inc(1);
Ok((href, col_title.unwrap_or(col_id)))
}
Err(e) => {
pb.println(format!(
" {} Failed ({}): {}",
console::style("✗").red(),
input_dir.display(),
e
));
pb.inc(1);
Err((input_dir.display().to_string(), e.to_string()))
}
}
}
})
.buffer_unordered(catalog_concurrency);
while let Some(result) = result_stream.next().await {
match result {
Ok((href, title)) => {
generated_collections.push((href, title));
}
Err(_) => {
catalog_errors += 1;
}
}
}
catalog_pb_arc.finish_and_clear();
let catalog_id = merged_config.id.unwrap_or_else(|| {
config
.output
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("catalog")
.to_string()
});
let description = merged_config
.description
.unwrap_or_else(|| "Root catalog".to_string());
let mut catalog_builder = StacCatalogBuilder::new(catalog_id, description);
if let Some(t) = merged_config.title {
catalog_builder = catalog_builder.title(t);
}
let collection_count = generated_collections.len();
for (href, title) in generated_collections {
catalog_builder = catalog_builder.child_link(href, Some(title));
}
catalog_builder = catalog_builder
.self_link("./catalog.json")
.root_link("./catalog.json");
let catalog = catalog_builder.build();
let catalog_json = if config.pretty {
serde_json::to_string_pretty(&catalog)?
} else {
serde_json::to_string(&catalog)?
};
let catalog_path = config.output.join("catalog.json");
std::fs::write(&catalog_path, catalog_json)?;
Summary::new()
.add("Catalog", catalog_path.display().to_string())
.add("Collections", format!("{collection_count}"))
.add("Errors", format!("{catalog_errors}"))
.print();
print_success("Catalog generated successfully");
Ok(())
}
struct CollectionConfig {
inputs: Vec<PathBuf>,
output: PathBuf,
config: Option<PathBuf>,
id: Option<String>,
title: Option<String>,
description: Option<String>,
license: String,
include: Vec<String>,
exclude: Vec<String>,
recursive: bool,
max_depth: Option<usize>,
skip_errors: bool,
base_url: Option<String>,
pretty: bool,
dry_run: bool,
overwrite_items: bool,
overwrite_collection: bool,
geoparquet: bool,
concurrency: Option<usize>,
max_item_links: Option<usize>,
parent_href: Option<String>,
root_href: Option<String>,
}
async fn handle_collection_command(config: CollectionConfig) -> Result<()> {
if config.dry_run {
use crate::validation;
use progress::{print_banner, print_error, print_success};
print_banner();
println!("\nRunning in dry-run mode...\n");
let base_config = if let Some(config_path) = &config.config {
let _base_config = CollectionConfigFile::from_file(config_path)?;
validation::validate_collection_config(
&Some(config_path.clone()),
&config.inputs,
&config.base_url,
)
.await?
} else {
validation::validate_collection_config(&None, &config.inputs, &config.base_url).await?
};
println!();
if base_config.is_valid() {
print_success("Dry run complete: All validations passed");
std::process::exit(0);
} else {
print_error("Dry run failed: Errors found");
std::process::exit(base_config.exit_code());
}
}
match process_collection_logic(config).await {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
async fn process_collection_logic(
config: CollectionConfig,
) -> Result<(PathBuf, String, Option<String>)> {
use crate::stac::{CollectionAccumulator, ItemMetadata};
let base_config = if let Some(config_path) = &config.config {
CollectionConfigFile::from_file(config_path)?
} else {
CollectionConfigFile::default()
};
let merged_config = base_config.merge_with_cli(&CollectionCliArgs {
id: config.id.clone(),
title: config.title.clone(),
description: config.description.clone(),
license: if config.license != "proprietary" {
Some(config.license.clone())
} else {
None
},
base_url: config.base_url.clone(),
});
let final_inputs = if !config.inputs.is_empty() {
config.inputs.clone()
} else if let Some(config_inputs) = merged_config.inputs {
let config_dir = config
.config
.as_ref()
.and_then(|p| p.parent())
.unwrap_or(Path::new("."));
let resolved_inputs = config_inputs.resolve(config_dir)?;
resolved_inputs
.iter()
.map(|s| PathBuf::from(s.as_str()))
.collect()
} else {
Vec::new()
};
let crs_override: Option<CRS> = merged_config
.extent
.as_ref()
.and_then(|e| e.spatial.as_ref())
.and_then(|s| s.crs.as_ref())
.and_then(|crs_str| CRS::from_citygml_srs_name(crs_str));
let collection_id = merged_config.id.clone().unwrap_or_else(|| {
final_inputs
.first()
.and_then(|p| p.file_name().and_then(|n| n.to_str()))
.unwrap_or("collection")
.to_string()
});
let mut sources: Vec<InputSource> = Vec::new();
let mut local_search_paths: Vec<PathBuf> = Vec::new();
for input in &final_inputs {
let input_str = input.to_string_lossy();
if crate::remote::is_remote_url(&input_str) {
sources.push(InputSource::Remote(input_str.to_string()));
} else {
local_search_paths.push(input.clone());
}
}
log::info!(
"Scanning {} local path(s) and {} remote URL(s)",
local_search_paths.len(),
sources.len()
);
if !local_search_paths.is_empty() {
let files = traversal::find_files_with_patterns(
&local_search_paths,
&config.include,
&config.exclude,
config.recursive,
config.max_depth,
)?;
for file in files {
sources.push(InputSource::Local(file));
}
}
let config_only = sources.is_empty();
if config_only {
let has_config_bbox = merged_config
.extent
.as_ref()
.and_then(|e| e.spatial.as_ref())
.and_then(|s| s.bbox.as_ref())
.is_some_and(|bbox| !bbox.is_empty());
if !has_config_bbox {
return Err(crate::error::CityJsonStacError::StacError(
"No input files found and no spatial extent (bbox) in config. \
For collection-only mode, provide extent.spatial.bbox in the config file."
.to_string(),
));
}
print_info("Config-only mode: generating collection from config metadata (no items)");
} else {
print_info(format!("Found {} input source(s)", sources.len()));
}
log_memory(format!(
"collection-start id={} sources={}",
collection_id,
sources.len()
));
let mut stem_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for source in &sources {
let filename = match source {
InputSource::Local(p) => p
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string(),
InputSource::Remote(u) => crate::remote::url_filename(u),
};
let path = PathBuf::from(&filename);
let stem = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
*stem_counts.entry(stem.to_string()).or_insert(0) += 1;
}
std::fs::create_dir_all(&config.output)?;
let items_dir = config.output.join("items");
if !config_only {
std::fs::create_dir_all(&items_dir)?;
}
let mut accumulator = CollectionAccumulator::new(config.max_item_links);
let memory_log_every = memory_log_interval(1000);
let pb = create_progress_bar(sources.len() as u64, "Processing files…");
let pb_arc = std::sync::Arc::new(pb);
let items_dir_arc = std::sync::Arc::new(items_dir.clone());
let base_url_arc = std::sync::Arc::new(config.base_url.clone());
let collection_id_arc = std::sync::Arc::new(collection_id.clone());
let crs_override_arc = std::sync::Arc::new(crs_override.clone());
let stem_counts_arc = std::sync::Arc::new(stem_counts);
enum ItemResult {
Success {
metadata: ItemMetadata,
item_href: String,
title: Option<String>,
},
Error { source: String, error: String },
Fatal(CityJsonStacError),
}
let concurrency_limit = config.concurrency.filter(|&n| n > 0).unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
});
use futures::stream::{self, StreamExt};
let skip_errors = config.skip_errors;
let pretty = config.pretty;
let overwrite_items = config.overwrite_items;
let mut result_stream = stream::iter(sources)
.map(|source| {
let pb = pb_arc.clone();
let items_dir = items_dir_arc.clone();
let base_url = base_url_arc.clone();
let collection_id = collection_id_arc.clone();
let crs_override = crs_override_arc.clone();
let stem_counts = stem_counts_arc.clone();
async move {
let source_desc = match &source {
InputSource::Local(p) => p.display().to_string(),
InputSource::Remote(u) => u.clone(),
};
let short_desc = source_desc
.split(['/', '\\'])
.next_back()
.unwrap_or(&source_desc)
.to_string();
pb.set_message(format!("Processing: {short_desc}"));
let reader = match get_reader_from_source(&source).await {
Ok(r) => r,
Err(e) => {
if skip_errors {
pb.println(format!(
" {} Skipping {short_desc}: {e}",
console::style("⚠").yellow()
));
pb.inc(1);
return ItemResult::Error {
source: source_desc,
error: e.to_string(),
};
} else {
pb.inc(1);
return ItemResult::Fatal(e);
}
}
};
let file_path = reader.file_path();
let stem = file_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
let has_collision = stem_counts.get(stem).is_some_and(|&count| count > 1);
let item_id = if has_collision {
let encoding = reader.encoding();
let suffix = match encoding {
"CityJSON" => "_cj",
"CityJSONSeq" => "_cjseq",
"FlatCityBuf" => "_fcb",
_ => "",
};
format!("{}{}", stem, suffix)
} else {
stem.to_string()
};
let item_filename = format!("{item_id}_item.json");
let item_path = items_dir.join(&item_filename);
if item_path.exists() && !overwrite_items {
pb.println(format!(
" {} Skipping existing: {}",
console::style("⚠").yellow(),
item_filename
));
match ItemMetadata::from_file(&item_path) {
Ok(metadata) => {
let item_href = format!("./items/{item_filename}");
let title = file_path
.file_name()
.and_then(|n| n.to_str())
.map(String::from);
pb.inc(1);
return ItemResult::Success {
metadata,
item_href,
title,
};
}
Err(e) => {
if skip_errors {
pb.println(format!(
" {} Failed to read existing item: {e}",
console::style("✗").red()
));
pb.inc(1);
return ItemResult::Error {
source: item_filename,
error: e,
};
} else {
pb.inc(1);
return ItemResult::Fatal(CityJsonStacError::StacError(format!(
"Failed to read existing item {}: {}",
item_path.display(),
e
)));
}
}
}
}
let original_url = match &source {
InputSource::Remote(url) => Some(url.clone()),
InputSource::Local(_) => None,
};
let builder_result = if has_collision {
StacItemBuilder::from_file_with_format_suffix_and_crs(
file_path,
reader.as_ref(),
base_url.as_deref(),
original_url.as_deref(),
(*crs_override).as_ref(),
)
} else {
StacItemBuilder::from_file_with_crs_override(
file_path,
reader.as_ref(),
base_url.as_deref(),
original_url.as_deref(),
(*crs_override).as_ref(),
)
};
match builder_result {
Ok(builder) => match builder
.collection_id(&*collection_id)
.collection_link("../collection.json")
.build()
{
Ok(item) => {
let metadata = ItemMetadata::from_item(&item);
let item_id = item.id.clone();
let json = if pretty {
serde_json::to_string_pretty(&item)
} else {
serde_json::to_string(&item)
};
match json {
Ok(json) => {
let item_filename = format!("{item_id}_item.json");
let item_path = items_dir.join(&item_filename);
if let Err(e) = tokio::fs::write(&item_path, &json).await {
if skip_errors {
pb.println(format!(
" {} Skipping {short_desc}: {e}",
console::style("⚠").yellow()
));
pb.inc(1);
return ItemResult::Error {
source: source_desc,
error: e.to_string(),
};
} else {
pb.inc(1);
return ItemResult::Fatal(CityJsonStacError::IoError(
e,
));
}
}
let item_href = format!("./items/{item_filename}");
let title = file_path
.file_name()
.and_then(|n| n.to_str())
.map(String::from);
pb.inc(1);
ItemResult::Success {
metadata,
item_href,
title,
}
}
Err(e) => {
if skip_errors {
pb.println(format!(
" {} Skipping {short_desc}: {e}",
console::style("⚠").yellow()
));
pb.inc(1);
ItemResult::Error {
source: source_desc,
error: e.to_string(),
}
} else {
pb.inc(1);
ItemResult::Fatal(CityJsonStacError::JsonError(e))
}
}
}
}
Err(e) => {
if skip_errors {
pb.println(format!(
" {} Skipping {short_desc}: {e}",
console::style("⚠").yellow()
));
pb.inc(1);
ItemResult::Error {
source: source_desc,
error: e.to_string(),
}
} else {
pb.inc(1);
ItemResult::Fatal(e)
}
}
},
Err(e) => {
if skip_errors {
pb.println(format!(
" {} Skipping {short_desc}: {e}",
console::style("⚠").yellow()
));
pb.inc(1);
ItemResult::Error {
source: source_desc,
error: e.to_string(),
}
} else {
pb.inc(1);
ItemResult::Fatal(e)
}
}
}
}
})
.buffer_unordered(concurrency_limit);
while let Some(result) = result_stream.next().await {
match result {
ItemResult::Success {
metadata,
item_href,
title,
} => {
accumulator.add_item(metadata, item_href, title);
if memory_logging_enabled()
&& accumulator.successful_count() % memory_log_every == 0
{
log_memory(format!(
"collection-progress processed={} errors={}",
accumulator.successful_count(),
accumulator.error_count()
));
}
}
ItemResult::Error { source, error } => {
accumulator.add_error(source, error);
if memory_logging_enabled() && accumulator.error_count() % memory_log_every == 0 {
log_memory(format!(
"collection-errors processed={} errors={}",
accumulator.successful_count(),
accumulator.error_count()
));
}
}
ItemResult::Fatal(e) => {
return Err(e);
}
}
}
pb_arc.finish_and_clear();
log_memory(format!(
"collection-items-finished processed={} errors={}",
accumulator.successful_count(),
accumulator.error_count()
));
let collection_path = config.output.join("collection.json");
if collection_path.exists() && !config.overwrite_collection {
print_warning(
"Collection file already exists, skipping (use --overwrite-collection to regenerate)",
);
if config.geoparquet {
let mut items_for_parquet: Vec<crate::stac::StacItem> = Vec::new();
let spinner = create_spinner("Reading existing items for GeoParquet…");
for entry in std::fs::read_dir(&items_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("json") {
if let Ok(content) = std::fs::read_to_string(&path) {
if let Ok(item) = serde_json::from_str::<crate::stac::StacItem>(&content) {
items_for_parquet.push(item);
}
}
}
}
finish_spinner_ok(
spinner,
format!("Read {} item(s) from disk", items_for_parquet.len()),
);
if !items_for_parquet.is_empty() {
let collection_content = std::fs::read_to_string(&collection_path)?;
let mut collection: crate::stac::StacCollection =
serde_json::from_str(&collection_content)?;
collection
.assets
.entry("items-geoparquet".to_string())
.or_insert_with(make_geoparquet_asset);
let updated_json = if config.pretty {
serde_json::to_string_pretty(&collection)?
} else {
serde_json::to_string(&collection)?
};
std::fs::write(&collection_path, &updated_json)?;
let parquet_path = config.output.join("items.parquet");
let spinner = create_spinner("Writing GeoParquet…");
crate::stac::geoparquet::write_geoparquet(
&items_for_parquet,
&collection,
&parquet_path,
)?;
finish_spinner_ok(
spinner,
format!(
"GeoParquet written: {} ({} items)",
parquet_path.display(),
items_for_parquet.len()
),
);
}
}
return Ok((collection_path, collection_id, merged_config.title));
}
if accumulator.has_errors() {
print_error(format!(
"Collection generation failed: {} item(s) had errors",
accumulator.error_count()
));
for (source, error) in &accumulator.errors {
eprintln!(" {} {}: {}", console::style("✗").red(), source, error);
}
return Err(CityJsonStacError::StacError(format!(
"{} item(s) failed to process",
accumulator.error_count()
)));
}
let license = merged_config
.license
.clone()
.unwrap_or_else(|| config.license.clone());
let mut collection_builder = StacCollectionBuilder::new(&collection_id).license(license);
if let Some(temporal) = merged_config
.extent
.as_ref()
.and_then(|e| e.temporal.as_ref())
{
let start = temporal
.start
.as_ref()
.and_then(|s| s.parse::<chrono::DateTime<chrono::Utc>>().ok());
let end = temporal
.end
.as_ref()
.and_then(|s| s.parse::<chrono::DateTime<chrono::Utc>>().ok());
collection_builder = collection_builder.temporal_extent(start, end);
} else {
collection_builder = collection_builder.temporal_extent(Some(chrono::Utc::now()), None);
}
if !config_only {
collection_builder = collection_builder.aggregate_from_summaries(&accumulator.summaries)?;
} else {
if let Some(bbox) = merged_config
.extent
.as_ref()
.and_then(|e| e.spatial.as_ref())
.and_then(|s| s.bbox.as_ref())
{
let bbox3d = if bbox.len() == 6 {
crate::metadata::BBox3D::new(bbox[0], bbox[1], bbox[2], bbox[3], bbox[4], bbox[5])
} else if bbox.len() >= 4 {
crate::metadata::BBox3D::new(bbox[0], bbox[1], 0.0, bbox[2], bbox[3], 0.0)
} else {
return Err(CityJsonStacError::StacError(
"Config bbox must have 4 or 6 elements".to_string(),
));
};
let crs = crs_override.clone().unwrap_or_default();
let wgs84_bbox = bbox3d.to_wgs84(&crs)?;
collection_builder = collection_builder.spatial_extent(wgs84_bbox);
}
}
if let Some(t) = &merged_config.title {
collection_builder = collection_builder.title(t.clone());
}
if let Some(d) = &merged_config.description {
collection_builder = collection_builder.description(d.clone());
}
if let Some(keywords) = &merged_config.keywords {
collection_builder = collection_builder.keywords(keywords.clone());
}
if let Some(providers) = &merged_config.providers {
for provider in providers {
collection_builder = collection_builder.provider(provider.clone().into());
}
}
if let Some(summaries) = &merged_config.summaries {
for (key, value) in summaries {
collection_builder = collection_builder.summary(key.clone(), value.clone());
}
}
if let Some(links) = &merged_config.links {
for link_cfg in links {
let mut link = stac::Link::new(&link_cfg.href, &link_cfg.rel);
link.r#type = link_cfg.link_type.clone();
link.title = link_cfg.title.clone();
collection_builder = collection_builder.link(link);
}
}
if let Some(assets) = &merged_config.assets {
for (key, asset_cfg) in assets {
let mut asset = stac::Asset::new(&asset_cfg.href);
asset.r#type = asset_cfg.media_type.clone();
asset.title = asset_cfg.title.clone();
asset.description = asset_cfg.description.clone();
if let Some(roles) = &asset_cfg.roles {
asset.roles = roles.clone();
}
collection_builder = collection_builder.asset(key.clone(), asset);
}
}
for (href, title) in &accumulator.item_links {
collection_builder = collection_builder.item_link(href.clone(), title.clone());
}
if accumulator.omitted_item_links() > 0 {
print_warning(format!(
"Omitted {} item link(s) from collection.json due to --max-item-links limit",
accumulator.omitted_item_links()
));
}
collection_builder = collection_builder.self_link("./collection.json");
if let Some(parent_href) = &config.parent_href {
collection_builder = collection_builder.parent_link(parent_href);
}
if let Some(root_href) = &config.root_href {
collection_builder = collection_builder.root_link(root_href);
}
if config.geoparquet {
collection_builder = collection_builder.asset("items-geoparquet", make_geoparquet_asset());
}
let collection = collection_builder.build()?;
let collection_json = if config.pretty {
serde_json::to_string_pretty(&collection)?
} else {
serde_json::to_string(&collection)?
};
log_memory("collection-before-write-json");
std::fs::write(&collection_path, &collection_json)?;
log_memory("collection-after-write-json");
let mut geoparquet_item_count = 0;
if config.geoparquet {
log_memory("geoparquet-read-start");
let spinner = create_spinner("Reading items from disk for GeoParquet…");
let mut geoparquet_items: Vec<crate::stac::StacItem> = Vec::new();
for entry in std::fs::read_dir(&items_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("json") {
if let Ok(content) = std::fs::read_to_string(&path) {
if let Ok(item) = serde_json::from_str::<crate::stac::StacItem>(&content) {
geoparquet_items.push(item);
}
}
}
}
finish_spinner_ok(
spinner,
format!("Read {} item(s) from disk", geoparquet_items.len()),
);
if !geoparquet_items.is_empty() {
geoparquet_item_count = geoparquet_items.len();
let parquet_path = config.output.join("items.parquet");
let spinner = create_spinner("Writing GeoParquet…");
log_memory(format!(
"geoparquet-write-start items={}",
geoparquet_items.len()
));
crate::stac::geoparquet::write_geoparquet(
&geoparquet_items,
&collection,
&parquet_path,
)?;
log_memory("geoparquet-write-finished");
finish_spinner_ok(
spinner,
format!(
"GeoParquet written: {} ({} items)",
parquet_path.display(),
geoparquet_items.len()
),
);
}
}
let mut summary = Summary::new()
.add("Collection", collection_path.display().to_string())
.add("Items dir", items_dir.display().to_string())
.add(
"Items generated",
format!("{}", accumulator.successful_count()),
);
if accumulator.omitted_item_links() > 0 {
summary = summary.add(
"Item links omitted",
format!("{}", accumulator.omitted_item_links()),
);
}
if config.geoparquet && geoparquet_item_count > 0 {
summary = summary.add(
"GeoParquet",
config.output.join("items.parquet").display().to_string(),
);
}
summary.print();
print_success("Collection generated successfully");
Ok((collection_path, collection_id, merged_config.title))
}
struct UpdateCollectionConfig {
items: Vec<PathBuf>,
output: PathBuf,
config: Option<PathBuf>,
id: Option<String>,
title: Option<String>,
description: Option<String>,
license: String,
items_base_url: Option<String>,
skip_errors: bool,
pretty: bool,
dry_run: bool,
geoparquet: bool,
max_item_links: Option<usize>,
}
fn handle_update_collection_command(config: UpdateCollectionConfig) -> Result<()> {
if config.dry_run {
use progress::{print_banner, print_error, print_success};
print_banner();
println!("\nRunning in dry-run mode...\n");
let mut all_valid = true;
let mut found = 0;
for item_path in &config.items {
let fname = item_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
if item_path.exists() {
match std::fs::read_to_string(item_path) {
Ok(content) => match serde_json::from_str::<crate::stac::StacItem>(&content) {
Ok(_) => {
println!(" ✓ {}", fname);
found += 1;
}
Err(e) => {
println!(" ✗ {}: Invalid STAC item - {}", fname, e);
all_valid = false;
}
},
Err(e) => {
println!(" ✗ {}: Cannot read - {}", fname, e);
all_valid = false;
}
}
} else {
println!(" ✗ {}: File not found", fname);
all_valid = false;
}
}
println!("\n STAC items: {}/{} valid", found, config.items.len());
println!();
if all_valid {
print_success("Dry run complete: All validations passed");
std::process::exit(0);
} else {
print_error("Dry run failed: Errors found");
std::process::exit(1);
}
}
let base_config = if let Some(config_path) = &config.config {
CollectionConfigFile::from_file(config_path)?
} else {
CollectionConfigFile::default()
};
let merged_config = base_config.merge_with_cli(&CollectionCliArgs {
id: config.id.clone(),
title: config.title.clone(),
description: config.description.clone(),
license: if config.license != "proprietary" {
Some(config.license.clone())
} else {
None
},
base_url: None, });
log::info!(
"Aggregating {} STAC items into collection",
config.items.len()
);
if config.items.is_empty() {
return Err(crate::error::CityJsonStacError::StacError(
"No STAC item files provided".to_string(),
));
}
let mut parsed_items: Vec<crate::stac::StacItem> = Vec::new();
let mut errors: Vec<(PathBuf, String)> = Vec::new();
let pb = create_progress_bar(config.items.len() as u64, "Parsing STAC items…");
for item_path in &config.items {
let fname = item_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unknown");
pb.set_message(format!("Parsing: {fname}"));
match std::fs::read_to_string(item_path) {
Ok(content) => match serde_json::from_str::<crate::stac::StacItem>(&content) {
Ok(item) => {
parsed_items.push(item);
}
Err(e) => {
if config.skip_errors {
errors.push((item_path.clone(), e.to_string()));
pb.println(format!(
" {} Skipping {fname}: {e}",
console::style("⚠").yellow()
));
} else {
pb.finish_and_clear();
return Err(crate::error::CityJsonStacError::JsonError(e));
}
}
},
Err(e) => {
if config.skip_errors {
errors.push((item_path.clone(), e.to_string()));
pb.println(format!(
" {} Skipping {fname}: {e}",
console::style("⚠").yellow()
));
} else {
pb.finish_and_clear();
return Err(crate::error::CityJsonStacError::IoError(e));
}
}
}
pb.inc(1);
}
pb.finish_and_clear();
if parsed_items.is_empty() {
return Err(crate::error::CityJsonStacError::StacError(
"No valid STAC items could be parsed".to_string(),
));
}
let collection_id = merged_config.id.unwrap_or_else(|| {
config
.output
.file_stem()
.and_then(|n| n.to_str())
.unwrap_or("collection")
.to_string()
});
let license = merged_config
.license
.unwrap_or_else(|| config.license.clone());
let mut collection_builder = StacCollectionBuilder::new(&collection_id)
.license(license)
.temporal_extent(Some(chrono::Utc::now()), None)
.aggregate_from_items(&parsed_items)?;
if let Some(t) = merged_config.title {
collection_builder = collection_builder.title(t);
}
if let Some(d) = merged_config.description {
collection_builder = collection_builder.description(d);
}
if let Some(keywords) = merged_config.keywords {
collection_builder = collection_builder.keywords(keywords);
}
if let Some(providers) = merged_config.providers {
for provider in providers {
collection_builder = collection_builder.provider(provider.into());
}
}
let max_item_links = config.max_item_links.unwrap_or(usize::MAX);
let mut omitted_item_links = 0usize;
for (idx, (item_path, item)) in config.items.iter().zip(parsed_items.iter()).enumerate() {
if idx >= max_item_links {
omitted_item_links += 1;
continue;
}
let fallback_filename = format!("{}.json", item.id);
let item_filename = item_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or(&fallback_filename);
let href = match &config.items_base_url {
Some(base) => {
let normalized_base = if base.ends_with('/') {
base.to_string()
} else {
format!("{base}/")
};
format!("{normalized_base}{item_filename}")
}
None => {
format!("./{item_filename}")
}
};
collection_builder = collection_builder.item_link(href, Some(item.id.clone()));
}
if omitted_item_links > 0 {
print_warning(format!(
"Omitted {} item link(s) from collection.json due to --max-item-links limit",
omitted_item_links
));
}
collection_builder = collection_builder.self_link("./collection.json");
if config.geoparquet && !parsed_items.is_empty() {
collection_builder = collection_builder.asset("items-geoparquet", make_geoparquet_asset());
}
let collection = collection_builder.build()?;
let collection_json = if config.pretty {
serde_json::to_string_pretty(&collection)?
} else {
serde_json::to_string(&collection)?
};
if let Some(parent) = config.output.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
std::fs::write(&config.output, &collection_json)?;
if config.geoparquet && !parsed_items.is_empty() {
let parquet_path = config
.output
.parent()
.unwrap_or_else(|| Path::new("."))
.join("items.parquet");
let spinner = create_spinner("Writing GeoParquet…");
crate::stac::geoparquet::write_geoparquet(&parsed_items, &collection, &parquet_path)?;
finish_spinner_ok(
spinner,
format!(
"GeoParquet written: {} ({} items)",
parquet_path.display(),
parsed_items.len()
),
);
}
let mut summary = Summary::new()
.add("Collection", config.output.display().to_string())
.add("Items aggregated", format!("{}", parsed_items.len()));
if omitted_item_links > 0 {
summary = summary.add("Item links omitted", format!("{omitted_item_links}"));
}
if !errors.is_empty() {
summary = summary.add("Skipped", format!("{} item(s)", errors.len()));
}
if config.geoparquet && !parsed_items.is_empty() {
let parquet_path = config
.output
.parent()
.unwrap_or_else(|| Path::new("."))
.join("items.parquet");
summary = summary.add("GeoParquet", parquet_path.display().to_string());
}
summary.print();
if errors.is_empty() {
print_success("Collection updated successfully");
} else {
print_warning(format!(
"Collection updated with {} skipped item(s)",
errors.len()
));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitize_folder_name_basic() {
assert_eq!(sanitize_folder_name("my-collection"), "my-collection");
assert_eq!(sanitize_folder_name("my_collection"), "my_collection");
assert_eq!(sanitize_folder_name("my.collection"), "my.collection");
assert_eq!(sanitize_folder_name("collection123"), "collection123");
}
#[test]
fn test_sanitize_folder_name_spaces() {
assert_eq!(sanitize_folder_name("my collection"), "my_collection");
assert_eq!(sanitize_folder_name("my collection"), "my__collection");
}
#[test]
fn test_sanitize_folder_name_special_chars() {
assert_eq!(sanitize_folder_name("my@collection"), "my_collection");
assert_eq!(sanitize_folder_name("my/collection"), "my_collection");
assert_eq!(sanitize_folder_name("my\\collection"), "my_collection");
assert_eq!(sanitize_folder_name("my:collection"), "my_collection");
assert_eq!(sanitize_folder_name("my*collection"), "my_collection");
assert_eq!(sanitize_folder_name("my?collection"), "my_collection");
assert_eq!(sanitize_folder_name("my<collection"), "my_collection");
assert_eq!(sanitize_folder_name("my>collection"), "my_collection");
assert_eq!(sanitize_folder_name("my|collection"), "my_collection");
}
#[test]
fn test_sanitize_folder_name_unicode() {
assert_eq!(sanitize_folder_name("münchen"), "münchen");
assert_eq!(sanitize_folder_name("東京"), "東京");
assert_eq!(sanitize_folder_name("hello★world"), "hello_world");
}
#[test]
fn test_sanitize_folder_name_mixed() {
assert_eq!(
sanitize_folder_name("my awesome collection!"),
"my_awesome_collection_"
);
assert_eq!(
sanitize_folder_name("collection (v1.0)"),
"collection__v1.0_"
);
}
#[test]
fn test_fallback_folder_name() {
assert_eq!(fallback_folder_name("path/to/config.yaml"), "config.yaml");
assert_eq!(
fallback_folder_name("./opendata/vienna-config.yaml"),
"vienna-config.yaml"
);
assert_eq!(fallback_folder_name("config"), "config");
}
}