completion/stream/mod.rs
1//! Utilities for the [`CompletionStream`] trait.
2
3#[cfg(feature = "alloc")]
4use alloc::boxed::Box;
5use core::cmp;
6#[cfg(feature = "std")]
7use core::iter::FusedIterator;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use completion_core::CompletionFuture;
12#[doc(no_inline)]
13pub use completion_core::CompletionStream;
14use futures_core::Stream;
15
16use super::{Adapter, MustComplete};
17
18mod adapters;
19pub use adapters::*;
20
21mod futures;
22pub use futures::*;
23
24mod unfold;
25pub use unfold::*;
26
27mod from_completion_stream;
28pub use from_completion_stream::FromCompletionStream;
29#[cfg(feature = "std")]
30pub(crate) use from_completion_stream::FromCompletionStreamInner;
31
32/// Extension trait for [`CompletionStream`].
33pub trait CompletionStreamExt: CompletionStream {
34 /// A convenience for calling [`CompletionStream::poll_next`] on [`Unpin`] streams.
35 ///
36 /// # Safety
37 ///
38 /// Identical to [`CompletionStream::poll_next`].
39 unsafe fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
40 where
41 Self: Unpin,
42 {
43 Pin::new(self).poll_next(cx)
44 }
45
46 /// A convenience for calling [`CompletionStream::poll_cancel`] on [`Unpin`] streams.
47 ///
48 /// # Safety
49 ///
50 /// Identical to [`CompletionStream::poll_cancel`].
51 unsafe fn poll_cancel(&mut self, cx: &mut Context<'_>) -> Poll<()>
52 where
53 Self: Unpin,
54 {
55 Pin::new(self).poll_cancel(cx)
56 }
57
58 /// Make sure that the stream will complete. Any requests to cancel the stream through
59 /// [`poll_cancel`](CompletionStream::poll_cancel) will be ignored.
60 fn must_complete(self) -> MustComplete<Self>
61 where
62 Self: Sized,
63 {
64 MustComplete { inner: self }
65 }
66
67 /// Get the next item in the stream.
68 ///
69 /// Be aware that if you cancel the returned future, the stream itself will be cancelled and so
70 /// any further attempts to use it may panic, block forever, or do other unexpected things.
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use completion::{CompletionStreamExt, StreamExt};
76 /// use futures_lite::stream;
77 ///
78 /// # completion::future::block_on(completion::completion_async! {
79 /// let mut stream = stream::iter(0..3).into_completion();
80 /// assert_eq!(stream.next().await, Some(0));
81 /// assert_eq!(stream.next().await, Some(1));
82 /// assert_eq!(stream.next().await, Some(2));
83 /// assert_eq!(stream.next().await, None);
84 /// # });
85 /// ```
86 fn next(&mut self) -> Next<'_, Self>
87 where
88 Self: Unpin,
89 {
90 Next::new(self)
91 }
92
93 /// Count the number of items in the stream.
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use completion::{CompletionStreamExt, StreamExt};
99 /// use futures_lite::stream;
100 ///
101 /// # completion::future::block_on(completion::completion_async! {
102 /// let stream_1 = stream::iter(3..7).into_completion();
103 /// let stream_2 = stream::iter(&[8, 2, 4]).into_completion();
104 ///
105 /// assert_eq!(stream_1.count().await, 4);
106 /// assert_eq!(stream_2.count().await, 3);
107 /// # });
108 /// ```
109 fn count(self) -> Count<Self>
110 where
111 Self: Sized,
112 {
113 Count::new(self)
114 }
115
116 /// Get the last element in the stream.
117 ///
118 /// # Examples
119 ///
120 /// ```
121 /// use completion::{CompletionStreamExt, StreamExt};
122 /// use futures_lite::stream;
123 ///
124 /// # completion::future::block_on(completion::completion_async! {
125 /// assert_eq!(stream::iter(3..7).into_completion().last().await, Some(6));
126 /// assert_eq!(stream::empty::<String>().into_completion().last().await, None);
127 /// # });
128 /// ```
129 fn last(self) -> Last<Self>
130 where
131 Self: Sized,
132 {
133 Last::new(self)
134 }
135
136 /// Get the nth element in the stream.
137 ///
138 /// # Examples
139 ///
140 /// ```
141 /// use completion::{CompletionStreamExt, StreamExt};
142 /// use futures_lite::stream;
143 ///
144 /// # completion::future::block_on(completion::completion_async! {
145 /// assert_eq!(stream::iter(3..7).into_completion().nth(2).await, Some(5));
146 /// assert_eq!(stream::iter(3..7).into_completion().nth(10).await, None);
147 /// # });
148 /// ```
149 fn nth(&mut self, n: usize) -> Nth<'_, Self>
150 where
151 Self: Unpin,
152 {
153 Nth::new(self, n)
154 }
155
156 /// Create a stream starting at the same point, but stepping by the given amount each
157 /// iteration.
158 ///
159 /// # Panics
160 ///
161 /// This will panic if `step` is `0`.
162 ///
163 /// # Examples
164 ///
165 /// ```
166 /// use completion::{CompletionStreamExt, StreamExt};
167 /// use futures_lite::stream;
168 ///
169 /// # completion::future::block_on(completion::completion_async! {
170 /// let a = [0, 1, 2, 3, 4, 5];
171 /// let mut stream = stream::iter(&a).into_completion().step_by(2);
172 ///
173 /// assert_eq!(stream.next().await, Some(&0));
174 /// assert_eq!(stream.next().await, Some(&2));
175 /// assert_eq!(stream.next().await, Some(&4));
176 /// assert_eq!(stream.next().await, None);
177 /// # });
178 /// ```
179 fn step_by(self, step: usize) -> StepBy<Self>
180 where
181 Self: Sized,
182 {
183 StepBy::new(self, step)
184 }
185
186 /// Chain this stream with another.
187 ///
188 /// # Examples
189 ///
190 /// ```
191 /// use completion::{CompletionStreamExt, StreamExt};
192 /// use futures_lite::stream;
193 ///
194 /// # completion::future::block_on(completion::completion_async! {
195 /// let mut stream = stream::iter(4..6).into_completion()
196 /// .chain(stream::iter(6..10).into_completion());
197 ///
198 /// assert_eq!(stream.next().await, Some(4));
199 /// assert_eq!(stream.next().await, Some(5));
200 /// assert_eq!(stream.next().await, Some(6));
201 /// assert_eq!(stream.next().await, Some(7));
202 /// assert_eq!(stream.next().await, Some(8));
203 /// assert_eq!(stream.next().await, Some(9));
204 /// assert_eq!(stream.next().await, None);
205 /// # });
206 /// ```
207 fn chain<U: CompletionStream<Item = Self::Item>>(self, other: U) -> Chain<Self, U>
208 where
209 Self: Sized,
210 {
211 Chain::new(self, other)
212 }
213
214 // TODO: zip
215
216 /// Map this stream's items with a closure.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// use completion::{CompletionStreamExt, StreamExt};
222 /// use futures_lite::stream;
223 ///
224 /// # completion::future::block_on(completion::completion_async! {
225 /// let mut stream = stream::iter(0..5).into_completion().map(|x| x * 2 + 4);
226 ///
227 /// assert_eq!(stream.next().await, Some(4));
228 /// assert_eq!(stream.next().await, Some(6));
229 /// assert_eq!(stream.next().await, Some(8));
230 /// assert_eq!(stream.next().await, Some(10));
231 /// assert_eq!(stream.next().await, Some(12));
232 /// assert_eq!(stream.next().await, None);
233 /// # });
234 /// ```
235 fn map<T, F: FnMut(Self::Item) -> T>(self, f: F) -> Map<Self, F>
236 where
237 Self: Sized,
238 {
239 Map::new(self, f)
240 }
241
242 /// Map this stream's items with an asynchronous closure.
243 ///
244 /// # Examples
245 ///
246 /// ```
247 /// use completion::{CompletionStreamExt, StreamExt, completion_async_move};
248 /// use futures_lite::stream;
249 ///
250 /// # completion::future::block_on(completion_async_move! {
251 /// let mut stream = stream::iter(0..5)
252 /// .into_completion()
253 /// .then(|x| completion_async_move!(x * 2 + 4));
254 ///
255 /// futures_lite::pin!(stream);
256 ///
257 /// assert_eq!(stream.next().await, Some(4));
258 /// assert_eq!(stream.next().await, Some(6));
259 /// assert_eq!(stream.next().await, Some(8));
260 /// assert_eq!(stream.next().await, Some(10));
261 /// assert_eq!(stream.next().await, Some(12));
262 /// assert_eq!(stream.next().await, None);
263 /// # });
264 /// ```
265 fn then<F: FnMut(Self::Item) -> Fut, Fut: CompletionFuture>(self, f: F) -> Then<Self, F, Fut>
266 where
267 Self: Sized,
268 {
269 Then::new(self, f)
270 }
271
272 /// Call a closure on each item the stream.
273 ///
274 /// # Examples
275 ///
276 /// ```
277 /// use completion::{CompletionStreamExt, StreamExt};
278 /// use futures_lite::stream;
279 ///
280 /// # completion::future::block_on(completion::completion_async! {
281 /// stream::iter(0..8).into_completion().for_each(|num| println!("{}", num)).await;
282 /// # });
283 /// ```
284 fn for_each<F: FnMut(Self::Item)>(self, f: F) -> ForEach<Self, F>
285 where
286 Self: Sized,
287 {
288 ForEach::new(self, f)
289 }
290
291 /// Keep the values in the stream for which the predicate resolves to `true`.
292 ///
293 /// # Examples
294 ///
295 /// ```
296 /// use completion::{CompletionStreamExt, StreamExt};
297 /// use futures_lite::stream;
298 ///
299 /// # completion::future::block_on(completion::completion_async! {
300 /// let mut stream = stream::iter(1..=10).into_completion().filter(|n| n % 3 == 0);
301 ///
302 /// assert_eq!(stream.next().await, Some(3));
303 /// assert_eq!(stream.next().await, Some(6));
304 /// assert_eq!(stream.next().await, Some(9));
305 /// assert_eq!(stream.next().await, None);
306 /// # });
307 /// ```
308 fn filter<F>(self, f: F) -> Filter<Self, F>
309 where
310 F: FnMut(&Self::Item) -> bool,
311 Self: Sized,
312 {
313 Filter::new(self, f)
314 }
315
316 /// Filter and map the items of the stream with a closure.
317 ///
318 /// This will yield any values for which the closure returns [`Some`] and will discard any
319 /// values for which the closure returns [`None`].
320 ///
321 /// # Examples
322 ///
323 /// ```
324 /// use completion::{CompletionStreamExt, StreamExt};
325 /// use futures_lite::stream;
326 ///
327 /// # completion::future::block_on(completion::completion_async! {
328 /// let strings = ["5", "!", "2", "NaN", "6", ""];
329 /// let stream = stream::iter(&strings).into_completion();
330 /// let mut stream = stream.filter_map(|s| s.parse::<i32>().ok());
331 ///
332 /// assert_eq!(stream.next().await, Some(5));
333 /// assert_eq!(stream.next().await, Some(2));
334 /// assert_eq!(stream.next().await, Some(6));
335 /// assert_eq!(stream.next().await, None);
336 /// # });
337 /// ```
338 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
339 where
340 F: FnMut(Self::Item) -> Option<T>,
341 Self: Sized,
342 {
343 FilterMap::new(self, f)
344 }
345
346 /// Yield the current iteration count as well as the next value.
347 ///
348 /// The returned stream yields pairs `(i, val)` where `i` is the current index of iteration and
349 /// `val` is the value returned by the stream.
350 ///
351 /// # Overflow Behaviour
352 ///
353 /// The method does no guarding against overflows, so enumerating more than [`usize::MAX`]
354 /// elements either produces the wrong result or panics. If debug assertions are enabled, a
355 /// panic is guaranteed.
356 ///
357 /// # Panics
358 ///
359 /// The returned stream might panic if the to-be-returned index would overflow a [`usize`].
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// use completion::{CompletionStreamExt, StreamExt};
365 /// use futures_lite::stream;
366 ///
367 /// # completion::future::block_on(completion::completion_async! {
368 /// let string = "Hello";
369 /// let mut stream = stream::iter(string.chars()).into_completion().enumerate();
370 ///
371 /// assert_eq!(stream.next().await, Some((0, 'H')));
372 /// assert_eq!(stream.next().await, Some((1, 'e')));
373 /// assert_eq!(stream.next().await, Some((2, 'l')));
374 /// assert_eq!(stream.next().await, Some((3, 'l')));
375 /// assert_eq!(stream.next().await, Some((4, 'o')));
376 /// assert_eq!(stream.next().await, None);
377 /// # });
378 /// ```
379 fn enumerate(self) -> Enumerate<Self>
380 where
381 Self: Sized,
382 {
383 Enumerate::new(self)
384 }
385
386 /// Create a stream which can use [`peek`](Peekable::peek) to look at the next element of the
387 /// stream without consuming it.
388 ///
389 /// Note that the underlying stream is still advanced when [`peek`](Peekable::peek) is called
390 /// for the first time after [`next`](Self::next); in order to retrieve the next element,
391 /// [`next`](Self::next) is called on the underlying stream, hence any side effects of the
392 /// method with occur.
393 ///
394 /// # Examples
395 ///
396 /// ```
397 /// use completion::{CompletionStreamExt, StreamExt};
398 /// use futures_lite::{stream, pin};
399 ///
400 /// # completion::future::block_on(completion::completion_async! {
401 /// let mut stream = stream::iter("Hello!\n".chars()).into_completion().peekable();
402 ///
403 /// let mut s = String::new();
404 /// while stream.peek_unpin().await != Some(&'\n') {
405 /// s.push(stream.next().await.unwrap());
406 /// }
407 /// assert_eq!(s, "Hello!");
408 /// assert_eq!(stream.next().await, Some('\n'));
409 /// assert_eq!(stream.next().await, None);
410 /// # });
411 /// ```
412 fn peekable(self) -> Peekable<Self>
413 where
414 Self: Sized,
415 {
416 Peekable::new(self)
417 }
418
419 /// Skip items while the predicate returns `true`.
420 ///
421 /// # Examples
422 ///
423 /// ```
424 /// use completion::{CompletionStreamExt, StreamExt};
425 /// use futures_lite::stream;
426 ///
427 /// # completion::future::block_on(completion::completion_async! {
428 /// let stream = stream::iter(" Hello world!".chars()).into_completion();
429 /// let text: String = stream.skip_while(char::is_ascii_whitespace).collect().await;
430 /// assert_eq!(text, "Hello world!");
431 /// # });
432 /// ```
433 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
434 where
435 P: FnMut(&Self::Item) -> bool,
436 Self: Sized,
437 {
438 SkipWhile::new(self, predicate)
439 }
440
441 /// Take items while the predicate returns `true`.
442 ///
443 /// # Examples
444 ///
445 /// ```
446 /// use completion::{CompletionStreamExt, StreamExt};
447 /// use futures_lite::stream;
448 ///
449 /// # completion::future::block_on(completion::completion_async! {
450 /// let stream = stream::iter("Hello world!\nFoo bar".chars()).into_completion();
451 /// let text: String = stream.take_while(|&c| c != '\n').collect().await;
452 /// assert_eq!(text, "Hello world!");
453 /// # });
454 /// ```
455 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
456 where
457 P: FnMut(&Self::Item) -> bool,
458 Self: Sized,
459 {
460 TakeWhile::new(self, predicate)
461 }
462
463 /// Skip the first `n` items in the stream.
464 ///
465 /// # Examples
466 ///
467 /// ```
468 /// use completion::{CompletionStreamExt, StreamExt};
469 /// use futures_lite::stream;
470 ///
471 /// # completion::future::block_on(completion::completion_async! {
472 /// let stream = stream::iter(0..10).into_completion();
473 /// assert_eq!(stream.skip(5).collect::<Vec<_>>().await, [5, 6, 7, 8, 9]);
474 /// # });
475 /// ```
476 fn skip(self, n: usize) -> Skip<Self>
477 where
478 Self: Sized,
479 {
480 Skip::new(self, n)
481 }
482
483 /// Takes the first `n` items of the stream. All other items will be ignored.
484 ///
485 /// # Examples
486 ///
487 /// ```
488 /// use completion::{CompletionStreamExt, StreamExt};
489 /// use futures_lite::stream;
490 ///
491 /// # completion::future::block_on(completion::completion_async! {
492 /// let stream = stream::repeat(19).into_completion();
493 /// assert_eq!(stream.take(5).collect::<Vec<_>>().await, [19, 19, 19, 19, 19]);
494 /// # });
495 /// ```
496 fn take(self, n: usize) -> Take<Self>
497 where
498 Self: Sized,
499 {
500 Take::new(self, n)
501 }
502
503 // TODO: scan
504
505 /// Map the stream, flattening nested structure.
506 ///
507 /// `.flat_map(f)` is equivalent to `.[`map`](Self::map)`(f).`[`flatten`](Self::flatten)`()`.
508 ///
509 /// # Examples
510 ///
511 /// ```
512 /// use completion::{CompletionStreamExt, StreamExt};
513 /// use futures_lite::stream;
514 ///
515 /// # completion::future::block_on(completion::completion_async! {
516 /// let s: String = stream::iter(&["alpha", "beta", "gamma"])
517 /// .into_completion()
518 /// .flat_map(|s| stream::iter(s.chars()).into_completion())
519 /// .collect()
520 /// .await;
521 ///
522 /// assert_eq!(s, "alphabetagamma");
523 /// # });
524 /// ```
525 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
526 where
527 Self: Sized,
528 U: CompletionStream,
529 F: FnMut(Self::Item) -> U,
530 {
531 FlatMap::new(self, f)
532 }
533
534 /// Flatten nested structure in the stream.
535 ///
536 /// This converts a stream of streams to a stream.
537 ///
538 /// # Examples
539 ///
540 /// ```
541 /// use completion::{CompletionStreamExt, StreamExt};
542 /// use futures_lite::stream;
543 ///
544 /// # completion::future::block_on(completion::completion_async! {
545 /// let streams = vec![
546 /// stream::iter(0..5).into_completion(),
547 /// stream::iter(5..7).into_completion(),
548 /// ];
549 /// let v: Vec<u8> = stream::iter(streams).into_completion().flatten().collect().await;
550 /// assert_eq!(v, &[0, 1, 2, 3, 4, 5, 6]);
551 /// # });
552 /// ```
553 fn flatten(self) -> Flatten<Self>
554 where
555 Self: Sized,
556 Self::Item: CompletionStream,
557 {
558 Flatten::new(self)
559 }
560
561 /// Fuse the stream so that it is guaranteed to continue to yield [`None`] when exhausted.
562 ///
563 /// If the stream is cancelled, it is also guaranteed to continue to yield [`None`].
564 ///
565 /// # Examples
566 ///
567 /// ```
568 /// use completion::{CompletionStreamExt, StreamExt};
569 /// use futures_lite::stream;
570 ///
571 /// # completion::future::block_on(completion::completion_async! {
572 /// let mut stream = stream::once(5).into_completion().fuse();
573 /// assert_eq!(stream.next().await, Some(5));
574 /// assert_eq!(stream.next().await, None);
575 /// assert_eq!(stream.next().await, None);
576 /// assert_eq!(stream.next().await, None);
577 /// # });
578 /// ```
579 fn fuse(self) -> Fuse<Self>
580 where
581 Self: Sized,
582 {
583 Fuse::new(self)
584 }
585
586 /// Do something with each element in the stream, passing the value on.
587 ///
588 /// # Examples
589 ///
590 /// ```
591 /// use completion::{CompletionStreamExt, StreamExt};
592 /// use futures_lite::stream;
593 ///
594 /// # completion::future::block_on(completion::completion_async! {
595 /// let sum = stream::iter(0..16)
596 /// .into_completion()
597 /// .inspect(|x| println!("about to filter: {}", x))
598 /// .filter(|x| x % 2 == 0)
599 /// .inspect(|x| println!("made it through filter: {}", x))
600 /// .fold(0_u32, |sum, i| sum + i)
601 /// .await;
602 ///
603 /// assert_eq!(sum, 56);
604 /// # });
605 /// ```
606 fn inspect<F>(self, f: F) -> Inspect<Self, F>
607 where
608 Self: Sized,
609 F: FnMut(&Self::Item),
610 {
611 Inspect::new(self, f)
612 }
613
614 // TODO: by_ref
615
616 /// Collect all the items in the stream into a collection.
617 ///
618 /// # Examples
619 ///
620 /// ```
621 /// use completion::{CompletionStreamExt, completion_stream};
622 ///
623 /// # completion::future::block_on(completion::completion_async! {
624 /// let stream = completion_stream! {
625 /// for i in 0..5 {
626 /// yield i;
627 /// }
628 /// };
629 ///
630 /// let items: Vec<_> = stream.collect().await;
631 /// assert_eq!(items, [0, 1, 2, 3, 4]);
632 /// # });
633 /// ```
634 ///
635 /// You can also collect into [`Result`]s or [`Option`]s.
636 ///
637 /// ```
638 /// use completion::{CompletionStreamExt, completion_stream};
639 ///
640 /// # completion::future::block_on(completion::completion_async! {
641 /// let success_stream = completion_stream! {
642 /// for i in 0..5 {
643 /// yield Some(i);
644 /// }
645 /// };
646 /// let failure_stream = completion_stream! {
647 /// for i in 0..5 {
648 /// yield Some(i);
649 /// }
650 /// yield None;
651 /// };
652 ///
653 /// assert_eq!(success_stream.collect::<Option<Vec<_>>>().await, Some(vec![0, 1, 2, 3, 4]));
654 /// assert_eq!(failure_stream.collect::<Option<Vec<_>>>().await, None);
655 /// # });
656 /// ```
657 fn collect<C: FromCompletionStream<Self::Item>>(self) -> Collect<Self, C>
658 where
659 Self: Sized,
660 {
661 Collect::new(self)
662 }
663
664 // TODO: partition
665 // TODO: try_fold
666 // TODO: try_for_each
667
668 /// Accumulate a value over a stream.
669 ///
670 /// `Fold` stores an accumulator that is initially set to `init`. Whenever the stream produces
671 /// a new item, it calls the function `f` with the accumulator and new item, which then returns
672 /// the new value of the accumulator. Once the stream is finished, it returns the accumulator.
673 ///
674 /// # Examples
675 ///
676 /// ```
677 /// use completion::{CompletionStreamExt, completion_stream};
678 ///
679 /// # completion::future::block_on(completion::completion_async! {
680 /// let stream = completion_stream! {
681 /// yield 1;
682 /// yield 8;
683 /// yield 2;
684 /// };
685 /// assert_eq!(stream.fold(0, |acc, x| acc + x).await, 11);
686 /// # });
687 /// ```
688 fn fold<T, F>(self, init: T, f: F) -> Fold<Self, F, T>
689 where
690 F: FnMut(T, Self::Item) -> T,
691 Self: Sized,
692 {
693 Fold::new(self, init, f)
694 }
695
696 /// Check if all the elements in the stream match a predicate.
697 ///
698 /// This is short-circuiting; it will stop once it finds a `false`.
699 ///
700 /// An empty stream returns `true`.
701 ///
702 /// # Examples
703 ///
704 /// ```
705 /// use completion::{CompletionStreamExt, StreamExt};
706 /// use futures_lite::stream;
707 ///
708 /// # completion::future::block_on(completion::completion_async! {
709 /// assert!(stream::iter(0..10).into_completion().all(|x| x < 10).await);
710 ///
711 /// assert!(!stream::iter(0..8).into_completion().all(|x| x < 7).await);
712 ///
713 /// assert!(stream::empty::<()>().into_completion().all(|_| false).await);
714 /// # });
715 /// ```
716 fn all<F: FnMut(Self::Item) -> bool>(&mut self, f: F) -> All<'_, Self, F>
717 where
718 Self: Unpin,
719 {
720 All::new(self, f)
721 }
722
723 /// Check if any of the elements in the stream match a predicate.
724 ///
725 /// This is short-circuiting; it will stop once it finds a `true`.
726 ///
727 /// An empty stream returns `false`.
728 ///
729 /// # Examples
730 ///
731 /// ```
732 /// use completion::{CompletionStreamExt, StreamExt};
733 /// use futures_lite::stream;
734 ///
735 /// # completion::future::block_on(completion::completion_async! {
736 /// assert!(stream::iter(0..10).into_completion().any(|x| x == 9).await);
737 ///
738 /// assert!(!stream::iter(0..8).into_completion().all(|x| x == 9).await);
739 ///
740 /// assert!(!stream::empty::<()>().into_completion().any(|_| true).await);
741 /// # });
742 /// ```
743 fn any<F: FnMut(Self::Item) -> bool>(&mut self, f: F) -> Any<'_, Self, F>
744 where
745 Self: Unpin,
746 {
747 Any::new(self, f)
748 }
749
750 /// Search for an element in the stream that satisfies a predicate.
751 ///
752 /// `find()` is short-circuiting, it will stop processing as soon as the closure returns
753 /// `true`.
754 ///
755 /// # Examples
756 ///
757 /// ```
758 /// use completion::{CompletionStreamExt, StreamExt};
759 /// use futures_lite::stream;
760 ///
761 /// # completion::future::block_on(completion::completion_async! {
762 /// let mut stream = stream::iter(1..10).into_completion();
763 ///
764 /// // Find the first even number.
765 /// assert_eq!(stream.find(|x| x % 2 == 0).await, Some(2));
766 ///
767 /// // Find the next odd number.
768 /// assert_eq!(stream.find(|x| x % 2 == 1).await, Some(3));
769 ///
770 /// // Find short-circuits.
771 /// assert_eq!(stream.next().await, Some(4));
772 /// # });
773 /// ```
774 fn find<P>(&mut self, predicate: P) -> Find<'_, Self, P>
775 where
776 Self: Unpin,
777 P: FnMut(&Self::Item) -> bool,
778 {
779 Find::new(self, predicate)
780 }
781
782 /// Finds the first element in a stream for which a function returns [`Some`].
783 ///
784 /// `stream.find_map(f).await` is equivalent to `stream.filter_map(f).await.next().await`.
785 ///
786 /// # Examples
787 ///
788 /// ```
789 /// use completion::{CompletionStreamExt, StreamExt};
790 /// use futures_lite::stream;
791 ///
792 /// # completion::future::block_on(completion::completion_async! {
793 /// let mut stream = stream::iter(&["lol", "NaN", "2", "5"]).into_completion();
794 ///
795 /// assert_eq!(stream.find_map(|s| s.parse().ok()).await, Some(2));
796 /// assert_eq!(stream.find_map(|s| s.parse().ok()).await, Some(5));
797 /// assert_eq!(stream.next().await, None);
798 /// # });
799 /// ```
800 fn find_map<B, F>(&mut self, f: F) -> FindMap<'_, Self, F>
801 where
802 Self: Unpin,
803 F: FnMut(Self::Item) -> Option<B>,
804 {
805 FindMap::new(self, f)
806 }
807
808 /// Get the index of an element in the stream.
809 ///
810 /// `position()` is short-circuiting, it will stop processing as soon as the closure returns
811 /// `true`.
812 ///
813 /// # Overflow Behaviour
814 ///
815 /// The method does no guarding against overflows, so enumerating more than [`usize::MAX`]
816 /// elements either produces the wrong result or panics. If debug assertions are enabled, a
817 /// panic is guaranteed.
818 ///
819 /// # Panics
820 ///
821 /// The returned stream might panic if the to-be-returned index would overflow a [`usize`].
822 ///
823 /// # Examples
824 ///
825 /// ```
826 /// use completion::{CompletionStreamExt, StreamExt};
827 /// use futures_lite::stream;
828 ///
829 /// # completion::future::block_on(completion::completion_async! {
830 /// let mut stream = stream::iter(1..10).into_completion();
831 ///
832 /// assert_eq!(stream.position(|x| x == 4).await, Some(3));
833 /// assert_eq!(stream.position(|x| x == 11).await, None);
834 /// # });
835 /// ```
836 fn position<P>(&mut self, predicate: P) -> Position<'_, Self, P>
837 where
838 Self: Unpin,
839 P: FnMut(Self::Item) -> bool,
840 {
841 Position::new(self, predicate)
842 }
843
844 /// Find the maximum value in the stream.
845 ///
846 /// If several elements are equally maximum, the last element is returned. If the stream is
847 /// empty, [`None`] is returned.
848 ///
849 /// # Examples
850 ///
851 /// ```
852 /// use completion::{CompletionStreamExt, StreamExt};
853 /// use futures_lite::stream;
854 ///
855 /// # completion::future::block_on(completion::completion_async! {
856 /// assert_eq!(stream::iter(&[1, 5, 2, 7, 4]).into_completion().max().await, Some(&7));
857 /// assert_eq!(stream::iter(<Vec<()>>::new()).into_completion().max().await, None);
858 ///
859 /// let first = 5;
860 /// let second = 5;
861 /// let r = stream::iter(vec![&first, &second]).into_completion().max().await.unwrap();
862 /// // `first` and `second` are equal in value, but `second` is chosen because it is later.
863 /// assert_eq!(r as *const _, &second as *const _);
864 /// # });
865 /// ```
866 fn max(self) -> Max<Self>
867 where
868 Self: Sized,
869 Self::Item: Ord,
870 {
871 Max::new(self)
872 }
873
874 /// Find the maximum value in the stream using the specified comparison function.
875 ///
876 /// If several elements are equally maximum, the last element is returned. If the stream is
877 /// empty, [`None`] is returned.
878 ///
879 /// # Examples
880 ///
881 /// ```
882 /// use completion::{CompletionStreamExt, StreamExt};
883 /// use futures_lite::stream;
884 ///
885 /// # completion::future::block_on(completion::completion_async! {
886 /// let a = [("a", 1), ("b", 7), ("c", 2), ("d", 7), ("e", 4)];
887 /// let max = stream::iter(&a).into_completion().max_by(|(_, x), (_, y)| x.cmp(y)).await;
888 /// assert_eq!(max, Some(&("d", 7)));
889 /// # });
890 /// ```
891 fn max_by<F>(self, compare: F) -> MaxBy<Self, F>
892 where
893 Self: Sized,
894 F: FnMut(&Self::Item, &Self::Item) -> cmp::Ordering,
895 {
896 MaxBy::new(self, compare)
897 }
898
899 /// Find the element that gives the maximum value from the specified function.
900 ///
901 /// If several elements are equally maximum, the last element is returned. If the stream is
902 /// empty, [`None`] is returned.
903 ///
904 /// # Examples
905 ///
906 /// ```
907 /// use completion::{CompletionStreamExt, StreamExt};
908 /// use futures_lite::stream;
909 ///
910 /// # completion::future::block_on(completion::completion_async! {
911 /// let a: &[i32] = &[-3, 2, 10, 5, -10];
912 /// let max = stream::iter(a).into_completion().max_by_key(|x| x.abs()).await;
913 /// assert_eq!(max, Some(&-10));
914 /// # });
915 /// ```
916 fn max_by_key<B, F>(self, f: F) -> MaxByKey<Self, B, F>
917 where
918 Self: Sized,
919 B: Ord,
920 F: FnMut(&Self::Item) -> B,
921 {
922 MaxByKey::new(self, f)
923 }
924
925 /// Find the minimum value in the stream.
926 ///
927 /// If several elements are equally minimum, the first element is returned. If the stream is
928 /// empty, [`None`] is returned.
929 ///
930 /// # Examples
931 ///
932 /// ```
933 /// use completion::{CompletionStreamExt, StreamExt};
934 /// use futures_lite::stream;
935 ///
936 /// # completion::future::block_on(completion::completion_async! {
937 /// assert_eq!(stream::iter(&[5, 7, 1, 3, 2]).into_completion().min().await, Some(&1));
938 /// assert_eq!(stream::iter(<Vec<()>>::new()).into_completion().min().await, None);
939 ///
940 /// let first = 5;
941 /// let second = 5;
942 /// let r = stream::iter(vec![&first, &second]).into_completion().min().await.unwrap();
943 /// // `first` and `second` are equal in value, but `first` is chosen because it is earlier.
944 /// assert_eq!(r as *const _, &first as *const _);
945 /// # });
946 /// ```
947 fn min(self) -> Min<Self>
948 where
949 Self: Sized,
950 Self::Item: Ord,
951 {
952 Min::new(self)
953 }
954
955 /// Find the minimum value in the stream using the specified comparison function.
956 ///
957 /// If several elements are equally minimum, the last element is returned. If the stream is
958 /// empty, [`None`] is returned.
959 ///
960 /// # Examples
961 ///
962 /// ```
963 /// use completion::{CompletionStreamExt, StreamExt};
964 /// use futures_lite::stream;
965 ///
966 /// # completion::future::block_on(completion::completion_async! {
967 /// let a = [("a", 3), ("b", 7), ("c", 1), ("d", 1), ("e", 4)];
968 /// let min = stream::iter(&a).into_completion().min_by(|(_, x), (_, y)| x.cmp(y)).await;
969 /// assert_eq!(min, Some(&("c", 1)));
970 /// # });
971 /// ```
972 fn min_by<F>(self, compare: F) -> MinBy<Self, F>
973 where
974 Self: Sized,
975 F: FnMut(&Self::Item, &Self::Item) -> cmp::Ordering,
976 {
977 MinBy::new(self, compare)
978 }
979
980 /// Find the element that gives the minimum value from the specified function.
981 ///
982 /// If several elements are equally minimum, the last element is returned. If the stream is
983 /// empty, [`None`] is returned.
984 ///
985 /// # Examples
986 ///
987 /// ```
988 /// use completion::{CompletionStreamExt, StreamExt};
989 /// use futures_lite::stream;
990 ///
991 /// # completion::future::block_on(completion::completion_async! {
992 /// let a: &[i32] = &[-10, 9, 23, -2, 8, 2];
993 /// let min = stream::iter(a).into_completion().min_by_key(|x| x.abs()).await;
994 /// assert_eq!(min, Some(&-2));
995 /// # });
996 /// ```
997 fn min_by_key<B, F>(self, f: F) -> MinByKey<Self, B, F>
998 where
999 Self: Sized,
1000 B: Ord,
1001 F: FnMut(&Self::Item) -> B,
1002 {
1003 MinByKey::new(self, f)
1004 }
1005
1006 // TODO: unzip
1007
1008 /// Copy all of the elements in the stream.
1009 ///
1010 /// This is useful when you have a stream over `&T`, but you need a stream over `T`.
1011 ///
1012 /// # Examples
1013 ///
1014 /// ```
1015 /// use completion::{CompletionStreamExt, StreamExt};
1016 /// use futures_lite::stream;
1017 ///
1018 /// # completion::future::block_on(completion::completion_async! {
1019 /// let a = [1, 2, 3];
1020 /// let v: Vec<i32> = stream::iter(&a).into_completion().copied().collect().await;
1021 /// assert_eq!(v, [1, 2, 3]);
1022 /// # });
1023 /// ```
1024 fn copied<'a, T: Copy + 'a>(self) -> Copied<Self>
1025 where
1026 Self: CompletionStream<Item = &'a T> + Sized,
1027 {
1028 Copied::new(self)
1029 }
1030
1031 /// Clone all of the elements in the stream.
1032 ///
1033 /// This is useful when you have a stream over `&T`, but you need a stream over `T`.
1034 ///
1035 /// # Examples
1036 ///
1037 /// ```
1038 /// use completion::{CompletionStreamExt, StreamExt};
1039 /// use futures_lite::stream;
1040 ///
1041 /// # completion::future::block_on(completion::completion_async! {
1042 /// let a = ["1".to_owned(), "2".to_owned(), "3".to_owned()];
1043 /// let v: Vec<String> = stream::iter(&a).into_completion().cloned().collect().await;
1044 /// assert_eq!(v, ["1".to_owned(), "2".to_owned(), "3".to_owned()]);
1045 /// # });
1046 /// ```
1047 fn cloned<'a, T: Clone + 'a>(self) -> Cloned<Self>
1048 where
1049 Self: CompletionStream<Item = &'a T> + Sized,
1050 {
1051 Cloned::new(self)
1052 }
1053
1054 /// Repeat the stream endlessly.
1055 ///
1056 /// Instead of stopping at [`None`], this stream will start again, from the beginning. The
1057 /// returned stream will only return [`None`] when the underlying stream is empty.
1058 ///
1059 /// # Examples
1060 ///
1061 /// ```
1062 /// use completion::{CompletionStreamExt, StreamExt};
1063 /// use futures_lite::stream;
1064 ///
1065 /// # completion::future::block_on(completion::completion_async! {
1066 /// let mut stream = stream::iter(0..3).into_completion().cycle();
1067 ///
1068 /// assert_eq!(stream.next().await, Some(0));
1069 /// assert_eq!(stream.next().await, Some(1));
1070 /// assert_eq!(stream.next().await, Some(2));
1071 /// assert_eq!(stream.next().await, Some(0));
1072 /// assert_eq!(stream.next().await, Some(1));
1073 /// assert_eq!(stream.next().await, Some(2));
1074 /// assert_eq!(stream.next().await, Some(0));
1075 /// // And so on...
1076 /// # });
1077 /// ```
1078 fn cycle(self) -> Cycle<Self>
1079 where
1080 Self: Sized + Clone,
1081 {
1082 Cycle::new(self)
1083 }
1084
1085 // TODO: cmp
1086 // TODO: partial_cmp
1087 // TODO: eq
1088 // TODO: ne
1089 // TODO: lt
1090 // TODO: le
1091 // TODO: gt
1092 // TODO: ge
1093
1094 /// Box the stream, erasing its type.
1095 ///
1096 /// # Examples
1097 ///
1098 /// ```
1099 /// use completion::{CompletionStreamExt, StreamExt};
1100 /// use futures_lite::stream;
1101 ///
1102 /// # let some_condition = true;
1103 /// // These streams are different types, but boxing them makes them the same type.
1104 /// let stream = if some_condition {
1105 /// stream::iter(2..18).into_completion().boxed()
1106 /// } else {
1107 /// stream::iter(vec![5, 3, 7, 8, 2]).into_completion().boxed()
1108 /// };
1109 /// ```
1110 #[cfg(feature = "alloc")]
1111 #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1112 fn boxed<'a>(self) -> BoxCompletionStream<'a, Self::Item>
1113 where
1114 Self: Sized + Send + 'a,
1115 {
1116 Box::pin(self)
1117 }
1118
1119 /// Box the stream locally, erasing its type.
1120 ///
1121 /// # Examples
1122 ///
1123 /// ```
1124 /// use completion::{CompletionStreamExt, StreamExt};
1125 /// use futures_lite::stream;
1126 ///
1127 /// # let some_condition = true;
1128 /// // These streams are different types, but boxing them makes them the same type.
1129 /// let stream = if some_condition {
1130 /// stream::iter(2..18).into_completion().boxed_local()
1131 /// } else {
1132 /// stream::iter(vec![5, 3, 7, 8, 2]).into_completion().boxed_local()
1133 /// };
1134 /// ```
1135 #[cfg(feature = "alloc")]
1136 #[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1137 fn boxed_local<'a>(self) -> LocalBoxCompletionStream<'a, Self::Item>
1138 where
1139 Self: Sized + 'a,
1140 {
1141 Box::pin(self)
1142 }
1143}
1144impl<T: CompletionStream + ?Sized> CompletionStreamExt for T {}
1145
1146/// A type-erased completion future.
1147#[cfg(feature = "alloc")]
1148#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1149pub type BoxCompletionStream<'a, T> = Pin<Box<dyn CompletionStream<Item = T> + Send + 'a>>;
1150
1151/// A type-erased completion future that cannot be send across threads.
1152#[cfg(feature = "alloc")]
1153#[cfg_attr(doc_cfg, doc(cfg(feature = "alloc")))]
1154pub type LocalBoxCompletionStream<'a, T> = Pin<Box<dyn CompletionStream<Item = T> + 'a>>;
1155
1156/// Extension trait for converting [`Stream`]s to [`CompletionStream`]s.
1157pub trait StreamExt: Stream + Sized {
1158 /// Convert this stream into a [`CompletionStream`].
1159 ///
1160 /// # Examples
1161 ///
1162 /// ```
1163 /// use completion::StreamExt;
1164 /// use futures_lite::stream;
1165 ///
1166 /// let completion_stream = stream::iter(&[1, 1, 2, 3, 5]).into_completion();
1167 /// ```
1168 fn into_completion(self) -> Adapter<Self> {
1169 Adapter(self)
1170 }
1171}
1172impl<T: Stream> StreamExt for T {}
1173
1174/// Convert a stream to a blocking iterator.
1175///
1176/// # Examples
1177///
1178/// ```
1179/// use completion::{completion_stream, stream, StreamExt};
1180///
1181/// let stream = completion_stream! {
1182/// yield '!';
1183/// yield '~';
1184/// };
1185/// futures_lite::pin!(stream);
1186/// assert_eq!(stream::block_on(stream).collect::<Vec<_>>(), ['!', '~']);
1187/// ```
1188#[cfg(feature = "std")]
1189#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
1190pub fn block_on<S: CompletionStream + Unpin>(stream: S) -> BlockOn<S> {
1191 BlockOn { stream }
1192}
1193
1194/// Iterator for [`block_on`].
1195#[cfg_attr(doc_cfg, doc(cfg(feature = "std")))]
1196#[derive(Debug)]
1197#[cfg(feature = "std")]
1198pub struct BlockOn<S> {
1199 stream: S,
1200}
1201
1202#[cfg(feature = "std")]
1203impl<S: CompletionStream + Unpin> Iterator for BlockOn<S> {
1204 type Item = S::Item;
1205
1206 fn next(&mut self) -> Option<Self::Item> {
1207 crate::future::block_on(self.stream.next())
1208 }
1209 fn size_hint(&self) -> (usize, Option<usize>) {
1210 self.stream.size_hint()
1211 }
1212}
1213#[cfg(feature = "std")]
1214impl<S: CompletionStream + Unpin> FusedIterator for BlockOn<Fuse<S>> {}