ufotofu/lib.rs
1#![no_std]
2#![allow(clippy::needless_range_loop)]
3#![allow(clippy::type_complexity)]
4
5//! # UFOTOFU
6//!
7//! UFOTOFU provides APIs for lazily producing or consuming sequences of arbitrary length, serving as async redesigns of traits such as [`Iterator`], [`io::Read`](std::io::Read), or [`io::Write`](std::io::Write). Highlights include
8//!
9//! - bulk data transfer without temporary buffers,
10//! - freely choosable error and item types, even for readers and writers,
11//! - meaningful subtyping relations between streams and readers, and between sinks and writers,
12//! - the ability to represent finite and infinite sequences on the type level, and
13//! - `nostd` support.
14//!
15//! You can read an in-depth discussion of the API designs [here](https://github.com/AljoschaMeyer/lazy_on_principle/blob/main/main.pdf).
16//!
17//! ## Core Abstractions
18//!
19//! UFOTOFU is built around a small hierarchy of traits that describe how to produce or consume a sequence item by item.
20//!
21//! A [`Producer`] provides the items of a sequence to some client code, similar to the [`futures::Stream`](https://docs.rs/futures/latest/futures/stream/trait.Stream.html) and [`core::iter::Iterator`] traits. Client code can repeatedly request the next item, and receives either another item, an error, or a dedicated *final* item which may be of a different type than the repeated items. An *iterator* of `T`s corresponds to a *producer* of `T`s with final item type `()` and error type [`Infallible`](core::convert::Infallible).
22//!
23//! A [`Consumer`] accepts the items of a sequence from some client code, similar to the [`futures::Sink`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html) trait. Client code can repeatedly add new items to the sequence, until it adds a single *final* item which may be of a different type than the repeated items. A final item type of `()` makes adding the final item equivalent to calling a conventional [`close`](https://docs.rs/futures/latest/futures/sink/trait.Sink.html#tymethod.poll_close) method.
24//!
25//! Producers and consumers are fully dual; the [`pipe`] function writes as much data as possible from a producer into a consumer.
26//!
27//! Consumers often buffer items in an internal queue before performing side-effects on data in larger chunks, such as writing data to the network only once a full packet can be filled. The [`BufferedConsumer`] trait extends the [`Consumer`] trait to allow client code to trigger effectful flushing of internal buffers. Dually, the [`BufferedProducer`] trait extends the [`Producer`] trait to allow client code to trigger effectful prefetching of data into internal buffers.
28//!
29//! Finally, the [`BulkProducer`] and [`BulkConsumer`] traits extend [`BufferedProducer`] and [`BufferedConsumer`] respectively with the ability to operate on whole slices of items at a time, similar to [`std::io::Read`] and [`std::io::Write`]. The [`bulk_pipe`] function leverages this ability to efficiently pipe data — unlike when using the standard library's [Read](std::io::Read) and [Write](std::io::Write) traits, this is possible without allocating an auxiliary buffer.
30//!
31//! ## Async Conventions
32//!
33//! UFOTOFU provides async APIs, and follows some consistent conventions. Most importantly, the futures returned by any UFOTOFU method are not expected to be cancellation safe: while it is allowed to drop a Future returned by an UFOTOFU trait method without polling it to completion, it is forbidden to call any further UFOTOFU trait methods on the same value afterwards. In other words: you must poll the future returned by any trait method to completion before calling the next trait method. This mirrors — as closely as possible — the design of sync APIs, where there is no way to partially execute a method either.
34//!
35//! Further, UFOTOFU never requires a `Send` bound on any futures. In other words, it can only be used with single-threaded async runtimes (or, more precisely, runtimes that confine the execution of any one future to a single thread).
36//!
37//! ## Module Organisation
38//!
39//! This top-level module provides the core UFOTOFU traits, and basic piping functionality:
40//!
41//! - Traits for producing sequences: [`Producer`], [`BufferedProducer`], and [`BulkProducer`].
42//! - Traits for consuming sequences: [`Consumer`], [`BufferedConsumer`], and [`BulkConsumer`].
43//! - Piping data: [`pipe`] and [`bulk_pipe`].
44//!
45//! Further functionality, specific to producers and consumers respectively, is exposed in the [`producer`] and [`consumer`] modules.
46//!
47//! ## Feature Flags
48//!
49//! UFOTOFU gates several features that are only interesting under certain circumstances behind feature flags. These API docs document *all* functionality, though, as if all feature flags were activated.
50//!
51//! All functionality which relies on the Rust standard library is gated behind the `std` feature flag (enabled by default).
52//!
53//! All functionality which performs dynamic memory allocations is gated behind the `alloc` feature flag (disabled by default, implied by the `std` feature).
54//!
55//! All functionality which provides interoperability with other async sequence manipulation crates is gated behind the `compat` feature flag (disabled by default).
56//!
57//! All functionality specifically designed to aid in testing and development is gated behind the `dev` feature flag (disabled by default).
58
59#[cfg(feature = "std")]
60extern crate std;
61
62#[cfg(feature = "alloc")]
63extern crate alloc;
64
65use core::cmp::min;
66use core::future::Future;
67
68use either::Either::{self, *};
69
70// This allows macros to use `ufotofu` instead of `crate`, which might become
71// convenient some day.
72extern crate self as ufotofu;
73
74#[macro_use]
75mod common_macros;
76
77mod errors;
78pub use errors::*;
79
80pub mod consumer;
81pub mod producer;
82
83#[cfg(all(feature = "dev", feature = "alloc"))]
84mod test_yielder;
85
86/// A [`Consumer`] consumes a potentially infinite sequence, one item at a time.
87///
88/// The sequence consists of an arbitrary number of values of type [`Self::Item`], followed by
89/// up to one value of type [`Self::Final`]. If you intend for the sequence to be infinite, use
90/// [`Infallible`](core::convert::Infallible) for [`Self::Final`].
91///
92/// A consumer may signal an error of type [`Self::Error`] instead of consuming any item (whether repeated or final).
93pub trait Consumer {
94 /// The sequence consumed by this consumer starts with *arbitrarily many* values of this type.
95 type Item;
96 /// The sequence consumed by this consumer ends with *up to one* value of this type.
97 type Final;
98 /// The type of errors the consumer can emit instead of doing its job.
99 type Error;
100
101 /// Attempts to consume the next item.
102 ///
103 /// After this function returns an error, no further functions of this trait may be invoked.
104 ///
105 /// #### Invariants
106 ///
107 /// Must not be called after any function of this trait returned an error,
108 /// nor after [`close`](Consumer::close) was called.
109 fn consume(&mut self, item: Self::Item) -> impl Future<Output = Result<(), Self::Error>>;
110
111 /// Attempts to consume the final item.
112 ///
113 /// After this function is called, no further functions of this trait may be invoked.
114 ///
115 /// #### Invariants
116 ///
117 /// Must not be called after any function of this trait has returned an error,
118 /// nor after [`close`](Consumer::close) was called.
119 fn close(&mut self, fin: Self::Final) -> impl Future<Output = Result<(), Self::Error>>;
120
121 /// Tries to consume (clones of) *all* items in the given slice.
122 /// Reports an error if the slice could not be consumed completely.
123 ///
124 /// This is a trait method for convenience, you should never need to
125 /// replace the default implementation.
126 ///
127 /// #### Invariants
128 ///
129 /// Must not be called after any function of this trait has returned an error,
130 /// nor after [`close`](Consumer::close) was called.
131 ///
132 /// #### Implementation Notes
133 ///
134 /// This is a trait method for convenience, you should never need to
135 /// replace the default implementation.
136 fn consume_full_slice(
137 &mut self,
138 buf: &[Self::Item],
139 ) -> impl Future<Output = Result<(), ConsumeAtLeastError<Self::Error>>>
140 where
141 Self::Item: Clone,
142 {
143 async {
144 for i in 0..buf.len() {
145 let item = buf[i].clone();
146
147 if let Err(err) = self.consume(item).await {
148 return Err(ConsumeAtLeastError {
149 count: i,
150 reason: err,
151 });
152 }
153 }
154
155 Ok(())
156 }
157 }
158}
159
160/// A [`Consumer`] that can delay performing side-effects when consuming items.
161///
162/// It must not delay performing side-effects when being closed. In other words,
163/// calling [`close`](Consumer::close) should internally trigger flushing.
164pub trait BufferedConsumer: Consumer {
165 /// Forces the consumer to perform any side-effects that were delayed for previously consumed items.
166 ///
167 /// This function allows client code to force execution of the (potentially expensive)
168 /// side-effects. In exchange, the consumer gains the freedom to delay the side-effects of
169 /// [`consume`](Consumer::consume) to improve efficiency.
170 ///
171 /// After this function returns an error, no further functions of this trait may be invoked.
172 ///
173 /// #### Invariants
174 ///
175 /// Must not be called after any function of this trait has returned an error,
176 /// nor after [`close`](Consumer::close) was called.
177 fn flush(&mut self) -> impl Future<Output = Result<(), Self::Error>>;
178}
179
180/// A [`Consumer`] that is able to consume several items with a single function call, in order to
181/// improve on the efficiency of the [`Consumer`] trait. Semantically, there must be no
182/// difference between consuming items in bulk or one item at a time.
183pub trait BulkConsumer: BufferedConsumer {
184 /// A low-level method for consuming multiple items at a time. If you are only *working* with consumers (rather than *implementing* them), you will probably want to ignore this method and use [BulkConsumer::bulk_consume] instead.
185 ///
186 /// Exposes a non-empty slice of memory for client code to fill with items that should
187 /// be consumed.
188 ///
189 /// The consumer should expose the largest contiguous slice it can expose efficiently.
190 /// It should not perform copies to increase the size of the slice. Client code must be
191 /// able to make progress even if this function always returns slices of size one.
192 ///
193 /// After this function returns an error, no further functions of this trait may be invoked.
194 ///
195 /// #### Invariants
196 ///
197 /// Must not be called after any function of this trait has returned an error,
198 /// nor after [`close`](Consumer::close) was called.
199 fn expose_slots<'a>(
200 &'a mut self,
201 ) -> impl Future<Output = Result<&'a mut [Self::Item], Self::Error>>
202 where
203 Self::Item: 'a;
204
205 /// A low-level method for consuming multiple items at a time. If you are only *working* with consumers (rather than *implementing* them), you will probably want to ignore this method and use [BulkConsumer::bulk_consume] instead.
206 ///
207 /// Instructs the consumer to consume the first `amount` many items of the slots
208 /// it has most recently exposed. The semantics must be equivalent to those of [`consume`](Consumer::consume)
209 /// being called `amount` many times with exactly those items.
210 ///
211 /// After this function returns an error, no further functions of this trait may be invoked.
212 ///
213 /// #### Invariants
214 ///
215 /// Callers must have written into (at least) the `amount` many first slots that
216 /// were most recently exposed.
217 ///
218 /// Must not be called after any function of this trait has returned an error, nor after
219 /// [`close`](Consumer::close) was called.
220 fn consume_slots(&mut self, amount: usize) -> impl Future<Output = Result<(), Self::Error>>;
221
222 /// Consumes a non-zero number of items by reading them from a given buffer and returning how
223 /// many items were consumed.
224 ///
225 /// After this function returns an error, no further functions of this trait may be invoked.
226 ///
227 /// #### Invariants
228 ///
229 /// Must not be called after any function of this trait has returned an error, nor after
230 /// [`close`](Consumer::close) was called.
231 ///
232 /// #### Implementation Notes
233 ///
234 /// The default implementation orchestrates [`expose_slots`](BulkConsumer::expose_slots) and [`consume_slots`](BulkConsumer::consume_slots) in a
235 /// straightforward manner. Only provide your own implementation if you can do better
236 /// than that.
237 fn bulk_consume(
238 &mut self,
239 buf: &[Self::Item],
240 ) -> impl Future<Output = Result<usize, Self::Error>>
241 where
242 Self::Item: Clone,
243 {
244 async {
245 let slots = self.expose_slots().await?;
246 let amount = min(slots.len(), buf.len());
247 slots[0..amount].clone_from_slice(&buf[0..amount]);
248 self.consume_slots(amount).await?;
249
250 Ok(amount)
251 }
252 }
253
254 /// Tries to bulk-consume (copies of) *all* items in the given slice.
255 /// Reports an error if the slice could not be consumed completely.
256 ///
257 /// This is a trait method for convenience, you should never need to
258 /// replace the default implementation.
259 ///
260 /// #### Invariants
261 ///
262 /// Must not be called after any function of this trait has returned an error,
263 /// nor after [`close`](Consumer::close) was called.
264 ///
265 /// #### Implementation Notes
266 ///
267 /// This is a trait method for convenience, you should never need to
268 /// replace the default implementation.
269 fn bulk_consume_full_slice(
270 &mut self,
271 buf: &[Self::Item],
272 ) -> impl Future<Output = Result<(), ConsumeAtLeastError<Self::Error>>>
273 where
274 Self::Item: Clone,
275 {
276 async {
277 let mut consumed_so_far = 0;
278
279 while consumed_so_far < buf.len() {
280 match self.bulk_consume(&buf[consumed_so_far..]).await {
281 Ok(consumed_count) => consumed_so_far += consumed_count,
282 Err(err) => {
283 return Err(ConsumeAtLeastError {
284 count: consumed_so_far,
285 reason: err,
286 });
287 }
288 }
289 }
290
291 Ok(())
292 }
293 }
294}
295
296/// A [`Producer`] produces a potentially infinite sequence, one item at a time.
297///
298/// The sequence consists of an arbitrary number of values of type [`Self::Item`], followed by
299/// up to one value of type [`Self::Final`]. If you intend for the sequence to be infinite, use
300/// [`Infallible`](core::convert::Infallible) for [`Self::Final`].
301///
302/// A producer may signal an error of type [`Self::Error`] instead of producing an item (whether repeated or final).
303pub trait Producer {
304 /// The sequence produced by this producer starts with *arbitrarily many* values of this type.
305 type Item;
306 /// The sequence produced by this producer ends with *up to one* value of this type.
307 type Final;
308 /// The type of errors the producer can emit instead of doing its job.
309 type Error;
310
311 /// Attempts to produce the next item, which is either a regular repeated item or the final item.
312 ///
313 /// After this function returns the final item, or after it returns an error, no further
314 /// functions of this trait may be invoked.
315 ///
316 /// #### Invariants
317 ///
318 /// Must not be called after any function of this trait has returned a final item or an error.
319 fn produce(
320 &mut self,
321 ) -> impl Future<Output = Result<Either<Self::Item, Self::Final>, Self::Error>>;
322
323 /// Tries to produce a regular item, and reports an error if the final item was produced instead.
324 ///
325 /// This is a trait method for convenience, you should never need to
326 /// replace the default implementation.
327 ///
328 /// #### Invariants
329 ///
330 /// Must not be called after any function of this trait has returned an error,
331 /// nor after [`close`](Consumer::close) was called.
332 ///
333 /// #### Implementation Notes
334 ///
335 /// This is a trait method for convenience, you should never need to
336 /// replace the default implementation.
337 fn produce_item(
338 &mut self,
339 ) -> impl Future<Output = Result<Self::Item, ProduceAtLeastError<Self::Final, Self::Error>>>
340 {
341 async {
342 match self.produce().await {
343 Ok(Left(item)) => Ok(item),
344 Ok(Right(fin)) => Err(ProduceAtLeastError {
345 count: 0,
346 reason: Left(fin),
347 }),
348 Err(err) => Err(ProduceAtLeastError {
349 count: 0,
350 reason: Right(err),
351 }),
352 }
353 }
354 }
355
356 /// Tries to completely overwrite a slice with items from a producer.
357 /// Reports an error if the slice could not be overwritten completely.
358 ///
359 /// This is a trait method for convenience, you should never need to
360 /// replace the default implementation.
361 ///
362 /// #### Invariants
363 ///
364 /// Must not be called after any function of this trait has returned an error,
365 /// nor after [`close`](Consumer::close) was called.
366 ///
367 /// #### Implementation Notes
368 ///
369 /// This is a trait method for convenience, you should never need to
370 /// replace the default implementation.
371 fn overwrite_full_slice(
372 &mut self,
373 buf: &mut [Self::Item],
374 ) -> impl Future<Output = Result<(), ProduceAtLeastError<Self::Final, Self::Error>>> {
375 async {
376 for i in 0..buf.len() {
377 match self.produce().await {
378 Ok(Left(item)) => buf[i] = item,
379 Ok(Right(fin)) => {
380 return Err(ProduceAtLeastError {
381 count: i,
382 reason: Left(fin),
383 })
384 }
385 Err(err) => {
386 return Err(ProduceAtLeastError {
387 count: i,
388 reason: Right(err),
389 })
390 }
391 }
392 }
393
394 Ok(())
395 }
396 }
397}
398
399/// A [`Producer`] that can eagerly perform side-effects to prepare values for later yielding.
400pub trait BufferedProducer: Producer {
401 /// Asks the producer to prepare some values for yielding.
402 ///
403 /// This function allows the [`Producer`] to perform side effects that it would otherwise
404 /// have to do just-in-time when [`produce`](Producer::produce) gets called.
405 ///
406 /// After this function returns an error, no further functions of this trait may be invoked.
407 ///
408 /// #### Invariants
409 ///
410 /// Must not be called after any function of this trait has returned a final item or an error.
411 fn slurp(&mut self) -> impl Future<Output = Result<(), Self::Error>>;
412}
413
414/// A [`Producer`] that is able to produce several items with a single function call, in order to
415/// improve on the efficiency of the [`Producer`] trait. Semantically, there must be no difference
416/// between producing items in bulk or one item at a time.
417pub trait BulkProducer: BufferedProducer {
418 /// A low-level method for producing multiple items at a time. If you are only *working* with producers (rather than *implementing* them), you will probably want to ignore this method and use [BulkProducer::bulk_produce] instead.
419 ///
420 /// Exposes a non-empty slice of items to be produced (or the final value, or an error).
421 /// The items in the slice must not have been emitted by [`produce`](Producer::produce) before.
422 ///
423 /// The producer should expose the largest contiguous slice it can expose efficiently.
424 /// It should not perform copies to increase the size of the slice. Client code must be
425 /// able to make progress even if this function always returns slices of size one.
426 ///
427 /// After this function returns the final item, or after it returns an error, no further
428 /// functions of this trait may be invoked.
429 ///
430 /// #### Invariants
431 ///
432 /// Must not be called after any function of this trait has returned a final item or an error.
433 fn expose_items<'a>(
434 &'a mut self,
435 ) -> impl Future<Output = Result<Either<&'a [Self::Item], Self::Final>, Self::Error>>
436 where
437 Self::Item: 'a;
438
439 /// A low-level method for producing multiple items at a time. If you are only *working* with producers (rather than *implementing* them), you will probably want to ignore this method and use [BulkProducer::bulk_produce] instead.
440 ///
441 /// Marks `amount` many items as having been produced. Future calls to [`produce`](Producer::produce) and to
442 /// [`expose_items`](BulkProducer::expose_items) must act as if [`produce`](Producer::produce) had been called `amount` many times.
443 ///
444 /// After this function returns an error, no further functions of this trait may be invoked.
445 ///
446 /// #### Invariants
447 ///
448 /// Callers must not mark items as produced that had not previously been exposed by [`expose_items`](BulkProducer::expose_items).
449 ///
450 /// Must not be called after any function of this trait returned a final item or an error.
451 fn consider_produced(&mut self, amount: usize)
452 -> impl Future<Output = Result<(), Self::Error>>;
453
454 /// Produces a non-zero number of items by writing them into a given buffer and returning how
455 /// many items were produced.
456 ///
457 /// After this function returns the final item, or after it returns an error, no further
458 /// functions of this trait may be invoked.
459 ///
460 /// #### Invariants
461 ///
462 /// Must not be called after any function of this trait has returned a final item or an error.
463 ///
464 /// #### Implementation Notes
465 ///
466 /// The default implementation orchestrates [`expose_items`](BulkProducer::expose_items) and [`consider_produced`](BulkProducer::consider_produced) in a
467 /// straightforward manner. Only provide your own implementation if you can do better
468 /// than that.
469 fn bulk_produce(
470 &mut self,
471 buf: &mut [Self::Item],
472 ) -> impl Future<Output = Result<Either<usize, Self::Final>, Self::Error>>
473 where
474 Self::Item: Clone,
475 {
476 async {
477 match self.expose_items().await? {
478 Either::Left(slots) => {
479 let amount = min(slots.len(), buf.len());
480 buf[0..amount].clone_from_slice(&slots[0..amount]);
481
482 self.consider_produced(amount).await?;
483
484 Ok(Either::Left(amount))
485 }
486 Either::Right(final_value) => Ok(Either::Right(final_value)),
487 }
488 }
489 }
490
491 /// Tries to completely overwrite a slice with items from a bulk producer.
492 /// Reports an error if the slice could not be overwritten completely.
493 ///
494 /// This is a trait method for convenience, you should never need to
495 /// replace the default implementation.
496 ///
497 /// #### Invariants
498 ///
499 /// Must not be called after any function of this trait has returned an error,
500 /// nor after [`close`](Consumer::close) was called.
501 ///
502 /// #### Implementation Notes
503 ///
504 /// This is a trait method for convenience, you should never need to
505 /// replace the default implementation.
506 fn bulk_overwrite_full_slice(
507 &mut self,
508 buf: &mut [Self::Item],
509 ) -> impl Future<Output = Result<(), ProduceAtLeastError<Self::Final, Self::Error>>>
510 where
511 Self::Item: Clone,
512 {
513 async {
514 let mut produced_so_far = 0;
515
516 while produced_so_far < buf.len() {
517 match self.bulk_produce(&mut buf[produced_so_far..]).await {
518 Ok(Left(count)) => produced_so_far += count,
519 Ok(Right(fin)) => {
520 return Err(ProduceAtLeastError {
521 count: produced_so_far,
522 reason: Left(fin),
523 });
524 }
525 Err(err) => {
526 return Err(ProduceAtLeastError {
527 count: produced_so_far,
528 reason: Right(err),
529 });
530 }
531 }
532 }
533
534 Ok(())
535 }
536 }
537}
538
539/// Pipes as many items as possible from a [`Producer`] into a [`Consumer`]. Then calls [`close`](Consumer::close)
540/// on the consumer with the final value emitted by the producer.
541pub async fn pipe<P, C>(
542 producer: &mut P,
543 consumer: &mut C,
544) -> Result<(), PipeError<P::Error, C::Error>>
545where
546 P: Producer,
547 C: Consumer<Item = P::Item, Final = P::Final>,
548{
549 loop {
550 match producer.produce().await {
551 Ok(Either::Left(item)) => {
552 match consumer.consume(item).await {
553 Ok(()) => {
554 // No-op, continues with next loop iteration.
555 }
556 Err(consumer_error) => {
557 return Err(PipeError::Consumer(consumer_error));
558 }
559 }
560 }
561 Ok(Either::Right(final_value)) => match consumer.close(final_value).await {
562 Ok(()) => {
563 return Ok(());
564 }
565 Err(consumer_error) => {
566 return Err(PipeError::Consumer(consumer_error));
567 }
568 },
569 Err(producer_error) => {
570 return Err(PipeError::Producer(producer_error));
571 }
572 }
573 }
574}
575
576/// Efficiently pipes as many items as possible from a [`BulkProducer`] into a [`BulkConsumer`]
577/// using [`consumer.bulk_consume`](BulkConsumer::bulk_consume). Then calls [`close`](Consumer::close) on the consumer with the final value
578/// emitted by the producer.
579pub async fn bulk_pipe<P, C>(
580 producer: &mut P,
581 consumer: &mut C,
582) -> Result<(), PipeError<P::Error, C::Error>>
583where
584 P: BulkProducer,
585 P::Item: Clone,
586 C: BulkConsumer<Item = P::Item, Final = P::Final>,
587{
588 loop {
589 match producer.expose_items().await {
590 Ok(Either::Left(slots)) => {
591 let amount = match consumer.bulk_consume(slots).await {
592 Ok(amount) => amount,
593 Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
594 };
595 match producer.consider_produced(amount).await {
596 Ok(()) => {
597 // No-op, continues with next loop iteration.
598 }
599 Err(producer_error) => return Err(PipeError::Producer(producer_error)),
600 };
601 }
602 Ok(Either::Right(final_value)) => {
603 match consumer.close(final_value).await {
604 Ok(()) => return Ok(()),
605 Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
606 };
607 }
608 Err(producer_error) => {
609 return Err(PipeError::Producer(producer_error));
610 }
611 }
612 }
613}
614
615/// Pipes at most `count` many items from a [`Producer`] into a [`Consumer`],
616/// and reports how many items were piped.
617/// The producer has emitted its final item (which was then used to close the
618/// consumer) if and only if the number of returned items is strictly less
619/// than `count`.
620pub async fn pipe_at_most<P, C>(
621 producer: &mut P,
622 consumer: &mut C,
623 count: usize,
624) -> Result<usize, PipeError<P::Error, C::Error>>
625where
626 P: Producer,
627 C: Consumer<Item = P::Item, Final = P::Final>,
628{
629 let mut piped = 0;
630 while piped < count {
631 match producer.produce().await {
632 Ok(Either::Left(item)) => {
633 match consumer.consume(item).await {
634 Ok(()) => {
635 piped += 1;
636 // Then continues with next loop iteration.
637 }
638 Err(consumer_error) => {
639 return Err(PipeError::Consumer(consumer_error));
640 }
641 }
642 }
643 Ok(Either::Right(final_value)) => match consumer.close(final_value).await {
644 Ok(()) => {
645 return Ok(piped);
646 }
647 Err(consumer_error) => {
648 return Err(PipeError::Consumer(consumer_error));
649 }
650 },
651 Err(producer_error) => {
652 return Err(PipeError::Producer(producer_error));
653 }
654 }
655 }
656
657 Ok(piped)
658}
659
660/// Efficiently pipes at most `count` many items from a [`BulkProducer`] into a [`BulkConsumer`] [`consumer.bulk_consume`](BulkConsumer::bulk_consume),
661/// and reports how many items were piped.
662/// The producer has emitted its final item (which was then used to close the
663/// consumer) if and only if the number of returned items is strictly less
664/// than `count`.
665pub async fn bulk_pipe_at_most<P, C>(
666 producer: &mut P,
667 consumer: &mut C,
668 count: usize,
669) -> Result<usize, PipeError<P::Error, C::Error>>
670where
671 P: BulkProducer,
672 P::Item: Clone,
673 C: BulkConsumer<Item = P::Item, Final = P::Final>,
674{
675 let mut piped = 0;
676 while piped < count {
677 match producer.expose_items().await {
678 Ok(Either::Left(slots)) => {
679 let max_slots = min(count - piped, slots.len());
680 let amount = match consumer.bulk_consume(&slots[..max_slots]).await {
681 Ok(amount) => amount,
682 Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
683 };
684 match producer.consider_produced(amount).await {
685 Ok(()) => {
686 piped += amount;
687 // Then continues with next loop iteration.
688 }
689 Err(producer_error) => return Err(PipeError::Producer(producer_error)),
690 };
691 }
692 Ok(Either::Right(final_value)) => {
693 match consumer.close(final_value).await {
694 Ok(()) => return Ok(piped),
695 Err(consumer_error) => return Err(PipeError::Consumer(consumer_error)),
696 };
697 }
698 Err(producer_error) => {
699 return Err(PipeError::Producer(producer_error));
700 }
701 }
702 }
703
704 Ok(piped)
705}