use super::{ItemCollection, Items, Search};
use crate::{Collection, Error, Item};
#[cfg(feature = "async")]
use futures_core::Stream;
use std::future::Future;
pub trait ItemsClient: Send + Sync {
type Error: Send;
fn search(
&self,
search: Search,
) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send;
fn item(
&self,
collection_id: &str,
item_id: &str,
) -> impl Future<Output = Result<Option<Item>, Self::Error>> + Send
where
Self::Error: From<Error>,
{
async move {
let search = Search::default()
.ids(vec![item_id.to_string()])
.collections(vec![collection_id.to_string()]);
let mut item_collection = self.search(search).await?;
if item_collection.items.len() == 1 {
let api_item = item_collection.items.pop().expect("just checked length");
let item: Item = serde_json::from_value(serde_json::Value::Object(api_item))
.map_err(Error::from)?;
Ok(Some(item))
} else {
Ok(None)
}
}
}
fn items(
&self,
collection_id: &str,
items: Items,
) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send {
async move {
let search = items.search_collection(collection_id);
self.search(search).await
}
}
}
pub trait CollectionsClient: Send + Sync {
type Error: Send;
fn collections(&self) -> impl Future<Output = Result<Vec<Collection>, Self::Error>> + Send;
fn collection(
&self,
id: &str,
) -> impl Future<Output = Result<Option<Collection>, Self::Error>> + Send {
async move {
let collections = self.collections().await?;
Ok(collections.into_iter().find(|c| c.id == id))
}
}
}
pub trait TransactionClient: Send {
type Error: Send;
fn add_collection(
&mut self,
collection: Collection,
) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn add_item(&mut self, item: Item) -> impl Future<Output = Result<(), Self::Error>> + Send;
fn add_items(
&mut self,
items: Vec<Item>,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async move {
for item in items {
self.add_item(item).await?;
}
Ok(())
}
}
}
#[cfg(feature = "geoarrow")]
pub trait ArrowItemsClient {
type Error;
type RecordBatchStream<'a>: arrow_array::RecordBatchReader
where
Self: 'a;
fn search_to_arrow(&self, search: Search) -> Result<Self::RecordBatchStream<'_>, Self::Error>;
fn items_to_arrow(
&self,
collection_id: &str,
items: Items,
) -> Result<Self::RecordBatchStream<'_>, Self::Error> {
self.search_to_arrow(items.search_collection(collection_id))
}
}
#[cfg(feature = "async")]
pub trait StreamItemsClient: Send + Sync {
type Error: Send;
fn search_stream(
&self,
search: Search,
) -> impl Future<
Output = Result<impl Stream<Item = Result<super::Item, Self::Error>> + Send, Self::Error>,
> + Send;
fn collect_items(
&self,
search: Search,
) -> impl Future<Output = Result<Vec<super::Item>, Self::Error>> + Send {
async move {
use futures::TryStreamExt as _;
let stream = self.search_stream(search).await?;
futures::pin_mut!(stream);
stream.try_collect().await
}
}
fn item_count(
&self,
search: Search,
) -> impl Future<Output = Result<usize, Self::Error>> + Send {
async move {
use futures::TryStreamExt as _;
let stream = self.search_stream(search).await?;
futures::pin_mut!(stream);
let mut count = 0usize;
stream
.try_for_each(|_| {
count += 1;
async { Ok(()) }
})
.await?;
Ok(count)
}
}
fn items_stream(
&self,
collection_id: &str,
items: Items,
) -> impl Future<
Output = Result<impl Stream<Item = Result<super::Item, Self::Error>> + Send, Self::Error>,
> + Send {
async move {
let search = items.search_collection(collection_id);
self.search_stream(search).await
}
}
}
pub trait PagedCollectionsClient: Send + Sync {
type Error: Send;
fn collections_page(
&self,
token: Option<String>,
) -> impl Future<Output = Result<(Vec<Collection>, Option<String>), Self::Error>> + Send;
}
#[cfg(feature = "async")]
pub trait StreamCollectionsClient: Send + Sync {
type Error: Send;
fn collections_stream(
&self,
) -> impl Future<
Output = Result<impl Stream<Item = Result<Collection, Self::Error>> + Send, Self::Error>,
> + Send;
fn collect_collections(
&self,
) -> impl Future<Output = Result<Vec<Collection>, Self::Error>> + Send {
async move {
use futures::TryStreamExt as _;
let stream = self.collections_stream().await?;
futures::pin_mut!(stream);
stream.try_collect().await
}
}
}