sipper/lib.rs
1//! A sipper is a type-safe [`Future`] that can [`Stream`] progress.
2//!
3//! Effectively, a [`Sipper`] combines a [`Future`] and a [`Stream`]
4//! together to represent an asynchronous task that produces some `Output`
5//! and notifies of some `Progress`, without both types being necessarily the
6//! same.
7//!
8//! In fact, a [`Sipper`] implements both the [`Future`] and the [`Stream`] traits—which
9//! gives you all the great combinators from [`FutureExt`] and [`StreamExt`] for free.
10//!
11//! Generally, [`Sipper`] should be chosen over [`Stream`] when the final value produced—the
12//! end of the task—is important and inherently different from the other values.
13//!
14//! # An example
15//! An example of this could be a file download. When downloading a file, the progress
16//! that must be notified is normally a bunch of statistics related to the download; but
17//! when the download finishes, the contents of the file need to also be provided.
18//!
19//! ## The Uncomfy Stream
20//! With a [`Stream`], you must create some kind of type that unifies both states of the
21//! download:
22//!
23//! ```rust
24//! use futures::Stream;
25//!
26//! struct File(Vec<u8>);
27//!
28//! type Progress = u32;
29//!
30//! enum Download {
31//! Running(Progress),
32//! Done(File)
33//! }
34//!
35//! fn download(url: &str) -> impl Stream<Item = Download> {
36//! // ...
37//! # futures::stream::once(async { Download::Done(File(Vec::new())) })
38//! }
39//! ```
40//!
41//! If we now wanted to notify progress and—at the same time—do something with
42//! the final `File`, we'd need to juggle with the [`Stream`]:
43//!
44//! ```rust
45//! # use futures::Stream;
46//! #
47//! # struct File(Vec<u8>);
48//! #
49//! # type Progress = u32;
50//! #
51//! # enum Download {
52//! # Running(Progress),
53//! # Done(File)
54//! # }
55//! #
56//! # fn download(url: &str) -> impl Stream<Item = Download> {
57//! # // ...
58//! # futures::stream::once(async { Download::Done(File(Vec::new())) })
59//! # }
60//! use futures::{SinkExt, StreamExt};
61//!
62//! async fn example() {
63//! let mut file_download = download("https://iced.rs/logo.svg").boxed();
64//!
65//! while let Some(download) = file_download.next().await {
66//! match download {
67//! Download::Running(progress) => {
68//! println!("{progress}%");
69//! }
70//! Download::Done(file) => {
71//! // Do something with file...
72//! // We are nested, and there are no compiler guarantees
73//! // this will ever be reached.
74//! }
75//! }
76//! }
77//! }
78//! ```
79//!
80//! While we could rewrite the previous snippet using `loop`, `expect`, and `break` to get the
81//! final file out of the [`Stream`], we would still be introducing runtime errors and, simply put,
82//! working around the fact that a [`Stream`] does not encode the idea of a final value.
83//!
84//! ## The Chad Sipper
85//! A [`Sipper`] can precisely describe this dichotomy in a type-safe way:
86//!
87//! ```rust
88//! use sipper::Sipper;
89//!
90//! struct File(Vec<u8>);
91//!
92//! type Progress = u32;
93//!
94//! fn download(url: &str) -> impl Sipper<File, Progress> {
95//! // ...
96//! # sipper::sipper(|_| futures::future::ready(File(Vec::new())))
97//! }
98//! ```
99//!
100//! Which can then be easily ~~used~~ sipped:
101//!
102//! ```rust
103//! # use sipper::{sipper, Sipper};
104//! #
105//! # struct File(Vec<u8>);
106//! #
107//! # type Progress = u32;
108//! #
109//! # fn download(url: &str) -> impl Sipper<File, Progress> {
110//! # sipper(|_| futures::future::ready(File(Vec::new())))
111//! # }
112//! #
113//! async fn example() -> File {
114//! let mut download = download("https://iced.rs/logo.svg").pin();
115//!
116//! // A sipper is a stream!
117//! // `Sipper::sip` is actually just an alias of `Stream::next`
118//! while let Some(progress) = download.sip().await {
119//! println!("{progress}%");
120//! }
121//!
122//! // A sipper is also a future!
123//! let logo = download.await;
124//!
125//! // We are guaranteed to have a File here!
126//! logo
127//! }
128//! ```
129//!
130//! ## The Delicate Straw
131//! How about error handling? Fear not! A [`Straw`] is a [`Sipper`] that can fail. What would
132//! our download example look like with an error sprinkled in?
133//!
134//! ```rust
135//! # use sipper::{sipper, Sipper};
136//! #
137//! # struct File(Vec<u8>);
138//! #
139//! # type Progress = u32;
140//! #
141//! use sipper::Straw;
142//!
143//! enum Error {
144//! Failed,
145//! }
146//!
147//! fn try_download(url: &str) -> impl Straw<File, Progress, Error> {
148//! // ...
149//! # sipper(|_| futures::future::ready(Ok(File(Vec::new()))))
150//! }
151//!
152//! async fn example() -> Result<File, Error> {
153//! let mut download = try_download("https://iced.rs/logo.svg").pin();
154//!
155//! while let Some(progress) = download.sip().await {
156//! println!("{progress}%");
157//! }
158//!
159//! let logo = download.await?;
160//!
161//! // We are guaranteed to have a File here!
162//! Ok(logo)
163//! }
164//! ```
165//!
166//! Pretty much the same! It's quite easy to add error handling to an existing [`Sipper`].
167//! In fact, [`Straw`] is actually just an extension trait of a [`Sipper`] with a `Result` as output.
168//! Therefore, all the [`Sipper`] methods are available for [`Straw`] as well. It's just nicer to write!
169//!
170//! ## The Great Builder
171//! You can build a [`Sipper`] with the [`sipper`] function. It takes a closure that receives
172//! a [`Sender`]—for sending progress updates—and must return a [`Future`] producing the output.
173//!
174//! ```rust,ignore
175//! # use sipper::{sipper, Sipper};
176//! #
177//! # struct File(Vec<u8>);
178//! #
179//! # type Progress = u32;
180//! #
181//! fn download(url: &str) -> impl Sipper<File, Progress> + '_ {
182//! sipper(|mut sender| async move {
183//! // Perform async request here...
184//! let download = /* ... */;
185//!
186//! while let Some(chunk) = download.chunk().await {
187//! // ...
188//! // Send updates when needed
189//! sender.send(/* ... */).await;
190//!
191//! }
192//!
193//! File(/* ... */)
194//! })
195//! }
196//! ```
197//!
198//! Furthermore, [`Sipper`] has no required methods and is just an extension trait of a
199//! [`Future`] and [`Stream`] combo. This means you can come up with new ways to build a
200//! [`Sipper`] by implementing the async traits on any of your types. Additionally,
201//! any foreign type that implements both is already one.
202//!
203//! ## The Fancy Composition
204//! A [`Sipper`] supports a bunch of methods for easy composition; like [`with`], [`filter_with`],
205//! and [`run`].
206//!
207//! For instance, let's say we wanted to build a new function that downloads a bunch of files
208//! instead of just one:
209//!
210//! ```rust
211//! # use sipper::{sipper, Sipper};
212//! #
213//! # struct File(Vec<u8>);
214//! #
215//! # type Progress = u32;
216//! #
217//! # fn download(url: &str) -> impl Sipper<File, Progress> {
218//! # sipper(|_| futures::future::ready(File(Vec::new())))
219//! # }
220//! #
221//! fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
222//! sipper(move |sender| async move {
223//! let mut files = Vec::new();
224//!
225//! for (id, url) in urls.iter().enumerate() {
226//! let file = download(url)
227//! .with(move |progress| (id, progress))
228//! .run(&sender)
229//! .await;
230//!
231//! files.push(file);
232//! }
233//!
234//! files
235//! })
236//! }
237//! ```
238//!
239//! As you can see, we just leverage [`with`] to combine the download index with the progress
240//! and call [`run`] to drive the [`Sipper`] to completion—notifying properly through the [`Sender`].
241//!
242//! Of course, this example will download files sequentially; but, since [`run`] returns a simple
243//! [`Future`], a proper collection like [`FuturesOrdered`] could be used just as easily—if not
244//! more! Take a look:
245//!
246//! ```rust
247//! # use sipper::{sipper, Sipper};
248//! #
249//! # struct File(Vec<u8>);
250//! #
251//! # type Progress = u32;
252//! #
253//! # fn download(url: &str) -> impl Sipper<File, Progress> {
254//! # sipper(|_| futures::future::ready(File(Vec::new())))
255//! # }
256//! #
257//! use futures::stream::{FuturesOrdered, StreamExt};
258//!
259//! fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
260//! sipper(|sender| {
261//! urls.iter()
262//! .enumerate()
263//! .map(|(id, url)| {
264//! download(url)
265//! .with(move |progress| (id, progress))
266//! .run(&sender)
267//! })
268//! .collect::<FuturesOrdered<_>>()
269//! .collect()
270//! })
271//! }
272//! ```
273//!
274//! [`Stream`]: futures::Stream
275//! [`Sink`]: futures::Sink
276//! [`FutureExt`]: futures::FutureExt
277//! [`StreamExt`]: futures::StreamExt
278//! [`FuturesOrdered`]: futures::stream::FuturesOrdered
279//! [`with`]: Sipper::with
280//! [`filter_with`]: Sipper::filter_with
281//! [`run`]: Sipper::run
282mod core;
283mod filter_with;
284mod run;
285mod sender;
286mod straw;
287mod with;
288
289pub use core::Core;
290pub use filter_with::FilterWith;
291pub use run::Run;
292pub use sender::Sender;
293pub use straw::Straw;
294pub use with::With;
295
296use futures::channel::mpsc;
297use futures::stream;
298use pin_project_lite::pin_project;
299
300use std::pin::Pin;
301use std::task;
302
303#[doc(no_inline)]
304pub use futures::never::Never;
305#[doc(no_inline)]
306pub use futures::{Future, FutureExt, Sink, Stream, StreamExt};
307
308/// A sipper is both a [`Stream`] that produces a bunch of progress
309/// and a [`Future`] that produces some final output.
310pub trait Sipper<Output, Progress = Output>:
311 core::Core<Output = Output, Progress = Progress>
312{
313 /// Maps the progress of the [`Sipper`] with the given closure.
314 ///
315 /// This is analogous to `map` in many other types; but we use `with`
316 /// to avoid naming collisions with [`Future`] and [`Stream`].
317 fn with<F, A>(self, f: F) -> With<Self, F, A>
318 where
319 Self: Sized,
320 F: FnMut(Progress) -> A,
321 {
322 With::new(self, f)
323 }
324
325 /// Maps and filters the progress of the [`Sipper`] with the given closure.
326 ///
327 /// This is analogous to `filter_map` in many other types; but we use `filter_with`
328 /// to avoid naming collisions with [`Future`] and [`Stream`].
329 fn filter_with<F, A>(self, f: F) -> FilterWith<Self, F, A>
330 where
331 Self: Sized,
332 F: FnMut(Progress) -> Option<A>,
333 {
334 FilterWith::new(self, f)
335 }
336
337 /// Returns the next progress, if any.
338 ///
339 /// When this method returns `None`, it means there is no more progress to be made;
340 /// and the output is ready.
341 fn sip(&mut self) -> stream::Next<'_, Self>
342 where
343 Self: Unpin,
344 {
345 StreamExt::next(self)
346 }
347
348 /// Runs the [`Sipper`], sending any progress through the given [`Sender`] and returning
349 /// its output at the end.
350 fn run<S>(self, on_progress: impl Into<Sender<Progress, S>>) -> Run<Self, S>
351 where
352 Self: Sized,
353 S: Sink<Progress>,
354 {
355 Run::new(self, on_progress.into().sink)
356 }
357
358 /// Pins the [`Sipper`] in a [`Box`].
359 ///
360 /// You may need to call this method before being able to [`sip`](Self::sip).
361 fn pin(self) -> Pin<Box<Self>>
362 where
363 Self: Sized,
364 {
365 Box::pin(self)
366 }
367}
368
369impl<T, Output, Progress> Sipper<Output, Progress> for T where
370 T: core::Core<Output = Output, Progress = Progress>
371{
372}
373
374/// Creates a new [`Sipper`] from the given async closure, which receives
375/// a [`Sender`] that can be used to notify progress asynchronously.
376pub fn sipper<Progress, F>(
377 builder: impl FnOnce(Sender<Progress>) -> F,
378) -> impl Sipper<F::Output, Progress>
379where
380 F: Future,
381{
382 pin_project! {
383 struct Internal<F, Progress>
384 where
385 F: Future,
386 {
387 #[pin]
388 future: F,
389 #[pin]
390 receiver: mpsc::Receiver<Progress>,
391 output: Option<F::Output>,
392 is_progress_finished: bool,
393 }
394 }
395
396 impl<F, Progress> Future for Internal<F, Progress>
397 where
398 F: Future,
399 {
400 type Output = F::Output;
401
402 fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
403 let mut this = self.project();
404
405 if !*this.is_progress_finished {
406 loop {
407 match this.receiver.as_mut().poll_next(cx) {
408 task::Poll::Ready(Some(_)) => {} // Discard
409 task::Poll::Ready(None) => {
410 *this.is_progress_finished = true;
411 break;
412 }
413 task::Poll::Pending => {
414 break;
415 }
416 }
417 }
418 }
419
420 if let Some(output) = this.output.take() {
421 task::Poll::Ready(output)
422 } else {
423 this.future.poll(cx)
424 }
425 }
426 }
427
428 impl<F, Progress> Stream for Internal<F, Progress>
429 where
430 F: Future,
431 {
432 type Item = Progress;
433
434 fn poll_next(
435 self: Pin<&mut Self>,
436 cx: &mut task::Context<'_>,
437 ) -> task::Poll<Option<Self::Item>> {
438 use futures::ready;
439
440 let mut this = self.project();
441
442 if !*this.is_progress_finished {
443 match this.receiver.as_mut().poll_next(cx) {
444 task::Poll::Ready(None) => {
445 *this.is_progress_finished = true;
446 }
447 task::Poll::Ready(progress) => return task::Poll::Ready(progress),
448 task::Poll::Pending => {}
449 }
450 }
451
452 if this.output.is_some() {
453 return task::Poll::Ready(None);
454 }
455
456 *this.output = Some(ready!(this.future.poll(cx)));
457
458 if *this.is_progress_finished {
459 task::Poll::Ready(None)
460 } else {
461 task::Poll::Pending
462 }
463 }
464 }
465
466 let (sender, receiver) = Sender::channel(1);
467
468 Internal {
469 future: builder(sender),
470 receiver,
471 is_progress_finished: false,
472 output: None,
473 }
474}
475
476/// Turns a [`Sipper`] into a [`Stream`].
477///
478/// This is only possible if the `Output` and `Progress` types of the [`Sipper`] match!
479pub fn stream<Output>(sipper: impl Sipper<Output>) -> impl Stream<Item = Output> {
480 let sip = sipper.pin();
481
482 stream::unfold(Some(sip), |mut sip| async move {
483 if let Some(progress) = sip.as_mut()?.next().await {
484 Some((progress, sip))
485 } else {
486 Some((sip.take()?.await, sip))
487 }
488 })
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494
495 use tokio::task;
496 use tokio::test;
497
498 #[derive(Debug, PartialEq, Eq)]
499 struct File(Vec<u8>);
500
501 type Progress = u32;
502
503 #[derive(Debug, PartialEq, Eq)]
504 enum Error {
505 Failed,
506 }
507
508 fn download(url: &str) -> impl Sipper<File, Progress> + '_ {
509 sipper(move |mut sender| async move {
510 let _url = url;
511
512 for i in 0..=100 {
513 sender.send(i).await;
514 }
515
516 File(vec![1, 2, 3, 4])
517 })
518 }
519
520 fn try_download(url: &str) -> impl Straw<File, Progress, Error> + '_ {
521 sipper(move |mut sender| async move {
522 let _url = url;
523
524 for i in 0..=42 {
525 sender.send(i).await;
526 }
527
528 Err(Error::Failed)
529 })
530 }
531
532 #[test]
533 async fn it_is_a_future() {
534 assert_eq!(
535 download("https://iced.rs/logo.svg").await,
536 File(vec![1, 2, 3, 4])
537 );
538 }
539
540 #[test]
541 async fn it_is_a_stream() {
542 assert!(download("https://iced.rs/logo.svg")
543 .collect::<Vec<_>>()
544 .await
545 .into_iter()
546 .eq(0..=100));
547 }
548
549 #[test]
550 async fn it_works() {
551 use futures::StreamExt;
552
553 let (sender, receiver) = mpsc::channel(1);
554
555 let progress = task::spawn(receiver.collect::<Vec<_>>());
556 let file = download("https://iced.rs/logo.svg").run(sender).await;
557
558 assert!(progress
559 .await
560 .expect("Collect progress")
561 .into_iter()
562 .eq(0..=100));
563
564 assert_eq!(file, File(vec![1, 2, 3, 4]));
565 }
566
567 #[test]
568 async fn it_sips() {
569 let mut i = 0;
570 let mut last_progress = None;
571
572 let mut download = download("https://iced.rs/logo.svg").pin();
573
574 while let Some(progress) = download.sip().await {
575 i += 1;
576 last_progress = Some(progress);
577 }
578
579 let file = download.await;
580
581 assert_eq!(i, 101);
582 assert_eq!(last_progress, Some(100));
583 assert_eq!(file, File(vec![1, 2, 3, 4]));
584 }
585
586 #[test]
587 async fn it_sips_partially() {
588 let mut download = download("https://iced.rs/logo.svg").pin();
589
590 assert_eq!(download.next().await, Some(0));
591 assert_eq!(download.next().await, Some(1));
592 assert_eq!(download.next().await, Some(2));
593 assert_eq!(download.next().await, Some(3));
594 assert_eq!(download.await, File(vec![1, 2, 3, 4]));
595 }
596
597 #[test]
598 async fn it_sips_fully_and_completes() {
599 let mut finished = false;
600
601 {
602 let mut download = sipper(|sender| async {
603 let _ = download("https://iced.rs/logo.svg").run(sender).await;
604
605 tokio::task::yield_now().await;
606 tokio::task::yield_now().await;
607 tokio::task::yield_now().await;
608
609 finished = true;
610 })
611 .pin();
612
613 while download.next().await.is_some() {}
614 }
615
616 assert!(finished);
617 }
618
619 #[test]
620 async fn it_can_be_streamed() {
621 async fn uses_stream(stream: impl Stream<Item = File> + Send) {
622 use futures::StreamExt;
623 let files: Vec<_> = stream.collect().await;
624
625 assert_eq!(files.len(), 102);
626 assert_eq!(files.last(), Some(&File(vec![1, 2, 3, 4])));
627 }
628
629 uses_stream(stream(
630 download("https://iced.rs/logo.svg").with(|_| File(vec![])),
631 ))
632 .await;
633 }
634
635 #[test]
636 async fn it_can_fail() {
637 let mut i = 0;
638 let mut last_progress = None;
639
640 let mut download = try_download("https://iced.rs/logo.svg").pin();
641
642 while let Some(progress) = download.next().await {
643 i += 1;
644 last_progress = Some(progress);
645 }
646
647 let file = download.await;
648
649 assert_eq!(i, 43);
650 assert_eq!(last_progress, Some(42));
651 assert_eq!(file, Err(Error::Failed));
652 }
653
654 #[test]
655 async fn it_can_be_mapped() {
656 let mapper = |progress| progress * 2;
657
658 let download = download("https://iced.rs/logo.svg")
659 .with(mapper)
660 .collect::<Vec<_>>()
661 .await;
662
663 assert_eq!(
664 download.into_iter().sum::<u32>(),
665 (0..=100).map(mapper).sum()
666 );
667 }
668
669 #[test]
670 async fn it_can_be_filtered() {
671 let filter = |progress| (progress % 2 == 0).then_some(progress);
672
673 let download = download("https://iced.rs/logo.svg")
674 .filter_with(filter)
675 .collect::<Vec<_>>()
676 .await;
677
678 assert_eq!(
679 download.into_iter().sum::<u32>(),
680 (0..=100).filter_map(filter).sum()
681 );
682 }
683
684 #[test]
685 async fn it_composes_nicely() {
686 use futures::stream::{FuturesOrdered, StreamExt};
687
688 fn download_all<'a>(urls: &'a [&str]) -> impl Sipper<Vec<File>, (usize, Progress)> + 'a {
689 sipper(|sender| {
690 urls.iter()
691 .enumerate()
692 .map(|(id, url)| {
693 download(url)
694 .with(move |progress| (id, progress))
695 .run(&sender)
696 })
697 .collect::<FuturesOrdered<_>>()
698 .collect()
699 })
700 }
701
702 let mut download =
703 download_all(&["https://iced.rs/logo.svg", "https://iced.rs/logo.white.svg"]).pin();
704
705 let mut i = 0;
706
707 while let Some(_progress) = download.next().await {
708 i += 1;
709 }
710
711 let files = download.await;
712
713 assert_eq!(i, 202);
714 assert_eq!(files, vec![File(vec![1, 2, 3, 4]), File(vec![1, 2, 3, 4])]);
715 }
716}