sipper/
lib.rs

1//! A sipper is a type-safe [`Future`] that can [`Stream`] progress.
2//!
3//! Effectively, a [`Sipper`] combines a [`Future`] and a [`Stream`]
4//! together to represent an asynchronous task that produces some `Output`
5//! and notifies of some `Progress`, without both types being necessarily the
6//! same.
7//!
8//! In fact, a [`Sipper`] implements both the [`Future`] and the [`Stream`] traits—which
9//! gives you all the great combinators from [`FutureExt`] and [`StreamExt`] for free.
10//!
11//! Generally, [`Sipper`] should be chosen over [`Stream`] when the final value produced—the
12//! end of the task—is important and inherently different from the other values.
13//!
14//! # An example
15//! An example of this could be a file download. When downloading a file, the progress
16//! that must be notified is normally a bunch of statistics related to the download; but
17//! when the download finishes, the contents of the file need to also be provided.
18//!
19//! ## The Uncomfy Stream
20//! With a [`Stream`], you must create some kind of type that unifies both states of the
21//! download:
22//!
23//! ```rust
24//! use futures::Stream;
25//!
26//! struct File(Vec<u8>);
27//!
28//! type Progress = u32;
29//!
30//! enum Download {
31//!     Running(Progress),
32//!     Done(File)
33//! }
34//!
35//! fn download(url: &str) -> impl Stream<Item = Download> {
36//!     // ...
37//! #     futures::stream::once(async { Download::Done(File(Vec::new())) })
38//! }
39//! ```
40//!
41//! If we now wanted to notify progress and—at the same time—do something with
42//! the final `File`, we'd need to juggle with the [`Stream`]:
43//!
44//! ```rust
45//! # use futures::Stream;
46//! #
47//! # struct File(Vec<u8>);
48//! #
49//! # type Progress = u32;
50//! #
51//! # enum Download {
52//! #    Running(Progress),
53//! #    Done(File)
54//! # }
55//! #
56//! # fn download(url: &str) -> impl Stream<Item = Download> {
57//! #     // ...
58//! #     futures::stream::once(async { Download::Done(File(Vec::new())) })
59//! # }
60//! use futures::{SinkExt, StreamExt};
61//!
62//! async fn example() {
63//!     let mut file_download = download("https://iced.rs/logo.svg").boxed();
64//!
65//!     while let Some(download) = file_download.next().await {
66//!         match download {
67//!             Download::Running(progress) => {
68//!                 println!("{progress}%");
69//!             }
70//!             Download::Done(file) => {
71//!                 // Do something with file...
72//!                 // We are nested, and there are no compiler guarantees
73//!                 // this will ever be reached.
74//!             }
75//!         }
76//!     }
77//! }
78//! ```
79//!
80//! While we could rewrite the previous snippet using `loop`, `expect`, and `break` to get the
81//! final file out of the [`Stream`], we would still be introducing runtime errors and, simply put,
82//! working around the fact that a [`Stream`] does not encode the idea of a final value.
83//!
84//! ## The Chad Sipper
85//! A [`Sipper`] can precisely describe this dichotomy in a type-safe way:
86//!
87//! ```rust
88//! use sipper::Sipper;
89//!
90//! struct File(Vec<u8>);
91//!
92//! type Progress = u32;
93//!
94//! fn download(url: &str) -> impl Sipper<File, Progress> {
95//!     // ...
96//! #     sipper::sipper(|_| futures::future::ready(File(Vec::new())))
97//! }
98//! ```
99//!
100//! Which can then be easily ~~used~~ sipped:
101//!
102//! ```rust
103//! # use sipper::{sipper, Sipper};
104//! #
105//! # struct File(Vec<u8>);
106//! #
107//! # type Progress = u32;
108//! #
109//! # fn download(url: &str) -> impl Sipper<File, Progress> {
110//! #     sipper(|_| futures::future::ready(File(Vec::new())))
111//! # }
112//! #
113//! async fn example() -> File {
114//!     let mut download = download("https://iced.rs/logo.svg").pin();
115//!
116//!     // A sipper is a stream!
117//!     // `Sipper::sip` is actually just an alias of `Stream::next`
118//!     while let Some(progress) = download.sip().await {
119//!         println!("{progress}%");
120//!     }
121//!
122//!     // A sipper is also a future!
123//!     let logo = download.await;
124//!
125//!     // We are guaranteed to have a File here!
126//!     logo
127//! }
128//! ```
129//!
130//! ## The Delicate Straw
131//! How about error handling? Fear not! A [`Straw`] is a [`Sipper`] that can fail. What would
132//! our download example look like with an error sprinkled in?
133//!
134//! ```rust
135//! # use sipper::{sipper, Sipper};
136//! #
137//! # struct File(Vec<u8>);
138//! #
139//! # type Progress = u32;
140//! #
141//! use sipper::Straw;
142//!
143//! enum Error {
144//!     Failed,
145//! }
146//!
147//! fn try_download(url: &str) -> impl Straw<File, Progress, Error> {
148//!     // ...
149//! #     sipper(|_| futures::future::ready(Ok(File(Vec::new()))))
150//! }
151//!
152//! async fn example() -> Result<File, Error> {
153//!     let mut download = try_download("https://iced.rs/logo.svg").pin();
154//!
155//!     while let Some(progress) = download.sip().await {
156//!         println!("{progress}%");
157//!     }
158//!
159//!     let logo = download.await?;
160//!
161//!     // We are guaranteed to have a File here!
162//!     Ok(logo)
163//! }
164//! ```
165//!
166//! Pretty much the same! It's quite easy to add error handling to an existing [`Sipper`].
167//! In fact, [`Straw`] is actually just an extension trait of a [`Sipper`] with a `Result` as output.
168//! Therefore, all the [`Sipper`] methods are available for [`Straw`] as well. It's just nicer to write!
169//!
170//! ## The Great Builder
171//! You can build a [`Sipper`] with the [`sipper`] function. It takes a closure that receives
172//! a [`Sender`]—for sending progress updates—and must return a [`Future`] producing the output.
173//!
174//! ```rust,ignore
175//! # use sipper::{sipper, Sipper};
176//! #
177//! # struct File(Vec<u8>);
178//! #
179//! # type Progress = u32;
180//! #
181//! fn download(url: &str) -> impl Sipper<File, Progress> + '_ {
182//!     sipper(|mut sender| async move {
183//!         // Perform async request here...
184//!         let download = /* ... */;
185//!
186//!         while let Some(chunk) = download.chunk().await {
187//!             // ...
188//!             // Send updates when needed
189//!             sender.send(/* ... */).await;
190//!
191//!         }
192//!
193//!         File(/* ... */)
194//!     })
195//! }
196//! ```
197//!
198//! Furthermore, [`Sipper`] has no required methods and is just an extension trait of a
199//! [`Future`] and [`Stream`] combo. This means you can come up with new ways to build a
200//! [`Sipper`] by implementing the async traits on any of your types. Additionally,
201//! any foreign type that implements both is already one.
202//!
203//! ## The Fancy Composition
204//! A [`Sipper`] supports a bunch of methods for easy composition; like [`with`], [`filter_with`],
205//! and [`run`].
206//!
207//! For instance, let's say we wanted to build a new function that downloads a bunch of files
208//! instead of just one:
209//!
210//! ```rust
211//! # use sipper::{sipper, Sipper};
212//! #
213//! # struct File(Vec<u8>);
214//! #
215//! # type Progress = u32;
216//! #
217//! # fn download(url: &str) -> impl Sipper<File, Progress> {
218//! #     sipper(|_| futures::future::ready(File(Vec::new())))
219//! # }
220//! #
221//! fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
222//!     sipper(move |sender| async move {
223//!         let mut files = Vec::new();
224//!
225//!         for (id, url) in urls.iter().enumerate() {
226//!             let file = download(url)
227//!                 .with(move |progress| (id, progress))
228//!                 .run(&sender)
229//!                 .await;
230//!
231//!             files.push(file);
232//!         }
233//!
234//!         files
235//!     })
236//! }
237//! ```
238//!
239//! As you can see, we just leverage [`with`] to combine the download index with the progress
240//! and call [`run`] to drive the [`Sipper`] to completion—notifying properly through the [`Sender`].
241//!
242//! Of course, this example will download files sequentially; but, since [`run`] returns a simple
243//! [`Future`], a proper collection like [`FuturesOrdered`] could be used just as easily—if not
244//! more! Take a look:
245//!
246//! ```rust
247//! # use sipper::{sipper, Sipper};
248//! #
249//! # struct File(Vec<u8>);
250//! #
251//! # type Progress = u32;
252//! #
253//! # fn download(url: &str) -> impl Sipper<File, Progress> {
254//! #     sipper(|_| futures::future::ready(File(Vec::new())))
255//! # }
256//! #
257//! use futures::stream::{FuturesOrdered, StreamExt};
258//!
259//! fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
260//!     sipper(|sender| {
261//!         urls.iter()
262//!             .enumerate()
263//!             .map(|(id, url)| {
264//!                 download(url)
265//!                     .with(move |progress| (id, progress))
266//!                     .run(&sender)
267//!             })
268//!             .collect::<FuturesOrdered<_>>()
269//!             .collect()
270//!     })
271//! }
272//! ```
273//!
274//! [`Stream`]: futures::Stream
275//! [`Sink`]: futures::Sink
276//! [`FutureExt`]: futures::FutureExt
277//! [`StreamExt`]: futures::StreamExt
278//! [`FuturesOrdered`]: futures::stream::FuturesOrdered
279//! [`with`]: Sipper::with
280//! [`filter_with`]: Sipper::filter_with
281//! [`run`]: Sipper::run
282mod core;
283mod filter_with;
284mod run;
285mod sender;
286mod straw;
287mod with;
288
289pub use core::Core;
290pub use filter_with::FilterWith;
291pub use run::Run;
292pub use sender::Sender;
293pub use straw::Straw;
294pub use with::With;
295
296use futures::channel::mpsc;
297use futures::stream;
298use pin_project_lite::pin_project;
299
300use std::pin::Pin;
301use std::task;
302
303#[doc(no_inline)]
304pub use futures::never::Never;
305#[doc(no_inline)]
306pub use futures::{Future, FutureExt, Sink, Stream, StreamExt};
307
308/// A sipper is both a [`Stream`] that produces a bunch of progress
309/// and a [`Future`] that produces some final output.
310pub trait Sipper<Output, Progress = Output>:
311    core::Core<Output = Output, Progress = Progress>
312{
313    /// Maps the progress of the [`Sipper`] with the given closure.
314    ///
315    /// This is analogous to `map` in many other types; but we use `with`
316    /// to avoid naming collisions with [`Future`] and [`Stream`].
317    fn with<F, A>(self, f: F) -> With<Self, F, A>
318    where
319        Self: Sized,
320        F: FnMut(Progress) -> A,
321    {
322        With::new(self, f)
323    }
324
325    /// Maps and filters the progress of the [`Sipper`] with the given closure.
326    ///
327    /// This is analogous to `filter_map` in many other types; but we use `filter_with`
328    /// to avoid naming collisions with [`Future`] and [`Stream`].
329    fn filter_with<F, A>(self, f: F) -> FilterWith<Self, F, A>
330    where
331        Self: Sized,
332        F: FnMut(Progress) -> Option<A>,
333    {
334        FilterWith::new(self, f)
335    }
336
337    /// Returns the next progress, if any.
338    ///
339    /// When this method returns `None`, it means there is no more progress to be made;
340    /// and the output is ready.
341    fn sip(&mut self) -> stream::Next<'_, Self>
342    where
343        Self: Unpin,
344    {
345        StreamExt::next(self)
346    }
347
348    /// Runs the [`Sipper`], sending any progress through the given [`Sender`] and returning
349    /// its output at the end.
350    fn run<S>(self, on_progress: impl Into<Sender<Progress, S>>) -> Run<Self, S>
351    where
352        Self: Sized,
353        S: Sink<Progress>,
354    {
355        Run::new(self, on_progress.into().sink)
356    }
357
358    /// Pins the [`Sipper`] in a [`Box`].
359    ///
360    /// You may need to call this method before being able to [`sip`](Self::sip).
361    fn pin(self) -> Pin<Box<Self>>
362    where
363        Self: Sized,
364    {
365        Box::pin(self)
366    }
367}
368
369impl<T, Output, Progress> Sipper<Output, Progress> for T where
370    T: core::Core<Output = Output, Progress = Progress>
371{
372}
373
374/// Creates a new [`Sipper`] from the given async closure, which receives
375/// a [`Sender`] that can be used to notify progress asynchronously.
376pub fn sipper<Progress, F>(
377    builder: impl FnOnce(Sender<Progress>) -> F,
378) -> impl Sipper<F::Output, Progress>
379where
380    F: Future,
381{
382    pin_project! {
383        struct Internal<F, Progress>
384        where
385            F: Future,
386        {
387            #[pin]
388            future: F,
389            #[pin]
390            receiver: mpsc::Receiver<Progress>,
391            output: Option<F::Output>,
392            is_progress_finished: bool,
393        }
394    }
395
396    impl<F, Progress> Future for Internal<F, Progress>
397    where
398        F: Future,
399    {
400        type Output = F::Output;
401
402        fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
403            let mut this = self.project();
404
405            if !*this.is_progress_finished {
406                loop {
407                    match this.receiver.as_mut().poll_next(cx) {
408                        task::Poll::Ready(Some(_)) => {} // Discard
409                        task::Poll::Ready(None) => {
410                            *this.is_progress_finished = true;
411                            break;
412                        }
413                        task::Poll::Pending => {
414                            break;
415                        }
416                    }
417                }
418            }
419
420            if let Some(output) = this.output.take() {
421                task::Poll::Ready(output)
422            } else {
423                this.future.poll(cx)
424            }
425        }
426    }
427
428    impl<F, Progress> Stream for Internal<F, Progress>
429    where
430        F: Future,
431    {
432        type Item = Progress;
433
434        fn poll_next(
435            self: Pin<&mut Self>,
436            cx: &mut task::Context<'_>,
437        ) -> task::Poll<Option<Self::Item>> {
438            use futures::ready;
439
440            let mut this = self.project();
441
442            if !*this.is_progress_finished {
443                match this.receiver.as_mut().poll_next(cx) {
444                    task::Poll::Ready(None) => {
445                        *this.is_progress_finished = true;
446                    }
447                    task::Poll::Ready(progress) => return task::Poll::Ready(progress),
448                    task::Poll::Pending => {}
449                }
450            }
451
452            if this.output.is_some() {
453                return task::Poll::Ready(None);
454            }
455
456            *this.output = Some(ready!(this.future.poll(cx)));
457
458            if *this.is_progress_finished {
459                task::Poll::Ready(None)
460            } else {
461                task::Poll::Pending
462            }
463        }
464    }
465
466    let (sender, receiver) = Sender::channel(1);
467
468    Internal {
469        future: builder(sender),
470        receiver,
471        is_progress_finished: false,
472        output: None,
473    }
474}
475
476/// Turns a [`Sipper`] into a [`Stream`].
477///
478/// This is only possible if the `Output` and `Progress` types of the [`Sipper`] match!
479pub fn stream<Output>(sipper: impl Sipper<Output>) -> impl Stream<Item = Output> {
480    let sip = sipper.pin();
481
482    stream::unfold(Some(sip), |mut sip| async move {
483        if let Some(progress) = sip.as_mut()?.next().await {
484            Some((progress, sip))
485        } else {
486            Some((sip.take()?.await, sip))
487        }
488    })
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494
495    use tokio::task;
496    use tokio::test;
497
498    #[derive(Debug, PartialEq, Eq)]
499    struct File(Vec<u8>);
500
501    type Progress = u32;
502
503    #[derive(Debug, PartialEq, Eq)]
504    enum Error {
505        Failed,
506    }
507
508    fn download(url: &str) -> impl Sipper<File, Progress> + '_ {
509        sipper(move |mut sender| async move {
510            let _url = url;
511
512            for i in 0..=100 {
513                sender.send(i).await;
514            }
515
516            File(vec![1, 2, 3, 4])
517        })
518    }
519
520    fn try_download(url: &str) -> impl Straw<File, Progress, Error> + '_ {
521        sipper(move |mut sender| async move {
522            let _url = url;
523
524            for i in 0..=42 {
525                sender.send(i).await;
526            }
527
528            Err(Error::Failed)
529        })
530    }
531
532    #[test]
533    async fn it_is_a_future() {
534        assert_eq!(
535            download("https://iced.rs/logo.svg").await,
536            File(vec![1, 2, 3, 4])
537        );
538    }
539
540    #[test]
541    async fn it_is_a_stream() {
542        assert!(download("https://iced.rs/logo.svg")
543            .collect::<Vec<_>>()
544            .await
545            .into_iter()
546            .eq(0..=100));
547    }
548
549    #[test]
550    async fn it_works() {
551        use futures::StreamExt;
552
553        let (sender, receiver) = mpsc::channel(1);
554
555        let progress = task::spawn(receiver.collect::<Vec<_>>());
556        let file = download("https://iced.rs/logo.svg").run(sender).await;
557
558        assert!(progress
559            .await
560            .expect("Collect progress")
561            .into_iter()
562            .eq(0..=100));
563
564        assert_eq!(file, File(vec![1, 2, 3, 4]));
565    }
566
567    #[test]
568    async fn it_sips() {
569        let mut i = 0;
570        let mut last_progress = None;
571
572        let mut download = download("https://iced.rs/logo.svg").pin();
573
574        while let Some(progress) = download.sip().await {
575            i += 1;
576            last_progress = Some(progress);
577        }
578
579        let file = download.await;
580
581        assert_eq!(i, 101);
582        assert_eq!(last_progress, Some(100));
583        assert_eq!(file, File(vec![1, 2, 3, 4]));
584    }
585
586    #[test]
587    async fn it_sips_partially() {
588        let mut download = download("https://iced.rs/logo.svg").pin();
589
590        assert_eq!(download.next().await, Some(0));
591        assert_eq!(download.next().await, Some(1));
592        assert_eq!(download.next().await, Some(2));
593        assert_eq!(download.next().await, Some(3));
594        assert_eq!(download.await, File(vec![1, 2, 3, 4]));
595    }
596
597    #[test]
598    async fn it_sips_fully_and_completes() {
599        let mut finished = false;
600
601        {
602            let mut download = sipper(|sender| async {
603                let _ = download("https://iced.rs/logo.svg").run(sender).await;
604
605                tokio::task::yield_now().await;
606                tokio::task::yield_now().await;
607                tokio::task::yield_now().await;
608
609                finished = true;
610            })
611            .pin();
612
613            while download.next().await.is_some() {}
614        }
615
616        assert!(finished);
617    }
618
619    #[test]
620    async fn it_can_be_streamed() {
621        async fn uses_stream(stream: impl Stream<Item = File> + Send) {
622            use futures::StreamExt;
623            let files: Vec<_> = stream.collect().await;
624
625            assert_eq!(files.len(), 102);
626            assert_eq!(files.last(), Some(&File(vec![1, 2, 3, 4])));
627        }
628
629        uses_stream(stream(
630            download("https://iced.rs/logo.svg").with(|_| File(vec![])),
631        ))
632        .await;
633    }
634
635    #[test]
636    async fn it_can_fail() {
637        let mut i = 0;
638        let mut last_progress = None;
639
640        let mut download = try_download("https://iced.rs/logo.svg").pin();
641
642        while let Some(progress) = download.next().await {
643            i += 1;
644            last_progress = Some(progress);
645        }
646
647        let file = download.await;
648
649        assert_eq!(i, 43);
650        assert_eq!(last_progress, Some(42));
651        assert_eq!(file, Err(Error::Failed));
652    }
653
654    #[test]
655    async fn it_can_be_mapped() {
656        let mapper = |progress| progress * 2;
657
658        let download = download("https://iced.rs/logo.svg")
659            .with(mapper)
660            .collect::<Vec<_>>()
661            .await;
662
663        assert_eq!(
664            download.into_iter().sum::<u32>(),
665            (0..=100).map(mapper).sum()
666        );
667    }
668
669    #[test]
670    async fn it_can_be_filtered() {
671        let filter = |progress| (progress % 2 == 0).then_some(progress);
672
673        let download = download("https://iced.rs/logo.svg")
674            .filter_with(filter)
675            .collect::<Vec<_>>()
676            .await;
677
678        assert_eq!(
679            download.into_iter().sum::<u32>(),
680            (0..=100).filter_map(filter).sum()
681        );
682    }
683
684    #[test]
685    async fn it_composes_nicely() {
686        use futures::stream::{FuturesOrdered, StreamExt};
687
688        fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
689            sipper(|sender| {
690                urls.iter()
691                    .enumerate()
692                    .map(|(id, url)| {
693                        download(url)
694                            .with(move |progress| (id, progress))
695                            .run(&sender)
696                    })
697                    .collect::<FuturesOrdered<_>>()
698                    .collect()
699            })
700        }
701
702        let mut download =
703            download_all(&["https://iced.rs/logo.svg", "https://iced.rs/logo.white.svg"]).pin();
704
705        let mut i = 0;
706
707        while let Some(_progress) = download.next().await {
708            i += 1;
709        }
710
711        let files = download.await;
712
713        assert_eq!(i, 202);
714        assert_eq!(files, vec![File(vec![1, 2, 3, 4]), File(vec![1, 2, 3, 4])]);
715    }
716}