stac/api/client.rs
1use super::{ItemCollection, Items, Search};
2use crate::{Collection, Error, Item};
3#[cfg(feature = "async")]
4use futures_core::Stream;
5use std::future::Future;
6
7/// A client that can fetch STAC items.
8///
9/// [`ItemsClient::search`] is the only required method. This trait covers the
10/// `/search`, `/collections/{id}/items`, and
11/// `/collections/{id}/items/{item_id}` endpoints — all expressed as
12/// constrained [`Search`] queries. [`ItemsClient::item`] and
13/// [`ItemsClient::items`] have default implementations that delegate to
14/// [`ItemsClient::search`].
15pub trait ItemsClient: Send + Sync {
16 /// The error type for this client.
17 type Error: Send;
18
19 /// Searches for STAC items matching the given parameters.
20 fn search(
21 &self,
22 search: Search,
23 ) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send;
24
25 /// Returns a single item from a collection.
26 ///
27 /// The default implementation uses [`ItemsClient::search`] with `ids` and
28 /// `collections` filters, then deserializes the result.
29 ///
30 /// Override this method if your backend has a native O(1) point-lookup for
31 /// `GET /collections/{id}/items/{item_id}`. Both the `pgstac` and `memory`
32 /// backends override this.
33 fn item(
34 &self,
35 collection_id: &str,
36 item_id: &str,
37 ) -> impl Future<Output = Result<Option<Item>, Self::Error>> + Send
38 where
39 Self::Error: From<Error>,
40 {
41 async move {
42 let search = Search::default()
43 .ids(vec![item_id.to_string()])
44 .collections(vec![collection_id.to_string()]);
45 let mut item_collection = self.search(search).await?;
46 if item_collection.items.len() == 1 {
47 let api_item = item_collection.items.pop().expect("just checked length");
48 let item: Item = serde_json::from_value(serde_json::Value::Object(api_item))
49 .map_err(Error::from)?;
50 Ok(Some(item))
51 } else {
52 Ok(None)
53 }
54 }
55 }
56
57 /// Returns items from a collection.
58 ///
59 /// The default implementation converts the request to a [`Search`] scoped
60 /// to the given collection and delegates to [`ItemsClient::search`].
61 fn items(
62 &self,
63 collection_id: &str,
64 items: Items,
65 ) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send {
66 async move {
67 let search = items.search_collection(collection_id);
68 self.search(search).await
69 }
70 }
71}
72
73/// A client that can retrieve STAC collections.
74///
75/// [`CollectionsClient::collections`] is the only required method.
76/// [`CollectionsClient::collection`] has a default implementation that
77/// scans all collections. Override it for O(1) lookups.
78pub trait CollectionsClient: Send + Sync {
79 /// The error type for this client.
80 type Error: Send;
81
82 /// Returns all collections.
83 fn collections(&self) -> impl Future<Output = Result<Vec<Collection>, Self::Error>> + Send;
84
85 /// Returns a single collection by ID.
86 ///
87 /// The default implementation scans all collections. Override this method
88 /// if your backend has an O(1) indexed lookup (e.g. a hash map or database
89 /// index). Both the `pgstac` and `memory` backends override this.
90 fn collection(
91 &self,
92 id: &str,
93 ) -> impl Future<Output = Result<Option<Collection>, Self::Error>> + Send {
94 async move {
95 let collections = self.collections().await?;
96 Ok(collections.into_iter().find(|c| c.id == id))
97 }
98 }
99}
100
101/// A client that can create or add STAC items and collections.
102///
103/// [`TransactionClient::add_collection`] and
104/// [`TransactionClient::add_item`] are required methods.
105/// [`TransactionClient::add_items`] has a default implementation that calls
106/// `add_item` in a loop.
107pub trait TransactionClient: Send {
108 /// The error type for this client.
109 type Error: Send;
110
111 /// Adds a collection.
112 fn add_collection(
113 &mut self,
114 collection: Collection,
115 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
116
117 /// Adds a single item.
118 fn add_item(&mut self, item: Item) -> impl Future<Output = Result<(), Self::Error>> + Send;
119
120 /// Adds multiple items.
121 ///
122 /// The default implementation calls [`TransactionClient::add_item`] for
123 /// each item sequentially.
124 fn add_items(
125 &mut self,
126 items: Vec<Item>,
127 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
128 async move {
129 for item in items {
130 self.add_item(item).await?;
131 }
132 Ok(())
133 }
134 }
135}
136
137/// A client that can return STAC items as Arrow record batches.
138///
139/// [`ArrowItemsClient::search_to_arrow`] is the only required method.
140/// [`ArrowItemsClient::items_to_arrow`] has a default implementation that
141/// delegates to `search_to_arrow`.
142///
143/// Unlike the other client traits, this trait does not require `Send + Sync`
144/// and `search_to_arrow` is synchronous. This allows implementations to return
145/// borrowing iterators (e.g. iterators that borrow from a database connection).
146#[cfg(feature = "geoarrow")]
147pub trait ArrowItemsClient {
148 /// The error type for this client.
149 type Error;
150
151 /// The record batch reader type returned by [`ArrowItemsClient::search_to_arrow`].
152 type RecordBatchStream<'a>: arrow_array::RecordBatchReader
153 where
154 Self: 'a;
155
156 /// Searches for STAC items, returning results as Arrow record batches.
157 fn search_to_arrow(&self, search: Search) -> Result<Self::RecordBatchStream<'_>, Self::Error>;
158
159 /// Returns items from a collection as Arrow record batches.
160 ///
161 /// The default implementation calls
162 /// [`ArrowItemsClient::search_to_arrow`] with a [`Search`] scoped to the
163 /// given collection. Override this method if your backend can serve
164 /// collection items in Arrow more efficiently than routing through search.
165 fn items_to_arrow(
166 &self,
167 collection_id: &str,
168 items: Items,
169 ) -> Result<Self::RecordBatchStream<'_>, Self::Error> {
170 self.search_to_arrow(items.search_collection(collection_id))
171 }
172}
173
174#[cfg(feature = "async")]
175/// A client that can stream STAC items across all pages.
176///
177/// [`StreamItemsClient::search_stream`] is the only required method. The
178/// default methods [`StreamItemsClient::collect_items`],
179/// [`StreamItemsClient::item_count`], and [`StreamItemsClient::items_stream`]
180/// are built on top of it.
181///
182/// `Stream` is the async equivalent of `Iterator` — think of
183/// `StreamExt::next().await` as the async `Iterator::next()`. You cannot
184/// implement `std::iter::Iterator` here because `Iterator::next` is
185/// synchronous; blocking an async runtime on each item would defeat the
186/// purpose. For blocking / sync contexts, wrap your runtime in a
187/// `tokio::runtime::Handle` or use the `BlockingClient` in `stac-io`.
188///
189/// # Examples
190///
191/// Stream items lazily (low memory):
192///
193/// ```no_run
194/// use futures::StreamExt;
195/// use stac::api::{Search, StreamItemsClient};
196///
197/// async fn example<C>(client: C)
198/// where
199/// C: StreamItemsClient,
200/// C::Error: std::fmt::Debug,
201/// {
202/// let search = Search::default();
203/// let stream = client.search_stream(search).await.unwrap();
204/// futures::pin_mut!(stream);
205/// while let Some(item) = stream.next().await {
206/// println!("Got item: {:?}", item.unwrap());
207/// }
208/// }
209/// ```
210///
211/// Or collect all into a `Vec` using the default method:
212///
213/// ```no_run
214/// use stac::api::{Search, StreamItemsClient};
215///
216/// async fn example<C>(client: C)
217/// where
218/// C: StreamItemsClient,
219/// C::Error: std::fmt::Debug,
220/// {
221/// let items = client.collect_items(Search::default()).await.unwrap();
222/// println!("Total: {}", items.len());
223/// }
224/// ```
225pub trait StreamItemsClient: Send + Sync {
226 /// The error type for this client.
227 type Error: Send;
228
229 /// Searches for STAC items, returning a stream of items.
230 ///
231 /// This method paginates through all pages of results. For a single page,
232 /// use [`ItemsClient::search`].
233 fn search_stream(
234 &self,
235 search: Search,
236 ) -> impl Future<
237 Output = Result<impl Stream<Item = Result<super::Item, Self::Error>> + Send, Self::Error>,
238 > + Send;
239
240 /// Collects all items from all pages into a `Vec`.
241 ///
242 /// Drives [`search_stream`](StreamItemsClient::search_stream) to
243 /// completion. Prefer [`search_stream`](StreamItemsClient::search_stream)
244 /// when working with large result sets to avoid loading everything into
245 /// memory at once.
246 ///
247 /// # Examples
248 ///
249 /// ```no_run
250 /// use stac::api::{Search, StreamItemsClient};
251 ///
252 /// async fn example<C>(client: C)
253 /// where
254 /// C: StreamItemsClient,
255 /// C::Error: std::fmt::Debug,
256 /// {
257 /// let items = client.collect_items(Search::default()).await.unwrap();
258 /// println!("Got {} items", items.len());
259 /// }
260 /// ```
261 fn collect_items(
262 &self,
263 search: Search,
264 ) -> impl Future<Output = Result<Vec<super::Item>, Self::Error>> + Send {
265 async move {
266 use futures::TryStreamExt as _;
267 let stream = self.search_stream(search).await?;
268 futures::pin_mut!(stream);
269 stream.try_collect().await
270 }
271 }
272
273 /// Counts all items across all pages without collecting them.
274 ///
275 /// More memory-efficient than [`collect_items`](StreamItemsClient::collect_items)
276 /// when only the count is needed. Each item is deserialized and immediately
277 /// discarded.
278 ///
279 /// # Examples
280 ///
281 /// ```no_run
282 /// use stac::api::{Search, StreamItemsClient};
283 ///
284 /// async fn example<C>(client: C)
285 /// where
286 /// C: StreamItemsClient,
287 /// C::Error: std::fmt::Debug,
288 /// {
289 /// let count = client.item_count(Search::default()).await.unwrap();
290 /// println!("Total items: {count}");
291 /// }
292 /// ```
293 fn item_count(
294 &self,
295 search: Search,
296 ) -> impl Future<Output = Result<usize, Self::Error>> + Send {
297 async move {
298 use futures::TryStreamExt as _;
299 let stream = self.search_stream(search).await?;
300 futures::pin_mut!(stream);
301 let mut count = 0usize;
302 stream
303 .try_for_each(|_| {
304 count += 1;
305 async { Ok(()) }
306 })
307 .await?;
308 Ok(count)
309 }
310 }
311
312 /// Streams all items belonging to a collection, paginating through all pages.
313 ///
314 /// The default implementation calls
315 /// [`StreamItemsClient::search_stream`] with a [`Search`] scoped to the
316 /// given collection. Override this method if your backend has a dedicated
317 /// link-following items endpoint (e.g. `stac-io`'s HTTP client).
318 ///
319 /// # Examples
320 ///
321 /// ```no_run
322 /// use futures::StreamExt;
323 /// use stac::api::{Items, StreamItemsClient};
324 ///
325 /// async fn example<C>(client: C)
326 /// where
327 /// C: StreamItemsClient,
328 /// C::Error: std::fmt::Debug,
329 /// {
330 /// let stream = client.items_stream("my-collection", Items::default()).await.unwrap();
331 /// futures::pin_mut!(stream);
332 /// while let Some(item) = stream.next().await {
333 /// println!("Got item: {:?}", item.unwrap());
334 /// }
335 /// }
336 /// ```
337 fn items_stream(
338 &self,
339 collection_id: &str,
340 items: Items,
341 ) -> impl Future<
342 Output = Result<impl Stream<Item = Result<super::Item, Self::Error>> + Send, Self::Error>,
343 > + Send {
344 async move {
345 let search = items.search_collection(collection_id);
346 self.search_stream(search).await
347 }
348 }
349}
350
351/// A client that can fetch a single page of STAC collections with cursor
352/// pagination.
353///
354/// This is the paginated counterpart to [`CollectionsClient`]. Implement this
355/// trait when your backend supports cursor-based `/collections` pagination
356/// (e.g. a future pgstac version, or any backend that returns a `next_token`
357/// alongside the collection list).
358///
359/// # Blanket impl
360///
361/// Any `T: PagedCollectionsClient + Clone + Send + Sync` automatically
362/// implements [`StreamCollectionsClient`] by following the cursor chain.
363/// This blanket takes priority over the simpler
364/// `CollectionsClient → StreamCollectionsClient` blanket for types that
365/// implement `PagedCollectionsClient`.
366///
367/// # Examples
368///
369/// ```no_run
370/// use stac::Collection;
371/// use stac::api::PagedCollectionsClient;
372///
373/// struct MyBackend;
374///
375/// impl PagedCollectionsClient for MyBackend {
376/// type Error = std::convert::Infallible;
377///
378/// async fn collections_page(
379/// &self,
380/// token: Option<String>,
381/// ) -> Result<(Vec<Collection>, Option<String>), Self::Error> {
382/// // fetch one page; return (collections, next_token)
383/// Ok((vec![], None))
384/// }
385/// }
386/// ```
387pub trait PagedCollectionsClient: Send + Sync {
388 /// The error type for this client.
389 type Error: Send;
390
391 /// Fetches one page of collections.
392 ///
393 /// `token` is the cursor returned by the previous call, or `None` for the
394 /// first page. Returns the collections on this page and an optional cursor
395 /// for the next page (`None` means no more pages).
396 fn collections_page(
397 &self,
398 token: Option<String>,
399 ) -> impl Future<Output = Result<(Vec<Collection>, Option<String>), Self::Error>> + Send;
400}
401
402#[cfg(feature = "async")]
403/// A client that can stream STAC collections.
404///
405/// [`StreamCollectionsClient::collections_stream`] is the only required
406/// method. This mirrors the naming convention of [`StreamItemsClient`]:
407/// the prefix `Stream` indicates a streaming variant of its non-streaming
408/// counterpart ([`CollectionsClient`]).
409///
410/// # Blanket impl
411///
412/// Any `T: CollectionsClient + Clone + Send + Sync` automatically implements
413/// this trait by eagerly fetching all collections in one call and yielding
414/// them as a stream.
415///
416/// For cursor-paginated backends, implement [`PagedCollectionsClient`] and
417/// call [`stream_pages_collections`](crate::api::stream_pages_collections)
418/// inside your own `StreamCollectionsClient` impl — the same pattern as
419/// [`stream_pages`](crate::api::stream_pages) for items.
420pub trait StreamCollectionsClient: Send + Sync {
421 /// The error type for this client.
422 type Error: Send;
423
424 /// Returns all collections as a stream.
425 fn collections_stream(
426 &self,
427 ) -> impl Future<
428 Output = Result<impl Stream<Item = Result<Collection, Self::Error>> + Send, Self::Error>,
429 > + Send;
430
431 /// Collects all collections into a `Vec`.
432 ///
433 /// Convenience wrapper around
434 /// [`collections_stream`](StreamCollectionsClient::collections_stream).
435 ///
436 /// # Examples
437 ///
438 /// ```no_run
439 /// use stac::api::StreamCollectionsClient;
440 ///
441 /// async fn example<C>(client: C)
442 /// where
443 /// C: StreamCollectionsClient,
444 /// C::Error: std::fmt::Debug,
445 /// {
446 /// let collections = client.collect_collections().await.unwrap();
447 /// println!("Got {} collections", collections.len());
448 /// }
449 /// ```
450 fn collect_collections(
451 &self,
452 ) -> impl Future<Output = Result<Vec<Collection>, Self::Error>> + Send {
453 async move {
454 use futures::TryStreamExt as _;
455 let stream = self.collections_stream().await?;
456 futures::pin_mut!(stream);
457 stream.try_collect().await
458 }
459 }
460}