ufotofu/
lib.rs

1#![no_std]
2#![allow(clippy::needless_range_loop)]
3#![allow(clippy::type_complexity)]
4
5//! # UFOTOFU
6//!
7//! UFOTOFU provides APIs for lazily producing or consuming sequences of arbitrary length, serving as async redesigns of traits such as [`Iterator`], [`io::Read`](std::io::Read), or [`io::Write`](std::io::Write). Highlights include
8//!
9//! - bulk data transfer without temporary buffers,
10//! - freely choosable error and item types, even for readers and writers,
11//! - meaningful subtyping relations between streams and readers, and between sinks and writers,
12//! - the ability to represent finite and infinite sequences on the type level, and
13//! - `nostd` support.
14//!
15//! You can read an in-depth discussion of the API designs [here](https://github.com/AljoschaMeyer/lazy_on_principle/blob/main/main.pdf).
16//!
17//! ## Core Abstractions
18//!
19//! UFOTOFU is built around a small hierarchy of traits that describe how to produce or consume a sequence item by item.
20//!
21//! A [`Producer`] provides the items of a sequence to some client code, similar to the [`futures::Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html) and [`core::iter::Iterator`] traits. Client code can repeatedly request the next item, and receives either another item, an error, or a dedicated *final* item which may be of a different type than the repeated items. An *iterator* of `T`s corresponds to a *producer* of `T`s with final item type `()` and error type [`Infallible`](core::convert::Infallible).
22//!
23//! A [`Consumer`] accepts the items of a sequence from some client code, similar to the [`futures::Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html) trait. Client code can repeatedly add new items to the sequence, until it adds a single *final* item which may be of a different type than the repeated items. A final item type of `()` makes adding the final item equivalent to calling a conventional [`close`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html#tymethod.poll_close) method.
24//!
25//! Producers and consumers are fully dual; the [`pipe`] function writes as much data as possible from a producer into a consumer.
26//!
27//! Consumers often buffer items in an internal queue before performing side-effects on data in larger chunks, such as writing data to the network only once a full packet can be filled. The [`BufferedConsumer`] trait extends the [`Consumer`] trait to allow client code to trigger effectful flushing of internal buffers. Dually, the [`BufferedProducer`] trait extends the [`Producer`] trait to allow client code to trigger effectful prefetching of data into internal buffers.
28//!
29//! Finally, the [`BulkProducer`] and [`BulkConsumer`] traits extend [`BufferedProducer`] and [`BufferedConsumer`] respectively with the ability to operate on whole slices of items at a time, similar to [`std::io::Read`] and [`std::io::Write`]. The [`bulk_pipe`] function leverages this ability to efficiently pipe data — unlike when using the standard library's [Read](std::io::Read) and [Write](std::io::Write) traits, this is possible without allocating an auxiliary buffer.
30//!
31//! ## Async Conventions
32//!
33//! UFOTOFU provides async APIs, and follows some consistent conventions. Most importantly, the futures returned by any UFOTOFU method are not expected to be cancellation safe: while it is allowed to drop a Future returned by an UFOTOFU trait method without polling it to completion, it is forbidden to call any further UFOTOFU trait methods on the same value afterwards. In other words: you must poll the future returned by any trait method to completion before calling the next trait method. This mirrors — as closely as possible — the design of sync APIs, where there is no way to partially execute a method either.
34//!
35//! Further, UFOTOFU never requires a `Send` bound on any futures. In other words, it can only be used with single-threaded async runtimes (or, more precisely, runtimes that confine the execution of any one future to a single thread).
36//!
37//! ## Module Organisation
38//!
39//! This top-level module provides the core UFOTOFU traits, and basic piping functionality:
40//!
41//! - Traits for producing sequences: [`Producer`], [`BufferedProducer`], and [`BulkProducer`].
42//! - Traits for consuming sequences: [`Consumer`], [`BufferedConsumer`], and [`BulkConsumer`].
43//! - Piping data: [`pipe`] and [`bulk_pipe`].
44//!
45//! Further functionality, specific to producers and consumers respectively, is exposed in the [`producer`] and [`consumer`] modules.
46//!
47//! ## Feature Flags
48//!
49//! UFOTOFU gates several features that are only interesting under certain circumstances behind feature flags. These API docs document *all* functionality, though, as if all feature flags were activated.
50//!
51//! All functionality which relies on the Rust standard library is gated behind the `std` feature flag (enabled by default).
52//!
53//! All functionality which performs dynamic memory allocations is gated behind the `alloc` feature flag (disabled by default, implied by the `std` feature).
54//!
55//! All functionality which provides interoperability with other async sequence manipulation crates is gated behind the `compat` feature flag (disabled by default).
56//!
57//! All functionality specifically designed to aid in testing and development is gated behind the `dev` feature flag (disabled by default).
58
59#[cfg(feature = "std")]
60extern crate std;
61
62#[cfg(feature = "alloc")]
63extern crate alloc;
64
65use core::cmp::min;
66use core::future::Future;
67
68use either::Either::{self, *};
69
70// This allows macros to use `ufotofu` instead of `crate`, which might become
71// convenient some day.
72extern crate self as ufotofu;
73
74#[macro_use]
75mod common_macros;
76
77mod errors;
78pub use errors::*;
79
80pub mod consumer;
81pub mod producer;
82
83#[cfg(all(feature = "dev", feature = "alloc"))]
84mod test_yielder;
85
86/// A [`Consumer`] consumes a potentially infinite sequence, one item at a time.
87///
88/// The sequence consists of an arbitrary number of values of type [`Self::Item`], followed by
89/// up to one value of type [`Self::Final`]. If you intend for the sequence to be infinite, use
90/// [`Infallible`](core::convert::Infallible) for [`Self::Final`].
91///
92/// A consumer may signal an error of type [`Self::Error`] instead of consuming any item (whether repeated or final).
93pub trait Consumer {
94    /// The sequence consumed by this consumer starts with *arbitrarily many* values of this type.
95    type Item;
96    /// The sequence consumed by this consumer ends with *up to one* value of this type.
97    type Final;
98    /// The type of errors the consumer can emit instead of doing its job.
99    type Error;
100
101    /// Attempts to consume the next item.
102    ///
103    /// After this function returns an error, no further functions of this trait may be invoked.
104    ///
105    /// #### Invariants
106    ///
107    /// Must not be called after any function of this trait returned an error,
108    /// nor after [`close`](Consumer::close) was called.
109    fn consume(&mut self, item: Self::Item) -> impl Future<Output = Result<(), Self::Error>>;
110
111    /// Attempts to consume the final item.
112    ///
113    /// After this function is called, no further functions of this trait may be invoked.
114    ///
115    /// #### Invariants
116    ///
117    /// Must not be called after any function of this trait has returned an error,
118    /// nor after [`close`](Consumer::close) was called.
119    fn close(&mut self, fin: Self::Final) -> impl Future<Output = Result<(), Self::Error>>;
120
121    /// Tries to consume (clones of) *all* items in the given slice.
122    /// Reports an error if the slice could not be consumed completely.
123    ///
124    /// This is a trait method for convenience, you should never need to
125    /// replace the default implementation.
126    ///
127    /// #### Invariants
128    ///
129    /// Must not be called after any function of this trait has returned an error,
130    /// nor after [`close`](Consumer::close) was called.
131    ///
132    /// #### Implementation Notes
133    ///
134    /// This is a trait method for convenience, you should never need to
135    /// replace the default implementation.
136    fn consume_full_slice(
137        &mut self,
138        buf: &[Self::Item],
139    ) -> impl Future<Output = Result<(), ConsumeAtLeastError<Self::Error>>>
140    where
141        Self::Item: Clone,
142    {
143        async {
144            for i in 0..buf.len() {
145                let item = buf[i].clone();
146
147                if let Err(err) = self.consume(item).await {
148                    return Err(ConsumeAtLeastError {
149                        count: i,
150                        reason: err,
151                    });
152                }
153            }
154
155            Ok(())
156        }
157    }
158}
159
160/// A [`Consumer`] that can delay performing side-effects when consuming items.
161///
162/// It must not delay performing side-effects when being closed. In other words,
163/// calling [`close`](Consumer::close) should internally trigger flushing.
164pub trait BufferedConsumer: Consumer {
165    /// Forces the consumer to perform any side-effects that were delayed for previously consumed items.
166    ///
167    /// This function allows client code to force execution of the (potentially expensive)
168    /// side-effects. In exchange, the consumer gains the freedom to delay the side-effects of
169    /// [`consume`](Consumer::consume) to improve efficiency.
170    ///
171    /// After this function returns an error, no further functions of this trait may be invoked.
172    ///
173    /// #### Invariants
174    ///
175    /// Must not be called after any function of this trait has returned an error,
176    /// nor after [`close`](Consumer::close) was called.
177    fn flush(&mut self) -> impl Future<Output = Result<(), Self::Error>>;
178}
179
180/// A [`Consumer`] that is able to consume several items with a single function call, in order to
181/// improve on the efficiency of the [`Consumer`] trait. Semantically, there must be no
182/// difference between consuming items in bulk or one item at a time.
183pub trait BulkConsumer: BufferedConsumer {
184    /// A low-level method for consuming multiple items at a time. If you are only *working* with consumers (rather than *implementing* them), you will probably want to ignore this method and use [BulkConsumer::bulk_consume] instead.
185    ///
186    /// Exposes a non-empty slice of memory for client code to fill with items that should
187    /// be consumed.
188    ///
189    /// The consumer should expose the largest contiguous slice it can expose efficiently.
190    /// It should not perform copies to increase the size of the slice. Client code must be
191    /// able to make progress even if this function always returns slices of size one.
192    ///
193    /// After this function returns an error, no further functions of this trait may be invoked.
194    ///
195    /// #### Invariants
196    ///
197    /// Must not be called after any function of this trait has returned an error,
198    /// nor after [`close`](Consumer::close) was called.
199    fn expose_slots<'a>(
200        &'a mut self,
201    ) -> impl Future<Output = Result<&'a mut [Self::Item], Self::Error>>
202    where
203        Self::Item: 'a;
204
205    /// A low-level method for consuming multiple items at a time. If you are only *working* with consumers (rather than *implementing* them), you will probably want to ignore this method and use [BulkConsumer::bulk_consume] instead.
206    ///
207    /// Instructs the consumer to consume the first `amount` many items of the slots
208    /// it has most recently exposed. The semantics must be equivalent to those of [`consume`](Consumer::consume)
209    /// being called `amount` many times with exactly those items.
210    ///
211    /// After this function returns an error, no further functions of this trait may be invoked.
212    ///
213    /// #### Invariants
214    ///
215    /// Callers must have written into (at least) the `amount` many first slots that
216    /// were most recently exposed.
217    ///
218    /// Must not be called after any function of this trait has returned an error, nor after
219    /// [`close`](Consumer::close) was called.
220    fn consume_slots(&mut self, amount: usize) -> impl Future<Output = Result<(), Self::Error>>;
221
222    /// Consumes a non-zero number of items by reading them from a given buffer and returning how
223    /// many items were consumed.
224    ///
225    /// After this function returns an error, no further functions of this trait may be invoked.
226    ///
227    /// #### Invariants
228    ///
229    /// Must not be called after any function of this trait has returned an error, nor after
230    /// [`close`](Consumer::close) was called.
231    ///
232    /// #### Implementation Notes
233    ///
234    /// The default implementation orchestrates [`expose_slots`](BulkConsumer::expose_slots) and [`consume_slots`](BulkConsumer::consume_slots) in a
235    /// straightforward manner. Only provide your own implementation if you can do better
236    /// than that.
237    fn bulk_consume(
238        &mut self,
239        buf: &[Self::Item],
240    ) -> impl Future<Output = Result<usize, Self::Error>>
241    where
242        Self::Item: Clone,
243    {
244        async {
245            let slots = self.expose_slots().await?;
246            let amount = min(slots.len(), buf.len());
247            slots[0..amount].clone_from_slice(&buf[0..amount]);
248            self.consume_slots(amount).await?;
249
250            Ok(amount)
251        }
252    }
253
254    /// Tries to bulk-consume (copies of) *all* items in the given slice.
255    /// Reports an error if the slice could not be consumed completely.
256    ///
257    /// This is a trait method for convenience, you should never need to
258    /// replace the default implementation.
259    ///
260    /// #### Invariants
261    ///
262    /// Must not be called after any function of this trait has returned an error,
263    /// nor after [`close`](Consumer::close) was called.
264    ///
265    /// #### Implementation Notes
266    ///
267    /// This is a trait method for convenience, you should never need to
268    /// replace the default implementation.
269    fn bulk_consume_full_slice(
270        &mut self,
271        buf: &[Self::Item],
272    ) -> impl Future<Output = Result<(), ConsumeAtLeastError<Self::Error>>>
273    where
274        Self::Item: Clone,
275    {
276        async {
277            let mut consumed_so_far = 0;
278
279            while consumed_so_far < buf.len() {
280                match self.bulk_consume(&buf[consumed_so_far..]).await {
281                    Ok(consumed_count) => consumed_so_far += consumed_count,
282                    Err(err) => {
283                        return Err(ConsumeAtLeastError {
284                            count: consumed_so_far,
285                            reason: err,
286                        });
287                    }
288                }
289            }
290
291            Ok(())
292        }
293    }
294}
295
296/// A [`Producer`] produces a potentially infinite sequence, one item at a time.
297///
298/// The sequence consists of an arbitrary number of values of type [`Self::Item`], followed by
299/// up to one value of type [`Self::Final`]. If you intend for the sequence to be infinite, use
300/// [`Infallible`](core::convert::Infallible) for [`Self::Final`].
301///
302/// A producer may signal an error of type [`Self::Error`] instead of producing an item (whether repeated or final).
303pub trait Producer {
304    /// The sequence produced by this producer starts with *arbitrarily many* values of this type.
305    type Item;
306    /// The sequence produced by this producer ends with *up to one* value of this type.
307    type Final;
308    /// The type of errors the producer can emit instead of doing its job.
309    type Error;
310
311    /// Attempts to produce the next item, which is either a regular repeated item or the final item.
312    ///
313    /// After this function returns the final item, or after it returns an error, no further
314    /// functions of this trait may be invoked.
315    ///
316    /// #### Invariants
317    ///
318    /// Must not be called after any function of this trait has returned a final item or an error.
319    fn produce(
320        &mut self,
321    ) -> impl Future<Output = Result<Either<Self::Item, Self::Final>, Self::Error>>;
322
323    /// Tries to produce a regular item, and reports an error if the final item was produced instead.
324    ///
325    /// This is a trait method for convenience, you should never need to
326    /// replace the default implementation.
327    ///
328    /// #### Invariants
329    ///
330    /// Must not be called after any function of this trait has returned an error,
331    /// nor after [`close`](Consumer::close) was called.
332    ///
333    /// #### Implementation Notes
334    ///
335    /// This is a trait method for convenience, you should never need to
336    /// replace the default implementation.
337    fn produce_item(
338        &mut self,
339    ) -> impl Future<Output = Result<Self::Item, ProduceAtLeastError<Self::Final, Self::Error>>>
340    {
341        async {
342            match self.produce().await {
343                Ok(Left(item)) => Ok(item),
344                Ok(Right(fin)) => Err(ProduceAtLeastError {
345                    count: 0,
346                    reason: Left(fin),
347                }),
348                Err(err) => Err(ProduceAtLeastError {
349                    count: 0,
350                    reason: Right(err),
351                }),
352            }
353        }
354    }
355
356    /// Tries to completely overwrite a slice with items from a producer.
357    /// Reports an error if the slice could not be overwritten completely.
358    ///
359    /// This is a trait method for convenience, you should never need to
360    /// replace the default implementation.
361    ///
362    /// #### Invariants
363    ///
364    /// Must not be called after any function of this trait has returned an error,
365    /// nor after [`close`](Consumer::close) was called.
366    ///
367    /// #### Implementation Notes
368    ///
369    /// This is a trait method for convenience, you should never need to
370    /// replace the default implementation.
371    fn overwrite_full_slice(
372        &mut self,
373        buf: &mut [Self::Item],
374    ) -> impl Future<Output = Result<(), ProduceAtLeastError<Self::Final, Self::Error>>> {
375        async {
376            for i in 0..buf.len() {
377                match self.produce().await {
378                    Ok(Left(item)) => buf[i] = item,
379                    Ok(Right(fin)) => {
380                        return Err(ProduceAtLeastError {
381                            count: i,
382                            reason: Left(fin),
383                        })
384                    }
385                    Err(err) => {
386                        return Err(ProduceAtLeastError {
387                            count: i,
388                            reason: Right(err),
389                        })
390                    }
391                }
392            }
393
394            Ok(())
395        }
396    }
397}
398
399/// A [`Producer`] that can eagerly perform side-effects to prepare values for later yielding.
400pub trait BufferedProducer: Producer {
401    /// Asks the producer to prepare some values for yielding.
402    ///
403    /// This function allows the [`Producer`] to perform side effects that it would otherwise
404    /// have to do just-in-time when [`produce`](Producer::produce) gets called.
405    ///
406    /// After this function returns an error, no further functions of this trait may be invoked.
407    ///
408    /// #### Invariants
409    ///
410    /// Must not be called after any function of this trait has returned a final item or an error.
411    fn slurp(&mut self) -> impl Future<Output = Result<(), Self::Error>>;
412}
413
414/// A [`Producer`] that is able to produce several items with a single function call, in order to
415/// improve on the efficiency of the [`Producer`] trait. Semantically, there must be no difference
416/// between producing items in bulk or one item at a time.
417pub trait BulkProducer: BufferedProducer {
418    /// A low-level method for producing multiple items at a time. If you are only *working* with producers (rather than *implementing* them), you will probably want to ignore this method and use [BulkProducer::bulk_produce] instead.
419    ///
420    /// Exposes a non-empty slice of items to be produced (or the final value, or an error).
421    /// The items in the slice must not have been emitted by [`produce`](Producer::produce) before.
422    ///
423    /// The producer should expose the largest contiguous slice it can expose efficiently.
424    /// It should not perform copies to increase the size of the slice. Client code must be
425    /// able to make progress even if this function always returns slices of size one.
426    ///
427    /// After this function returns the final item, or after it returns an error, no further
428    /// functions of this trait may be invoked.
429    ///
430    /// #### Invariants
431    ///
432    /// Must not be called after any function of this trait has returned a final item or an error.
433    fn expose_items<'a>(
434        &'a mut self,
435    ) -> impl Future<Output = Result<Either<&'a [Self::Item], Self::Final>, Self::Error>>
436    where
437        Self::Item: 'a;
438
439    /// A low-level method for producing multiple items at a time. If you are only *working* with producers (rather than *implementing* them), you will probably want to ignore this method and use [BulkProducer::bulk_produce] instead.
440    ///
441    /// Marks `amount` many items as having been produced. Future calls to [`produce`](Producer::produce) and to
442    /// [`expose_items`](BulkProducer::expose_items) must act as if [`produce`](Producer::produce) had been called `amount` many times.
443    ///
444    /// After this function returns an error, no further functions of this trait may be invoked.
445    ///
446    /// #### Invariants
447    ///
448    /// Callers must not mark items as produced that had not previously been exposed by [`expose_items`](BulkProducer::expose_items).
449    ///
450    /// Must not be called after any function of this trait returned a final item or an error.
451    fn consider_produced(&mut self, amount: usize)
452        -> impl Future<Output = Result<(), Self::Error>>;
453
454    /// Produces a non-zero number of items by writing them into a given buffer and returning how
455    /// many items were produced.
456    ///
457    /// After this function returns the final item, or after it returns an error, no further
458    /// functions of this trait may be invoked.
459    ///
460    /// #### Invariants
461    ///
462    /// Must not be called after any function of this trait has returned a final item or an error.
463    ///
464    /// #### Implementation Notes
465    ///
466    /// The default implementation orchestrates [`expose_items`](BulkProducer::expose_items) and [`consider_produced`](BulkProducer::consider_produced) in a
467    /// straightforward manner. Only provide your own implementation if you can do better
468    /// than that.
469    fn bulk_produce(
470        &mut self,
471        buf: &mut [Self::Item],
472    ) -> impl Future<Output = Result<Either<usize, Self::Final>, Self::Error>>
473    where
474        Self::Item: Clone,
475    {
476        async {
477            match self.expose_items().await? {
478                Either::Left(slots) => {
479                    let amount = min(slots.len(), buf.len());
480                    buf[0..amount].clone_from_slice(&slots[0..amount]);
481
482                    self.consider_produced(amount).await?;
483
484                    Ok(Either::Left(amount))
485                }
486                Either::Right(final_value) => Ok(Either::Right(final_value)),
487            }
488        }
489    }
490
491    /// Tries to completely overwrite a slice with items from a bulk producer.
492    /// Reports an error if the slice could not be overwritten completely.
493    ///
494    /// This is a trait method for convenience, you should never need to
495    /// replace the default implementation.
496    ///
497    /// #### Invariants
498    ///
499    /// Must not be called after any function of this trait has returned an error,
500    /// nor after [`close`](Consumer::close) was called.
501    ///
502    /// #### Implementation Notes
503    ///
504    /// This is a trait method for convenience, you should never need to
505    /// replace the default implementation.
506    fn bulk_overwrite_full_slice(
507        &mut self,
508        buf: &mut [Self::Item],
509    ) -> impl Future<Output = Result<(), ProduceAtLeastError<Self::Final, Self::Error>>>
510    where
511        Self::Item: Clone,
512    {
513        async {
514            let mut produced_so_far = 0;
515
516            while produced_so_far < buf.len() {
517                match self.bulk_produce(&mut buf[produced_so_far..]).await {
518                    Ok(Left(count)) => produced_so_far += count,
519                    Ok(Right(fin)) => {
520                        return Err(ProduceAtLeastError {
521                            count: produced_so_far,
522                            reason: Left(fin),
523                        });
524                    }
525                    Err(err) => {
526                        return Err(ProduceAtLeastError {
527                            count: produced_so_far,
528                            reason: Right(err),
529                        });
530                    }
531                }
532            }
533
534            Ok(())
535        }
536    }
537}
538
539/// Pipes as many items as possible from a [`Producer`] into a [`Consumer`]. Then calls [`close`](Consumer::close)
540/// on the consumer with the final value emitted by the producer.
541pub async fn pipe<P, C>(
542    producer: &mut P,
543    consumer: &mut C,
544) -> Result<(), PipeError<P::Error, C::Error>>
545where
546    P: Producer,
547    C: Consumer<Item = P::Item, Final = P::Final>,
548{
549    loop {
550        match producer.produce().await {
551            Ok(Either::Left(item)) => {
552                match consumer.consume(item).await {
553                    Ok(()) => {
554                        // No-op, continues with next loop iteration.
555                    }
556                    Err(consumer_error) => {
557                        return Err(PipeError::Consumer(consumer_error));
558                    }
559                }
560            }
561            Ok(Either::Right(final_value)) => match consumer.close(final_value).await {
562                Ok(()) => {
563                    return Ok(());
564                }
565                Err(consumer_error) => {
566                    return Err(PipeError::Consumer(consumer_error));
567                }
568            },
569            Err(producer_error) => {
570                return Err(PipeError::Producer(producer_error));
571            }
572        }
573    }
574}
575
576/// Efficiently pipes as many items as possible from a [`BulkProducer`] into a [`BulkConsumer`]
577/// using [`consumer.bulk_consume`](BulkConsumer::bulk_consume). Then calls [`close`](Consumer::close) on the consumer with the final value
578/// emitted by the producer.
579pub async fn bulk_pipe<P, C>(
580    producer: &mut P,
581    consumer: &mut C,
582) -> Result<(), PipeError<P::Error, C::Error>>
583where
584    P: BulkProducer,
585    P::Item: Clone,
586    C: BulkConsumer<Item = P::Item, Final = P::Final>,
587{
588    loop {
589        match producer.expose_items().await {
590            Ok(Either::Left(slots)) => {
591                let amount = match consumer.bulk_consume(slots).await {
592                    Ok(amount) => amount,
593                    Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
594                };
595                match producer.consider_produced(amount).await {
596                    Ok(()) => {
597                        // No-op, continues with next loop iteration.
598                    }
599                    Err(producer_error) => return Err(PipeError::Producer(producer_error)),
600                };
601            }
602            Ok(Either::Right(final_value)) => {
603                match consumer.close(final_value).await {
604                    Ok(()) => return Ok(()),
605                    Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
606                };
607            }
608            Err(producer_error) => {
609                return Err(PipeError::Producer(producer_error));
610            }
611        }
612    }
613}
614
615/// Pipes at most `count` many items from a [`Producer`] into a [`Consumer`],
616/// and reports how many items were piped.
617/// The producer has emitted its final item (which was then used to close the
618/// consumer) if and only if the number of returned items is strictly less
619/// than `count`.
620pub async fn pipe_at_most<P, C>(
621    producer: &mut P,
622    consumer: &mut C,
623    count: usize,
624) -> Result<usize, PipeError<P::Error, C::Error>>
625where
626    P: Producer,
627    C: Consumer<Item = P::Item, Final = P::Final>,
628{
629    let mut piped = 0;
630    while piped < count {
631        match producer.produce().await {
632            Ok(Either::Left(item)) => {
633                match consumer.consume(item).await {
634                    Ok(()) => {
635                        piped += 1;
636                        // Then continues with next loop iteration.
637                    }
638                    Err(consumer_error) => {
639                        return Err(PipeError::Consumer(consumer_error));
640                    }
641                }
642            }
643            Ok(Either::Right(final_value)) => match consumer.close(final_value).await {
644                Ok(()) => {
645                    return Ok(piped);
646                }
647                Err(consumer_error) => {
648                    return Err(PipeError::Consumer(consumer_error));
649                }
650            },
651            Err(producer_error) => {
652                return Err(PipeError::Producer(producer_error));
653            }
654        }
655    }
656
657    Ok(piped)
658}
659
660/// Efficiently pipes at most `count` many items from a [`BulkProducer`] into a [`BulkConsumer`] [`consumer.bulk_consume`](BulkConsumer::bulk_consume),
661/// and reports how many items were piped.
662/// The producer has emitted its final item (which was then used to close the
663/// consumer) if and only if the number of returned items is strictly less
664/// than `count`.
665pub async fn bulk_pipe_at_most<P, C>(
666    producer: &mut P,
667    consumer: &mut C,
668    count: usize,
669) -> Result<usize, PipeError<P::Error, C::Error>>
670where
671    P: BulkProducer,
672    P::Item: Clone,
673    C: BulkConsumer<Item = P::Item, Final = P::Final>,
674{
675    let mut piped = 0;
676    while piped < count {
677        match producer.expose_items().await {
678            Ok(Either::Left(slots)) => {
679                let max_slots = min(count - piped, slots.len());
680                let amount = match consumer.bulk_consume(&slots[..max_slots]).await {
681                    Ok(amount) => amount,
682                    Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
683                };
684                match producer.consider_produced(amount).await {
685                    Ok(()) => {
686                        piped += amount;
687                        // Then continues with next loop iteration.
688                    }
689                    Err(producer_error) => return Err(PipeError::Producer(producer_error)),
690                };
691            }
692            Ok(Either::Right(final_value)) => {
693                match consumer.close(final_value).await {
694                    Ok(()) => return Ok(piped),
695                    Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
696                };
697            }
698            Err(producer_error) => {
699                return Err(PipeError::Producer(producer_error));
700            }
701        }
702    }
703
704    Ok(piped)
705}