Skip to main content

stac/api/
client.rs

1use super::{ItemCollection, Items, Search};
2use crate::{Collection, Error, Item};
3#[cfg(feature = "async")]
4use futures_core::Stream;
5use std::future::Future;
6
7/// A client that can fetch STAC items.
8///
9/// [`ItemsClient::search`] is the only required method. This trait covers the
10/// `/search`, `/collections/{id}/items`, and
11/// `/collections/{id}/items/{item_id}` endpoints — all expressed as
12/// constrained [`Search`] queries. [`ItemsClient::item`] and
13/// [`ItemsClient::items`] have default implementations that delegate to
14/// [`ItemsClient::search`].
15pub trait ItemsClient: Send + Sync {
16    /// The error type for this client.
17    type Error: Send;
18
19    /// Searches for STAC items matching the given parameters.
20    fn search(
21        &self,
22        search: Search,
23    ) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send;
24
25    /// Returns a single item from a collection.
26    ///
27    /// The default implementation uses [`ItemsClient::search`] with `ids` and
28    /// `collections` filters, then deserializes the result.
29    ///
30    /// Override this method if your backend has a native O(1) point-lookup for
31    /// `GET /collections/{id}/items/{item_id}`. Both the `pgstac` and `memory`
32    /// backends override this.
33    fn item(
34        &self,
35        collection_id: &str,
36        item_id: &str,
37    ) -> impl Future<Output = Result<Option<Item>, Self::Error>> + Send
38    where
39        Self::Error: From<Error>,
40    {
41        async move {
42            let search = Search::default()
43                .ids(vec![item_id.to_string()])
44                .collections(vec![collection_id.to_string()]);
45            let mut item_collection = self.search(search).await?;
46            if item_collection.items.len() == 1 {
47                let api_item = item_collection.items.pop().expect("just checked length");
48                let item: Item = serde_json::from_value(serde_json::Value::Object(api_item))
49                    .map_err(Error::from)?;
50                Ok(Some(item))
51            } else {
52                Ok(None)
53            }
54        }
55    }
56
57    /// Returns items from a collection.
58    ///
59    /// The default implementation converts the request to a [`Search`] scoped
60    /// to the given collection and delegates to [`ItemsClient::search`].
61    fn items(
62        &self,
63        collection_id: &str,
64        items: Items,
65    ) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send {
66        async move {
67            let search = items.search_collection(collection_id);
68            self.search(search).await
69        }
70    }
71}
72
73/// A client that can retrieve STAC collections.
74///
75/// [`CollectionsClient::collections`] is the only required method.
76/// [`CollectionsClient::collection`] has a default implementation that
77/// scans all collections. Override it for O(1) lookups.
78pub trait CollectionsClient: Send + Sync {
79    /// The error type for this client.
80    type Error: Send;
81
82    /// Returns all collections.
83    fn collections(&self) -> impl Future<Output = Result<Vec<Collection>, Self::Error>> + Send;
84
85    /// Returns a single collection by ID.
86    ///
87    /// The default implementation scans all collections. Override this method
88    /// if your backend has an O(1) indexed lookup (e.g. a hash map or database
89    /// index). Both the `pgstac` and `memory` backends override this.
90    fn collection(
91        &self,
92        id: &str,
93    ) -> impl Future<Output = Result<Option<Collection>, Self::Error>> + Send {
94        async move {
95            let collections = self.collections().await?;
96            Ok(collections.into_iter().find(|c| c.id == id))
97        }
98    }
99}
100
101/// A client that can create or add STAC items and collections.
102///
103/// [`TransactionClient::add_collection`] and
104/// [`TransactionClient::add_item`] are required methods.
105/// [`TransactionClient::add_items`] has a default implementation that calls
106/// `add_item` in a loop.
107pub trait TransactionClient: Send {
108    /// The error type for this client.
109    type Error: Send;
110
111    /// Adds a collection.
112    fn add_collection(
113        &mut self,
114        collection: Collection,
115    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
116
117    /// Adds a single item.
118    fn add_item(&mut self, item: Item) -> impl Future<Output = Result<(), Self::Error>> + Send;
119
120    /// Adds multiple items.
121    ///
122    /// The default implementation calls [`TransactionClient::add_item`] for
123    /// each item sequentially.
124    fn add_items(
125        &mut self,
126        items: Vec<Item>,
127    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
128        async move {
129            for item in items {
130                self.add_item(item).await?;
131            }
132            Ok(())
133        }
134    }
135}
136
137/// A client that can return STAC items as Arrow record batches.
138///
139/// [`ArrowItemsClient::search_to_arrow`] is the only required method.
140/// [`ArrowItemsClient::items_to_arrow`] has a default implementation that
141/// delegates to `search_to_arrow`.
142///
143/// Unlike the other client traits, this trait does not require `Send + Sync`
144/// and `search_to_arrow` is synchronous. This allows implementations to return
145/// borrowing iterators (e.g. iterators that borrow from a database connection).
146#[cfg(feature = "geoarrow")]
147pub trait ArrowItemsClient {
148    /// The error type for this client.
149    type Error;
150
151    /// The record batch reader type returned by [`ArrowItemsClient::search_to_arrow`].
152    type RecordBatchStream<'a>: arrow_array::RecordBatchReader
153    where
154        Self: 'a;
155
156    /// Searches for STAC items, returning results as Arrow record batches.
157    fn search_to_arrow(&self, search: Search) -> Result<Self::RecordBatchStream<'_>, Self::Error>;
158
159    /// Returns items from a collection as Arrow record batches.
160    ///
161    /// The default implementation calls
162    /// [`ArrowItemsClient::search_to_arrow`] with a [`Search`] scoped to the
163    /// given collection. Override this method if your backend can serve
164    /// collection items in Arrow more efficiently than routing through search.
165    fn items_to_arrow(
166        &self,
167        collection_id: &str,
168        items: Items,
169    ) -> Result<Self::RecordBatchStream<'_>, Self::Error> {
170        self.search_to_arrow(items.search_collection(collection_id))
171    }
172}
173
174#[cfg(feature = "async")]
175/// A client that can stream STAC items across all pages.
176///
177/// [`StreamItemsClient::search_stream`] is the only required method. The
178/// default methods [`StreamItemsClient::collect_items`],
179/// [`StreamItemsClient::item_count`], and [`StreamItemsClient::items_stream`]
180/// are built on top of it.
181///
182/// `Stream` is the async equivalent of `Iterator` — think of
183/// `StreamExt::next().await` as the async `Iterator::next()`. You cannot
184/// implement `std::iter::Iterator` here because `Iterator::next` is
185/// synchronous; blocking an async runtime on each item would defeat the
186/// purpose. For blocking / sync contexts, wrap your runtime in a
187/// `tokio::runtime::Handle` or use the `BlockingClient` in `stac-io`.
188///
189/// # Examples
190///
191/// Stream items lazily (low memory):
192///
193/// ```no_run
194/// use futures::StreamExt;
195/// use stac::api::{Search, StreamItemsClient};
196///
197/// async fn example<C>(client: C)
198/// where
199///     C: StreamItemsClient,
200///     C::Error: std::fmt::Debug,
201/// {
202///     let search = Search::default();
203///     let stream = client.search_stream(search).await.unwrap();
204///     futures::pin_mut!(stream);
205///     while let Some(item) = stream.next().await {
206///         println!("Got item: {:?}", item.unwrap());
207///     }
208/// }
209/// ```
210///
211/// Or collect all into a `Vec` using the default method:
212///
213/// ```no_run
214/// use stac::api::{Search, StreamItemsClient};
215///
216/// async fn example<C>(client: C)
217/// where
218///     C: StreamItemsClient,
219///     C::Error: std::fmt::Debug,
220/// {
221///     let items = client.collect_items(Search::default()).await.unwrap();
222///     println!("Total: {}", items.len());
223/// }
224/// ```
225pub trait StreamItemsClient: Send + Sync {
226    /// The error type for this client.
227    type Error: Send;
228
229    /// Searches for STAC items, returning a stream of items.
230    ///
231    /// This method paginates through all pages of results. For a single page,
232    /// use [`ItemsClient::search`].
233    fn search_stream(
234        &self,
235        search: Search,
236    ) -> impl Future<
237        Output = Result<impl Stream<Item = Result<super::Item, Self::Error>> + Send, Self::Error>,
238    > + Send;
239
240    /// Collects all items from all pages into a `Vec`.
241    ///
242    /// Drives [`search_stream`](StreamItemsClient::search_stream) to
243    /// completion. Prefer [`search_stream`](StreamItemsClient::search_stream)
244    /// when working with large result sets to avoid loading everything into
245    /// memory at once.
246    ///
247    /// # Examples
248    ///
249    /// ```no_run
250    /// use stac::api::{Search, StreamItemsClient};
251    ///
252    /// async fn example<C>(client: C)
253    /// where
254    ///     C: StreamItemsClient,
255    ///     C::Error: std::fmt::Debug,
256    /// {
257    ///     let items = client.collect_items(Search::default()).await.unwrap();
258    ///     println!("Got {} items", items.len());
259    /// }
260    /// ```
261    fn collect_items(
262        &self,
263        search: Search,
264    ) -> impl Future<Output = Result<Vec<super::Item>, Self::Error>> + Send {
265        async move {
266            use futures::TryStreamExt as _;
267            let stream = self.search_stream(search).await?;
268            futures::pin_mut!(stream);
269            stream.try_collect().await
270        }
271    }
272
273    /// Counts all items across all pages without collecting them.
274    ///
275    /// More memory-efficient than [`collect_items`](StreamItemsClient::collect_items)
276    /// when only the count is needed. Each item is deserialized and immediately
277    /// discarded.
278    ///
279    /// # Examples
280    ///
281    /// ```no_run
282    /// use stac::api::{Search, StreamItemsClient};
283    ///
284    /// async fn example<C>(client: C)
285    /// where
286    ///     C: StreamItemsClient,
287    ///     C::Error: std::fmt::Debug,
288    /// {
289    ///     let count = client.item_count(Search::default()).await.unwrap();
290    ///     println!("Total items: {count}");
291    /// }
292    /// ```
293    fn item_count(
294        &self,
295        search: Search,
296    ) -> impl Future<Output = Result<usize, Self::Error>> + Send {
297        async move {
298            use futures::TryStreamExt as _;
299            let stream = self.search_stream(search).await?;
300            futures::pin_mut!(stream);
301            let mut count = 0usize;
302            stream
303                .try_for_each(|_| {
304                    count += 1;
305                    async { Ok(()) }
306                })
307                .await?;
308            Ok(count)
309        }
310    }
311
312    /// Streams all items belonging to a collection, paginating through all pages.
313    ///
314    /// The default implementation calls
315    /// [`StreamItemsClient::search_stream`] with a [`Search`] scoped to the
316    /// given collection. Override this method if your backend has a dedicated
317    /// link-following items endpoint (e.g. `stac-io`'s HTTP client).
318    ///
319    /// # Examples
320    ///
321    /// ```no_run
322    /// use futures::StreamExt;
323    /// use stac::api::{Items, StreamItemsClient};
324    ///
325    /// async fn example<C>(client: C)
326    /// where
327    ///     C: StreamItemsClient,
328    ///     C::Error: std::fmt::Debug,
329    /// {
330    ///     let stream = client.items_stream("my-collection", Items::default()).await.unwrap();
331    ///     futures::pin_mut!(stream);
332    ///     while let Some(item) = stream.next().await {
333    ///         println!("Got item: {:?}", item.unwrap());
334    ///     }
335    /// }
336    /// ```
337    fn items_stream(
338        &self,
339        collection_id: &str,
340        items: Items,
341    ) -> impl Future<
342        Output = Result<impl Stream<Item = Result<super::Item, Self::Error>> + Send, Self::Error>,
343    > + Send {
344        async move {
345            let search = items.search_collection(collection_id);
346            self.search_stream(search).await
347        }
348    }
349}
350
351/// A client that can fetch a single page of STAC collections with cursor
352/// pagination.
353///
354/// This is the paginated counterpart to [`CollectionsClient`]. Implement this
355/// trait when your backend supports cursor-based `/collections` pagination
356/// (e.g. a future pgstac version, or any backend that returns a `next_token`
357/// alongside the collection list).
358///
359/// # Blanket impl
360///
361/// Any `T: PagedCollectionsClient + Clone + Send + Sync` automatically
362/// implements [`StreamCollectionsClient`] by following the cursor chain.
363/// This blanket takes priority over the simpler
364/// `CollectionsClient → StreamCollectionsClient` blanket for types that
365/// implement `PagedCollectionsClient`.
366///
367/// # Examples
368///
369/// ```no_run
370/// use stac::Collection;
371/// use stac::api::PagedCollectionsClient;
372///
373/// struct MyBackend;
374///
375/// impl PagedCollectionsClient for MyBackend {
376///     type Error = std::convert::Infallible;
377///
378///     async fn collections_page(
379///         &self,
380///         token: Option<String>,
381///     ) -> Result<(Vec<Collection>, Option<String>), Self::Error> {
382///         // fetch one page; return (collections, next_token)
383///         Ok((vec![], None))
384///     }
385/// }
386/// ```
387pub trait PagedCollectionsClient: Send + Sync {
388    /// The error type for this client.
389    type Error: Send;
390
391    /// Fetches one page of collections.
392    ///
393    /// `token` is the cursor returned by the previous call, or `None` for the
394    /// first page. Returns the collections on this page and an optional cursor
395    /// for the next page (`None` means no more pages).
396    fn collections_page(
397        &self,
398        token: Option<String>,
399    ) -> impl Future<Output = Result<(Vec<Collection>, Option<String>), Self::Error>> + Send;
400}
401
402#[cfg(feature = "async")]
403/// A client that can stream STAC collections.
404///
405/// [`StreamCollectionsClient::collections_stream`] is the only required
406/// method. This mirrors the naming convention of [`StreamItemsClient`]:
407/// the prefix `Stream` indicates a streaming variant of its non-streaming
408/// counterpart ([`CollectionsClient`]).
409///
410/// # Blanket impl
411///
412/// Any `T: CollectionsClient + Clone + Send + Sync` automatically implements
413/// this trait by eagerly fetching all collections in one call and yielding
414/// them as a stream.
415///
416/// For cursor-paginated backends, implement [`PagedCollectionsClient`] and
417/// call [`stream_pages_collections`](crate::api::stream_pages_collections)
418/// inside your own `StreamCollectionsClient` impl — the same pattern as
419/// [`stream_pages`](crate::api::stream_pages) for items.
420pub trait StreamCollectionsClient: Send + Sync {
421    /// The error type for this client.
422    type Error: Send;
423
424    /// Returns all collections as a stream.
425    fn collections_stream(
426        &self,
427    ) -> impl Future<
428        Output = Result<impl Stream<Item = Result<Collection, Self::Error>> + Send, Self::Error>,
429    > + Send;
430
431    /// Collects all collections into a `Vec`.
432    ///
433    /// Convenience wrapper around
434    /// [`collections_stream`](StreamCollectionsClient::collections_stream).
435    ///
436    /// # Examples
437    ///
438    /// ```no_run
439    /// use stac::api::StreamCollectionsClient;
440    ///
441    /// async fn example<C>(client: C)
442    /// where
443    ///     C: StreamCollectionsClient,
444    ///     C::Error: std::fmt::Debug,
445    /// {
446    ///     let collections = client.collect_collections().await.unwrap();
447    ///     println!("Got {} collections", collections.len());
448    /// }
449    /// ```
450    fn collect_collections(
451        &self,
452    ) -> impl Future<Output = Result<Vec<Collection>, Self::Error>> + Send {
453        async move {
454            use futures::TryStreamExt as _;
455            let stream = self.collections_stream().await?;
456            futures::pin_mut!(stream);
457            stream.try_collect().await
458        }
459    }
460}