mod core;
mod filter_with;
mod run;
mod sender;
mod straw;
mod with;
pub use core::Core;
pub use filter_with::FilterWith;
pub use run::Run;
pub use sender::Sender;
pub use straw::Straw;
pub use with::With;
use futures::channel::mpsc;
use futures::stream;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task;
#[doc(no_inline)]
pub use futures::never::Never;
#[doc(no_inline)]
pub use futures::{Future, FutureExt, Sink, Stream, StreamExt};
pub trait Sipper<Output, Progress = Output>:
core::Core<Output = Output, Progress = Progress>
{
fn with<F, A>(self, f: F) -> With<Self, F, A>
where
Self: Sized,
F: FnMut(Progress) -> A,
{
With::new(self, f)
}
fn filter_with<F, A>(self, f: F) -> FilterWith<Self, F, A>
where
Self: Sized,
F: FnMut(Progress) -> Option<A>,
{
FilterWith::new(self, f)
}
fn sip(&mut self) -> stream::Next<'_, Self>
where
Self: Unpin,
{
StreamExt::next(self)
}
fn run<S>(self, on_progress: impl Into<Sender<Progress, S>>) -> Run<Self, S>
where
Self: Sized,
S: Sink<Progress>,
{
Run::new(self, on_progress.into().sink)
}
fn pin(self) -> Pin<Box<Self>>
where
Self: Sized,
{
Box::pin(self)
}
}
impl<T, Output, Progress> Sipper<Output, Progress> for T where
T: core::Core<Output = Output, Progress = Progress>
{
}
pub fn sipper<Progress, F>(
builder: impl FnOnce(Sender<Progress>) -> F,
) -> impl Sipper<F::Output, Progress>
where
F: Future,
{
pin_project! {
struct Internal<F, Progress>
where
F: Future,
{
#[pin]
future: F,
#[pin]
receiver: mpsc::Receiver<Progress>,
output: Option<F::Output>,
is_progress_finished: bool,
}
}
impl<F, Progress> Future for Internal<F, Progress>
where
F: Future,
{
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
let mut this = self.project();
if !*this.is_progress_finished {
loop {
match this.receiver.as_mut().poll_next(cx) {
task::Poll::Ready(Some(_)) => {} task::Poll::Ready(None) => {
*this.is_progress_finished = true;
break;
}
task::Poll::Pending => {
break;
}
}
}
}
if let Some(output) = this.output.take() {
task::Poll::Ready(output)
} else {
this.future.poll(cx)
}
}
}
impl<F, Progress> Stream for Internal<F, Progress>
where
F: Future,
{
type Item = Progress;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
use futures::ready;
let mut this = self.project();
if !*this.is_progress_finished {
match this.receiver.as_mut().poll_next(cx) {
task::Poll::Ready(None) => {
*this.is_progress_finished = true;
}
task::Poll::Ready(progress) => return task::Poll::Ready(progress),
task::Poll::Pending => {}
}
}
if this.output.is_some() {
return task::Poll::Ready(None);
}
*this.output = Some(ready!(this.future.poll(cx)));
if *this.is_progress_finished {
task::Poll::Ready(None)
} else {
task::Poll::Pending
}
}
}
let (sender, receiver) = Sender::channel(1);
Internal {
future: builder(sender),
receiver,
is_progress_finished: false,
output: None,
}
}
pub fn stream<Output>(sipper: impl Sipper<Output>) -> impl Stream<Item = Output> {
let sip = sipper.pin();
stream::unfold(Some(sip), |mut sip| async move {
if let Some(progress) = sip.as_mut()?.next().await {
Some((progress, sip))
} else {
Some((sip.take()?.await, sip))
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::task;
use tokio::test;
#[derive(Debug, PartialEq, Eq)]
struct File(Vec<u8>);
type Progress = u32;
#[derive(Debug, PartialEq, Eq)]
enum Error {
Failed,
}
fn download(url: &str) -> impl Sipper<File, Progress> + '_ {
sipper(move |mut sender| async move {
let _url = url;
for i in 0..=100 {
sender.send(i).await;
}
File(vec![1, 2, 3, 4])
})
}
fn try_download(url: &str) -> impl Straw<File, Progress, Error> + '_ {
sipper(move |mut sender| async move {
let _url = url;
for i in 0..=42 {
sender.send(i).await;
}
Err(Error::Failed)
})
}
#[test]
async fn it_is_a_future() {
assert_eq!(
download("https://iced.rs/logo.svg").await,
File(vec![1, 2, 3, 4])
);
}
#[test]
async fn it_is_a_stream() {
assert!(download("https://iced.rs/logo.svg")
.collect::<Vec<_>>()
.await
.into_iter()
.eq(0..=100));
}
#[test]
async fn it_works() {
use futures::StreamExt;
let (sender, receiver) = mpsc::channel(1);
let progress = task::spawn(receiver.collect::<Vec<_>>());
let file = download("https://iced.rs/logo.svg").run(sender).await;
assert!(progress
.await
.expect("Collect progress")
.into_iter()
.eq(0..=100));
assert_eq!(file, File(vec![1, 2, 3, 4]));
}
#[test]
async fn it_sips() {
let mut i = 0;
let mut last_progress = None;
let mut download = download("https://iced.rs/logo.svg").pin();
while let Some(progress) = download.sip().await {
i += 1;
last_progress = Some(progress);
}
let file = download.await;
assert_eq!(i, 101);
assert_eq!(last_progress, Some(100));
assert_eq!(file, File(vec![1, 2, 3, 4]));
}
#[test]
async fn it_sips_partially() {
let mut download = download("https://iced.rs/logo.svg").pin();
assert_eq!(download.next().await, Some(0));
assert_eq!(download.next().await, Some(1));
assert_eq!(download.next().await, Some(2));
assert_eq!(download.next().await, Some(3));
assert_eq!(download.await, File(vec![1, 2, 3, 4]));
}
#[test]
async fn it_sips_fully_and_completes() {
let mut finished = false;
{
let mut download = sipper(|sender| async {
let _ = download("https://iced.rs/logo.svg").run(sender).await;
tokio::task::yield_now().await;
tokio::task::yield_now().await;
tokio::task::yield_now().await;
finished = true;
})
.pin();
while download.next().await.is_some() {}
}
assert!(finished);
}
#[test]
async fn it_can_be_streamed() {
async fn uses_stream(stream: impl Stream<Item = File> + Send) {
use futures::StreamExt;
let files: Vec<_> = stream.collect().await;
assert_eq!(files.len(), 102);
assert_eq!(files.last(), Some(&File(vec![1, 2, 3, 4])));
}
uses_stream(stream(
download("https://iced.rs/logo.svg").with(|_| File(vec![])),
))
.await;
}
#[test]
async fn it_can_fail() {
let mut i = 0;
let mut last_progress = None;
let mut download = try_download("https://iced.rs/logo.svg").pin();
while let Some(progress) = download.next().await {
i += 1;
last_progress = Some(progress);
}
let file = download.await;
assert_eq!(i, 43);
assert_eq!(last_progress, Some(42));
assert_eq!(file, Err(Error::Failed));
}
#[test]
async fn it_can_be_mapped() {
let mapper = |progress| progress * 2;
let download = download("https://iced.rs/logo.svg")
.with(mapper)
.collect::<Vec<_>>()
.await;
assert_eq!(
download.into_iter().sum::<u32>(),
(0..=100).map(mapper).sum()
);
}
#[test]
async fn it_can_be_filtered() {
let filter = |progress| (progress % 2 == 0).then_some(progress);
let download = download("https://iced.rs/logo.svg")
.filter_with(filter)
.collect::<Vec<_>>()
.await;
assert_eq!(
download.into_iter().sum::<u32>(),
(0..=100).filter_map(filter).sum()
);
}
#[test]
async fn it_composes_nicely() {
use futures::stream::{FuturesOrdered, StreamExt};
fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
sipper(|sender| {
urls.iter()
.enumerate()
.map(|(id, url)| {
download(url)
.with(move |progress| (id, progress))
.run(&sender)
})
.collect::<FuturesOrdered<_>>()
.collect()
})
}
let mut download =
download_all(&["https://iced.rs/logo.svg", "https://iced.rs/logo.white.svg"]).pin();
let mut i = 0;
while let Some(_progress) = download.next().await {
i += 1;
}
let files = download.await;
assert_eq!(i, 202);
assert_eq!(files, vec![File(vec![1, 2, 3, 4]), File(vec![1, 2, 3, 4])]);
}
}