Skip to main content

superstac_search/
executor.rs

1use futures::{StreamExt, TryStreamExt};
2use stac::Item;
3use superstac_core::{errors::SuperSTACError, models::catalog::Catalog};
4use tokio::time::{sleep, timeout};
5
6use crate::{
7    aggregator::SearchAggregator,
8    options::FederationOptions,
9    query::SearchQuery,
10    response::{CatalogFailure, SearchItem, SearchResponse},
11    translator::to_stac_search,
12    unifier,
13};
14
15/// Fans out a search across catalogs with retry, capped concurrency, and
16/// per-catalog timeouts. Owns the `reqwest::Client` it borrows from the
17/// engine so connection pools and default headers are shared.
18pub struct SearchExecutor {
19    client: reqwest::Client,
20}
21
22impl SearchExecutor {
23    pub fn new(client: reqwest::Client) -> Self {
24        Self { client }
25    }
26
27    /// Query all `catalogs` concurrently and aggregate the results.
28    /// Per-catalog failures are recorded on the response metadata's
29    /// `failures` field rather than failing the whole call.
30    pub async fn federated_search(
31        &self,
32        catalogs: Vec<Catalog>,
33        query: SearchQuery,
34        options: FederationOptions,
35    ) -> Result<SearchResponse, SuperSTACError> {
36        let catalogs_queried = catalogs.len();
37        let concurrency = options.max_concurrent.max(1);
38
39        tracing::debug!(
40            catalogs = catalogs_queried,
41            concurrency,
42            collections = ?query.collections,
43            "federated search"
44        );
45
46        let attempts = catalogs.into_iter().map(|catalog| {
47            let query = query.clone();
48            async move {
49                let catalog_id = catalog.id.clone();
50                let result = self
51                    .search_catalog_with_retry(catalog, query, options)
52                    .await;
53                (catalog_id, result)
54            }
55        });
56
57        let results: Vec<(String, Result<Vec<SearchItem>, SuperSTACError>)> =
58            futures::stream::iter(attempts)
59                .buffer_unordered(concurrency)
60                .collect()
61                .await;
62
63        let mut successful = Vec::new();
64        let mut failures: Vec<CatalogFailure> = Vec::new();
65
66        for (catalog_id, outcome) in results {
67            match outcome {
68                Ok(items) => successful.push(items),
69                Err(e) => failures.push(CatalogFailure {
70                    catalog_id,
71                    reason: e.to_string(),
72                }),
73            }
74        }
75
76        Ok(SearchAggregator::aggregate(
77            successful,
78            catalogs_queried,
79            failures,
80            options.deduplicate,
81        ))
82    }
83
84    async fn search_catalog_with_retry(
85        &self,
86        catalog: Catalog,
87        query: SearchQuery,
88        options: FederationOptions,
89    ) -> Result<Vec<SearchItem>, SuperSTACError> {
90        let mut backoff = options.retry.initial_backoff;
91        let mut last_error: SuperSTACError =
92            SuperSTACError::SearchFailed("no attempts made".to_string());
93
94        for attempt in 1..=options.retry.max_attempts {
95            let attempt_result = timeout(
96                options.per_catalog_timeout,
97                self.search_catalog(catalog.clone(), query.clone(), options),
98            )
99            .await;
100
101            match attempt_result {
102                Ok(Ok(items)) => return Ok(items),
103                Ok(Err(e)) => {
104                    if !is_retryable(&e) || attempt == options.retry.max_attempts {
105                        return Err(e);
106                    }
107                    last_error = e;
108                }
109                Err(_) => {
110                    last_error = SuperSTACError::SearchFailed(format!(
111                        "timeout after {:?}",
112                        options.per_catalog_timeout
113                    ));
114                    if attempt == options.retry.max_attempts {
115                        return Err(last_error);
116                    }
117                }
118            }
119
120            tracing::warn!(
121                catalog = %catalog.id,
122                attempt,
123                error = %last_error,
124                backoff_ms = backoff.as_millis() as u64,
125                "search attempt failed, retrying"
126            );
127
128            sleep(backoff).await;
129            backoff = (backoff * 2).min(options.retry.max_backoff);
130        }
131
132        Err(last_error)
133    }
134
135    async fn search_catalog(
136        &self,
137        catalog: Catalog,
138        mut query: SearchQuery,
139        options: FederationOptions,
140    ) -> Result<Vec<SearchItem>, SuperSTACError> {
141        // Translate canonical collection names to this catalog's local names.
142        // Falls back to the canonical name when no alias is declared.
143        query.collections = query
144            .collections
145            .iter()
146            .map(|c| catalog.resolve_collection(c).to_string())
147            .collect();
148
149        // Cap items at min(user_limit, per-catalog system cap). Set on the
150        // STAC search so the server doesn't waste a round-trip filling more
151        // than we'd keep.
152        let user_limit = query.limit.unwrap_or(10);
153        let cap = user_limit.min(options.max_items_per_catalog);
154        query.limit = Some(cap);
155
156        let search = to_stac_search(query);
157
158        let stac_client = stac_io::api::Client::with_client(self.client.clone(), &catalog.url)
159            .map_err(|e| SuperSTACError::SearchFailed(format!("stac client init: {}", e)))?;
160
161        let stream = stac_client
162            .search(search)
163            .await
164            .map_err(|e| SuperSTACError::SearchFailed(format!("search request: {}", e)))?;
165
166        // Stream of `stac::api::Item` (= `serde_json::Map<String, Value>`),
167        // paginated internally by stac-io. `take(cap)` bounds the total;
168        // `try_collect` short-circuits on first per-item stream error.
169        let raw_items: Vec<stac::api::Item> = stream
170            .take(cap)
171            .try_collect()
172            .await
173            .map_err(|e| SuperSTACError::SearchFailed(format!("stream item: {}", e)))?;
174
175        let items: Vec<SearchItem> = raw_items
176            .into_iter()
177            .map(|map_item| {
178                let mut item: Item =
179                    serde_json::from_value(serde_json::Value::Object(map_item))
180                        .map_err(|err| SuperSTACError::SearchFailed(err.to_string()))?;
181
182                if options.unify_response {
183                    unifier::unify_item(&mut item, &catalog);
184                }
185
186                Ok(SearchItem {
187                    catalog_id: catalog.id.clone(),
188                    seen_in: vec![catalog.id.clone()],
189                    item,
190                })
191            })
192            .collect::<Result<Vec<_>, SuperSTACError>>()?;
193
194        Ok(items)
195    }
196}
197
198/// For now: treat all `SearchFailed` errors as retryable. The proper fix is a
199/// richer error taxonomy (Network / Timeout / Server5xx / Client4xx) which is
200/// out of scope here.
201fn is_retryable(error: &SuperSTACError) -> bool {
202    matches!(error, SuperSTACError::SearchFailed(_))
203}