stream_future_impl/
lib.rs

1use proc_macro::TokenStream;
2use proc_macro2::Ident;
3use quote::{quote, ToTokens};
4use syn::{
5    parse::{Parse, Parser},
6    parse_macro_input,
7    visit_mut::{visit_expr_mut, visit_stmt_mut, VisitMut},
8    Block, Expr, ItemFn, Lifetime, ReturnType, Type,
9};
10
11/// See the crate document of [`stream-future`].
12fn stream_impl(attr: TokenStream, input: TokenStream, gen_ty: &str, is_try: bool) -> TokenStream {
13    let gen_ty = Ident::parse.parse_str(gen_ty).unwrap();
14    let mut p_type = Type::parse.parse2(quote!(())).unwrap();
15    let mut lifetime = Lifetime::parse.parse2(quote!('static)).unwrap();
16    if !attr.is_empty() {
17        let args_parser = syn::meta::parser(|meta| {
18            if meta.path.is_ident("lifetime") {
19                lifetime = meta.value()?.parse()?;
20            } else {
21                p_type = Type::parse.parse2(meta.path.into_token_stream())?;
22            }
23            Ok(())
24        });
25        parse_macro_input!(attr with args_parser);
26    }
27    let mut func = parse_macro_input!(input as ItemFn);
28    func.sig.asyncness = None;
29    let future_return_type = match func.sig.output {
30        ReturnType::Default => Box::new(Type::parse.parse2(quote!(())).unwrap()),
31        ReturnType::Type(_, t) => t,
32    };
33    func.sig.output = ReturnType::parse
34        .parse2(if !is_try {
35            quote! {
36                -> impl ::core::future::Future<Output = #future_return_type> + ::stream_future::Stream<Item = #p_type> + #lifetime
37            }
38        } else {
39            quote! {
40                -> impl ::core::future::Future<Output = #future_return_type> + ::stream_future::Stream<Item = ::stream_future::TryStreamItemType<#future_return_type, #p_type>> + #lifetime
41            }
42        })
43        .unwrap();
44    let mut old_block = func.block;
45    for stmt in old_block.stmts.iter_mut() {
46        visit_stmt_mut(&mut AwaitYieldVisitor, stmt);
47    }
48    func.block = Box::new(
49        Block::parse
50            .parse2(quote! {{
51                ::stream_future::#gen_ty::<#p_type, _>::new(#[coroutine] static move |#[allow(unused_mut)] mut __cx: ::stream_future::ResumeTy| {
52                    #old_block
53                })
54            }})
55            .unwrap(),
56    );
57
58    func.to_token_stream().into()
59}
60
61struct AwaitYieldVisitor;
62
63impl VisitMut for AwaitYieldVisitor {
64    fn visit_expr_mut(&mut self, i: &mut Expr) {
65        match i {
66            Expr::Await(expr_await) => {
67                let attrs = &expr_await.attrs;
68                let mut inner_expr = expr_await.base.clone();
69                self.visit_expr_mut(&mut inner_expr);
70                *i = Expr::parse
71                    .parse2(quote! {
72                        #(#attrs)*
73                        {
74                            let mut __future = #inner_expr;
75                            loop {
76                                #[allow(unsafe_code)]
77                                let mut __future = unsafe { ::core::pin::Pin::new_unchecked(&mut __future) };
78                                match __cx.poll_future(__future) {
79                                    ::core::task::Poll::Ready(__ret) => {
80                                        break __ret;
81                                    }
82                                    ::core::task::Poll::Pending => {
83                                        yield ::core::task::Poll::Pending;
84                                    }
85                                }
86                            }
87                        }
88                    })
89                    .unwrap();
90            }
91            Expr::Yield(expr_yield) => {
92                let mut inner_expr = expr_yield
93                    .expr
94                    .take()
95                    .unwrap_or_else(|| Box::new(Expr::parse.parse2(quote!(())).unwrap()));
96                self.visit_expr_mut(&mut inner_expr);
97                expr_yield.expr = Some(Box::new(
98                    Expr::parse
99                        .parse2(quote!(::core::task::Poll::Ready(
100                            #[allow(unused_parens)]
101                            #inner_expr
102                        )))
103                        .unwrap(),
104                ));
105                *i = Expr::parse
106                    .parse2(quote! {
107                        __cx = #expr_yield
108                    })
109                    .unwrap()
110            }
111            _ => visit_expr_mut(self, i),
112        }
113    }
114
115    fn visit_expr_async_mut(&mut self, _i: &mut syn::ExprAsync) {}
116}
117
118#[proc_macro_attribute]
119pub fn stream(attr: TokenStream, input: TokenStream) -> TokenStream {
120    stream_impl(attr, input, "GenStreamFuture", false)
121}
122
123#[proc_macro_attribute]
124pub fn try_stream(attr: TokenStream, input: TokenStream) -> TokenStream {
125    stream_impl(attr, input, "GenTryStreamFuture", true)
126}