1#![deny(unused_crate_dependencies)]
4
5use anyhow::{Error, Result, anyhow};
6use async_stream::try_stream;
7use clap::{CommandFactory, Parser, Subcommand};
8use futures_core::TryStream;
9use futures_util::{TryStreamExt, pin_mut};
10use stac::api::{GetItems, GetSearch, Search};
11use stac::{
12 Assets, Collection, Item, Links, Migrate, SelfHref,
13 geoparquet::{Compression, default_compression},
14};
15use stac_io::{Format, StacStore};
16use stac_server::Backend;
17use stac_validate::Validate;
18use std::{
19 collections::{HashMap, VecDeque},
20 io::Write,
21 str::FromStr,
22};
23use tokio::{io::AsyncReadExt, net::TcpListener, task::JoinSet};
24use tracing::metadata::Level;
25use tracing_indicatif::IndicatifLayer;
26use tracing_subscriber::{
27 fmt::writer::MakeWriterExt, layer::SubscriberExt, util::SubscriberInitExt,
28};
29use url::Url;
30
31const DEFAULT_COLLECTION_ID: &str = "default-collection-id";
32
33#[derive(Debug, Parser)]
35pub struct Rustac {
36 #[command(subcommand)]
37 command: Command,
38
39 #[arg(
48 short = 'i',
49 long = "input-format",
50 global = true,
51 verbatim_doc_comment
52 )]
53 input_format: Option<Format>,
54
55 #[arg(long = "opt", global = true, verbatim_doc_comment)]
59 options: Vec<KeyValue>,
60
61 #[arg(
70 short = 'o',
71 long = "output-format",
72 global = true,
73 verbatim_doc_comment
74 )]
75 output_format: Option<Format>,
76
77 #[arg(short = 'c', long = "compact-json", global = true)]
82 compact_json: Option<bool>,
83
84 #[arg(long = "parquet-compression", global = true, verbatim_doc_comment)]
99 parquet_compression: Option<Compression>,
100
101 #[arg(
107 long = "parquet-max-row-group-size",
108 global = true,
109 verbatim_doc_comment
110 )]
111 parquet_max_row_group_size: Option<usize>,
112
113 #[arg(
114 long,
115 short = 'v',
116 action = clap::ArgAction::Count,
117 global = true,
118 help = ErrorLevel::verbose_help(),
119 long_help = ErrorLevel::verbose_long_help(),
120 )]
121 verbose: u8,
122
123 #[arg(
124 long,
125 short = 'q',
126 action = clap::ArgAction::Count,
127 global = true,
128 help = ErrorLevel::quiet_help(),
129 long_help = ErrorLevel::quiet_long_help(),
130 conflicts_with = "verbose",
131 )]
132 quiet: u8,
133}
134
135#[derive(Debug, Subcommand)]
137#[allow(clippy::large_enum_variant)]
138pub enum Command {
139 Translate {
141 infile: Option<String>,
145
146 outfile: Option<String>,
150
151 #[arg(long = "migrate", default_value_t = false)]
156 migrate: bool,
157
158 #[arg(long = "to")]
163 to: Option<String>,
164 },
165
166 Search {
168 href: String,
170
171 outfile: Option<String>,
175
176 #[arg(long = "use-duckdb")]
182 use_duckdb: Option<bool>,
183
184 #[arg(short = 'n', long = "max-items")]
186 max_items: Option<usize>,
187
188 #[arg(long = "intersects")]
192 intersects: Option<String>,
193
194 #[arg(long = "ids")]
196 ids: Option<String>,
197
198 #[arg(long = "collections")]
200 collections: Option<String>,
201
202 #[arg(long = "bbox")]
204 bbox: Option<String>,
205
206 #[arg(long = "datetime")]
211 datetime: Option<String>,
212
213 #[arg(long = "fields")]
215 fields: Option<String>,
216
217 #[arg(long = "sortby")]
219 sortby: Option<String>,
220
221 #[arg(long = "filter")]
223 filter: Option<String>,
224
225 #[arg(long = "limit")]
227 limit: Option<String>,
228 },
229
230 Serve {
232 hrefs: Vec<String>,
234
235 #[arg(short = 'a', long = "addr", default_value = "127.0.0.1:7822")]
238 addr: String,
239
240 #[arg(short = 'b', long = "bind")]
242 bind: Option<String>,
243
244 #[arg(long = "pgstac")]
248 pgstac: Option<String>,
249
250 #[arg(long = "use-duckdb")]
256 use_duckdb: Option<bool>,
257
258 #[arg(long = "load-collection-items", default_value_t = true)]
260 load_collection_items: bool,
261
262 #[arg(long, default_value_t = true)]
264 create_collections: bool,
265 },
266
267 Crawl {
271 href: String,
273
274 directory: String,
278 },
279
280 Validate {
285 infile: Option<String>,
289 },
290
291 GenerateCompletions {
293 shell: clap_complete::Shell,
295 },
296}
297
298#[derive(Debug)]
299#[allow(dead_code, clippy::large_enum_variant)]
300enum Value {
301 Stac(stac::Value),
302 Json(serde_json::Value),
303}
304
305#[derive(Debug, Clone)]
306struct KeyValue(String, String);
307
308#[derive(Copy, Clone, Debug, Default)]
309struct ErrorLevel;
310
311impl Rustac {
312 pub async fn run(self, init_tracing_subscriber: bool) -> Result<()> {
317 if init_tracing_subscriber {
318 let indicatif_layer = IndicatifLayer::new();
319 tracing_subscriber::registry()
320 .with(
321 tracing_subscriber::fmt::layer().with_writer(
322 indicatif_layer
323 .get_stderr_writer()
324 .with_max_level(self.log_level().unwrap_or(Level::WARN)),
325 ),
326 )
327 .with(indicatif_layer)
328 .init();
329 }
330 match self.command {
331 Command::Translate {
332 ref infile,
333 ref outfile,
334 migrate,
335 ref to,
336 } => {
337 let mut value = self.get(infile.as_deref()).await?;
338 if migrate {
339 value = value.migrate(
340 &to.as_deref()
341 .map(|s| s.parse().unwrap())
342 .unwrap_or_default(),
343 )?;
344 } else if let Some(to) = to {
345 eprintln!(
346 "WARNING: --to was passed ({to}) without --migrate, value will not be migrated"
347 );
348 }
349 self.put(outfile.as_deref(), value.into()).await
350 }
351 Command::Search {
352 ref href,
353 ref outfile,
354 ref use_duckdb,
355 ref max_items,
356 ref intersects,
357 ref ids,
358 ref collections,
359 ref bbox,
360 ref datetime,
361 ref fields,
362 ref sortby,
363 ref filter,
364 ref limit,
365 } => {
366 let use_duckdb = use_duckdb.unwrap_or_else(|| {
367 matches!(Format::infer_from_href(href), Some(Format::Geoparquet(_)))
368 });
369 let get_items = GetItems {
370 bbox: bbox.clone(),
371 datetime: datetime.clone(),
372 fields: fields.clone(),
373 sortby: sortby.clone(),
374 filter: filter.clone(),
375 limit: limit.clone(),
376 ..Default::default()
377 };
378 let get_search = GetSearch {
379 intersects: intersects.clone(),
380 ids: ids.clone(),
381 collections: collections.clone(),
382 items: get_items,
383 };
384 let search: Search = get_search.try_into()?;
385 let item_collection = if use_duckdb {
386 stac_duckdb::search(href, search, *max_items)?
387 } else {
388 stac_io::api::search(href, search, *max_items).await?
389 };
390 self.put(
391 outfile.as_deref(),
392 serde_json::to_value(item_collection)?.into(),
393 )
394 .await
395 }
396 Command::Serve {
397 ref hrefs,
398 ref addr,
399 ref bind,
400 ref pgstac,
401 use_duckdb,
402 load_collection_items,
403 create_collections,
404 } => {
405 let bind = bind.as_deref().unwrap_or(&addr);
406 if matches!(use_duckdb, Some(true))
407 || (use_duckdb.is_none() && hrefs.len() == 1 && hrefs[0].ends_with("parquet"))
408 {
409 let backend = stac_server::DuckdbBackend::new(&hrefs[0]).await?;
410 eprintln!("Backend: duckdb");
411 return load_and_serve(
412 bind,
413 addr,
414 backend,
415 Vec::new(),
416 HashMap::new(),
417 create_collections,
418 )
419 .await;
420 }
421 let mut collections = Vec::new();
422 let mut items: HashMap<String, Vec<stac::Item>> = HashMap::new();
423 for href in hrefs {
424 let value = self.get(Some(href.as_str())).await?;
425 match value {
426 stac::Value::Collection(collection) => {
427 if load_collection_items {
428 for link in collection.iter_item_links() {
429 let value = self.get(Some(link.href.as_str())).await?;
430 if let stac::Value::Item(item) = value {
431 items.entry(collection.id.clone()).or_default().push(item);
432 } else {
433 return Err(anyhow!(
434 "item link was not an item: {value:?}"
435 ));
436 }
437 }
438 }
439 collections.push(collection);
440 }
441 stac::Value::ItemCollection(item_collection) => {
442 for item in item_collection.items {
443 if let Some(collection) = item.collection.clone() {
444 items.entry(collection).or_default().push(item);
445 } else {
446 items.entry(String::new()).or_default().push(item);
447 }
448 }
449 }
450 stac::Value::Item(item) => {
451 if let Some(collection) = item.collection.clone() {
452 items.entry(collection).or_default().push(item);
453 } else {
454 return Err(anyhow!("item without a collection: {item:?}"));
455 }
456 }
457 _ => return Err(anyhow!("don't know how to load value: {value:?}")),
458 }
459 }
460
461 #[allow(unused_variables)]
462 if let Some(pgstac) = pgstac {
463 #[cfg(feature = "pgstac")]
464 {
465 let backend =
466 stac_server::PgstacBackend::new_from_stringlike(pgstac).await?;
467 eprintln!("Backend: pgstac");
468 load_and_serve(bind, addr, backend, collections, items, create_collections)
469 .await
470 }
471 #[cfg(not(feature = "pgstac"))]
472 {
473 Err(anyhow!("rustac is not compiled with pgstac support"))
474 }
475 } else {
476 let backend = stac_server::MemoryBackend::new();
477 eprintln!("Backend: memory");
478 load_and_serve(bind, addr, backend, collections, items, create_collections)
479 .await
480 }
481 }
482 Command::Crawl {
483 ref href,
484 ref directory,
485 } => {
486 let opts = self.opts();
487 let (store, path) = stac_io::parse_href_opts(href.clone(), opts.clone())?;
488 let value: stac::Value = store.get(path).await.unwrap();
489 let mut items: HashMap<Option<String>, Vec<Item>> = HashMap::new();
490 let crawl = crawl(value, store).await;
491 pin_mut!(crawl);
492 let mut warned = false;
493 while let Some(item) = crawl.try_next().await? {
494 let collection = item.collection.clone();
495 if collection.as_deref() == Some(DEFAULT_COLLECTION_ID) && !warned {
496 warned = true;
497 tracing::warn!(
498 "collection id matches the default collection id, so any collection-less items will be grouped into this collection: {DEFAULT_COLLECTION_ID}"
499 )
500 }
501 items.entry(collection).or_default().push(item);
502 }
503 let (store, path) = stac_io::parse_href_opts(directory.clone(), opts)?;
504 let format = self.output_format(None);
505 for (collection, items) in items {
506 let file_name = format!(
507 "{}.{}",
508 collection.as_deref().unwrap_or(DEFAULT_COLLECTION_ID),
509 format.extension()
510 );
511 store
512 .put_format(
513 path.child(file_name),
514 stac::ItemCollection::from(items),
515 format,
516 )
517 .await?;
518 }
519 Ok(())
520 }
521 Command::Validate { ref infile } => {
522 let value = self.get(infile.as_deref()).await?;
523 let result = value.validate().await;
524 if let Err(error) = result {
525 if let stac_validate::Error::Validation(errors) = error {
526 if let Some(format) = self.output_format {
527 if let Format::Json(_) = format {
528 let value = errors
529 .into_iter()
530 .map(|error| error.into_json())
531 .collect::<Vec<_>>();
532 if self.compact_json.unwrap_or_default() {
533 serde_json::to_writer(std::io::stdout(), &value)?;
534 } else {
535 serde_json::to_writer_pretty(std::io::stdout(), &value)?;
536 }
537 println!();
538 } else {
539 return Err(anyhow!("invalid output format: {}", format));
540 }
541 } else {
542 for error in errors {
543 println!("{error}");
544 }
545 }
546 }
547 std::io::stdout().flush()?;
548 Err(anyhow!("one or more validation errors"))
549 } else {
550 Ok(())
551 }
552 }
553 Command::GenerateCompletions { shell } => {
554 let mut command = Rustac::command();
555 clap_complete::generate(shell, &mut command, "rustac", &mut std::io::stdout());
556 Ok(())
557 }
558 }
559 }
560
561 async fn get(&self, href: Option<&str>) -> Result<stac::Value> {
562 let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
563 let format = self.input_format(href);
564 if let Some(href) = href {
565 let (store, path) = stac_io::parse_href_opts(href, self.opts())?;
566 let value: stac::Value = store.get_format(path, format).await?;
567 Ok(value)
568 } else {
569 let mut buf = Vec::new();
570 let _ = tokio::io::stdin().read_to_end(&mut buf).await?;
571 let value: stac::Value = format.from_bytes(buf)?;
572 Ok(value)
573 }
574 }
575
576 async fn put(&self, href: Option<&str>, value: Value) -> Result<()> {
577 let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
578 let format = self.output_format(href);
579 if let Some(href) = href {
580 let (store, path) = stac_io::parse_href_opts(href, self.opts())?;
581 let _ = match value {
582 Value::Json(json) => store.put_format(path, json, format).await?,
583 Value::Stac(stac) => store.put_format(path, stac, format).await?,
584 };
585 Ok(())
586 } else {
587 let mut bytes = match value {
588 Value::Json(json) => format.into_vec(json)?,
589 Value::Stac(stac) => format.into_vec(stac)?,
590 };
591 if !matches!(format, Format::NdJson) {
593 bytes.push(b'\n');
594 }
595 std::io::stdout().write_all(&bytes)?;
596 Ok(())
597 }
598 }
599
600 pub fn log_level(&self) -> Option<Level> {
601 level_enum(self.verbosity())
602 }
603
604 fn verbosity(&self) -> i8 {
605 level_value(ErrorLevel::default()) - (self.quiet as i8) + (self.verbose as i8)
606 }
607
608 pub fn input_format(&self, href: Option<&str>) -> Format {
610 if let Some(input_format) = self.input_format {
611 input_format
612 } else if let Some(href) = href {
613 Format::infer_from_href(href).unwrap_or_default()
614 } else {
615 Format::json()
616 }
617 }
618
619 pub fn output_format(&self, href: Option<&str>) -> Format {
621 let format = if let Some(format) = self.output_format {
622 format
623 } else if let Some(href) = href {
624 Format::infer_from_href(href).unwrap_or_default()
625 } else {
626 Format::Json(true)
627 };
628 if matches!(format, Format::Geoparquet(_)) {
629 use stac::geoparquet::WriterOptions;
630
631 let mut writer_options = WriterOptions::new()
632 .with_compression(self.parquet_compression.or(Some(default_compression())));
633
634 if let Some(max_row_group_size) = self.parquet_max_row_group_size {
635 writer_options = writer_options.with_max_row_group_size(max_row_group_size);
636 }
637
638 Format::Geoparquet(writer_options)
639 } else if let Format::Json(pretty) = format {
640 Format::Json(self.compact_json.map(|c| !c).unwrap_or(pretty))
641 } else {
642 format
643 }
644 }
645
646 fn opts(&self) -> Vec<(String, String)> {
647 self.options
648 .iter()
649 .cloned()
650 .map(|kv| (kv.0, kv.1))
651 .collect()
652 }
653}
654
655impl ErrorLevel {
656 fn default() -> Option<Level> {
657 Some(Level::ERROR)
658 }
659
660 fn verbose_help() -> Option<&'static str> {
661 Some("Increase verbosity")
662 }
663
664 fn verbose_long_help() -> Option<&'static str> {
665 None
666 }
667
668 fn quiet_help() -> Option<&'static str> {
669 Some("Decrease verbosity")
670 }
671
672 fn quiet_long_help() -> Option<&'static str> {
673 None
674 }
675}
676
677impl From<stac::Value> for Value {
678 fn from(value: stac::Value) -> Self {
679 Value::Stac(value)
680 }
681}
682
683impl From<serde_json::Value> for Value {
684 fn from(value: serde_json::Value) -> Self {
685 Value::Json(value)
686 }
687}
688
689impl FromStr for KeyValue {
690 type Err = Error;
691
692 fn from_str(s: &str) -> Result<Self> {
693 if let Some((key, value)) = s.split_once('=') {
694 Ok(KeyValue(key.to_string(), value.to_string()))
695 } else {
696 Err(anyhow!("invalid key=value: {s}"))
697 }
698 }
699}
700
701async fn load_and_serve(
702 bind: &str,
703 addr: &str,
704 mut backend: impl Backend,
705 collections: Vec<Collection>,
706 mut items: HashMap<String, Vec<Item>>,
707 create_collections: bool,
708) -> Result<()> {
709 for collection in collections {
710 let items = items.remove(&collection.id);
711 backend.add_collection(collection).await?;
712 if let Some(items) = items {
713 backend.add_items(items).await?;
714 }
715 }
716 if create_collections {
717 for (mut collection_id, mut items) in items {
718 if collection_id.is_empty() {
719 if backend.collection(DEFAULT_COLLECTION_ID).await?.is_some() {
720 return Err(anyhow!(
721 "cannot auto-create collections, a collection already exists with id={DEFAULT_COLLECTION_ID}"
722 ));
723 } else {
724 collection_id = DEFAULT_COLLECTION_ID.to_string();
725 }
726 }
727 for item in &mut items {
728 item.collection = Some(collection_id.to_string());
729 }
730 let collection = Collection::from_id_and_items(collection_id, &items);
731 backend.add_collection(collection).await?;
732 backend.add_items(items).await?;
733 }
734 } else if !items.is_empty() {
735 return Err(anyhow!(
736 "items don't have a collection and `create_collections` is false"
737 ));
738 }
739
740 let root = Url::parse(addr)
741 .map(|url| url.to_string())
742 .unwrap_or(format!("http://{addr}"));
743 let api = stac_server::Api::new(backend, &root)?;
744 let router = stac_server::routes::from_api(api);
745 let listener = TcpListener::bind(&bind).await?;
746 eprintln!("Serving a STAC API at {root}");
747 axum::serve(listener, router).await.map_err(Error::from)
748}
749
750fn level_enum(verbosity: i8) -> Option<Level> {
751 match verbosity {
752 i8::MIN..=-1 => None,
753 0 => Some(Level::ERROR),
754 1 => Some(Level::WARN),
755 2 => Some(Level::INFO),
756 3 => Some(Level::DEBUG),
757 4..=i8::MAX => Some(Level::TRACE),
758 }
759}
760
761fn level_value(level: Option<Level>) -> i8 {
762 match level {
763 None => -1,
764 Some(Level::ERROR) => 0,
765 Some(Level::WARN) => 1,
766 Some(Level::INFO) => 2,
767 Some(Level::DEBUG) => 3,
768 Some(Level::TRACE) => 4,
769 }
770}
771
772async fn crawl(value: stac::Value, store: StacStore) -> impl TryStream<Item = Result<Item>> {
773 use stac::Value::*;
774
775 try_stream! {
776 let mut values = VecDeque::from([value]);
777 while let Some(mut value) = values.pop_front() {
778 value.make_links_absolute()?;
779 match value {
780 Catalog(_) | Collection(_) => {
781 if let Catalog(ref catalog) = value {
782 tracing::info!("got catalog={}", catalog.id);
783 }
784 if let Collection(ref collection) = value {
785 tracing::info!("got collection={}", collection.id);
786 }
787 let mut join_set: JoinSet<Result<stac::Value>> = JoinSet::new();
788 for link in value
789 .links()
790 .iter()
791 .filter(|link| link.is_child() || link.is_item())
792 .cloned()
793 {
794 let store = store.clone();
795 let url = Url::parse(&link.href)?;
796 join_set.spawn(async move {
797 let value: stac::Value = store.get(url.path()).await?;
798 Ok(value)
799 });
800 }
801 while let Some(result) = join_set.join_next().await {
802 let value = result??;
803 values.push_back(value);
804 }
805 }
806 Item(mut item) => {
807 if let Some(self_href) = item.self_href() {
808 let self_href= self_href.to_string();
809 item.make_assets_absolute(&self_href)?;
810 }
811 yield item;
812 }
813 ItemCollection(item_collection) => {
814 for mut item in item_collection.items {
815 if let Some(self_href) = item.self_href() {
816 let self_href = self_href.to_string();
817 item.make_assets_absolute(&self_href)?;
818 }
819 yield item;
820 }
821 }
822 }
823 }
824 }
825}
826
827#[cfg(test)]
828use {assert_cmd as _, rstest as _, tempfile as _};