rustac/
lib.rs

1// The verbosity stuff is cribbed from https://github.com/clap-rs/clap-verbosity-flag/blob/c621a6a8a7c0b6df8f1464a985a5d076b4915693/src/lib.rs and updated for tracing
2
3#![deny(unused_crate_dependencies)]
4
5use anyhow::{Error, Result, anyhow};
6use async_stream::try_stream;
7use clap::{CommandFactory, Parser, Subcommand};
8use futures_core::TryStream;
9use futures_util::{TryStreamExt, pin_mut};
10use stac::api::{GetItems, GetSearch, Search};
11use stac::{
12    Assets, Collection, Item, Links, Migrate, SelfHref,
13    geoparquet::{Compression, default_compression},
14};
15use stac_io::{Format, StacStore};
16use stac_server::Backend;
17use stac_validate::Validate;
18use std::{
19    collections::{HashMap, VecDeque},
20    io::Write,
21    str::FromStr,
22};
23use tokio::{io::AsyncReadExt, net::TcpListener, task::JoinSet};
24use tracing::metadata::Level;
25use tracing_indicatif::IndicatifLayer;
26use tracing_subscriber::{
27    fmt::writer::MakeWriterExt, layer::SubscriberExt, util::SubscriberInitExt,
28};
29use url::Url;
30
31const DEFAULT_COLLECTION_ID: &str = "default-collection-id";
32
33/// rustac: A command-line interface for the SpatioTemporal Asset Catalog (STAC)
34#[derive(Debug, Parser)]
35pub struct Rustac {
36    #[command(subcommand)]
37    command: Command,
38
39    /// The input format.
40    ///
41    /// If not provided, the format will be inferred from the file extension.
42    /// Possible values (default: json):
43    ///
44    /// - json
45    /// - ndjson (newline-delimited json)
46    /// - parquet (stac-geoparquet)
47    #[arg(
48        short = 'i',
49        long = "input-format",
50        global = true,
51        verbatim_doc_comment
52    )]
53    input_format: Option<Format>,
54
55    /// Options for getting and putting files from object storage.
56    ///
57    /// Options should be provided in `key=value` pairs, e.g.: `rustac --opt aws_access_key_id=redacted --opt other_value=very_important`
58    #[arg(long = "opt", global = true, verbatim_doc_comment)]
59    options: Vec<KeyValue>,
60
61    /// The output format.
62    ///
63    /// If not provided, the format will be inferred from the file extension.
64    /// Possible values (default: json):
65    ///
66    /// - json
67    /// - ndjson (newline-delimited json)
68    /// - parquet (stac-geoparquet)
69    #[arg(
70        short = 'o',
71        long = "output-format",
72        global = true,
73        verbatim_doc_comment
74    )]
75    output_format: Option<Format>,
76
77    /// Whether to print compact JSON output.
78    ///
79    /// By default, JSON output will printed "compact" if it is being output to a file, and printed "pretty" if it is being output to standard output.
80    /// Use this argument to force one or the other.
81    #[arg(short = 'c', long = "compact-json", global = true)]
82    compact_json: Option<bool>,
83
84    /// The parquet compression to use when writing stac-geoparquet.
85    ///
86    /// Possible values (default: snappy):
87    ///
88    /// - uncompressed: No compression
89    /// - snappy:       Snappy compression (<https://en.wikipedia.org/wiki/Snappy_(compression)>)
90    /// - gzip(n):      Gzip compression (<https://www.ietf.org/rfc/rfc1952.txt>)
91    /// - lzo:          LZO compression (<https://en.wikipedia.org/wiki/Lempel%E2%80%93Ziv%E2%80%93Oberhumer>)
92    /// - brotli(n):    Brotli compression (<https://datatracker.ietf.org/doc/html/rfc7932>)
93    /// - lz4:          LZ4 compression (<https://lz4.org/>), [(deprecated)](https://issues.apache.org/jira/browse/PARQUET-2032)
94    /// - zstd(n):      ZSTD compression (<https://datatracker.ietf.org/doc/html/rfc8878>)
95    /// - lz4-raw:      LZ4 compression (<https://lz4.org/>)
96    ///
97    /// Some of the compression values have a level, specified as `(n)`. This level should be an integer.
98    #[arg(long = "parquet-compression", global = true, verbatim_doc_comment)]
99    parquet_compression: Option<Compression>,
100
101    /// Maximum number of rows per row group in parquet files.
102    ///
103    /// The default is 150,000 rows per group, which is the maximum recommended value for Geoparquet files.
104    /// When records are ordered spatially or temporally, lower values result in smaller row groups (better for selective queries),
105    /// while higher values result in larger row groups (better compression).
106    #[arg(
107        long = "parquet-max-row-group-size",
108        global = true,
109        verbatim_doc_comment
110    )]
111    parquet_max_row_group_size: Option<usize>,
112
113    #[arg(
114        long,
115        short = 'v',
116        action = clap::ArgAction::Count,
117        global = true,
118        help = ErrorLevel::verbose_help(),
119        long_help = ErrorLevel::verbose_long_help(),
120    )]
121    verbose: u8,
122
123    #[arg(
124        long,
125        short = 'q',
126        action = clap::ArgAction::Count,
127        global = true,
128        help = ErrorLevel::quiet_help(),
129        long_help = ErrorLevel::quiet_long_help(),
130        conflicts_with = "verbose",
131    )]
132    quiet: u8,
133}
134
135/// A rustac subcommand.
136#[derive(Debug, Subcommand)]
137#[allow(clippy::large_enum_variant)]
138pub enum Command {
139    /// Translates STAC from one format to another.
140    Translate {
141        /// The input file.
142        ///
143        /// To read from standard input, pass `-` or don't provide an argument at all.
144        infile: Option<String>,
145
146        /// The output file.
147        ///
148        /// To write to standard output, pass `-` or don't provide an argument at all.
149        outfile: Option<String>,
150
151        /// Migrate this STAC value to another version.
152        ///
153        /// By default, will migrate to the latest supported version. Use `--to`
154        /// to specify a different STAC version.
155        #[arg(long = "migrate", default_value_t = false)]
156        migrate: bool,
157
158        /// Migrate to this STAC version.
159        ///
160        /// If not provided, will migrate to the latest supported version. Will
161        /// only be used if `--migrate` is passed.
162        #[arg(long = "to")]
163        to: Option<String>,
164    },
165
166    /// Searches a STAC API or stac-geoparquet file.
167    Search {
168        /// The href of the STAC API or stac-geoparquet file to search.
169        href: String,
170
171        /// The output file.
172        ///
173        /// To write to standard output, pass `-` or don't provide an argument at all.
174        outfile: Option<String>,
175
176        /// Use DuckDB to query the href.
177        ///
178        /// By default, DuckDB will be used if the href ends in `parquet` or
179        /// `geoparquet`. Set this value to `true` to force DuckDB to be used,
180        /// or to `false` to disable this behavior.
181        #[arg(long = "use-duckdb")]
182        use_duckdb: Option<bool>,
183
184        /// The maximum number of items to return from the search.
185        #[arg(short = 'n', long = "max-items")]
186        max_items: Option<usize>,
187
188        /// Searches items by performing intersection between their geometry and provided GeoJSON geometry.
189        ///
190        /// All GeoJSON geometry types must be supported.
191        #[arg(long = "intersects")]
192        intersects: Option<String>,
193
194        /// Comma-delimited list of Item ids to return.
195        #[arg(long = "ids")]
196        ids: Option<String>,
197
198        /// Comma-delimited list of one or more Collection IDs that each matching Item must be in.
199        #[arg(long = "collections")]
200        collections: Option<String>,
201
202        /// Requested bounding box, as a comma-delimited string.
203        #[arg(long = "bbox")]
204        bbox: Option<String>,
205
206        /// Single date+time, or a range ('/' separator), formatted to [RFC 3339,
207        /// section 5.6](https://tools.ietf.org/html/rfc3339#section-5.6).
208        ///
209        /// Use double dots `..` for open date ranges.
210        #[arg(long = "datetime")]
211        datetime: Option<String>,
212
213        /// Include/exclude fields from item collections, as a comma-delimited string.
214        #[arg(long = "fields")]
215        fields: Option<String>,
216
217        /// Fields by which to sort results, as a comma-delimited string.
218        #[arg(long = "sortby")]
219        sortby: Option<String>,
220
221        /// CQL2 filter expression.
222        #[arg(long = "filter")]
223        filter: Option<String>,
224
225        /// The page size to be returned from the server.
226        #[arg(long = "limit")]
227        limit: Option<String>,
228    },
229
230    /// Serves a STAC API.
231    Serve {
232        /// The hrefs of collections, items, and item collections to load into the API on startup.
233        hrefs: Vec<String>,
234
235        /// The address of the server. Defaults to `127.0.0.1:7822`.
236        /// Either a URL `https://some-host.io/stac` or a local address like `127.0.0.1:7822`.
237        #[arg(short = 'a', long = "addr", default_value = "127.0.0.1:7822")]
238        addr: String,
239
240        /// The address to bind the server to, if different from `--addr`.
241        #[arg(short = 'b', long = "bind")]
242        bind: Option<String>,
243
244        /// The pgstac connection string, e.g. `postgresql://username:password@localhost:5432/postgis`
245        ///
246        /// If not provided an in-process memory backend will be used.
247        #[arg(long = "pgstac")]
248        pgstac: Option<String>,
249
250        /// Use DuckDB to serve items from a stac-geoparquet file.
251        ///
252        /// The server will automatically use DuckDB if the feature is enabled,
253        /// `use_duckdb` is `None`, and there is only one `href` that ends in
254        /// `parquet`.
255        #[arg(long = "use-duckdb")]
256        use_duckdb: Option<bool>,
257
258        /// After loading a collection, load all of its item links.
259        #[arg(long = "load-collection-items", default_value_t = true)]
260        load_collection_items: bool,
261
262        /// Create collections for any items that don't have one.
263        #[arg(long, default_value_t = true)]
264        create_collections: bool,
265    },
266
267    /// Crawls a STAC Catalog or Collection by following its links.
268    ///
269    /// Items are saved as item collections (in the output format) in the output directory.
270    Crawl {
271        /// The href of a STAC Catalog or Collection
272        href: String,
273
274        /// The output directory
275        ///
276        /// This doesn't have to be local, by the way.
277        directory: String,
278    },
279
280    /// Validates a STAC value.
281    ///
282    /// The default output format is plain text — use `--output-format=json` to
283    /// get structured output.
284    Validate {
285        /// The input file.
286        ///
287        /// To read from standard input, pass `-` or don't provide an argument at all.
288        infile: Option<String>,
289    },
290
291    /// Generate completion scripts for a given shell.
292    GenerateCompletions {
293        /// The shell to generate completion scripts for.
294        shell: clap_complete::Shell,
295    },
296}
297
298#[derive(Debug)]
299#[allow(dead_code, clippy::large_enum_variant)]
300enum Value {
301    Stac(stac::Value),
302    Json(serde_json::Value),
303}
304
305#[derive(Debug, Clone)]
306struct KeyValue(String, String);
307
308#[derive(Copy, Clone, Debug, Default)]
309struct ErrorLevel;
310
311impl Rustac {
312    /// Runs this command.
313    ///
314    /// If `init_tracing_subscriber` is `false`, it is expected that the caller
315    /// is setting up the appropriate logging (e.g. Python).
316    pub async fn run(self, init_tracing_subscriber: bool) -> Result<()> {
317        if init_tracing_subscriber {
318            let indicatif_layer = IndicatifLayer::new();
319            tracing_subscriber::registry()
320                .with(
321                    tracing_subscriber::fmt::layer().with_writer(
322                        indicatif_layer
323                            .get_stderr_writer()
324                            .with_max_level(self.log_level().unwrap_or(Level::WARN)),
325                    ),
326                )
327                .with(indicatif_layer)
328                .init();
329        }
330        match self.command {
331            Command::Translate {
332                ref infile,
333                ref outfile,
334                migrate,
335                ref to,
336            } => {
337                let mut value = self.get(infile.as_deref()).await?;
338                if migrate {
339                    value = value.migrate(
340                        &to.as_deref()
341                            .map(|s| s.parse().unwrap())
342                            .unwrap_or_default(),
343                    )?;
344                } else if let Some(to) = to {
345                    eprintln!(
346                        "WARNING: --to was passed ({to}) without --migrate, value will not be migrated"
347                    );
348                }
349                self.put(outfile.as_deref(), value.into()).await
350            }
351            Command::Search {
352                ref href,
353                ref outfile,
354                ref use_duckdb,
355                ref max_items,
356                ref intersects,
357                ref ids,
358                ref collections,
359                ref bbox,
360                ref datetime,
361                ref fields,
362                ref sortby,
363                ref filter,
364                ref limit,
365            } => {
366                let use_duckdb = use_duckdb.unwrap_or_else(|| {
367                    matches!(Format::infer_from_href(href), Some(Format::Geoparquet(_)))
368                });
369                let get_items = GetItems {
370                    bbox: bbox.clone(),
371                    datetime: datetime.clone(),
372                    fields: fields.clone(),
373                    sortby: sortby.clone(),
374                    filter: filter.clone(),
375                    limit: limit.clone(),
376                    ..Default::default()
377                };
378                let get_search = GetSearch {
379                    intersects: intersects.clone(),
380                    ids: ids.clone(),
381                    collections: collections.clone(),
382                    items: get_items,
383                };
384                let search: Search = get_search.try_into()?;
385                let item_collection = if use_duckdb {
386                    stac_duckdb::search(href, search, *max_items)?
387                } else {
388                    stac_io::api::search(href, search, *max_items).await?
389                };
390                self.put(
391                    outfile.as_deref(),
392                    serde_json::to_value(item_collection)?.into(),
393                )
394                .await
395            }
396            Command::Serve {
397                ref hrefs,
398                ref addr,
399                ref bind,
400                ref pgstac,
401                use_duckdb,
402                load_collection_items,
403                create_collections,
404            } => {
405                let bind = bind.as_deref().unwrap_or(&addr);
406                if matches!(use_duckdb, Some(true))
407                    || (use_duckdb.is_none() && hrefs.len() == 1 && hrefs[0].ends_with("parquet"))
408                {
409                    let backend = stac_server::DuckdbBackend::new(&hrefs[0]).await?;
410                    eprintln!("Backend: duckdb");
411                    return load_and_serve(
412                        bind,
413                        addr,
414                        backend,
415                        Vec::new(),
416                        HashMap::new(),
417                        create_collections,
418                    )
419                    .await;
420                }
421                let mut collections = Vec::new();
422                let mut items: HashMap<String, Vec<stac::Item>> = HashMap::new();
423                for href in hrefs {
424                    let value = self.get(Some(href.as_str())).await?;
425                    match value {
426                        stac::Value::Collection(collection) => {
427                            if load_collection_items {
428                                for link in collection.iter_item_links() {
429                                    let value = self.get(Some(link.href.as_str())).await?;
430                                    if let stac::Value::Item(item) = value {
431                                        items.entry(collection.id.clone()).or_default().push(item);
432                                    } else {
433                                        return Err(anyhow!(
434                                            "item link was not an item: {value:?}"
435                                        ));
436                                    }
437                                }
438                            }
439                            collections.push(collection);
440                        }
441                        stac::Value::ItemCollection(item_collection) => {
442                            for item in item_collection.items {
443                                if let Some(collection) = item.collection.clone() {
444                                    items.entry(collection).or_default().push(item);
445                                } else {
446                                    items.entry(String::new()).or_default().push(item);
447                                }
448                            }
449                        }
450                        stac::Value::Item(item) => {
451                            if let Some(collection) = item.collection.clone() {
452                                items.entry(collection).or_default().push(item);
453                            } else {
454                                return Err(anyhow!("item without a collection: {item:?}"));
455                            }
456                        }
457                        _ => return Err(anyhow!("don't know how to load value: {value:?}")),
458                    }
459                }
460
461                #[allow(unused_variables)]
462                if let Some(pgstac) = pgstac {
463                    #[cfg(feature = "pgstac")]
464                    {
465                        let backend =
466                            stac_server::PgstacBackend::new_from_stringlike(pgstac).await?;
467                        eprintln!("Backend: pgstac");
468                        load_and_serve(bind, addr, backend, collections, items, create_collections)
469                            .await
470                    }
471                    #[cfg(not(feature = "pgstac"))]
472                    {
473                        Err(anyhow!("rustac is not compiled with pgstac support"))
474                    }
475                } else {
476                    let backend = stac_server::MemoryBackend::new();
477                    eprintln!("Backend: memory");
478                    load_and_serve(bind, addr, backend, collections, items, create_collections)
479                        .await
480                }
481            }
482            Command::Crawl {
483                ref href,
484                ref directory,
485            } => {
486                let opts = self.opts();
487                let (store, path) = stac_io::parse_href_opts(href.clone(), opts.clone())?;
488                let value: stac::Value = store.get(path).await.unwrap();
489                let mut items: HashMap<Option<String>, Vec<Item>> = HashMap::new();
490                let crawl = crawl(value, store).await;
491                pin_mut!(crawl);
492                let mut warned = false;
493                while let Some(item) = crawl.try_next().await? {
494                    let collection = item.collection.clone();
495                    if collection.as_deref() == Some(DEFAULT_COLLECTION_ID) && !warned {
496                        warned = true;
497                        tracing::warn!(
498                            "collection id matches the default collection id, so any collection-less items will be grouped into this collection: {DEFAULT_COLLECTION_ID}"
499                        )
500                    }
501                    items.entry(collection).or_default().push(item);
502                }
503                let (store, path) = stac_io::parse_href_opts(directory.clone(), opts)?;
504                let format = self.output_format(None);
505                for (collection, items) in items {
506                    let file_name = format!(
507                        "{}.{}",
508                        collection.as_deref().unwrap_or(DEFAULT_COLLECTION_ID),
509                        format.extension()
510                    );
511                    store
512                        .put_format(
513                            path.child(file_name),
514                            stac::ItemCollection::from(items),
515                            format,
516                        )
517                        .await?;
518                }
519                Ok(())
520            }
521            Command::Validate { ref infile } => {
522                let value = self.get(infile.as_deref()).await?;
523                let result = value.validate().await;
524                if let Err(error) = result {
525                    if let stac_validate::Error::Validation(errors) = error {
526                        if let Some(format) = self.output_format {
527                            if let Format::Json(_) = format {
528                                let value = errors
529                                    .into_iter()
530                                    .map(|error| error.into_json())
531                                    .collect::<Vec<_>>();
532                                if self.compact_json.unwrap_or_default() {
533                                    serde_json::to_writer(std::io::stdout(), &value)?;
534                                } else {
535                                    serde_json::to_writer_pretty(std::io::stdout(), &value)?;
536                                }
537                                println!();
538                            } else {
539                                return Err(anyhow!("invalid output format: {}", format));
540                            }
541                        } else {
542                            for error in errors {
543                                println!("{error}");
544                            }
545                        }
546                    }
547                    std::io::stdout().flush()?;
548                    Err(anyhow!("one or more validation errors"))
549                } else {
550                    Ok(())
551                }
552            }
553            Command::GenerateCompletions { shell } => {
554                let mut command = Rustac::command();
555                clap_complete::generate(shell, &mut command, "rustac", &mut std::io::stdout());
556                Ok(())
557            }
558        }
559    }
560
561    async fn get(&self, href: Option<&str>) -> Result<stac::Value> {
562        let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
563        let format = self.input_format(href);
564        if let Some(href) = href {
565            let (store, path) = stac_io::parse_href_opts(href, self.opts())?;
566            let value: stac::Value = store.get_format(path, format).await?;
567            Ok(value)
568        } else {
569            let mut buf = Vec::new();
570            let _ = tokio::io::stdin().read_to_end(&mut buf).await?;
571            let value: stac::Value = format.from_bytes(buf)?;
572            Ok(value)
573        }
574    }
575
576    async fn put(&self, href: Option<&str>, value: Value) -> Result<()> {
577        let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
578        let format = self.output_format(href);
579        if let Some(href) = href {
580            let (store, path) = stac_io::parse_href_opts(href, self.opts())?;
581            let _ = match value {
582                Value::Json(json) => store.put_format(path, json, format).await?,
583                Value::Stac(stac) => store.put_format(path, stac, format).await?,
584            };
585            Ok(())
586        } else {
587            let mut bytes = match value {
588                Value::Json(json) => format.into_vec(json)?,
589                Value::Stac(stac) => format.into_vec(stac)?,
590            };
591            // TODO allow disabling trailing newline
592            if !matches!(format, Format::NdJson) {
593                bytes.push(b'\n');
594            }
595            std::io::stdout().write_all(&bytes)?;
596            Ok(())
597        }
598    }
599
600    pub fn log_level(&self) -> Option<Level> {
601        level_enum(self.verbosity())
602    }
603
604    fn verbosity(&self) -> i8 {
605        level_value(ErrorLevel::default()) - (self.quiet as i8) + (self.verbose as i8)
606    }
607
608    /// Returns the set or inferred input format.
609    pub fn input_format(&self, href: Option<&str>) -> Format {
610        if let Some(input_format) = self.input_format {
611            input_format
612        } else if let Some(href) = href {
613            Format::infer_from_href(href).unwrap_or_default()
614        } else {
615            Format::json()
616        }
617    }
618
619    /// Returns the set or inferred input format.
620    pub fn output_format(&self, href: Option<&str>) -> Format {
621        let format = if let Some(format) = self.output_format {
622            format
623        } else if let Some(href) = href {
624            Format::infer_from_href(href).unwrap_or_default()
625        } else {
626            Format::Json(true)
627        };
628        if matches!(format, Format::Geoparquet(_)) {
629            use stac::geoparquet::WriterOptions;
630
631            let mut writer_options = WriterOptions::new()
632                .with_compression(self.parquet_compression.or(Some(default_compression())));
633
634            if let Some(max_row_group_size) = self.parquet_max_row_group_size {
635                writer_options = writer_options.with_max_row_group_size(max_row_group_size);
636            }
637
638            Format::Geoparquet(writer_options)
639        } else if let Format::Json(pretty) = format {
640            Format::Json(self.compact_json.map(|c| !c).unwrap_or(pretty))
641        } else {
642            format
643        }
644    }
645
646    fn opts(&self) -> Vec<(String, String)> {
647        self.options
648            .iter()
649            .cloned()
650            .map(|kv| (kv.0, kv.1))
651            .collect()
652    }
653}
654
655impl ErrorLevel {
656    fn default() -> Option<Level> {
657        Some(Level::ERROR)
658    }
659
660    fn verbose_help() -> Option<&'static str> {
661        Some("Increase verbosity")
662    }
663
664    fn verbose_long_help() -> Option<&'static str> {
665        None
666    }
667
668    fn quiet_help() -> Option<&'static str> {
669        Some("Decrease verbosity")
670    }
671
672    fn quiet_long_help() -> Option<&'static str> {
673        None
674    }
675}
676
677impl From<stac::Value> for Value {
678    fn from(value: stac::Value) -> Self {
679        Value::Stac(value)
680    }
681}
682
683impl From<serde_json::Value> for Value {
684    fn from(value: serde_json::Value) -> Self {
685        Value::Json(value)
686    }
687}
688
689impl FromStr for KeyValue {
690    type Err = Error;
691
692    fn from_str(s: &str) -> Result<Self> {
693        if let Some((key, value)) = s.split_once('=') {
694            Ok(KeyValue(key.to_string(), value.to_string()))
695        } else {
696            Err(anyhow!("invalid key=value: {s}"))
697        }
698    }
699}
700
701async fn load_and_serve(
702    bind: &str,
703    addr: &str,
704    mut backend: impl Backend,
705    collections: Vec<Collection>,
706    mut items: HashMap<String, Vec<Item>>,
707    create_collections: bool,
708) -> Result<()> {
709    for collection in collections {
710        let items = items.remove(&collection.id);
711        backend.add_collection(collection).await?;
712        if let Some(items) = items {
713            backend.add_items(items).await?;
714        }
715    }
716    if create_collections {
717        for (mut collection_id, mut items) in items {
718            if collection_id.is_empty() {
719                if backend.collection(DEFAULT_COLLECTION_ID).await?.is_some() {
720                    return Err(anyhow!(
721                        "cannot auto-create collections, a collection already exists with id={DEFAULT_COLLECTION_ID}"
722                    ));
723                } else {
724                    collection_id = DEFAULT_COLLECTION_ID.to_string();
725                }
726            }
727            for item in &mut items {
728                item.collection = Some(collection_id.to_string());
729            }
730            let collection = Collection::from_id_and_items(collection_id, &items);
731            backend.add_collection(collection).await?;
732            backend.add_items(items).await?;
733        }
734    } else if !items.is_empty() {
735        return Err(anyhow!(
736            "items don't have a collection and `create_collections` is false"
737        ));
738    }
739
740    let root = Url::parse(addr)
741        .map(|url| url.to_string())
742        .unwrap_or(format!("http://{addr}"));
743    let api = stac_server::Api::new(backend, &root)?;
744    let router = stac_server::routes::from_api(api);
745    let listener = TcpListener::bind(&bind).await?;
746    eprintln!("Serving a STAC API at {root}");
747    axum::serve(listener, router).await.map_err(Error::from)
748}
749
750fn level_enum(verbosity: i8) -> Option<Level> {
751    match verbosity {
752        i8::MIN..=-1 => None,
753        0 => Some(Level::ERROR),
754        1 => Some(Level::WARN),
755        2 => Some(Level::INFO),
756        3 => Some(Level::DEBUG),
757        4..=i8::MAX => Some(Level::TRACE),
758    }
759}
760
761fn level_value(level: Option<Level>) -> i8 {
762    match level {
763        None => -1,
764        Some(Level::ERROR) => 0,
765        Some(Level::WARN) => 1,
766        Some(Level::INFO) => 2,
767        Some(Level::DEBUG) => 3,
768        Some(Level::TRACE) => 4,
769    }
770}
771
772async fn crawl(value: stac::Value, store: StacStore) -> impl TryStream<Item = Result<Item>> {
773    use stac::Value::*;
774
775    try_stream! {
776        let mut values = VecDeque::from([value]);
777        while let Some(mut value) = values.pop_front() {
778            value.make_links_absolute()?;
779            match value {
780                Catalog(_) | Collection(_) => {
781                    if let Catalog(ref catalog) = value {
782                        tracing::info!("got catalog={}", catalog.id);
783                    }
784                    if let Collection(ref collection) = value {
785                        tracing::info!("got collection={}", collection.id);
786                    }
787                    let mut join_set: JoinSet<Result<stac::Value>> = JoinSet::new();
788                    for link in value
789                        .links()
790                        .iter()
791                        .filter(|link| link.is_child() || link.is_item())
792                        .cloned()
793                    {
794                        let store = store.clone();
795                        let url = Url::parse(&link.href)?;
796                        join_set.spawn(async move {
797                            let value: stac::Value = store.get(url.path()).await?;
798                            Ok(value)
799                        });
800                    }
801                    while let Some(result) = join_set.join_next().await {
802                        let value = result??;
803                        values.push_back(value);
804                    }
805                }
806                Item(mut item) => {
807                    if let Some(self_href) = item.self_href() {
808                        let self_href=  self_href.to_string();
809                        item.make_assets_absolute(&self_href)?;
810                    }
811                    yield item;
812                }
813                ItemCollection(item_collection) => {
814                    for mut item in item_collection.items {
815                        if let Some(self_href) = item.self_href() {
816                            let self_href = self_href.to_string();
817                            item.make_assets_absolute(&self_href)?;
818                        }
819                        yield item;
820                    }
821                }
822            }
823        }
824    }
825}
826
827#[cfg(test)]
828use {assert_cmd as _, rstest as _, tempfile as _};