use std::sync::Mutex;
use fidius_core::Value;
use fidius_host::{CallError, ChunkStream};
use futures::StreamExt;
#[async_trait::async_trait]
pub trait StreamSink {
async fn accept(&self, item: Value) -> Result<(), CallError>;
}
pub fn stream_of(items: Vec<Value>) -> ChunkStream {
ChunkStream::new(futures::stream::iter(
items.into_iter().map(Ok::<Value, CallError>),
))
}
pub async fn collect(mut s: ChunkStream) -> Result<Vec<Value>, CallError> {
let mut out = Vec::new();
while let Some(item) = s.next().await {
out.push(item?);
}
Ok(out)
}
pub async fn pump<S>(mut out: ChunkStream, into: &S) -> Result<(), CallError>
where
S: StreamSink + ?Sized,
{
while let Some(item) = out.next().await {
into.accept(item?).await?;
}
Ok(())
}
#[derive(Default)]
pub struct CollectSink {
items: Mutex<Vec<Value>>,
}
impl CollectSink {
pub fn new() -> Self {
Self::default()
}
pub fn take(&self) -> Vec<Value> {
self.items.lock().unwrap().clone()
}
}
#[async_trait::async_trait]
impl StreamSink for CollectSink {
async fn accept(&self, item: Value) -> Result<(), CallError> {
self.items.lock().unwrap().push(item);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidius_core::{from_value, to_value};
fn vals(xs: &[i64]) -> Vec<Value> {
xs.iter().map(|n| to_value(n).unwrap()).collect()
}
fn ints(vs: Vec<Value>) -> Vec<i64> {
vs.into_iter().map(|v| from_value(v).unwrap()).collect()
}
#[tokio::test]
async fn stream_of_then_collect_round_trips() {
let got = collect(stream_of(vals(&[1, 2, 3]))).await.unwrap();
assert_eq!(ints(got), vec![1, 2, 3]);
}
#[tokio::test]
async fn collect_surfaces_first_error() {
let s = ChunkStream::new(futures::stream::iter(vec![
Ok(to_value(&1i64).unwrap()),
Err(CallError::StreamAborted),
]));
let err = collect(s).await.unwrap_err();
assert!(matches!(err, CallError::StreamAborted));
}
#[tokio::test]
async fn pump_delivers_all_items_to_sink() {
let sink = CollectSink::new();
pump(stream_of(vals(&[10, 20, 30])), &sink).await.unwrap();
assert_eq!(ints(sink.take()), vec![10, 20, 30]);
}
#[tokio::test]
async fn pump_stops_on_producer_error() {
let sink = CollectSink::new();
let s = ChunkStream::new(futures::stream::iter(vec![
Ok(to_value(&7i64).unwrap()),
Err(CallError::StreamAborted),
Ok(to_value(&8i64).unwrap()), ]));
let err = pump(s, &sink).await.unwrap_err();
assert!(matches!(err, CallError::StreamAborted));
assert_eq!(ints(sink.take()), vec![7]);
}
#[tokio::test]
async fn compose_single_plugin_idiom() {
let doubled = stream_of(vals(&[1, 2, 3])).map(|r| {
r.map(|v| {
let n: i64 = from_value(v).unwrap();
to_value(&(n * 2)).unwrap()
})
});
let got = collect(ChunkStream::new(doubled)).await.unwrap();
assert_eq!(ints(got), vec![2, 4, 6]);
}
}