stream_future_impl/
lib.rs1use proc_macro::TokenStream;
2use proc_macro2::Ident;
3use quote::{quote, ToTokens};
4use syn::{
5 parse::{Parse, Parser},
6 parse_macro_input,
7 visit_mut::{visit_expr_mut, visit_stmt_mut, VisitMut},
8 Block, Expr, ItemFn, Lifetime, ReturnType, Type,
9};
10
11fn stream_impl(attr: TokenStream, input: TokenStream, gen_ty: &str, is_try: bool) -> TokenStream {
13 let gen_ty = Ident::parse.parse_str(gen_ty).unwrap();
14 let mut p_type = Type::parse.parse2(quote!(())).unwrap();
15 let mut lifetime = Lifetime::parse.parse2(quote!('static)).unwrap();
16 if !attr.is_empty() {
17 let args_parser = syn::meta::parser(|meta| {
18 if meta.path.is_ident("lifetime") {
19 lifetime = meta.value()?.parse()?;
20 } else {
21 p_type = Type::parse.parse2(meta.path.into_token_stream())?;
22 }
23 Ok(())
24 });
25 parse_macro_input!(attr with args_parser);
26 }
27 let mut func = parse_macro_input!(input as ItemFn);
28 func.sig.asyncness = None;
29 let future_return_type = match func.sig.output {
30 ReturnType::Default => Box::new(Type::parse.parse2(quote!(())).unwrap()),
31 ReturnType::Type(_, t) => t,
32 };
33 func.sig.output = ReturnType::parse
34 .parse2(if !is_try {
35 quote! {
36 -> impl ::core::future::Future<Output = #future_return_type> + ::stream_future::Stream<Item = #p_type> + #lifetime
37 }
38 } else {
39 quote! {
40 -> impl ::core::future::Future<Output = #future_return_type> + ::stream_future::Stream<Item = ::stream_future::TryStreamItemType<#future_return_type, #p_type>> + #lifetime
41 }
42 })
43 .unwrap();
44 let mut old_block = func.block;
45 for stmt in old_block.stmts.iter_mut() {
46 visit_stmt_mut(&mut AwaitYieldVisitor, stmt);
47 }
48 func.block = Box::new(
49 Block::parse
50 .parse2(quote! {{
51 ::stream_future::#gen_ty::<#p_type, _>::new(#[coroutine] static move |#[allow(unused_mut)] mut __cx: ::stream_future::ResumeTy| {
52 #old_block
53 })
54 }})
55 .unwrap(),
56 );
57
58 func.to_token_stream().into()
59}
60
61struct AwaitYieldVisitor;
62
63impl VisitMut for AwaitYieldVisitor {
64 fn visit_expr_mut(&mut self, i: &mut Expr) {
65 match i {
66 Expr::Await(expr_await) => {
67 let attrs = &expr_await.attrs;
68 let mut inner_expr = expr_await.base.clone();
69 self.visit_expr_mut(&mut inner_expr);
70 *i = Expr::parse
71 .parse2(quote! {
72 #(#attrs)*
73 {
74 let mut __future = #inner_expr;
75 loop {
76 #[allow(unsafe_code)]
77 let mut __future = unsafe { ::core::pin::Pin::new_unchecked(&mut __future) };
78 match __cx.poll_future(__future) {
79 ::core::task::Poll::Ready(__ret) => {
80 break __ret;
81 }
82 ::core::task::Poll::Pending => {
83 yield ::core::task::Poll::Pending;
84 }
85 }
86 }
87 }
88 })
89 .unwrap();
90 }
91 Expr::Yield(expr_yield) => {
92 let mut inner_expr = expr_yield
93 .expr
94 .take()
95 .unwrap_or_else(|| Box::new(Expr::parse.parse2(quote!(())).unwrap()));
96 self.visit_expr_mut(&mut inner_expr);
97 expr_yield.expr = Some(Box::new(
98 Expr::parse
99 .parse2(quote!(::core::task::Poll::Ready(
100 #[allow(unused_parens)]
101 #inner_expr
102 )))
103 .unwrap(),
104 ));
105 *i = Expr::parse
106 .parse2(quote! {
107 __cx = #expr_yield
108 })
109 .unwrap()
110 }
111 _ => visit_expr_mut(self, i),
112 }
113 }
114
115 fn visit_expr_async_mut(&mut self, _i: &mut syn::ExprAsync) {}
116}
117
118#[proc_macro_attribute]
119pub fn stream(attr: TokenStream, input: TokenStream) -> TokenStream {
120 stream_impl(attr, input, "GenStreamFuture", false)
121}
122
123#[proc_macro_attribute]
124pub fn try_stream(attr: TokenStream, input: TokenStream) -> TokenStream {
125 stream_impl(attr, input, "GenTryStreamFuture", true)
126}