completion/stream/
mod.rs

1//! Utilities for the [`CompletionStream`] trait.
2
3#[cfg(feature = "alloc")]
4use alloc::boxed::Box;
5use core::cmp;
6#[cfg(feature = "std")]
7use core::iter::FusedIterator;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use completion_core::CompletionFuture;
12#[doc(no_inline)]
13pub use completion_core::CompletionStream;
14use futures_core::Stream;
15
16use super::{Adapter, MustComplete};
17
18mod adapters;
19pub use adapters::*;
20
21mod futures;
22pub use futures::*;
23
24mod unfold;
25pub use unfold::*;
26
27mod from_completion_stream;
28pub use from_completion_stream::FromCompletionStream;
29#[cfg(feature = "std")]
30pub(crate) use from_completion_stream::FromCompletionStreamInner;
31
32/// Extension trait for [`CompletionStream`].
33pub trait CompletionStreamExt: CompletionStream {
34    /// A convenience for calling [`CompletionStream::poll_next`] on [`Unpin`] streams.
35    ///
36    /// # Safety
37    ///
38    /// Identical to [`CompletionStream::poll_next`].
39    unsafe fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
40    where
41        Self: Unpin,
42    {
43        Pin::new(self).poll_next(cx)
44    }
45
46    /// A convenience for calling [`CompletionStream::poll_cancel`] on [`Unpin`] streams.
47    ///
48    /// # Safety
49    ///
50    /// Identical to [`CompletionStream::poll_cancel`].
51    unsafe fn poll_cancel(&mut self, cx: &mut Context<'_>) -> Poll<()>
52    where
53        Self: Unpin,
54    {
55        Pin::new(self).poll_cancel(cx)
56    }
57
58    /// Make sure that the stream will complete. Any requests to cancel the stream through
59    /// [`poll_cancel`](CompletionStream::poll_cancel) will be ignored.
60    fn must_complete(self) -> MustComplete<Self>
61    where
62        Self: Sized,
63    {
64        MustComplete { inner: self }
65    }
66
67    /// Get the next item in the stream.
68    ///
69    /// Be aware that if you cancel the returned future, the stream itself will be cancelled and so
70    /// any further attempts to use it may panic, block forever, or do other unexpected things.
71    ///
72    /// # Examples
73    ///
74    /// ```
75    /// use completion::{CompletionStreamExt, StreamExt};
76    /// use futures_lite::stream;
77    ///
78    /// # completion::future::block_on(completion::completion_async! {
79    /// let mut stream = stream::iter(0..3).into_completion();
80    /// assert_eq!(stream.next().await, Some(0));
81    /// assert_eq!(stream.next().await, Some(1));
82    /// assert_eq!(stream.next().await, Some(2));
83    /// assert_eq!(stream.next().await, None);
84    /// # });
85    /// ```
86    fn next(&mut self) -> Next<'_, Self>
87    where
88        Self: Unpin,
89    {
90        Next::new(self)
91    }
92
93    /// Count the number of items in the stream.
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use completion::{CompletionStreamExt, StreamExt};
99    /// use futures_lite::stream;
100    ///
101    /// # completion::future::block_on(completion::completion_async! {
102    /// let stream_1 = stream::iter(3..7).into_completion();
103    /// let stream_2 = stream::iter(&[8, 2, 4]).into_completion();
104    ///
105    /// assert_eq!(stream_1.count().await, 4);
106    /// assert_eq!(stream_2.count().await, 3);
107    /// # });
108    /// ```
109    fn count(self) -> Count<Self>
110    where
111        Self: Sized,
112    {
113        Count::new(self)
114    }
115
116    /// Get the last element in the stream.
117    ///
118    /// # Examples
119    ///
120    /// ```
121    /// use completion::{CompletionStreamExt, StreamExt};
122    /// use futures_lite::stream;
123    ///
124    /// # completion::future::block_on(completion::completion_async! {
125    /// assert_eq!(stream::iter(3..7).into_completion().last().await, Some(6));
126    /// assert_eq!(stream::empty::<String>().into_completion().last().await, None);
127    /// # });
128    /// ```
129    fn last(self) -> Last<Self>
130    where
131        Self: Sized,
132    {
133        Last::new(self)
134    }
135
136    /// Get the nth element in the stream.
137    ///
138    /// # Examples
139    ///
140    /// ```
141    /// use completion::{CompletionStreamExt, StreamExt};
142    /// use futures_lite::stream;
143    ///
144    /// # completion::future::block_on(completion::completion_async! {
145    /// assert_eq!(stream::iter(3..7).into_completion().nth(2).await, Some(5));
146    /// assert_eq!(stream::iter(3..7).into_completion().nth(10).await, None);
147    /// # });
148    /// ```
149    fn nth(&mut self, n: usize) -> Nth<'_, Self>
150    where
151        Self: Unpin,
152    {
153        Nth::new(self, n)
154    }
155
156    /// Create a stream starting at the same point, but stepping by the given amount each
157    /// iteration.
158    ///
159    /// # Panics
160    ///
161    /// This will panic if `step` is `0`.
162    ///
163    /// # Examples
164    ///
165    /// ```
166    /// use completion::{CompletionStreamExt, StreamExt};
167    /// use futures_lite::stream;
168    ///
169    /// # completion::future::block_on(completion::completion_async! {
170    /// let a = [0, 1, 2, 3, 4, 5];
171    /// let mut stream = stream::iter(&a).into_completion().step_by(2);
172    ///
173    /// assert_eq!(stream.next().await, Some(&0));
174    /// assert_eq!(stream.next().await, Some(&2));
175    /// assert_eq!(stream.next().await, Some(&4));
176    /// assert_eq!(stream.next().await, None);
177    /// # });
178    /// ```
179    fn step_by(self, step: usize) -> StepBy<Self>
180    where
181        Self: Sized,
182    {
183        StepBy::new(self, step)
184    }
185
186    /// Chain this stream with another.
187    ///
188    /// # Examples
189    ///
190    /// ```
191    /// use completion::{CompletionStreamExt, StreamExt};
192    /// use futures_lite::stream;
193    ///
194    /// # completion::future::block_on(completion::completion_async! {
195    /// let mut stream = stream::iter(4..6).into_completion()
196    ///     .chain(stream::iter(6..10).into_completion());
197    ///
198    /// assert_eq!(stream.next().await, Some(4));
199    /// assert_eq!(stream.next().await, Some(5));
200    /// assert_eq!(stream.next().await, Some(6));
201    /// assert_eq!(stream.next().await, Some(7));
202    /// assert_eq!(stream.next().await, Some(8));
203    /// assert_eq!(stream.next().await, Some(9));
204    /// assert_eq!(stream.next().await, None);
205    /// # });
206    /// ```
207    fn chain<U: CompletionStream<Item = Self::Item>>(self, other: U) -> Chain<Self, U>
208    where
209        Self: Sized,
210    {
211        Chain::new(self, other)
212    }
213
214    // TODO: zip
215
216    /// Map this stream's items with a closure.
217    ///
218    /// # Examples
219    ///
220    /// ```
221    /// use completion::{CompletionStreamExt, StreamExt};
222    /// use futures_lite::stream;
223    ///
224    /// # completion::future::block_on(completion::completion_async! {
225    /// let mut stream = stream::iter(0..5).into_completion().map(|x| x * 2 + 4);
226    ///
227    /// assert_eq!(stream.next().await, Some(4));
228    /// assert_eq!(stream.next().await, Some(6));
229    /// assert_eq!(stream.next().await, Some(8));
230    /// assert_eq!(stream.next().await, Some(10));
231    /// assert_eq!(stream.next().await, Some(12));
232    /// assert_eq!(stream.next().await, None);
233    /// # });
234    /// ```
235    fn map<T, F: FnMut(Self::Item) -> T>(self, f: F) -> Map<Self, F>
236    where
237        Self: Sized,
238    {
239        Map::new(self, f)
240    }
241
242    /// Map this stream's items with an asynchronous closure.
243    ///
244    /// # Examples
245    ///
246    /// ```
247    /// use completion::{CompletionStreamExt, StreamExt, completion_async_move};
248    /// use futures_lite::stream;
249    ///
250    /// # completion::future::block_on(completion_async_move! {
251    /// let mut stream = stream::iter(0..5)
252    ///     .into_completion()
253    ///     .then(|x| completion_async_move!(x * 2 + 4));
254    ///
255    /// futures_lite::pin!(stream);
256    ///
257    /// assert_eq!(stream.next().await, Some(4));
258    /// assert_eq!(stream.next().await, Some(6));
259    /// assert_eq!(stream.next().await, Some(8));
260    /// assert_eq!(stream.next().await, Some(10));
261    /// assert_eq!(stream.next().await, Some(12));
262    /// assert_eq!(stream.next().await, None);
263    /// # });
264    /// ```
265    fn then<F: FnMut(Self::Item) -> Fut, Fut: CompletionFuture>(self, f: F) -> Then<Self, F, Fut>
266    where
267        Self: Sized,
268    {
269        Then::new(self, f)
270    }
271
272    /// Call a closure on each item the stream.
273    ///
274    /// # Examples
275    ///
276    /// ```
277    /// use completion::{CompletionStreamExt, StreamExt};
278    /// use futures_lite::stream;
279    ///
280    /// # completion::future::block_on(completion::completion_async! {
281    /// stream::iter(0..8).into_completion().for_each(|num| println!("{}", num)).await;
282    /// # });
283    /// ```
284    fn for_each<F: FnMut(Self::Item)>(self, f: F) -> ForEach<Self, F>
285    where
286        Self: Sized,
287    {
288        ForEach::new(self, f)
289    }
290
291    /// Keep the values in the stream for which the predicate resolves to `true`.
292    ///
293    /// # Examples
294    ///
295    /// ```
296    /// use completion::{CompletionStreamExt, StreamExt};
297    /// use futures_lite::stream;
298    ///
299    /// # completion::future::block_on(completion::completion_async! {
300    /// let mut stream = stream::iter(1..=10).into_completion().filter(|n| n % 3 == 0);
301    ///
302    /// assert_eq!(stream.next().await, Some(3));
303    /// assert_eq!(stream.next().await, Some(6));
304    /// assert_eq!(stream.next().await, Some(9));
305    /// assert_eq!(stream.next().await, None);
306    /// # });
307    /// ```
308    fn filter<F>(self, f: F) -> Filter<Self, F>
309    where
310        F: FnMut(&Self::Item) -> bool,
311        Self: Sized,
312    {
313        Filter::new(self, f)
314    }
315
316    /// Filter and map the items of the stream with a closure.
317    ///
318    /// This will yield any values for which the closure returns [`Some`] and will discard any
319    /// values for which the closure returns [`None`].
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// use completion::{CompletionStreamExt, StreamExt};
325    /// use futures_lite::stream;
326    ///
327    /// # completion::future::block_on(completion::completion_async! {
328    /// let strings = ["5", "!", "2", "NaN", "6", ""];
329    /// let stream = stream::iter(&strings).into_completion();
330    /// let mut stream = stream.filter_map(|s| s.parse::<i32>().ok());
331    ///
332    /// assert_eq!(stream.next().await, Some(5));
333    /// assert_eq!(stream.next().await, Some(2));
334    /// assert_eq!(stream.next().await, Some(6));
335    /// assert_eq!(stream.next().await, None);
336    /// # });
337    /// ```
338    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
339    where
340        F: FnMut(Self::Item) -> Option<T>,
341        Self: Sized,
342    {
343        FilterMap::new(self, f)
344    }
345
346    /// Yield the current iteration count as well as the next value.
347    ///
348    /// The returned stream yields pairs `(i, val)` where `i` is the current index of iteration and
349    /// `val` is the value returned by the stream.
350    ///
351    /// # Overflow Behaviour
352    ///
353    /// The method does no guarding against overflows, so enumerating more than [`usize::MAX`]
354    /// elements either produces the wrong result or panics. If debug assertions are enabled, a
355    /// panic is guaranteed.
356    ///
357    /// # Panics
358    ///
359    /// The returned stream might panic if the to-be-returned index would overflow a [`usize`].
360    ///
361    /// # Examples
362    ///
363    /// ```
364    /// use completion::{CompletionStreamExt, StreamExt};
365    /// use futures_lite::stream;
366    ///
367    /// # completion::future::block_on(completion::completion_async! {
368    /// let string = "Hello";
369    /// let mut stream = stream::iter(string.chars()).into_completion().enumerate();
370    ///
371    /// assert_eq!(stream.next().await, Some((0, 'H')));
372    /// assert_eq!(stream.next().await, Some((1, 'e')));
373    /// assert_eq!(stream.next().await, Some((2, 'l')));
374    /// assert_eq!(stream.next().await, Some((3, 'l')));
375    /// assert_eq!(stream.next().await, Some((4, 'o')));
376    /// assert_eq!(stream.next().await, None);
377    /// # });
378    /// ```
379    fn enumerate(self) -> Enumerate<Self>
380    where
381        Self: Sized,
382    {
383        Enumerate::new(self)
384    }
385
386    /// Create a stream which can use [`peek`](Peekable::peek) to look at the next element of the
387    /// stream without consuming it.
388    ///
389    /// Note that the underlying stream is still advanced when [`peek`](Peekable::peek) is called
390    /// for the first time after [`next`](Self::next); in order to retrieve the next element,
391    /// [`next`](Self::next) is called on the underlying stream, hence any side effects of the
392    /// method with occur.
393    ///
394    /// # Examples
395    ///
396    /// ```
397    /// use completion::{CompletionStreamExt, StreamExt};
398    /// use futures_lite::{stream, pin};
399    ///
400    /// # completion::future::block_on(completion::completion_async! {
401    /// let mut stream = stream::iter("Hello!\n".chars()).into_completion().peekable();
402    ///
403    /// let mut s = String::new();
404    /// while stream.peek_unpin().await != Some(&'\n') {
405    ///     s.push(stream.next().await.unwrap());
406    /// }
407    /// assert_eq!(s, "Hello!");
408    /// assert_eq!(stream.next().await, Some('\n'));
409    /// assert_eq!(stream.next().await, None);
410    /// # });
411    /// ```
412    fn peekable(self) -> Peekable<Self>
413    where
414        Self: Sized,
415    {
416        Peekable::new(self)
417    }
418
419    /// Skip items while the predicate returns `true`.
420    ///
421    /// # Examples
422    ///
423    /// ```
424    /// use completion::{CompletionStreamExt, StreamExt};
425    /// use futures_lite::stream;
426    ///
427    /// # completion::future::block_on(completion::completion_async! {
428    /// let stream = stream::iter("   Hello world!".chars()).into_completion();
429    /// let text: String = stream.skip_while(char::is_ascii_whitespace).collect().await;
430    /// assert_eq!(text, "Hello world!");
431    /// # });
432    /// ```
433    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
434    where
435        P: FnMut(&Self::Item) -> bool,
436        Self: Sized,
437    {
438        SkipWhile::new(self, predicate)
439    }
440
441    /// Take items while the predicate returns `true`.
442    ///
443    /// # Examples
444    ///
445    /// ```
446    /// use completion::{CompletionStreamExt, StreamExt};
447    /// use futures_lite::stream;
448    ///
449    /// # completion::future::block_on(completion::completion_async! {
450    /// let stream = stream::iter("Hello world!\nFoo bar".chars()).into_completion();
451    /// let text: String = stream.take_while(|&c| c != '\n').collect().await;
452    /// assert_eq!(text, "Hello world!");
453    /// # });
454    /// ```
455    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
456    where
457        P: FnMut(&Self::Item) -> bool,
458        Self: Sized,
459    {
460        TakeWhile::new(self, predicate)
461    }
462
463    /// Skip the first `n` items in the stream.
464    ///
465    /// # Examples
466    ///
467    /// ```
468    /// use completion::{CompletionStreamExt, StreamExt};
469    /// use futures_lite::stream;
470    ///
471    /// # completion::future::block_on(completion::completion_async! {
472    /// let stream = stream::iter(0..10).into_completion();
473    /// assert_eq!(stream.skip(5).collect::<Vec<_>>().await, [5, 6, 7, 8, 9]);
474    /// # });
475    /// ```
476    fn skip(self, n: usize) -> Skip<Self>
477    where
478        Self: Sized,
479    {
480        Skip::new(self, n)
481    }
482
483    /// Takes the first `n` items of the stream. All other items will be ignored.
484    ///
485    /// # Examples
486    ///
487    /// ```
488    /// use completion::{CompletionStreamExt, StreamExt};
489    /// use futures_lite::stream;
490    ///
491    /// # completion::future::block_on(completion::completion_async! {
492    /// let stream = stream::repeat(19).into_completion();
493    /// assert_eq!(stream.take(5).collect::<Vec<_>>().await, [19, 19, 19, 19, 19]);
494    /// # });
495    /// ```
496    fn take(self, n: usize) -> Take<Self>
497    where
498        Self: Sized,
499    {
500        Take::new(self, n)
501    }
502
503    // TODO: scan
504
505    /// Map the stream, flattening nested structure.
506    ///
507    /// `.flat_map(f)` is equivalent to `.[`map`](Self::map)`(f).`[`flatten`](Self::flatten)`()`.
508    ///
509    /// # Examples
510    ///
511    /// ```
512    /// use completion::{CompletionStreamExt, StreamExt};
513    /// use futures_lite::stream;
514    ///
515    /// # completion::future::block_on(completion::completion_async! {
516    /// let s: String = stream::iter(&["alpha", "beta", "gamma"])
517    ///     .into_completion()
518    ///     .flat_map(|s| stream::iter(s.chars()).into_completion())
519    ///     .collect()
520    ///     .await;
521    ///
522    /// assert_eq!(s, "alphabetagamma");
523    /// # });
524    /// ```
525    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
526    where
527        Self: Sized,
528        U: CompletionStream,
529        F: FnMut(Self::Item) -> U,
530    {
531        FlatMap::new(self, f)
532    }
533
534    /// Flatten nested structure in the stream.
535    ///
536    /// This converts a stream of streams to a stream.
537    ///
538    /// # Examples
539    ///
540    /// ```
541    /// use completion::{CompletionStreamExt, StreamExt};
542    /// use futures_lite::stream;
543    ///
544    /// # completion::future::block_on(completion::completion_async! {
545    /// let streams = vec![
546    ///     stream::iter(0..5).into_completion(),
547    ///     stream::iter(5..7).into_completion(),
548    /// ];
549    /// let v: Vec<u8> = stream::iter(streams).into_completion().flatten().collect().await;
550    /// assert_eq!(v, &[0, 1, 2, 3, 4, 5, 6]);
551    /// # });
552    /// ```
553    fn flatten(self) -> Flatten<Self>
554    where
555        Self: Sized,
556        Self::Item: CompletionStream,
557    {
558        Flatten::new(self)
559    }
560
561    /// Fuse the stream so that it is guaranteed to continue to yield [`None`] when exhausted.
562    ///
563    /// If the stream is cancelled, it is also guaranteed to continue to yield [`None`].
564    ///
565    /// # Examples
566    ///
567    /// ```
568    /// use completion::{CompletionStreamExt, StreamExt};
569    /// use futures_lite::stream;
570    ///
571    /// # completion::future::block_on(completion::completion_async! {
572    /// let mut stream = stream::once(5).into_completion().fuse();
573    /// assert_eq!(stream.next().await, Some(5));
574    /// assert_eq!(stream.next().await, None);
575    /// assert_eq!(stream.next().await, None);
576    /// assert_eq!(stream.next().await, None);
577    /// # });
578    /// ```
579    fn fuse(self) -> Fuse<Self>
580    where
581        Self: Sized,
582    {
583        Fuse::new(self)
584    }
585
586    /// Do something with each element in the stream, passing the value on.
587    ///
588    /// # Examples
589    ///
590    /// ```
591    /// use completion::{CompletionStreamExt, StreamExt};
592    /// use futures_lite::stream;
593    ///
594    /// # completion::future::block_on(completion::completion_async! {
595    /// let sum = stream::iter(0..16)
596    ///     .into_completion()
597    ///     .inspect(|x| println!("about to filter: {}", x))
598    ///     .filter(|x| x % 2 == 0)
599    ///     .inspect(|x| println!("made it through filter: {}", x))
600    ///     .fold(0_u32, |sum, i| sum + i)
601    ///     .await;
602    ///
603    /// assert_eq!(sum, 56);
604    /// # });
605    /// ```
606    fn inspect<F>(self, f: F) -> Inspect<Self, F>
607    where
608        Self: Sized,
609        F: FnMut(&Self::Item),
610    {
611        Inspect::new(self, f)
612    }
613
614    // TODO: by_ref
615
616    /// Collect all the items in the stream into a collection.
617    ///
618    /// # Examples
619    ///
620    /// ```
621    /// use completion::{CompletionStreamExt, completion_stream};
622    ///
623    /// # completion::future::block_on(completion::completion_async! {
624    /// let stream = completion_stream! {
625    ///     for i in 0..5 {
626    ///         yield i;
627    ///     }
628    /// };
629    ///
630    /// let items: Vec<_> = stream.collect().await;
631    /// assert_eq!(items, [0, 1, 2, 3, 4]);
632    /// # });
633    /// ```
634    ///
635    /// You can also collect into [`Result`]s or [`Option`]s.
636    ///
637    /// ```
638    /// use completion::{CompletionStreamExt, completion_stream};
639    ///
640    /// # completion::future::block_on(completion::completion_async! {
641    /// let success_stream = completion_stream! {
642    ///     for i in 0..5 {
643    ///         yield Some(i);
644    ///     }
645    /// };
646    /// let failure_stream = completion_stream! {
647    ///     for i in 0..5 {
648    ///         yield Some(i);
649    ///     }
650    ///     yield None;
651    /// };
652    ///
653    /// assert_eq!(success_stream.collect::<Option<Vec<_>>>().await, Some(vec![0, 1, 2, 3, 4]));
654    /// assert_eq!(failure_stream.collect::<Option<Vec<_>>>().await, None);
655    /// # });
656    /// ```
657    fn collect<C: FromCompletionStream<Self::Item>>(self) -> Collect<Self, C>
658    where
659        Self: Sized,
660    {
661        Collect::new(self)
662    }
663
664    // TODO: partition
665    // TODO: try_fold
666    // TODO: try_for_each
667
668    /// Accumulate a value over a stream.
669    ///
670    /// `Fold` stores an accumulator that is initially set to `init`. Whenever the stream produces
671    /// a new item, it calls the function `f` with the accumulator and new item, which then returns
672    /// the new value of the accumulator. Once the stream is finished, it returns the accumulator.
673    ///
674    /// # Examples
675    ///
676    /// ```
677    /// use completion::{CompletionStreamExt, completion_stream};
678    ///
679    /// # completion::future::block_on(completion::completion_async! {
680    /// let stream = completion_stream! {
681    ///     yield 1;
682    ///     yield 8;
683    ///     yield 2;
684    /// };
685    /// assert_eq!(stream.fold(0, |acc, x| acc + x).await, 11);
686    /// # });
687    /// ```
688    fn fold<T, F>(self, init: T, f: F) -> Fold<Self, F, T>
689    where
690        F: FnMut(T, Self::Item) -> T,
691        Self: Sized,
692    {
693        Fold::new(self, init, f)
694    }
695
696    /// Check if all the elements in the stream match a predicate.
697    ///
698    /// This is short-circuiting; it will stop once it finds a `false`.
699    ///
700    /// An empty stream returns `true`.
701    ///
702    /// # Examples
703    ///
704    /// ```
705    /// use completion::{CompletionStreamExt, StreamExt};
706    /// use futures_lite::stream;
707    ///
708    /// # completion::future::block_on(completion::completion_async! {
709    /// assert!(stream::iter(0..10).into_completion().all(|x| x < 10).await);
710    ///
711    /// assert!(!stream::iter(0..8).into_completion().all(|x| x < 7).await);
712    ///
713    /// assert!(stream::empty::<()>().into_completion().all(|_| false).await);
714    /// # });
715    /// ```
716    fn all<F: FnMut(Self::Item) -> bool>(&mut self, f: F) -> All<'_, Self, F>
717    where
718        Self: Unpin,
719    {
720        All::new(self, f)
721    }
722
723    /// Check if any of the elements in the stream match a predicate.
724    ///
725    /// This is short-circuiting; it will stop once it finds a `true`.
726    ///
727    /// An empty stream returns `false`.
728    ///
729    /// # Examples
730    ///
731    /// ```
732    /// use completion::{CompletionStreamExt, StreamExt};
733    /// use futures_lite::stream;
734    ///
735    /// # completion::future::block_on(completion::completion_async! {
736    /// assert!(stream::iter(0..10).into_completion().any(|x| x == 9).await);
737    ///
738    /// assert!(!stream::iter(0..8).into_completion().all(|x| x == 9).await);
739    ///
740    /// assert!(!stream::empty::<()>().into_completion().any(|_| true).await);
741    /// # });
742    /// ```
743    fn any<F: FnMut(Self::Item) -> bool>(&mut self, f: F) -> Any<'_, Self, F>
744    where
745        Self: Unpin,
746    {
747        Any::new(self, f)
748    }
749
750    /// Search for an element in the stream that satisfies a predicate.
751    ///
752    /// `find()` is short-circuiting, it will stop processing as soon as the closure returns
753    /// `true`.
754    ///
755    /// # Examples
756    ///
757    /// ```
758    /// use completion::{CompletionStreamExt, StreamExt};
759    /// use futures_lite::stream;
760    ///
761    /// # completion::future::block_on(completion::completion_async! {
762    /// let mut stream = stream::iter(1..10).into_completion();
763    ///
764    /// // Find the first even number.
765    /// assert_eq!(stream.find(|x| x % 2 == 0).await, Some(2));
766    ///
767    /// // Find the next odd number.
768    /// assert_eq!(stream.find(|x| x % 2 == 1).await, Some(3));
769    ///
770    /// // Find short-circuits.
771    /// assert_eq!(stream.next().await, Some(4));
772    /// # });
773    /// ```
774    fn find<P>(&mut self, predicate: P) -> Find<'_, Self, P>
775    where
776        Self: Unpin,
777        P: FnMut(&Self::Item) -> bool,
778    {
779        Find::new(self, predicate)
780    }
781
782    /// Finds the first element in a stream for which a function returns [`Some`].
783    ///
784    /// `stream.find_map(f).await` is equivalent to `stream.filter_map(f).await.next().await`.
785    ///
786    /// # Examples
787    ///
788    /// ```
789    /// use completion::{CompletionStreamExt, StreamExt};
790    /// use futures_lite::stream;
791    ///
792    /// # completion::future::block_on(completion::completion_async! {
793    /// let mut stream = stream::iter(&["lol", "NaN", "2", "5"]).into_completion();
794    ///
795    /// assert_eq!(stream.find_map(|s| s.parse().ok()).await, Some(2));
796    /// assert_eq!(stream.find_map(|s| s.parse().ok()).await, Some(5));
797    /// assert_eq!(stream.next().await, None);
798    /// # });
799    /// ```
800    fn find_map<B, F>(&mut self, f: F) -> FindMap<'_, Self, F>
801    where
802        Self: Unpin,
803        F: FnMut(Self::Item) -> Option<B>,
804    {
805        FindMap::new(self, f)
806    }
807
808    /// Get the index of an element in the stream.
809    ///
810    /// `position()` is short-circuiting, it will stop processing as soon as the closure returns
811    /// `true`.
812    ///
813    /// # Overflow Behaviour
814    ///
815    /// The method does no guarding against overflows, so enumerating more than [`usize::MAX`]
816    /// elements either produces the wrong result or panics. If debug assertions are enabled, a
817    /// panic is guaranteed.
818    ///
819    /// # Panics
820    ///
821    /// The returned stream might panic if the to-be-returned index would overflow a [`usize`].
822    ///
823    /// # Examples
824    ///
825    /// ```
826    /// use completion::{CompletionStreamExt, StreamExt};
827    /// use futures_lite::stream;
828    ///
829    /// # completion::future::block_on(completion::completion_async! {
830    /// let mut stream = stream::iter(1..10).into_completion();
831    ///
832    /// assert_eq!(stream.position(|x| x == 4).await, Some(3));
833    /// assert_eq!(stream.position(|x| x == 11).await, None);
834    /// # });
835    /// ```
836    fn position<P>(&mut self, predicate: P) -> Position<'_, Self, P>
837    where
838        Self: Unpin,
839        P: FnMut(Self::Item) -> bool,
840    {
841        Position::new(self, predicate)
842    }
843
844    /// Find the maximum value in the stream.
845    ///
846    /// If several elements are equally maximum, the last element is returned. If the stream is
847    /// empty, [`None`] is returned.
848    ///
849    /// # Examples
850    ///
851    /// ```
852    /// use completion::{CompletionStreamExt, StreamExt};
853    /// use futures_lite::stream;
854    ///
855    /// # completion::future::block_on(completion::completion_async! {
856    /// assert_eq!(stream::iter(&[1, 5, 2, 7, 4]).into_completion().max().await, Some(&7));
857    /// assert_eq!(stream::iter(<Vec<()>>::new()).into_completion().max().await, None);
858    ///
859    /// let first = 5;
860    /// let second = 5;
861    /// let r = stream::iter(vec![&first, &second]).into_completion().max().await.unwrap();
862    /// // `first` and `second` are equal in value, but `second` is chosen because it is later.
863    /// assert_eq!(r as *const _, &second as *const _);
864    /// # });
865    /// ```
866    fn max(self) -> Max<Self>
867    where
868        Self: Sized,
869        Self::Item: Ord,
870    {
871        Max::new(self)
872    }
873
874    /// Find the maximum value in the stream using the specified comparison function.
875    ///
876    /// If several elements are equally maximum, the last element is returned. If the stream is
877    /// empty, [`None`] is returned.
878    ///
879    /// # Examples
880    ///
881    /// ```
882    /// use completion::{CompletionStreamExt, StreamExt};
883    /// use futures_lite::stream;
884    ///
885    /// # completion::future::block_on(completion::completion_async! {
886    /// let a = [("a", 1), ("b", 7), ("c", 2), ("d", 7), ("e", 4)];
887    /// let max = stream::iter(&a).into_completion().max_by(|(_, x), (_, y)| x.cmp(y)).await;
888    /// assert_eq!(max, Some(&("d", 7)));
889    /// # });
890    /// ```
891    fn max_by<F>(self, compare: F) -> MaxBy<Self, F>
892    where
893        Self: Sized,
894        F: FnMut(&Self::Item, &Self::Item) -> cmp::Ordering,
895    {
896        MaxBy::new(self, compare)
897    }
898
899    /// Find the element that gives the maximum value from the specified function.
900    ///
901    /// If several elements are equally maximum, the last element is returned. If the stream is
902    /// empty, [`None`] is returned.
903    ///
904    /// # Examples
905    ///
906    /// ```
907    /// use completion::{CompletionStreamExt, StreamExt};
908    /// use futures_lite::stream;
909    ///
910    /// # completion::future::block_on(completion::completion_async! {
911    /// let a: &[i32] = &[-3, 2, 10, 5, -10];
912    /// let max = stream::iter(a).into_completion().max_by_key(|x| x.abs()).await;
913    /// assert_eq!(max, Some(&-10));
914    /// # });
915    /// ```
916    fn max_by_key<B, F>(self, f: F) -> MaxByKey<Self, B, F>
917    where
918        Self: Sized,
919        B: Ord,
920        F: FnMut(&Self::Item) -> B,
921    {
922        MaxByKey::new(self, f)
923    }
924
925    /// Find the minimum value in the stream.
926    ///
927    /// If several elements are equally minimum, the first element is returned. If the stream is
928    /// empty, [`None`] is returned.
929    ///
930    /// # Examples
931    ///
932    /// ```
933    /// use completion::{CompletionStreamExt, StreamExt};
934    /// use futures_lite::stream;
935    ///
936    /// # completion::future::block_on(completion::completion_async! {
937    /// assert_eq!(stream::iter(&[5, 7, 1, 3, 2]).into_completion().min().await, Some(&1));
938    /// assert_eq!(stream::iter(<Vec<()>>::new()).into_completion().min().await, None);
939    ///
940    /// let first = 5;
941    /// let second = 5;
942    /// let r = stream::iter(vec![&first, &second]).into_completion().min().await.unwrap();
943    /// // `first` and `second` are equal in value, but `first` is chosen because it is earlier.
944    /// assert_eq!(r as *const _, &first as *const _);
945    /// # });
946    /// ```
947    fn min(self) -> Min<Self>
948    where
949        Self: Sized,
950        Self::Item: Ord,
951    {
952        Min::new(self)
953    }
954
955    /// Find the minimum value in the stream using the specified comparison function.
956    ///
957    /// If several elements are equally minimum, the last element is returned. If the stream is
958    /// empty, [`None`] is returned.
959    ///
960    /// # Examples
961    ///
962    /// ```
963    /// use completion::{CompletionStreamExt, StreamExt};
964    /// use futures_lite::stream;
965    ///
966    /// # completion::future::block_on(completion::completion_async! {
967    /// let a = [("a", 3), ("b", 7), ("c", 1), ("d", 1), ("e", 4)];
968    /// let min = stream::iter(&a).into_completion().min_by(|(_, x), (_, y)| x.cmp(y)).await;
969    /// assert_eq!(min, Some(&("c", 1)));
970    /// # });
971    /// ```
972    fn min_by<F>(self, compare: F) -> MinBy<Self, F>
973    where
974        Self: Sized,
975        F: FnMut(&Self::Item, &Self::Item) -> cmp::Ordering,
976    {
977        MinBy::new(self, compare)
978    }
979
980    /// Find the element that gives the minimum value from the specified function.
981    ///
982    /// If several elements are equally minimum, the last element is returned. If the stream is
983    /// empty, [`None`] is returned.
984    ///
985    /// # Examples
986    ///
987    /// ```
988    /// use completion::{CompletionStreamExt, StreamExt};
989    /// use futures_lite::stream;
990    ///
991    /// # completion::future::block_on(completion::completion_async! {
992    /// let a: &[i32] = &[-10, 9, 23, -2, 8, 2];
993    /// let min = stream::iter(a).into_completion().min_by_key(|x| x.abs()).await;
994    /// assert_eq!(min, Some(&-2));
995    /// # });
996    /// ```
997    fn min_by_key<B, F>(self, f: F) -> MinByKey<Self, B, F>
998    where
999        Self: Sized,
1000        B: Ord,
1001        F: FnMut(&Self::Item) -> B,
1002    {
1003        MinByKey::new(self, f)
1004    }
1005
1006    // TODO: unzip
1007
1008    /// Copy all of the elements in the stream.
1009    ///
1010    /// This is useful when you have a stream over `&T`, but you need a stream over `T`.
1011    ///
1012    /// # Examples
1013    ///
1014    /// ```
1015    /// use completion::{CompletionStreamExt, StreamExt};
1016    /// use futures_lite::stream;
1017    ///
1018    /// # completion::future::block_on(completion::completion_async! {
1019    /// let a = [1, 2, 3];
1020    /// let v: Vec<i32> = stream::iter(&a).into_completion().copied().collect().await;
1021    /// assert_eq!(v, [1, 2, 3]);
1022    /// # });
1023    /// ```
1024    fn copied<'a, T: Copy + 'a>(self) -> Copied<Self>
1025    where
1026        Self: CompletionStream<Item = &'a T> + Sized,
1027    {
1028        Copied::new(self)
1029    }
1030
1031    /// Clone all of the elements in the stream.
1032    ///
1033    /// This is useful when you have a stream over `&T`, but you need a stream over `T`.
1034    ///
1035    /// # Examples
1036    ///
1037    /// ```
1038    /// use completion::{CompletionStreamExt, StreamExt};
1039    /// use futures_lite::stream;
1040    ///
1041    /// # completion::future::block_on(completion::completion_async! {
1042    /// let a = ["1".to_owned(), "2".to_owned(), "3".to_owned()];
1043    /// let v: Vec<String> = stream::iter(&a).into_completion().cloned().collect().await;
1044    /// assert_eq!(v, ["1".to_owned(), "2".to_owned(), "3".to_owned()]);
1045    /// # });
1046    /// ```
1047    fn cloned<'a, T: Clone + 'a>(self) -> Cloned<Self>
1048    where
1049        Self: CompletionStream<Item = &'a T> + Sized,
1050    {
1051        Cloned::new(self)
1052    }
1053
1054    /// Repeat the stream endlessly.
1055    ///
1056    /// Instead of stopping at [`None`], this stream will start again, from the beginning. The
1057    /// returned stream will only return [`None`] when the underlying stream is empty.
1058    ///
1059    /// # Examples
1060    ///
1061    /// ```
1062    /// use completion::{CompletionStreamExt, StreamExt};
1063    /// use futures_lite::stream;
1064    ///
1065    /// # completion::future::block_on(completion::completion_async! {
1066    /// let mut stream = stream::iter(0..3).into_completion().cycle();
1067    ///
1068    /// assert_eq!(stream.next().await, Some(0));
1069    /// assert_eq!(stream.next().await, Some(1));
1070    /// assert_eq!(stream.next().await, Some(2));
1071    /// assert_eq!(stream.next().await, Some(0));
1072    /// assert_eq!(stream.next().await, Some(1));
1073    /// assert_eq!(stream.next().await, Some(2));
1074    /// assert_eq!(stream.next().await, Some(0));
1075    /// // And so on...
1076    /// # });
1077    /// ```
1078    fn cycle(self) -> Cycle<Self>
1079    where
1080        Self: Sized + Clone,
1081    {
1082        Cycle::new(self)
1083    }
1084
1085    // TODO: cmp
1086    // TODO: partial_cmp
1087    // TODO: eq
1088    // TODO: ne
1089    // TODO: lt
1090    // TODO: le
1091    // TODO: gt
1092    // TODO: ge
1093
1094    /// Box the stream, erasing its type.
1095    ///
1096    /// # Examples
1097    ///
1098    /// ```
1099    /// use completion::{CompletionStreamExt, StreamExt};
1100    /// use futures_lite::stream;
1101    ///
1102    /// # let some_condition = true;
1103    /// // These streams are different types, but boxing them makes them the same type.
1104    /// let stream = if some_condition {
1105    ///     stream::iter(2..18).into_completion().boxed()
1106    /// } else {
1107    ///     stream::iter(vec![5, 3, 7, 8, 2]).into_completion().boxed()
1108    /// };
1109    /// ```
1110    #[cfg(feature = "alloc")]
1111    #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1112    fn boxed<'a>(self) -> BoxCompletionStream<'a, Self::Item>
1113    where
1114        Self: Sized + Send + 'a,
1115    {
1116        Box::pin(self)
1117    }
1118
1119    /// Box the stream locally, erasing its type.
1120    ///
1121    /// # Examples
1122    ///
1123    /// ```
1124    /// use completion::{CompletionStreamExt, StreamExt};
1125    /// use futures_lite::stream;
1126    ///
1127    /// # let some_condition = true;
1128    /// // These streams are different types, but boxing them makes them the same type.
1129    /// let stream = if some_condition {
1130    ///     stream::iter(2..18).into_completion().boxed_local()
1131    /// } else {
1132    ///     stream::iter(vec![5, 3, 7, 8, 2]).into_completion().boxed_local()
1133    /// };
1134    /// ```
1135    #[cfg(feature = "alloc")]
1136    #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1137    fn boxed_local<'a>(self) -> LocalBoxCompletionStream<'a, Self::Item>
1138    where
1139        Self: Sized + 'a,
1140    {
1141        Box::pin(self)
1142    }
1143}
1144impl<T: CompletionStream + ?Sized> CompletionStreamExt for T {}
1145
1146/// A type-erased completion future.
1147#[cfg(feature = "alloc")]
1148#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1149pub type BoxCompletionStream<'a, T> = Pin<Box<dyn CompletionStream<Item = T> + Send + 'a>>;
1150
1151/// A type-erased completion future that cannot be send across threads.
1152#[cfg(feature = "alloc")]
1153#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1154pub type LocalBoxCompletionStream<'a, T> = Pin<Box<dyn CompletionStream<Item = T> + 'a>>;
1155
1156/// Extension trait for converting [`Stream`]s to [`CompletionStream`]s.
1157pub trait StreamExt: Stream + Sized {
1158    /// Convert this stream into a [`CompletionStream`].
1159    ///
1160    /// # Examples
1161    ///
1162    /// ```
1163    /// use completion::StreamExt;
1164    /// use futures_lite::stream;
1165    ///
1166    /// let completion_stream = stream::iter(&[1, 1, 2, 3, 5]).into_completion();
1167    /// ```
1168    fn into_completion(self) -> Adapter<Self> {
1169        Adapter(self)
1170    }
1171}
1172impl<T: Stream> StreamExt for T {}
1173
1174/// Convert a stream to a blocking iterator.
1175///
1176/// # Examples
1177///
1178/// ```
1179/// use completion::{completion_stream, stream, StreamExt};
1180///
1181/// let stream = completion_stream! {
1182///     yield '!';
1183///     yield '~';
1184/// };
1185/// futures_lite::pin!(stream);
1186/// assert_eq!(stream::block_on(stream).collect::<Vec<_>>(), ['!', '~']);
1187/// ```
1188#[cfg(feature = "std")]
1189#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
1190pub fn block_on<S: CompletionStream + Unpin>(stream: S) -> BlockOn<S> {
1191    BlockOn { stream }
1192}
1193
1194/// Iterator for [`block_on`].
1195#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
1196#[derive(Debug)]
1197#[cfg(feature = "std")]
1198pub struct BlockOn<S> {
1199    stream: S,
1200}
1201
1202#[cfg(feature = "std")]
1203impl<S: CompletionStream + Unpin> Iterator for BlockOn<S> {
1204    type Item = S::Item;
1205
1206    fn next(&mut self) -> Option<Self::Item> {
1207        crate::future::block_on(self.stream.next())
1208    }
1209    fn size_hint(&self) -> (usize, Option<usize>) {
1210        self.stream.size_hint()
1211    }
1212}
1213#[cfg(feature = "std")]
1214impl<S: CompletionStream + Unpin> FusedIterator for BlockOn<Fuse<S>> {}