#[cfg(feature = "async")]
mod async_adapters {
use crate::Collection;
use crate::api::client::{
CollectionsClient, ItemsClient, PagedCollectionsClient, StreamCollectionsClient,
StreamItemsClient,
};
use crate::api::{Item, ItemCollection, Search};
use async_stream::try_stream;
use futures_core::Stream;
use std::future::Future;
#[derive(Debug)]
pub struct PagedItemsStream<T> {
inner: T,
}
impl<T> PagedItemsStream<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T> StreamItemsClient for PagedItemsStream<T>
where
T: ItemsClient + Clone + Send + Sync,
T::Error: Send,
{
type Error = T::Error;
fn search_stream(
&self,
search: Search,
) -> impl Future<
Output = Result<impl Stream<Item = Result<Item, T::Error>> + Send, T::Error>,
> + Send {
let client = self.inner.clone();
async move {
let page = client.search(search.clone()).await?;
Ok(stream_pages(client, search, page))
}
}
}
pub fn stream_pages<T>(
client: T,
initial_search: Search,
initial_page: ItemCollection,
) -> impl Stream<Item = Result<Item, T::Error>> + Send
where
T: ItemsClient + Clone + Send + Sync,
T::Error: Send,
{
try_stream! {
let mut page = initial_page;
let mut current_search = initial_search;
loop {
if page.items.is_empty() {
break;
}
let next = page.next.clone();
for item in page.items {
yield item;
}
match next {
Some(next_fields) => {
current_search.additional_fields.extend(next_fields);
page = client.search(current_search.clone()).await?;
}
None => break,
}
}
}
}
impl<T> StreamCollectionsClient for T
where
T: CollectionsClient + Clone + Send + Sync,
T::Error: Send,
{
type Error = T::Error;
fn collections_stream(
&self,
) -> impl Future<
Output = Result<impl Stream<Item = Result<Collection, T::Error>> + Send, T::Error>,
> + Send {
let client = self.clone();
async move {
let collections = client.collections().await?;
Ok(futures::stream::iter(collections.into_iter().map(Ok)))
}
}
}
pub fn stream_pages_collections<T>(
client: T,
initial_page: Vec<Collection>,
initial_token: Option<String>,
) -> impl Stream<Item = Result<Collection, T::Error>> + Send
where
T: PagedCollectionsClient + Clone + Send + Sync,
T::Error: Send,
{
try_stream! {
let mut page = initial_page;
let mut cursor = initial_token;
loop {
if page.is_empty() {
break;
}
let next_cursor = cursor;
for collection in page {
yield collection;
}
match next_cursor {
Some(t) => {
let (next_page, next_t) = client.collections_page(Some(t)).await?;
page = next_page;
cursor = next_t;
}
None => break,
}
}
}
}
}
#[cfg(feature = "async")]
pub use async_adapters::{PagedItemsStream, stream_pages, stream_pages_collections};
#[cfg(feature = "geoarrow")]
mod geoarrow_adapters {
use super::super::client::{ArrowItemsClient, ItemsClient};
use super::super::{Item, ItemCollection, Search};
use std::future::Future;
#[allow(missing_debug_implementations)]
pub struct RecordBatchReaderAdapter<I> {
inner: I,
schema: arrow_schema::SchemaRef,
}
impl<I> RecordBatchReaderAdapter<I> {
pub fn new(inner: I, schema: arrow_schema::SchemaRef) -> Self {
Self { inner, schema }
}
}
impl<I, E> Iterator for RecordBatchReaderAdapter<I>
where
I: Iterator<Item = Result<arrow_array::RecordBatch, E>>,
E: std::error::Error + Send + Sync + 'static,
{
type Item = Result<arrow_array::RecordBatch, arrow_schema::ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|r| r.map_err(|e| arrow_schema::ArrowError::ExternalError(Box::new(e))))
}
}
impl<I, E> arrow_array::RecordBatchReader for RecordBatchReaderAdapter<I>
where
I: Iterator<Item = Result<arrow_array::RecordBatch, E>>,
E: std::error::Error + Send + Sync + 'static,
{
fn schema(&self) -> arrow_schema::SchemaRef {
self.schema.clone()
}
}
impl<T> ItemsClient for T
where
T: ArrowItemsClient + Send + Sync,
T::Error: std::error::Error + Send + Sync + 'static,
{
type Error = crate::Error;
fn search(
&self,
search: Search,
) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send {
let result: Result<Vec<Item>, crate::Error> = self
.search_to_arrow(search)
.map_err(|err| crate::Error::ArrowAdapterClient(Box::new(err)))
.and_then(crate::geoarrow::json::from_record_batch_reader);
async move { Ok(ItemCollection::from(result?)) }
}
}
#[cfg(feature = "async")]
mod async_geoarrow_adapters {
use super::ArrowItemsClient;
use crate::api::client::StreamItemsClient;
use crate::api::{Item, Search};
use futures_core::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct ArrowItemStream<R> {
reader: R,
current_items: std::vec::IntoIter<Item>,
}
impl<R> ArrowItemStream<R> {
fn new(reader: R) -> Self {
Self {
reader,
current_items: Vec::new().into_iter(),
}
}
}
impl<R> Unpin for ArrowItemStream<R> {}
impl<R> Stream for ArrowItemStream<R>
where
R: arrow_array::RecordBatchReader,
{
type Item = Result<Item, crate::Error>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(item) = this.current_items.next() {
return Poll::Ready(Some(Ok(item)));
}
match this.reader.next() {
Some(Ok(batch)) => {
match crate::geoarrow::json::record_batch_to_json_rows(batch) {
Ok(items) => {
this.current_items = items.into_iter();
}
Err(err) => return Poll::Ready(Some(Err(err))),
}
}
Some(Err(err)) => return Poll::Ready(Some(Err(err.into()))),
None => return Poll::Ready(None),
}
}
}
}
impl<T> StreamItemsClient for T
where
T: ArrowItemsClient + Send + Sync,
T::Error: std::error::Error + Send + Sync + 'static,
for<'a> T::RecordBatchStream<'a>: Send,
{
type Error = crate::Error;
fn search_stream(
&self,
search: Search,
) -> impl Future<
Output = Result<impl Stream<Item = Result<Item, Self::Error>> + Send, Self::Error>,
> + Send {
let reader = self
.search_to_arrow(search)
.map_err(|err| crate::Error::ArrowAdapterClient(Box::new(err)));
async move { Ok(ArrowItemStream::new(reader?)) }
}
}
}
}
#[cfg(feature = "geoarrow")]
pub use geoarrow_adapters::RecordBatchReaderAdapter;