superstac_search/
executor.rs1use futures::{StreamExt, TryStreamExt};
2use stac::Item;
3use superstac_core::{errors::SuperSTACError, models::catalog::Catalog};
4use tokio::time::{sleep, timeout};
5
6use crate::{
7 aggregator::SearchAggregator,
8 options::FederationOptions,
9 query::SearchQuery,
10 response::{CatalogFailure, SearchItem, SearchResponse},
11 translator::to_stac_search,
12 unifier,
13};
14
15pub struct SearchExecutor {
19 client: reqwest::Client,
20}
21
22impl SearchExecutor {
23 pub fn new(client: reqwest::Client) -> Self {
24 Self { client }
25 }
26
27 pub async fn federated_search(
31 &self,
32 catalogs: Vec<Catalog>,
33 query: SearchQuery,
34 options: FederationOptions,
35 ) -> Result<SearchResponse, SuperSTACError> {
36 let catalogs_queried = catalogs.len();
37 let concurrency = options.max_concurrent.max(1);
38
39 tracing::debug!(
40 catalogs = catalogs_queried,
41 concurrency,
42 collections = ?query.collections,
43 "federated search"
44 );
45
46 let attempts = catalogs.into_iter().map(|catalog| {
47 let query = query.clone();
48 async move {
49 let catalog_id = catalog.id.clone();
50 let result = self
51 .search_catalog_with_retry(catalog, query, options)
52 .await;
53 (catalog_id, result)
54 }
55 });
56
57 let results: Vec<(String, Result<Vec<SearchItem>, SuperSTACError>)> =
58 futures::stream::iter(attempts)
59 .buffer_unordered(concurrency)
60 .collect()
61 .await;
62
63 let mut successful = Vec::new();
64 let mut failures: Vec<CatalogFailure> = Vec::new();
65
66 for (catalog_id, outcome) in results {
67 match outcome {
68 Ok(items) => successful.push(items),
69 Err(e) => failures.push(CatalogFailure {
70 catalog_id,
71 reason: e.to_string(),
72 }),
73 }
74 }
75
76 Ok(SearchAggregator::aggregate(
77 successful,
78 catalogs_queried,
79 failures,
80 options.deduplicate,
81 ))
82 }
83
84 async fn search_catalog_with_retry(
85 &self,
86 catalog: Catalog,
87 query: SearchQuery,
88 options: FederationOptions,
89 ) -> Result<Vec<SearchItem>, SuperSTACError> {
90 let mut backoff = options.retry.initial_backoff;
91 let mut last_error: SuperSTACError =
92 SuperSTACError::SearchFailed("no attempts made".to_string());
93
94 for attempt in 1..=options.retry.max_attempts {
95 let attempt_result = timeout(
96 options.per_catalog_timeout,
97 self.search_catalog(catalog.clone(), query.clone(), options),
98 )
99 .await;
100
101 match attempt_result {
102 Ok(Ok(items)) => return Ok(items),
103 Ok(Err(e)) => {
104 if !is_retryable(&e) || attempt == options.retry.max_attempts {
105 return Err(e);
106 }
107 last_error = e;
108 }
109 Err(_) => {
110 last_error = SuperSTACError::SearchFailed(format!(
111 "timeout after {:?}",
112 options.per_catalog_timeout
113 ));
114 if attempt == options.retry.max_attempts {
115 return Err(last_error);
116 }
117 }
118 }
119
120 tracing::warn!(
121 catalog = %catalog.id,
122 attempt,
123 error = %last_error,
124 backoff_ms = backoff.as_millis() as u64,
125 "search attempt failed, retrying"
126 );
127
128 sleep(backoff).await;
129 backoff = (backoff * 2).min(options.retry.max_backoff);
130 }
131
132 Err(last_error)
133 }
134
135 async fn search_catalog(
136 &self,
137 catalog: Catalog,
138 mut query: SearchQuery,
139 options: FederationOptions,
140 ) -> Result<Vec<SearchItem>, SuperSTACError> {
141 query.collections = query
144 .collections
145 .iter()
146 .map(|c| catalog.resolve_collection(c).to_string())
147 .collect();
148
149 let user_limit = query.limit.unwrap_or(10);
153 let cap = user_limit.min(options.max_items_per_catalog);
154 query.limit = Some(cap);
155
156 let search = to_stac_search(query);
157
158 let stac_client = stac_io::api::Client::with_client(self.client.clone(), &catalog.url)
159 .map_err(|e| SuperSTACError::SearchFailed(format!("stac client init: {}", e)))?;
160
161 let stream = stac_client
162 .search(search)
163 .await
164 .map_err(|e| SuperSTACError::SearchFailed(format!("search request: {}", e)))?;
165
166 let raw_items: Vec<stac::api::Item> = stream
170 .take(cap)
171 .try_collect()
172 .await
173 .map_err(|e| SuperSTACError::SearchFailed(format!("stream item: {}", e)))?;
174
175 let items: Vec<SearchItem> = raw_items
176 .into_iter()
177 .map(|map_item| {
178 let mut item: Item =
179 serde_json::from_value(serde_json::Value::Object(map_item))
180 .map_err(|err| SuperSTACError::SearchFailed(err.to_string()))?;
181
182 if options.unify_response {
183 unifier::unify_item(&mut item, &catalog);
184 }
185
186 Ok(SearchItem {
187 catalog_id: catalog.id.clone(),
188 seen_in: vec![catalog.id.clone()],
189 item,
190 })
191 })
192 .collect::<Result<Vec<_>, SuperSTACError>>()?;
193
194 Ok(items)
195 }
196}
197
198fn is_retryable(error: &SuperSTACError) -> bool {
202 matches!(error, SuperSTACError::SearchFailed(_))
203}