use futures::{Stream, StreamExt};
use crate::types::Annotated;
use super::NvImagesResponse;
#[derive(Debug)]
pub struct DeltaAggregator {
response: Option<NvImagesResponse>,
error: Option<String>,
}
impl Default for DeltaAggregator {
fn default() -> Self {
Self::new()
}
}
impl DeltaAggregator {
pub fn new() -> Self {
DeltaAggregator {
response: None,
error: None,
}
}
pub async fn apply(
stream: impl Stream<Item = Annotated<NvImagesResponse>>,
) -> Result<NvImagesResponse, String> {
let aggregator = stream
.fold(DeltaAggregator::new(), |mut aggregator, delta| async move {
let delta = match delta.ok() {
Ok(delta) => delta,
Err(error) => {
aggregator.error = Some(error);
return aggregator;
}
};
if aggregator.error.is_none()
&& let Some(response) = delta.data
{
match &mut aggregator.response {
Some(existing) => {
existing.inner.data.extend(response.inner.data);
}
None => {
aggregator.response = Some(response);
}
}
}
aggregator
})
.await;
if let Some(error) = aggregator.error {
return Err(error);
}
Ok(aggregator.response.unwrap_or_else(NvImagesResponse::empty))
}
}
impl NvImagesResponse {
pub async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvImagesResponse>>,
) -> Result<NvImagesResponse, String> {
DeltaAggregator::apply(stream).await
}
}