1use proc_macro2::TokenStream;
6use quote::{quote, ToTokens};
7use syn::{parse_quote, Ident, Index, PatIdent, Path};
8
9use super::{config::Config, name_constructors::*};
10use crate::{
11 action_expr_chain::ActionExprChain,
12 chain::{
13 expr::{ActionExpr, InnerExpr, ProcessExpr},
14 group::{ApplicationType, ExprGroup, MoveType},
15 Chain,
16 },
17 handler::Handler,
18 parse::utils::is_block_expr,
19};
20
21struct ActionExprPos<'a> {
22 pub expr: &'a ExprGroup<ActionExpr>,
23 pub branch_index: usize,
24 pub expr_index: usize,
25}
26
27impl<'a> ActionExprPos<'a> {
28 fn new(
29 expr: &'a ExprGroup<ActionExpr>,
30 branch_index: impl Into<usize>,
31 expr_index: impl Into<usize>,
32 ) -> Self {
33 Self {
34 expr,
35 branch_index: branch_index.into(),
36 expr_index: expr_index.into(),
37 }
38 }
39}
40
41struct StepAcc<'a> {
42 def_stream: Option<TokenStream>,
43 step_streams: Vec<(TokenStream, Option<ActionExprPos<'a>>)>,
44}
45
46pub struct JoinOutput<'a> {
50 branch_count: usize,
54 branch_pats: Vec<Option<&'a PatIdent>>,
58 chains: Vec<Vec<Vec<&'a ExprGroup<ActionExpr>>>>,
64 config: Config,
68 custom_joiner: Option<&'a TokenStream>,
72 depths: Vec<usize>,
76 futures_crate_path: Option<&'a Path>,
80 handler: Option<&'a Handler>,
84 lazy_branches: bool,
88 max_step_count: usize,
92 transpose: bool,
96}
97
98impl<'a> JoinOutput<'a> {
99 pub fn new(
106 branches: &'a [ActionExprChain],
107 handler: Option<&'a Handler>,
108 futures_crate_path: Option<&'a Path>,
109 custom_joiner: Option<&'a TokenStream>,
110 custom_transpose_results: Option<bool>,
111 lazy_branches: Option<bool>,
112 config: Config,
113 ) -> Result<Self, &'static str>
114 where
115 Self: Sized,
116 {
117 let Config {
118 is_async,
119 is_try,
120 is_spawn,
121 } = config;
122
123 let branch_count = branches.len();
124
125 if !is_try
126 && (handler.map(Handler::is_map).unwrap_or(false)
127 || handler.map(Handler::is_and_then).unwrap_or(false))
128 {
129 Err("`and_then` or `map` handler should be only provided for `try` `join!`")
130 } else if is_try && handler.map(Handler::is_then).unwrap_or(false) {
131 Err("`then` handler should be only provided for `join!` but not for `try` `join!`")
132 } else if !config.is_async && futures_crate_path.is_some() {
133 Err("futures_crate_path should be only provided for `async` `join!`")
134 } else if branch_count == 0 {
135 Err("join should have at least one branch")
136 } else {
137 Ok({
138 let (depths_and_paths, chains): (Vec<_>, Vec<_>) = branches
139 .iter()
140 .map(|expr_chain| {
141 let (depth, steps) = expr_chain.members().iter().fold(
142 (1, vec![Vec::new()]),
143 |(depth, mut chain_acc), member| match *member.application_type() {
144 ApplicationType::Deferred => {
145 chain_acc.push(vec![member]);
146 (depth + 1, chain_acc)
147 }
148 _ => {
149 chain_acc.last_mut().unwrap().push(member);
150 (depth, chain_acc)
151 }
152 },
153 );
154 ((depth, expr_chain.id()), steps)
155 })
156 .unzip();
157
158 let (depths, branch_pats): (Vec<usize>, Vec<Option<&PatIdent>>) =
159 depths_and_paths.into_iter().unzip();
160
161 Self {
162 futures_crate_path,
163 custom_joiner,
164 lazy_branches: lazy_branches.unwrap_or(is_spawn && !is_async),
165 config,
166 max_step_count: *depths.iter().max().unwrap(),
170 handler,
171 branch_pats,
172 depths,
173 chains,
174 branch_count,
175 transpose: custom_transpose_results.unwrap_or(is_try && !is_async),
180 }
181 })
182 }
183 }
184
185 pub fn generate_steps<TPat: ToTokens + Clone, TVar: ToTokens + Clone>(
191 &self,
192 result_pats: &[TPat],
193 result_vars: &[TVar],
194 ) -> TokenStream {
195 (0..self.max_step_count)
196 .map(|step_number| {
197 let step_results_name = construct_step_results_name(step_number);
198 (
199 step_number,
200 self.generate_step(step_number, result_vars, &step_results_name),
201 step_results_name,
202 )
203 })
204 .rev()
205 .fold(
206 None,
207 |next_step_stream, (step_number, step_stream, step_results_name)| {
208 self.join_steps(
209 step_number,
210 step_stream,
211 next_step_stream,
212 result_pats,
213 result_vars,
214 &step_results_name,
215 )
216 .into()
217 },
218 )
219 .unwrap()
220 }
221
222 pub fn generate_handle(&self, results_var: &Ident, handler_name: &Ident) -> TokenStream {
226 let Self {
227 handler, config, ..
228 } = self;
229 let &Config { is_async, .. } = config;
230
231 let await_handler = if is_async {
232 Some(quote! { .await })
233 } else {
234 None
235 };
236
237 let result_vars: Vec<_> = (0..self.branch_count).map(construct_result_name).collect();
241
242 match handler {
243 Some(Handler::Then(_)) => {
244 let call_handler =
245 self.extract_results_tuple(results_var, &result_vars, handler_name, None);
246 quote! {
247 #call_handler#await_handler
248 }
249 }
250 Some(handler @ Handler::Map(_)) | Some(handler @ Handler::AndThen(_)) => {
251 let call_handler =
252 self.extract_results_tuple(results_var, &result_vars, handler_name, None);
253
254 let handler_closure_body = if is_async && handler.is_map() {
255 quote! {
256 #results_var.map(|#results_var| #call_handler)
257 }
258 } else {
259 call_handler
260 };
261
262 let call_method = match handler {
263 Handler::Map(_) => quote! { .map },
264 Handler::AndThen(_) => quote! { .and_then },
265 _ => unreachable!(),
266 };
267
268 let wrapped_results_var = self.wrap_into_block(results_var);
269
270 quote! {
271 #wrapped_results_var#call_method(
272 |#results_var| {
273 #handler_closure_body
274 }
275 )#await_handler
276 }
277 }
278 None => {
279 quote! { #results_var }
280 }
281 }
282 }
283
284 pub fn generate_step<TVar: ToTokens, TName: ToTokens>(
289 &self,
290 step_number: impl Into<usize>,
291 result_vars: &[TVar],
292 step_results_name: &TName,
293 ) -> TokenStream {
294 let step_number = step_number.into();
295 let Config {
296 is_async,
297 is_spawn,
298 is_try,
299 } = self.config;
300
301 let (def_streams, step_streams): (Vec<_>, Vec<_>) = self.chains
302 .iter()
303 .map(|chain| chain.get(step_number))
304 .enumerate()
305 .filter_map(
306 |(branch_index, chain_step_actions)|
307 chain_step_actions.and_then(
308 |chain_step_actions|
309 chain_step_actions
310 .iter()
311 .enumerate()
312 .fold(
313 None,
314 |acc: Option<StepAcc>,
315 (expr_index, &action_expr)| {
316 acc.map(|step_acc|
317 self.process_step_action_expr(
318 ActionExprPos::new(
319 action_expr,
320 branch_index,
321 expr_index
322 ),
323 step_acc
324 )
325 )
326 .or_else(|| {
327 let prev_result_name = &result_vars[branch_index];
328
329 let wrapped_prev_result = self.wrap_into_block(prev_result_name);
330
331 let step_acc = StepAcc {
332 def_stream: None,
333 step_streams: vec![(wrapped_prev_result, None)]
334 };
335
336 Some(
337 self.process_step_action_expr(
338 ActionExprPos::new(
339 action_expr,
340 branch_index,
341 expr_index
342 ),
343 step_acc
344 )
345 )
346 })
347 },
348 )
349 .map(|StepAcc { mut def_stream, mut step_streams }| loop {
350 if step_streams.len() > 1 {
351 let result = self.wrap_last_step_stream(StepAcc { def_stream, step_streams }, None);
352 def_stream = result.def_stream;
353 step_streams = result.step_streams;
354 } else {
355 break (def_stream, step_streams.pop().expect("join: Unexpected step streams length 0. This's a bug, please report it.").0);
356 }
357 })
358 .map(|(def_stream, chain)| {
359 (
360 def_stream,
361 if self.active_step_branch_count(step_number) > 1 {
362 let chain = if self.lazy_branches {
363 quote! { move || #chain }
364 } else {
365 chain
366 };
367 if is_spawn {
368 if is_async {
369 let spawn_tokio_fn_name = construct_spawn_tokio_fn_name();
370 quote! {
371 { #spawn_tokio_fn_name(Box::pin(#chain)) }
372 }
373 } else {
374 let thread_builder_name =
375 construct_thread_builder_name(branch_index);
376 quote! {
377 { #thread_builder_name.spawn(#chain).unwrap() }
378 }
379 }
380 } else {
381 chain
382 }
383 } else {
384 chain
385 },
386 )
387 }),
388 )
389 )
390 .unzip();
391
392 let joiner = if self.active_step_branch_count(step_number) > 1 {
393 self.custom_joiner.cloned().or_else(|| {
394 if is_async {
395 let futures_crate_path = self.futures_crate_path;
396 if is_try {
397 Some(quote! { #futures_crate_path::try_join! })
398 } else {
399 Some(quote! { #futures_crate_path::join! })
400 }
401 } else {
402 None
403 }
404 })
405 } else {
406 None
407 };
408
409 if is_async {
410 let join_results = joiner
411 .map(|joiner| quote! { #joiner(#( #step_streams ),*) })
412 .unwrap_or_else(|| quote! { #( #step_streams )*.await });
413
414 quote! {
415 #( #def_streams )*
416 let #step_results_name = #join_results;
417 }
418 } else {
419 let (thread_builders, spawn_joiners) = self
423 .generate_thread_builders_and_spawn_joiners(step_number, step_results_name)
424 .map(|(a, b)| (a.into(), b.into()))
425 .unwrap_or((None, None));
426
427 quote! {
428 #thread_builders
429 #( #def_streams )*
430 let #step_results_name = #joiner(#( #step_streams ),*);
431 #spawn_joiners
432 }
433 }
434 }
435
436 pub fn join_steps<TPat: ToTokens + Clone, TVar: ToTokens + Clone, TName: ToTokens>(
442 &self,
443 step_number: impl Into<usize>,
444 step_stream: TokenStream,
445 next_step_stream: impl Into<Option<TokenStream>>,
446 result_pats: &[TPat],
447 result_vars: &[TVar],
448 step_results_name: &TName,
449 ) -> TokenStream {
450 let step_number = step_number.into();
451 let next_step_stream = next_step_stream.into();
452 let &Self {
453 transpose,
454 max_step_count,
455 branch_count,
456 ..
457 } = self;
458 let Config {
459 is_try, is_async, ..
460 } = self.config;
461
462 let extracted_results =
463 self.extract_results_tuple(&step_results_name, result_pats, None, step_number);
464 let err_to_err = quote! { Err(err) => Err(err) };
465
466 if is_try && (step_number) < max_step_count - 1 {
467 if transpose {
468 let (is_result_successful, result_vars_matcher): (Vec<_>, Vec<_>) = result_vars
469 .iter()
470 .enumerate()
471 .filter_map(|(index, result_var)| {
472 if self.is_branch_active_in_step(step_number, index) {
473 (
474 quote! { #result_var.as_ref().map(|_| true).unwrap_or(false) },
475 quote! {
476 #index => #result_var.map(|_| unreachable!())
477 },
478 )
479 .into()
480 } else {
481 None
482 }
483 })
484 .unzip();
485 let value_name = construct_internal_value_name();
486
487 quote! {
488 #step_stream
489 #extracted_results
490 if let Some(__fail_index) = [#( #is_result_successful ),*].iter().position(|#value_name| !#value_name) {
491 match __fail_index {
492 #( #result_vars_matcher ),*,
493 _ => unreachable!()
494 }
495 } else {
496 #next_step_stream
497 }
498 }
499 } else {
500 let current_step_results = if is_async {
501 let mut index: usize = 0;
502 let ok_result_vars = (0..branch_count).filter_map(|branch_index| {
503 if self.is_branch_active_in_step(step_number, branch_index) {
504 let result_var = self.generate_indexed_step_results_name(
505 step_results_name,
506 step_number,
507 index,
508 );
509 index += 1;
510
511 Some(quote! { Ok(#result_var) })
512 } else {
513 None
514 }
515 });
516
517 quote! {
518 let #step_results_name = (#(# ok_result_vars ),*);
519 #extracted_results
520 }
521 } else {
522 extracted_results
523 };
524
525 quote! {
526 #step_stream
527 match #step_results_name {
528 Ok(#step_results_name) => {
529 #current_step_results
530 #next_step_stream
531 },
532 #err_to_err
533 }
534 }
535 }
536 } else if transpose && is_try {
537 let transposer = self.generate_results_transposer(result_vars, None);
538
539 quote! {
540 #step_stream
541 #extracted_results
542 #transposer
543 }
544 } else if is_try {
545 let final_stream = if self.branch_count > 1 {
546 let results: Vec<_> = result_vars
547 .iter()
548 .enumerate()
549 .filter_map(|(index, result_var)| {
550 if !self.is_branch_active_in_step(step_number, index) {
551 Some(result_var.clone())
552 } else {
553 None
554 }
555 })
556 .collect();
557
558 if !results.is_empty() {
559 let transposer = self.generate_results_transposer(&results, &result_vars);
560
561 quote! {
562 match #step_results_name {
563 Ok(#step_results_name) => {
564 #extracted_results
565 #transposer
566 },
567 #err_to_err
568 }
569 }
570 } else {
571 quote! {
572 match #step_results_name {
573 Ok(#step_results_name) => {
574 #extracted_results
575 Ok((#(# result_vars ),*))
576 },
577 #err_to_err
578 }
579 }
580 }
581 } else {
582 let value_name = construct_internal_value_name();
583 quote! {
584 match #step_results_name {
585 Ok(#value_name) => Ok((#value_name)),
586 #err_to_err
587 }
588 }
589 };
590
591 quote! {
592 #step_stream
593 #final_stream
594 }
595 } else {
596 let final_stream = next_step_stream.unwrap_or_else(|| quote! { (#( #result_vars ),*) });
597
598 quote! {
599 #step_stream
600 #extracted_results
601 #final_stream
602 }
603 }
604 }
605
606 pub fn branch_result_pat(&self, branch_index: impl Into<usize>) -> TokenStream {
610 let branch_index = branch_index.into();
611 self.branch_pats[branch_index]
612 .map(ToTokens::into_token_stream)
613 .unwrap_or_else(|| construct_result_name(branch_index).into_token_stream())
614 }
615
616 pub fn branch_result_name(&self, branch_index: impl Into<usize>) -> Ident {
620 let branch_index = branch_index.into();
621 self.branch_pats[branch_index]
622 .map(|pat| pat.ident.clone())
623 .unwrap_or_else(|| construct_result_name(branch_index))
624 }
625
626 pub fn wrap_into_block<TVal: ToTokens>(&self, value: &TVal) -> TokenStream {
630 if self.config.is_async {
631 quote! { async move { #value } }
635 } else {
636 quote! { { #value } }
640 }
641 }
642
643 pub fn active_step_branch_count(&self, step_number: impl Into<usize>) -> usize {
647 let step_number = step_number.into();
648 self.depths
649 .iter()
650 .filter(|&&branch_depth| branch_depth > step_number)
651 .count()
652 }
653
654 pub fn is_branch_active_in_step(
658 &self,
659 step_number: impl Into<usize>,
660 branch_index: impl Into<usize>,
661 ) -> bool {
662 self.depths[branch_index.into()] > step_number.into()
663 }
664
665 fn generate_indexed_step_results_name<TName: ToTokens>(
670 &self,
671 step_results_name: &TName,
672 step_number: impl Into<usize>,
673 index: impl Into<usize>,
674 ) -> TokenStream {
675 let step_number = step_number.into();
676 let index = index.into();
677
678 if self.active_step_branch_count(step_number) > 1 {
679 let index = Index::from(index);
680 quote! { #step_results_name.#index }
681 } else {
682 step_results_name.into_token_stream()
683 }
684 }
685
686 fn generate_thread_builders_and_spawn_joiners<TName: ToTokens>(
690 &self,
691 step_number: impl Into<usize>,
692 step_results_name: &TName,
693 ) -> Option<(TokenStream, TokenStream)> {
694 let step_number = step_number.into();
695 let Config {
696 is_async, is_spawn, ..
697 } = self.config;
698
699 if is_async || !is_spawn || self.active_step_branch_count(step_number) < 2 {
700 None
701 } else {
702 let thread_builders = (0..self.branch_count).filter_map(|branch_index| {
703 if self.is_branch_active_in_step(step_number, branch_index) {
704 let thread_builder_name = construct_thread_builder_name(branch_index);
705 let construct_thread_builder_fn_name = construct_thread_builder_fn_name();
706 Some(quote! { let #thread_builder_name = #construct_thread_builder_fn_name(#branch_index); })
707 } else {
708 None
709 }
710 });
711
712 let mut index: usize = 0;
713 let result_joiners = (0..self.branch_count).filter_map(|branch_index| {
714 if self.is_branch_active_in_step(step_number, branch_index) {
715 let step_result = self.generate_indexed_step_results_name(
716 step_results_name,
717 step_number,
718 index,
719 );
720 index += 1;
721 Some(quote! { #step_result.join().unwrap() })
722 } else {
723 None
724 }
725 });
726
727 Some((
728 quote! { #( #thread_builders )* },
729 quote! { let #step_results_name = (#( #result_joiners ),*); },
730 ))
731 }
732 }
733
734 fn generate_results_transposer<'b, T: ToTokens + 'b>(
752 &self,
753 result_vars: &'b [T],
754 return_vars: impl Into<Option<&'b &'b [T]>>,
755 ) -> TokenStream {
756 let return_vars = return_vars.into();
757
758 result_vars
759 .iter()
760 .rev()
761 .fold(None, |acc, result_var_name| {
762 acc.map_or_else(
763 || {
764 let return_value = return_vars.map_or_else(
765 || quote! { (#( #result_vars ),*) },
766 |return_vars| quote! { (#( #return_vars ),*) },
767 );
768 Some(quote! { #result_var_name.map(|#result_var_name| #return_value ) })
769 },
770 |acc| Some(quote! { #result_var_name.and_then(|#result_var_name| #acc) }),
771 )
772 })
773 .unwrap()
774 }
775
776 fn expand_process_expr(&self, prev_result: TokenStream, expr: &ProcessExpr) -> TokenStream {
780 match expr {
781 ProcessExpr::Then(_) => {
782 quote! { (#expr(#prev_result)) }
783 }
784 ProcessExpr::Inspect([expr]) => {
785 let inspect_fn_name = construct_inspect_fn_name();
786 if self.config.is_async {
787 quote! { #prev_result.inspect(#expr) }
788 } else {
789 quote! { #inspect_fn_name(#expr, #prev_result) }
793 }
794 }
795 _ => {
796 quote! { #prev_result#expr }
797 }
798 }
799 }
800
801 fn separate_block_expr<ExprType: InnerExpr + Clone>(
805 &self,
806 inner_expr: &ExprType,
807 branch_index: impl Into<usize>,
808 expr_index: impl Into<usize>,
809 ) -> (Option<TokenStream>, Option<ExprType>) {
810 let branch_index = branch_index.into();
811 let expr_index = expr_index.into();
812
813 if inner_expr.is_replaceable() {
814 inner_expr.inner_exprs().and_then(|exprs| {
815 let (def, replace_exprs): (Option<_>, Vec<_>) = exprs
816 .iter()
817 .enumerate()
818 .map(|(index, expr)| {
819 if is_block_expr(expr) {
820 let wrapper_name =
821 construct_expr_wrapper_name(branch_index, expr_index, index);
822 (
823 Some((
824 quote! { let #wrapper_name = #expr; },
825 parse_quote! { #wrapper_name },
826 )),
827 None,
828 )
829 } else {
830 (None, Some(expr))
831 }
832 })
833 .fold(
834 (None, Vec::new()),
835 |(def_acc, mut replace_acc), (def_with_expr, expr)| {
836 if let Some((def, expr)) = def_with_expr {
837 replace_acc.push(expr);
838 (
839 def_acc
840 .map(|def_acc| quote! { #def_acc #def })
841 .or(Some(def)),
842 replace_acc,
843 )
844 } else {
845 replace_acc.push(expr.unwrap().clone());
846 (def_acc, replace_acc)
847 }
848 },
849 );
850 def.map(|def| (def, inner_expr.clone().replace_inner_exprs(&replace_exprs)))
851 })
852 } else {
853 None
854 }
855 .map_or((None, None), |(def_stream, replaced_expr)| {
856 (Some(def_stream), replaced_expr)
857 })
858 }
859
860 fn extract_results_tuple<'b, TRes: ToTokens + 'b, TVar: ToTokens + 'b>(
864 &self,
865 results_var: &'b TRes,
866 result_vars: &'b [TVar],
867 handler: impl Into<Option<&'b Ident>>,
868 step_number: impl Into<Option<usize>>,
869 ) -> TokenStream {
870 let handler = handler.into();
871 let step_number = step_number.into();
872
873 let extracted = step_number.map_or_else(
874 || quote! { let (#( #result_vars ),*) = #results_var; },
875 |step_number| {
876 let mut index: usize = 0;
877 let result_vars = result_vars.iter().filter(|_| {
878 let result = self.is_branch_active_in_step(step_number, index);
879 index += 1;
880 result
881 });
882 quote! { let (#( #result_vars ),*) = #results_var; }
883 },
884 );
885
886 if let Some(handler) = handler {
887 quote! {{
888 #extracted
889 #handler(#( #result_vars ),*)
890 }}
891 } else {
892 extracted
893 }
894 }
895
896 fn wrap_last_step_stream<'b, 'c>(
900 &self,
901 StepAcc {
902 def_stream: prev_def_stream,
903 mut step_streams,
904 }: StepAcc<'c>,
905 action_expr_pos: impl Into<Option<ActionExprPos<'b>>>,
906 ) -> StepAcc<'c> {
907 let (prev_step_stream, _) = step_streams.pop().expect(
908 "join: Unexpected error on attempt to get last step stream. This's a bug, please report it.",
909 );
910
911 let (current_step_steam, action_expr_wrapper) = step_streams.pop().expect("join: Step expressions length is zero while it should be >=1. This's a bug, please report it.");
912
913 let action_expr_wrapper = action_expr_wrapper
914 .expect("join: Expected wrapper, found `None`. This's a bug, please report it.");
915
916 let internal_value_name = construct_internal_value_name();
917
918 let replaced_expr = action_expr_wrapper
919 .expr
920 .clone()
921 .replace_inner_exprs(&[parse_quote! { |#internal_value_name| #prev_step_stream }])
922 .expect("join: Failed to replace expr in unwrap expr. This's a bug, please report it.");
923
924 let replaced_action_expr_position = ActionExprPos {
925 expr: &replaced_expr,
926 expr_index: action_expr_wrapper.expr_index,
927 branch_index: action_expr_wrapper.branch_index,
928 };
929
930 let (def_stream, step_stream) = self.generate_def_and_step_streams(
931 prev_def_stream,
932 current_step_steam,
933 replaced_action_expr_position,
934 );
935
936 let (def_stream, step_stream) =
937 self.generate_def_and_step_streams(def_stream, step_stream, action_expr_pos.into());
938
939 step_streams.push((step_stream, None));
940
941 StepAcc {
942 def_stream,
943 step_streams,
944 }
945 }
946
947 fn generate_def_and_step_streams<'b>(
951 &self,
952 prev_def_stream: impl Into<Option<TokenStream>>,
953 prev_step_stream: TokenStream,
954 action_expr_pos: impl Into<Option<ActionExprPos<'b>>>,
955 ) -> (Option<TokenStream>, TokenStream) {
956 let prev_def_stream = prev_def_stream.into();
957
958 if let Some(ActionExprPos {
959 expr: action_expr,
960 branch_index,
961 expr_index,
962 }) = action_expr_pos.into()
963 {
964 match action_expr.expr() {
965 ActionExpr::Process(process_expr) => {
966 let (def_stream, replaced_expr) =
967 self.separate_block_expr(process_expr, branch_index, expr_index);
968
969 let step_stream = self.expand_process_expr(
970 prev_step_stream,
971 replaced_expr.as_ref().unwrap_or(process_expr),
972 );
973
974 (
975 prev_def_stream
976 .map(|prev| quote! { #prev #def_stream })
977 .or(def_stream),
978 step_stream,
979 )
980 }
981 ActionExpr::Err(err_expr) => {
982 let (def_stream, replaced_expr) =
983 self.separate_block_expr(err_expr, branch_index, expr_index);
984
985 let err_expr = replaced_expr.as_ref().unwrap_or(err_expr);
986
987 (
988 prev_def_stream
989 .map(|prev| quote! { #prev #def_stream })
990 .or(def_stream),
991 quote! { #prev_step_stream#err_expr },
992 )
993 }
994 ActionExpr::Initial(initial_expr) => {
995 let (def_stream, replaced_expr) =
996 self.separate_block_expr(initial_expr, branch_index, expr_index);
997
998 let initial_expr = replaced_expr.as_ref().unwrap_or(initial_expr);
999
1000 (
1001 prev_def_stream
1002 .map(|prev| quote! { #prev #def_stream })
1003 .or(def_stream),
1004 quote! { #initial_expr },
1005 )
1006 }
1007 }
1008 } else {
1009 (prev_def_stream, prev_step_stream)
1010 }
1011 }
1012
1013 fn process_step_action_expr<'b>(
1018 &self,
1019 action_expr_pos: impl Into<Option<ActionExprPos<'b>>>,
1020 step_acc: StepAcc<'b>,
1021 ) -> StepAcc<'b> {
1022 let ActionExprPos {
1023 expr: action_expr,
1024 branch_index,
1025 expr_index,
1026 } = action_expr_pos.into().expect("join: Unexpected `None` `ActionExprPos` in `process_step_action_expr`. This's a bug, please report it.");
1027
1028 match action_expr.move_type() {
1029 MoveType::Unwrap => self.wrap_last_step_stream(
1030 step_acc,
1031 None, ),
1033 MoveType::Wrap => {
1034 let StepAcc {
1035 def_stream: prev_def_stream,
1036 mut step_streams,
1037 } = step_acc;
1038
1039 step_streams.last_mut().expect("join: Unexpected 0 length of step streams. This's a bug, please report it.").1 =
1040 ActionExprPos::new(
1041 action_expr,
1042 branch_index,
1043 expr_index,
1044 ).into();
1045 step_streams.push((construct_internal_value_name().into_token_stream(), None));
1046
1047 StepAcc {
1048 def_stream: prev_def_stream,
1049 step_streams,
1050 }
1051 }
1052 MoveType::None => {
1053 let StepAcc {
1054 def_stream: prev_def_stream,
1055 mut step_streams,
1056 } = step_acc;
1057
1058 let (prev_step_stream, _) =
1059 step_streams.pop().expect("join: Unexpected `None` when pop step streams. This's a bug, please report it.");
1060 let (def_stream, step_stream) = self.generate_def_and_step_streams(
1061 prev_def_stream,
1062 prev_step_stream,
1063 ActionExprPos::new(action_expr, branch_index, expr_index),
1064 );
1065
1066 step_streams.push((step_stream, None));
1067
1068 StepAcc {
1069 def_stream,
1070 step_streams,
1071 }
1072 }
1073 }
1074 }
1075}
1076
1077impl<'a> ToTokens for JoinOutput<'a> {
1078 fn to_tokens(&self, output: &mut TokenStream) {
1079 let Config {
1080 is_async, is_spawn, ..
1081 } = self.config;
1082
1083 let results_var = construct_results_name();
1084 let handler_name = construct_handler_name();
1085 let (result_pats, result_vars): (Vec<_>, Vec<_>) = (0..self.branch_count)
1086 .map(|branch_index| {
1087 (
1088 self.branch_result_pat(branch_index),
1089 self.branch_result_name(branch_index),
1090 )
1091 })
1092 .unzip();
1093
1094 let steps_stream = self.generate_steps(&result_pats, &result_vars);
1098
1099 let handle_results = self.generate_handle(&results_var, &handler_name);
1104
1105 let handler_definition = self
1106 .handler
1107 .map(Handler::extract_expr)
1108 .map(|handler_expr| quote! { let #handler_name = #handler_expr; });
1109
1110 output.extend(
1111 if is_async {
1112 let futures_crate_path = self.futures_crate_path;
1113 let async_spawn_fn_definition = if is_spawn {
1114 let spawn_tokio_fn_name = construct_spawn_tokio_fn_name();
1115 let value_name = construct_internal_value_name();
1116
1117 Some(
1118 quote! {
1119 fn #spawn_tokio_fn_name<T, F>(__future: F) -> impl #futures_crate_path::future::Future<Output=T>
1120 where
1121 F: #futures_crate_path::future::Future<Output = T> + Send + 'static,
1122 T: Send + 'static,
1123 {
1124 ::tokio::spawn(__future).map(|#value_name| #value_name.unwrap_or_else(|err| panic!("tokio JoinHandle failed: {:#?}", err)))
1125 }
1126 }
1127 )
1128 } else {
1129 None
1130 };
1131
1132 quote! {
1133 Box::pin(
1134 async move {
1135 use #futures_crate_path::{FutureExt, TryFutureExt, StreamExt, TryStreamExt};
1136 #async_spawn_fn_definition
1137 #handler_definition
1138 let #results_var = { #steps_stream };
1139 #handle_results
1140 }
1141 )
1142 }
1143 } else {
1144 let inspect_fn_definition = {
1145 let inspect_fn_name = construct_inspect_fn_name();
1146 let handler_name = construct_handler_name();
1147 let value_name = construct_internal_value_name();
1148
1149 quote! {
1150 fn #inspect_fn_name<I>(#handler_name: impl Fn(&I) -> (), #value_name: I) -> I {
1151 #handler_name(&#value_name);
1152 #value_name
1153 }
1154 }
1155 };
1156 let thread_builder_fn_definition = if is_spawn {
1157 let construct_thread_builder_fn_name = construct_thread_builder_fn_name();
1158
1159 Some(
1160 quote! {
1161 fn #construct_thread_builder_fn_name(branch_index: usize) -> ::std::thread::Builder {
1162 let thread_name = format!("join_{}", branch_index);
1163 ::std::thread::Builder::new().name(
1164 ::std::thread::current().name()
1165 .map(
1166 |current_thread_name|
1167 format!("{current_thread_name}_{new_thread_name}",
1168 current_thread_name=current_thread_name,
1169 new_thread_name=thread_name
1170 )
1171 )
1172 .unwrap_or(thread_name)
1173 )
1174 }
1175 }
1176 )
1177 } else {
1178 None
1179 };
1180
1181 quote! {{
1182 #inspect_fn_definition
1183 #thread_builder_fn_definition
1184 #handler_definition
1185 let #results_var = { #steps_stream };
1186 #handle_results
1187 }}
1188 }
1189 );
1190 }
1191}