use futures::{Stream, StreamExt};
use crate::types::Annotated;
pub trait StreamAggregable: Sized {
fn empty() -> Self;
fn merge(&mut self, next: Self);
}
pub async fn aggregate_stream<T, S>(stream: S) -> Result<T, String>
where
T: StreamAggregable,
S: Stream<Item = Annotated<T>>,
{
let mut stream = std::pin::pin!(stream);
let mut response: Option<T> = None;
while let Some(delta) = stream.next().await {
let delta = delta.ok()?;
if let Some(data) = delta.data {
match response.as_mut() {
Some(existing) => existing.merge(data),
None => response = Some(data),
}
}
}
Ok(response.unwrap_or_else(T::empty))
}