1use crate::{Client, Error, Result};
2use async_stream::try_stream;
3use futures_core::stream::Stream;
4use futures_util::{pin_mut, StreamExt};
5use reqwest::Method;
6use stac::{Collection, Links};
7use stac_api::{GetItems, Item, ItemCollection, Items, Search, UrlBuilder};
8use tokio::{
9 sync::mpsc::{self, error::SendError},
10 task::JoinHandle,
11};
12
13const DEFAULT_CHANNEL_BUFFER: usize = 4;
14
15#[derive(Debug)]
17pub struct ApiClient {
18 client: Client,
19 channel_buffer: usize,
20 url_builder: UrlBuilder,
21}
22
23impl ApiClient {
24 pub fn new(url: &str) -> Result<ApiClient> {
33 ApiClient::with_client(Client::new(), url)
35 }
36
37 pub fn with_client(client: Client, url: &str) -> Result<ApiClient> {
50 Ok(ApiClient {
51 client,
52 channel_buffer: DEFAULT_CHANNEL_BUFFER,
53 url_builder: UrlBuilder::new(url)?,
54 })
55 }
56
57 pub async fn collection(&self, id: &str) -> Result<Option<Collection>> {
69 let url = self.url_builder.collection(id)?;
70 self.client.get(url).await
71 }
72
73 pub async fn items(
103 &self,
104 id: &str,
105 items: impl Into<Option<Items>>,
106 ) -> Result<impl Stream<Item = Result<Item>>> {
107 let url = self.url_builder.items(id)?; let items = if let Some(items) = items.into() {
109 Some(GetItems::try_from(items)?)
110 } else {
111 None
112 };
113 let page: Option<ItemCollection> = self
114 .client
115 .request(Method::GET, url.clone(), items.as_ref(), None)
116 .await?;
117 if let Some(page) = page {
118 Ok(stream_items(self.client.clone(), page, self.channel_buffer))
119 } else {
120 Err(Error::NotFound(url))
121 }
122 }
123
124 pub async fn search(&self, search: Search) -> Result<impl Stream<Item = Result<Item>>> {
148 let url = self.url_builder.search().clone();
149 let page: Option<ItemCollection> = self.client.post(url.clone(), &search).await?;
151 if let Some(page) = page {
152 Ok(stream_items(self.client.clone(), page, self.channel_buffer))
153 } else {
154 Err(Error::NotFound(url))
155 }
156 }
157}
158
159fn stream_items(
160 client: Client,
161 page: ItemCollection,
162 channel_buffer: usize,
163) -> impl Stream<Item = Result<Item>> {
164 let (tx, mut rx) = mpsc::channel(channel_buffer);
165 let handle: JoinHandle<std::result::Result<(), SendError<_>>> = tokio::spawn(async move {
166 let pages = stream_pages(client, page);
167 pin_mut!(pages);
168 while let Some(result) = pages.next().await {
169 match result {
170 Ok(page) => tx.send(Ok(page)).await?,
171 Err(err) => {
172 tx.send(Err(err)).await?;
173 return Ok(());
174 }
175 }
176 }
177 Ok(())
178 });
179 try_stream! {
180 while let Some(result) = rx.recv().await {
181 let page = result?;
182 for item in page.items {
183 yield item;
184 }
185 }
186 let _ = handle.await?;
187 }
188}
189
190fn stream_pages(
191 client: Client,
192 mut page: ItemCollection,
193) -> impl Stream<Item = Result<ItemCollection>> {
194 try_stream! {
195 loop {
196 if page.items.is_empty() {
197 break;
198 }
199 let next_link = page.link("next").cloned();
200 yield page;
201 if let Some(next_link) = next_link {
202 if let Some(next_page) = client.request_from_link(next_link).await? {
203 page = next_page;
204 } else {
205 break;
206 }
207 } else {
208 break;
209 }
210 }
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::ApiClient;
217 use futures_util::stream::StreamExt;
218 use mockito::{Matcher, Server};
219 use serde_json::json;
220 use stac::Links;
221 use stac_api::{ItemCollection, Items, Search};
222 use url::Url;
223
224 #[tokio::test]
225 async fn collection_not_found() {
226 let mut server = Server::new_async().await;
227 let collection = server
228 .mock("GET", "/collections/not-a-collection")
229 .with_body(include_str!("../mocks/not-a-collection.json"))
230 .with_header("content-type", "application/json")
231 .with_status(404)
232 .create_async()
233 .await;
234
235 let client = ApiClient::new(&server.url()).unwrap();
236 assert!(client
237 .collection("not-a-collection")
238 .await
239 .unwrap()
240 .is_none());
241 collection.assert_async().await;
242 }
243
244 #[tokio::test]
245 async fn search_with_paging() {
246 let mut server = Server::new_async().await;
247 let mut page_1_body: ItemCollection =
248 serde_json::from_str(include_str!("../mocks/search-page-1.json")).unwrap();
249 let mut next_link = page_1_body.link("next").unwrap().clone();
250 next_link.href = format!("{}/search", server.url());
251 page_1_body.set_link(next_link);
252 let page_1 = server
253 .mock("POST", "/search")
254 .match_body(Matcher::Json(json!({
255 "collections": ["sentinel-2-l2a"],
256 "limit": 1
257 })))
258 .with_body(serde_json::to_string(&page_1_body).unwrap())
259 .with_header("content-type", "application/geo+json")
260 .create_async()
261 .await;
262 let page_2 = server
263 .mock("POST", "/search")
264 .match_body(Matcher::Json(json!({
265 "collections": ["sentinel-2-l2a"],
266 "limit": 1,
267 "token": "next:S2A_MSIL2A_20230216T150721_R082_T19PHS_20230217T082924"
268 })))
269 .with_body(include_str!("../mocks/search-page-2.json"))
270 .with_header("content-type", "application/geo+json")
271 .create_async()
272 .await;
273
274 let client = ApiClient::new(&server.url()).unwrap();
275 let mut search = Search {
276 collections: Some(vec!["sentinel-2-l2a".to_string()]),
277 ..Default::default()
278 };
279 search.items.limit = Some(1);
280 let items: Vec<_> = client
281 .search(search)
282 .await
283 .unwrap()
284 .map(|result| result.unwrap())
285 .take(2)
286 .collect()
287 .await;
288 page_1.assert_async().await;
289 page_2.assert_async().await;
290 assert_eq!(items.len(), 2);
291 assert!(items[0]["id"] != items[1]["id"]);
292 }
293
294 #[tokio::test]
295 async fn items_with_paging() {
296 let mut server = Server::new_async().await;
297 let mut page_1_body: ItemCollection =
298 serde_json::from_str(include_str!("../mocks/items-page-1.json")).unwrap();
299 let mut next_link = page_1_body.link("next").unwrap().clone();
300 let url: Url = next_link.href.parse().unwrap();
301 let query = url.query().unwrap();
302 next_link.href = format!(
303 "{}/collections/sentinel-2-l2a/items?{}",
304 server.url(),
305 query
306 );
307 page_1_body.set_link(next_link);
308 let page_1 = server
309 .mock("GET", "/collections/sentinel-2-l2a/items?limit=1")
310 .with_body(serde_json::to_string(&page_1_body).unwrap())
311 .with_header("content-type", "application/geo+json")
312 .create_async()
313 .await;
314 let page_2 = server
315 .mock("GET", "/collections/sentinel-2-l2a/items?limit=1&token=next:S2A_MSIL2A_20230216T235751_R087_T52CEB_20230217T134604")
316 .with_body(include_str!("../mocks/items-page-2.json"))
317 .with_header("content-type", "application/geo+json")
318 .create_async()
319 .await;
320
321 let client = ApiClient::new(&server.url()).unwrap();
322 let items = Items {
323 limit: Some(1),
324 ..Default::default()
325 };
326 let items: Vec<_> = client
327 .items("sentinel-2-l2a", Some(items))
328 .await
329 .unwrap()
330 .map(|result| result.unwrap())
331 .take(2)
332 .collect()
333 .await;
334 page_1.assert_async().await;
335 page_2.assert_async().await;
336 assert_eq!(items.len(), 2);
337 assert!(items[0]["id"] != items[1]["id"]);
338 }
339
340 #[tokio::test]
341 async fn stop_on_empty_page() {
342 let mut server = Server::new_async().await;
343 let mut page_body: ItemCollection =
344 serde_json::from_str(include_str!("../mocks/items-page-1.json")).unwrap();
345 let mut next_link = page_body.link("next").unwrap().clone();
346 let url: Url = next_link.href.parse().unwrap();
347 let query = url.query().unwrap();
348 next_link.href = format!(
349 "{}/collections/sentinel-2-l2a/items?{}",
350 server.url(),
351 query
352 );
353 page_body.set_link(next_link);
354 page_body.items = vec![];
355 let page = server
356 .mock("GET", "/collections/sentinel-2-l2a/items?limit=1")
357 .with_body(serde_json::to_string(&page_body).unwrap())
358 .with_header("content-type", "application/geo+json")
359 .create_async()
360 .await;
361
362 let client = ApiClient::new(&server.url()).unwrap();
363 let items = Items {
364 limit: Some(1),
365 ..Default::default()
366 };
367 let items: Vec<_> = client
368 .items("sentinel-2-l2a", Some(items))
369 .await
370 .unwrap()
371 .map(|result| result.unwrap())
372 .collect()
373 .await;
374 page.assert_async().await;
375 assert!(items.is_empty());
376 }
377}