stac_async/
api_client.rs

1use crate::{Client, Error, Result};
2use async_stream::try_stream;
3use futures_core::stream::Stream;
4use futures_util::{pin_mut, StreamExt};
5use reqwest::Method;
6use stac::{Collection, Links};
7use stac_api::{GetItems, Item, ItemCollection, Items, Search, UrlBuilder};
8use tokio::{
9    sync::mpsc::{self, error::SendError},
10    task::JoinHandle,
11};
12
13const DEFAULT_CHANNEL_BUFFER: usize = 4;
14
15/// A client for interacting with STAC APIs.
16#[derive(Debug)]
17pub struct ApiClient {
18    client: Client,
19    channel_buffer: usize,
20    url_builder: UrlBuilder,
21}
22
23impl ApiClient {
24    /// Creates a new API client.
25    ///
26    /// # Examples
27    ///
28    /// ```
29    /// # use stac_async::ApiClient;
30    /// let client = ApiClient::new("https://planetarycomputer.microsoft.com/api/stac/v1").unwrap();
31    /// ```
32    pub fn new(url: &str) -> Result<ApiClient> {
33        // TODO support HATEOS (aka look up the urls from the root catalog)
34        ApiClient::with_client(Client::new(), url)
35    }
36
37    /// Creates a new API client with the given [Client].
38    ///
39    /// Useful if you want to customize the behavior of the underlying `Client`,
40    /// as documented in [Client::new].
41    ///
42    /// # Examples
43    ///
44    /// ```
45    /// use stac_async::{Client, ApiClient};
46    /// let client = Client::new();
47    /// let api_client = ApiClient::with_client(client, "https://earth-search.aws.element84.com/v1/").unwrap();
48    /// ```
49    pub fn with_client(client: Client, url: &str) -> Result<ApiClient> {
50        Ok(ApiClient {
51            client,
52            channel_buffer: DEFAULT_CHANNEL_BUFFER,
53            url_builder: UrlBuilder::new(url)?,
54        })
55    }
56
57    /// Returns a single collection.
58    ///
59    /// # Examples
60    ///
61    /// ```no_run
62    /// # use stac_async::ApiClient;
63    /// let client = ApiClient::new("https://planetarycomputer.microsoft.com/api/stac/v1").unwrap();
64    /// # tokio_test::block_on(async {
65    /// let collection = client.collection("sentinel-2-l2a").await.unwrap().unwrap();
66    /// # })
67    /// ```
68    pub async fn collection(&self, id: &str) -> Result<Option<Collection>> {
69        let url = self.url_builder.collection(id)?;
70        self.client.get(url).await
71    }
72
73    /// Returns a stream of items belonging to a collection, using the [items
74    /// endpoint](https://github.com/radiantearth/stac-api-spec/tree/main/ogcapi-features#collection-items-collectionscollectioniditems).
75    ///
76    /// The `items` argument can be used to filter, sort, and otherwise
77    /// configure the request.
78    ///
79    /// # Examples
80    ///
81    /// ```no_run
82    /// use stac_api::Items;
83    /// use stac_async::ApiClient;
84    /// use futures_util::stream::StreamExt;
85    ///
86    /// let client = ApiClient::new("https://planetarycomputer.microsoft.com/api/stac/v1").unwrap();
87    /// let items = Items {
88    ///     limit: Some(1),
89    ///     ..Default::default()
90    /// };
91    /// # tokio_test::block_on(async {
92    /// let items: Vec<_> = client
93    ///     .items("sentinel-2-l2a", items)
94    ///     .await
95    ///     .unwrap()
96    ///     .map(|result| result.unwrap())
97    ///     .collect()
98    ///     .await;
99    /// assert_eq!(items.len(), 1);
100    /// # })
101    /// ```
102    pub async fn items(
103        &self,
104        id: &str,
105        items: impl Into<Option<Items>>,
106    ) -> Result<impl Stream<Item = Result<Item>>> {
107        let url = self.url_builder.items(id)?; // TODO HATEOS
108        let items = if let Some(items) = items.into() {
109            Some(GetItems::try_from(items)?)
110        } else {
111            None
112        };
113        let page: Option<ItemCollection> = self
114            .client
115            .request(Method::GET, url.clone(), items.as_ref(), None)
116            .await?;
117        if let Some(page) = page {
118            Ok(stream_items(self.client.clone(), page, self.channel_buffer))
119        } else {
120            Err(Error::NotFound(url))
121        }
122    }
123
124    /// Searches an API, returning a stream of items.
125    ///
126    /// # Examples
127    ///
128    /// ```no_run
129    /// use stac_api::Search;
130    /// use stac_async::ApiClient;
131    /// use futures_util::stream::StreamExt;
132    ///
133    /// let client = ApiClient::new("https://planetarycomputer.microsoft.com/api/stac/v1").unwrap();
134    /// let mut search = Search { collections: Some(vec!["sentinel-2-l2a".to_string()]), ..Default::default() };
135    /// search.items.limit = Some(1);
136    /// # tokio_test::block_on(async {
137    /// let items: Vec<_> = client
138    ///     .search(search)
139    ///     .await
140    ///     .unwrap()
141    ///     .map(|result| result.unwrap())
142    ///     .collect()
143    ///     .await;
144    /// assert_eq!(items.len(), 1);
145    /// # })
146    /// ```
147    pub async fn search(&self, search: Search) -> Result<impl Stream<Item = Result<Item>>> {
148        let url = self.url_builder.search().clone();
149        // TODO support GET
150        let page: Option<ItemCollection> = self.client.post(url.clone(), &search).await?;
151        if let Some(page) = page {
152            Ok(stream_items(self.client.clone(), page, self.channel_buffer))
153        } else {
154            Err(Error::NotFound(url))
155        }
156    }
157}
158
159fn stream_items(
160    client: Client,
161    page: ItemCollection,
162    channel_buffer: usize,
163) -> impl Stream<Item = Result<Item>> {
164    let (tx, mut rx) = mpsc::channel(channel_buffer);
165    let handle: JoinHandle<std::result::Result<(), SendError<_>>> = tokio::spawn(async move {
166        let pages = stream_pages(client, page);
167        pin_mut!(pages);
168        while let Some(result) = pages.next().await {
169            match result {
170                Ok(page) => tx.send(Ok(page)).await?,
171                Err(err) => {
172                    tx.send(Err(err)).await?;
173                    return Ok(());
174                }
175            }
176        }
177        Ok(())
178    });
179    try_stream! {
180        while let Some(result) = rx.recv().await {
181            let page = result?;
182            for item in page.items {
183                yield item;
184            }
185        }
186        let _ = handle.await?;
187    }
188}
189
190fn stream_pages(
191    client: Client,
192    mut page: ItemCollection,
193) -> impl Stream<Item = Result<ItemCollection>> {
194    try_stream! {
195        loop {
196            if page.items.is_empty() {
197                break;
198            }
199            let next_link = page.link("next").cloned();
200            yield page;
201            if let Some(next_link) = next_link {
202                if let Some(next_page) = client.request_from_link(next_link).await? {
203                    page = next_page;
204                } else {
205                    break;
206                }
207            } else {
208                break;
209            }
210        }
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::ApiClient;
217    use futures_util::stream::StreamExt;
218    use mockito::{Matcher, Server};
219    use serde_json::json;
220    use stac::Links;
221    use stac_api::{ItemCollection, Items, Search};
222    use url::Url;
223
224    #[tokio::test]
225    async fn collection_not_found() {
226        let mut server = Server::new_async().await;
227        let collection = server
228            .mock("GET", "/collections/not-a-collection")
229            .with_body(include_str!("../mocks/not-a-collection.json"))
230            .with_header("content-type", "application/json")
231            .with_status(404)
232            .create_async()
233            .await;
234
235        let client = ApiClient::new(&server.url()).unwrap();
236        assert!(client
237            .collection("not-a-collection")
238            .await
239            .unwrap()
240            .is_none());
241        collection.assert_async().await;
242    }
243
244    #[tokio::test]
245    async fn search_with_paging() {
246        let mut server = Server::new_async().await;
247        let mut page_1_body: ItemCollection =
248            serde_json::from_str(include_str!("../mocks/search-page-1.json")).unwrap();
249        let mut next_link = page_1_body.link("next").unwrap().clone();
250        next_link.href = format!("{}/search", server.url());
251        page_1_body.set_link(next_link);
252        let page_1 = server
253            .mock("POST", "/search")
254            .match_body(Matcher::Json(json!({
255                "collections": ["sentinel-2-l2a"],
256                "limit": 1
257            })))
258            .with_body(serde_json::to_string(&page_1_body).unwrap())
259            .with_header("content-type", "application/geo+json")
260            .create_async()
261            .await;
262        let page_2 = server
263            .mock("POST", "/search")
264            .match_body(Matcher::Json(json!({
265                "collections": ["sentinel-2-l2a"],
266                "limit": 1,
267                "token": "next:S2A_MSIL2A_20230216T150721_R082_T19PHS_20230217T082924"
268            })))
269            .with_body(include_str!("../mocks/search-page-2.json"))
270            .with_header("content-type", "application/geo+json")
271            .create_async()
272            .await;
273
274        let client = ApiClient::new(&server.url()).unwrap();
275        let mut search = Search {
276            collections: Some(vec!["sentinel-2-l2a".to_string()]),
277            ..Default::default()
278        };
279        search.items.limit = Some(1);
280        let items: Vec<_> = client
281            .search(search)
282            .await
283            .unwrap()
284            .map(|result| result.unwrap())
285            .take(2)
286            .collect()
287            .await;
288        page_1.assert_async().await;
289        page_2.assert_async().await;
290        assert_eq!(items.len(), 2);
291        assert!(items[0]["id"] != items[1]["id"]);
292    }
293
294    #[tokio::test]
295    async fn items_with_paging() {
296        let mut server = Server::new_async().await;
297        let mut page_1_body: ItemCollection =
298            serde_json::from_str(include_str!("../mocks/items-page-1.json")).unwrap();
299        let mut next_link = page_1_body.link("next").unwrap().clone();
300        let url: Url = next_link.href.parse().unwrap();
301        let query = url.query().unwrap();
302        next_link.href = format!(
303            "{}/collections/sentinel-2-l2a/items?{}",
304            server.url(),
305            query
306        );
307        page_1_body.set_link(next_link);
308        let page_1 = server
309            .mock("GET", "/collections/sentinel-2-l2a/items?limit=1")
310            .with_body(serde_json::to_string(&page_1_body).unwrap())
311            .with_header("content-type", "application/geo+json")
312            .create_async()
313            .await;
314        let page_2 = server
315            .mock("GET", "/collections/sentinel-2-l2a/items?limit=1&token=next:S2A_MSIL2A_20230216T235751_R087_T52CEB_20230217T134604")
316            .with_body(include_str!("../mocks/items-page-2.json"))
317            .with_header("content-type", "application/geo+json")
318            .create_async()
319            .await;
320
321        let client = ApiClient::new(&server.url()).unwrap();
322        let items = Items {
323            limit: Some(1),
324            ..Default::default()
325        };
326        let items: Vec<_> = client
327            .items("sentinel-2-l2a", Some(items))
328            .await
329            .unwrap()
330            .map(|result| result.unwrap())
331            .take(2)
332            .collect()
333            .await;
334        page_1.assert_async().await;
335        page_2.assert_async().await;
336        assert_eq!(items.len(), 2);
337        assert!(items[0]["id"] != items[1]["id"]);
338    }
339
340    #[tokio::test]
341    async fn stop_on_empty_page() {
342        let mut server = Server::new_async().await;
343        let mut page_body: ItemCollection =
344            serde_json::from_str(include_str!("../mocks/items-page-1.json")).unwrap();
345        let mut next_link = page_body.link("next").unwrap().clone();
346        let url: Url = next_link.href.parse().unwrap();
347        let query = url.query().unwrap();
348        next_link.href = format!(
349            "{}/collections/sentinel-2-l2a/items?{}",
350            server.url(),
351            query
352        );
353        page_body.set_link(next_link);
354        page_body.items = vec![];
355        let page = server
356            .mock("GET", "/collections/sentinel-2-l2a/items?limit=1")
357            .with_body(serde_json::to_string(&page_body).unwrap())
358            .with_header("content-type", "application/geo+json")
359            .create_async()
360            .await;
361
362        let client = ApiClient::new(&server.url()).unwrap();
363        let items = Items {
364            limit: Some(1),
365            ..Default::default()
366        };
367        let items: Vec<_> = client
368            .items("sentinel-2-l2a", Some(items))
369            .await
370            .unwrap()
371            .map(|result| result.unwrap())
372            .collect()
373            .await;
374        page.assert_async().await;
375        assert!(items.is_empty());
376    }
377}