asynk_strim/
lib.rs

1#![no_std]
2#![doc = include_str!("../README.md")]
3#![forbid(rust_2018_idioms)]
4#![deny(missing_docs, unsafe_code)]
5#![warn(clippy::all, clippy::pedantic)]
6#![allow(forbidden_lint_groups)]
7
8use core::{future::Future, pin::pin, task};
9use futures_core::Stream;
10
11mod stream;
12mod try_yielder;
13mod waker;
14mod yielder;
15
16pub use self::try_yielder::TryYielder;
17pub use self::yielder::Yielder;
18
19/// Unwrap the waker
20///
21/// This is an escape hatch if a library depends on a similar hack we do,
22/// where we wrap the waker in a struct to store additional data.
23///
24/// An example is the [`embassy`](https://embassy.dev/) crate.
25///
26/// # Panics
27///
28/// The future will panic if the waker is not found.
29/// This happens if you use this function outside of the context of a stream generator.
30#[inline]
31pub async fn unwrap_waker<Fut>(future: Fut) -> Fut::Output
32where
33    Fut: Future,
34{
35    let mut future = pin!(future);
36    core::future::poll_fn(|cx| {
37        let unwrapped = crate::waker::unwrap_inner(cx.waker()).expect("waker not found");
38        let mut cx = task::Context::from_waker(unwrapped);
39        future.as_mut().poll(&mut cx)
40    })
41    .await
42}
43
44/// Create a new stream
45///
46/// # Example
47///
48/// Let's yield some lyrics (Song: "Verdächtig" by Systemabsturz):
49///
50/// ```
51/// # use futures_lite::StreamExt;
52/// # use std::pin::pin;
53/// # futures_lite::future::block_on(async {
54/// let stream = asynk_strim::stream_fn(|mut yielder| async move {
55///    yielder.yield_item("Fahr den Imsi-Catcher hoch").await;
56///    yielder.yield_item("Mach das Richtmikro an").await;
57///    yielder.yield_item("Bring Alexa auf den Markt").await;
58///    yielder.yield_item("Zapf den Netzknoten an").await;
59///    yielder.yield_item("Fahr den Ü-Wagen vor").await;
60///    yielder.yield_item("Kauf den Staatstrojaner ein").await;
61///    yielder.yield_item("Fake die Exit-Nodes bei Tor").await;
62///    yielder.yield_item("Ihr wollt doch alle sicher sein").await;
63/// });
64///
65/// let mut stream = pin!(stream);
66/// while let Some(item) = stream.next().await {
67///    println!("{item}");
68/// }
69/// # });
70#[inline]
71pub fn stream_fn<F, Item, Fut>(func: F) -> impl Stream<Item = Item>
72where
73    F: FnOnce(Yielder<Item>) -> Fut,
74    Fut: Future<Output = ()>,
75{
76    crate::stream::init(func)
77}
78
79/// Jokey alias for [`stream_fn`]
80///
81/// For more elaborate documentation, see [`stream_fn`]
82#[inline]
83pub fn strim_fn<F, Item, Fut>(func: F) -> impl Stream<Item = Item>
84where
85    F: FnOnce(Yielder<Item>) -> Fut,
86    Fut: Future<Output = ()>,
87{
88    stream_fn(func)
89}
90
91/// Create a new try stream
92///
93/// # Example
94///
95/// Let's yield some lyrics (Song: "Archbombe" by Systemabsturz):
96///
97/// ```
98/// # use futures_lite::StreamExt;
99/// # use std::pin::pin;
100/// # use std::convert::Infallible;
101/// # futures_lite::future::block_on(async {
102/// let stream = asynk_strim::try_stream_fn(|mut yielder| async move {
103///   yielder.yield_ok("Meine Programme habe ich mal ausgecheckt").await;
104///   yielder.yield_ok("Dass ich mit Zündern reden kann finde ich suspekt").await;
105///   yielder.yield_ok("Meine Codezeilen haben anfangs Hippies geschrieben").await;
106///   yielder.yield_error("Von ihrem Pazifismus ist nicht viel geblieben").await;
107///   yielder.yield_ok("Ich bin echt nicht glücklich und nicht einverstanden").await;
108///
109///   Err("Ich als Bombensteuerung soll auf Menschen landen")
110/// });
111///
112/// let mut stream = pin!(stream);
113/// while let Some(item) = stream.next().await {
114///   println!("{item:?}");
115/// }
116/// # });
117/// ```
118#[inline]
119pub fn try_stream_fn<F, Ok, Error, Fut>(func: F) -> impl Stream<Item = Result<Ok, Error>>
120where
121    F: FnOnce(TryYielder<Ok, Error>) -> Fut,
122    Fut: Future<Output = Result<(), Error>>,
123{
124    crate::stream::init(|mut yielder: TryYielder<_, _>| async move {
125        if let Err(err) = func(yielder.internal_clone()).await {
126            yielder.yield_error(err).await;
127        }
128    })
129}
130
131/// Jokey alias for [`try_stream_fn`]
132///
133/// For more elaborate documentation, see [`try_stream_fn`]
134#[inline]
135pub fn try_strim_fn<F, Ok, Error, Fut>(func: F) -> impl Stream<Item = Result<Ok, Error>>
136where
137    F: FnOnce(TryYielder<Ok, Error>) -> Fut,
138    Fut: Future<Output = Result<(), Error>>,
139{
140    try_stream_fn(func)
141}