join_impl/join/
join_output.rs

1//!
2//! Definition and implementation of `join!` macro output generator.
3//!
4
5use proc_macro2::TokenStream;
6use quote::{quote, ToTokens};
7use syn::{parse_quote, Ident, Index, PatIdent, Path};
8
9use super::{config::Config, name_constructors::*};
10use crate::{
11    action_expr_chain::ActionExprChain,
12    chain::{
13        expr::{ActionExpr, InnerExpr, ProcessExpr},
14        group::{ApplicationType, ExprGroup, MoveType},
15        Chain,
16    },
17    handler::Handler,
18    parse::utils::is_block_expr,
19};
20
21struct ActionExprPos<'a> {
22    pub expr: &'a ExprGroup<ActionExpr>,
23    pub branch_index: usize,
24    pub expr_index: usize,
25}
26
27impl<'a> ActionExprPos<'a> {
28    fn new(
29        expr: &'a ExprGroup<ActionExpr>,
30        branch_index: impl Into<usize>,
31        expr_index: impl Into<usize>,
32    ) -> Self {
33        Self {
34            expr,
35            branch_index: branch_index.into(),
36            expr_index: expr_index.into(),
37        }
38    }
39}
40
41struct StepAcc<'a> {
42    def_stream: Option<TokenStream>,
43    step_streams: Vec<(TokenStream, Option<ActionExprPos<'a>>)>,
44}
45
46///
47/// Generator of `join!` macro output.
48///
49pub struct JoinOutput<'a> {
50    ///
51    /// Total branch count.
52    ///
53    branch_count: usize,
54    ///
55    /// Provided result names for branches.
56    ///
57    branch_pats: Vec<Option<&'a PatIdent>>,
58    ///
59    /// `ExprGroup<ActionExpr>` groups each of which represents chain of `Instant` actions but every next group is `Deferred` from prev.
60    /// [[map, or_else, map, and_then], [map, and_then]] =>
61    /// it will be interpreted as `expr.map().or_else().map().and_then()`, and after first will be finished, `expr.map().and_then()`
62    ///
63    chains: Vec<Vec<Vec<&'a ExprGroup<ActionExpr>>>>,
64    ///
65    /// Macro call params.
66    ///
67    config: Config,
68    ///
69    /// Provided custom joiner function.
70    ///
71    custom_joiner: Option<&'a TokenStream>,
72    ///
73    /// All branches depths. Used to calculate max length and determine if we reached branch's end.
74    ///
75    depths: Vec<usize>,
76    ///
77    /// Provided futures crate path.
78    ///
79    futures_crate_path: Option<&'a Path>,
80    ///
81    /// Optional final handler.
82    ///
83    handler: Option<&'a Handler>,
84    ///
85    /// Wrap branches into closure `move || {...}`
86    ///
87    lazy_branches: bool,
88    ///
89    /// Max step count of all branches.
90    ///
91    max_step_count: usize,
92    ///
93    /// Transform tuple of `Result`'s (`Option`s) into `Result`/`Option` of tuple.
94    ///
95    transpose: bool,
96}
97
98impl<'a> JoinOutput<'a> {
99    ///
100    /// Creates new `Join` with given branches - `ActionExprChain`s, optional handler,
101    /// optional `futures_crate_path` and `Config`.
102    /// Will return `Err` if macro isn't `try` but `map` or `and_then` handler provided or if `then` handler provided for `try` macro,
103    /// if `futures_crate_path` provided for non `async` macro, if branches count is equal to 0.
104    ///
105    pub fn new(
106        branches: &'a [ActionExprChain],
107        handler: Option<&'a Handler>,
108        futures_crate_path: Option<&'a Path>,
109        custom_joiner: Option<&'a TokenStream>,
110        custom_transpose_results: Option<bool>,
111        lazy_branches: Option<bool>,
112        config: Config,
113    ) -> Result<Self, &'static str>
114    where
115        Self: Sized,
116    {
117        let Config {
118            is_async,
119            is_try,
120            is_spawn,
121        } = config;
122
123        let branch_count = branches.len();
124
125        if !is_try
126            && (handler.map(Handler::is_map).unwrap_or(false)
127                || handler.map(Handler::is_and_then).unwrap_or(false))
128        {
129            Err("`and_then` or `map` handler should be only provided for `try` `join!`")
130        } else if is_try && handler.map(Handler::is_then).unwrap_or(false) {
131            Err("`then` handler should be only provided for `join!` but not for `try` `join!`")
132        } else if !config.is_async && futures_crate_path.is_some() {
133            Err("futures_crate_path should be only provided for `async` `join!`")
134        } else if branch_count == 0 {
135            Err("join should have at least one branch")
136        } else {
137            Ok({
138                let (depths_and_paths, chains): (Vec<_>, Vec<_>) = branches
139                    .iter()
140                    .map(|expr_chain| {
141                        let (depth, steps) = expr_chain.members().iter().fold(
142                            (1, vec![Vec::new()]),
143                            |(depth, mut chain_acc), member| match *member.application_type() {
144                                ApplicationType::Deferred => {
145                                    chain_acc.push(vec![member]);
146                                    (depth + 1, chain_acc)
147                                }
148                                _ => {
149                                    chain_acc.last_mut().unwrap().push(member);
150                                    (depth, chain_acc)
151                                }
152                            },
153                        );
154                        ((depth, expr_chain.id()), steps)
155                    })
156                    .unzip();
157
158                let (depths, branch_pats): (Vec<usize>, Vec<Option<&PatIdent>>) =
159                    depths_and_paths.into_iter().unzip();
160
161                Self {
162                    futures_crate_path,
163                    custom_joiner,
164                    lazy_branches: lazy_branches.unwrap_or(is_spawn && !is_async),
165                    config,
166                    //
167                    // Max step count is max depth of branches.
168                    //
169                    max_step_count: *depths.iter().max().unwrap(),
170                    handler,
171                    branch_pats,
172                    depths,
173                    chains,
174                    branch_count,
175                    //
176                    // In case of `try` `async` `::futures::try_join!` macro will be used, so we don't need to
177                    // transpose results.
178                    //
179                    transpose: custom_transpose_results.unwrap_or(is_try && !is_async),
180                }
181            })
182        }
183    }
184
185    ///
186    /// Generates `TokenStream` which contains all steps. `result_pats` will be used in `let` destructuring patterns and
187    /// `result_vars` will be placed in actual step streams. They both are needed to join steps and make results of prev
188    /// values in next.
189    ///
190    pub fn generate_steps<TPat: ToTokens + Clone, TVar: ToTokens + Clone>(
191        &self,
192        result_pats: &[TPat],
193        result_vars: &[TVar],
194    ) -> TokenStream {
195        (0..self.max_step_count)
196            .map(|step_number| {
197                let step_results_name = construct_step_results_name(step_number);
198                (
199                    step_number,
200                    self.generate_step(step_number, result_vars, &step_results_name),
201                    step_results_name,
202                )
203            })
204            .rev()
205            .fold(
206                None,
207                |next_step_stream, (step_number, step_stream, step_results_name)| {
208                    self.join_steps(
209                        step_number,
210                        step_stream,
211                        next_step_stream,
212                        result_pats,
213                        result_vars,
214                        &step_results_name,
215                    )
216                    .into()
217                },
218            )
219            .unwrap()
220    }
221
222    ///
223    /// Generates token stream which contains handler call with final results (if handler exists) or returns final results.
224    ///
225    pub fn generate_handle(&self, results_var: &Ident, handler_name: &Ident) -> TokenStream {
226        let Self {
227            handler, config, ..
228        } = self;
229        let &Config { is_async, .. } = config;
230
231        let await_handler = if is_async {
232            Some(quote! { .await })
233        } else {
234            None
235        };
236
237        //
238        // Defines variable names to be used when destructuring results.
239        //
240        let result_vars: Vec<_> = (0..self.branch_count).map(construct_result_name).collect();
241
242        match handler {
243            Some(Handler::Then(_)) => {
244                let call_handler =
245                    self.extract_results_tuple(results_var, &result_vars, handler_name, None);
246                quote! {
247                    #call_handler#await_handler
248                }
249            }
250            Some(handler @ Handler::Map(_)) | Some(handler @ Handler::AndThen(_)) => {
251                let call_handler =
252                    self.extract_results_tuple(results_var, &result_vars, handler_name, None);
253
254                let handler_closure_body = if is_async && handler.is_map() {
255                    quote! {
256                        #results_var.map(|#results_var| #call_handler)
257                    }
258                } else {
259                    call_handler
260                };
261
262                let call_method = match handler {
263                    Handler::Map(_) => quote! { .map },
264                    Handler::AndThen(_) => quote! { .and_then },
265                    _ => unreachable!(),
266                };
267
268                let wrapped_results_var = self.wrap_into_block(results_var);
269
270                quote! {
271                    #wrapped_results_var#call_method(
272                        |#results_var| {
273                            #handler_closure_body
274                        }
275                    )#await_handler
276                }
277            }
278            None => {
279                quote! { #results_var }
280            }
281        }
282    }
283
284    ///
285    /// Generates `TokenStream` for step with given index. Result vars are variables with prev step results.
286    /// `result_vars` will be used as source values for step and `step_result_name` will contain tuple of step results.
287    ///
288    pub fn generate_step<TVar: ToTokens, TName: ToTokens>(
289        &self,
290        step_number: impl Into<usize>,
291        result_vars: &[TVar],
292        step_results_name: &TName,
293    ) -> TokenStream {
294        let step_number = step_number.into();
295        let Config {
296            is_async,
297            is_spawn,
298            is_try,
299        } = self.config;
300
301        let (def_streams, step_streams): (Vec<_>, Vec<_>) = self.chains
302            .iter()
303            .map(|chain| chain.get(step_number))
304            .enumerate()
305            .filter_map(
306                |(branch_index, chain_step_actions)|
307                    chain_step_actions.and_then(
308                        |chain_step_actions|
309                        chain_step_actions
310                        .iter()
311                        .enumerate()
312                        .fold(
313                            None,
314                            |acc: Option<StepAcc>,
315                             (expr_index, &action_expr)| {
316                                acc.map(|step_acc|
317                                    self.process_step_action_expr(
318                                        ActionExprPos::new(
319                                            action_expr,
320                                            branch_index,
321                                            expr_index
322                                        ),
323                                        step_acc
324                                    )
325                                )
326                                .or_else(|| {
327                                    let prev_result_name = &result_vars[branch_index];
328
329                                    let wrapped_prev_result = self.wrap_into_block(prev_result_name);
330
331                                    let step_acc = StepAcc {
332                                        def_stream: None,
333                                        step_streams: vec![(wrapped_prev_result, None)]
334                                    };
335
336                                    Some(
337                                        self.process_step_action_expr(
338                                            ActionExprPos::new(
339                                                action_expr,
340                                                branch_index,
341                                                expr_index
342                                            ),
343                                            step_acc
344                                        )
345                                    )
346                                })
347                            },
348                        )
349                        .map(|StepAcc { mut def_stream, mut step_streams }| loop {
350                            if step_streams.len() > 1 {
351                                let result = self.wrap_last_step_stream(StepAcc { def_stream, step_streams }, None);
352                                def_stream = result.def_stream;
353                                step_streams = result.step_streams;
354                            } else {
355                                break (def_stream, step_streams.pop().expect("join: Unexpected step streams length 0. This's a bug, please report it.").0);
356                            }
357                        })
358                        .map(|(def_stream, chain)| {
359                            (
360                                def_stream,
361                                if self.active_step_branch_count(step_number) > 1 {
362                                    let chain = if self.lazy_branches {
363                                        quote! { move || #chain }
364                                    } else {
365                                        chain
366                                    };
367                                    if is_spawn {
368                                        if is_async {
369                                            let spawn_tokio_fn_name = construct_spawn_tokio_fn_name();
370                                            quote! {
371                                                { #spawn_tokio_fn_name(Box::pin(#chain)) }
372                                            }
373                                        } else {
374                                            let thread_builder_name =
375                                                construct_thread_builder_name(branch_index);
376                                            quote! {
377                                                { #thread_builder_name.spawn(#chain).unwrap() }
378                                            }
379                                        }
380                                    } else {
381                                        chain
382                                    }
383                                } else {
384                                    chain
385                                },
386                            )
387                        }),
388                    )
389            )
390            .unzip();
391
392        let joiner = if self.active_step_branch_count(step_number) > 1 {
393            self.custom_joiner.cloned().or_else(|| {
394                if is_async {
395                    let futures_crate_path = self.futures_crate_path;
396                    if is_try {
397                        Some(quote! { #futures_crate_path::try_join! })
398                    } else {
399                        Some(quote! { #futures_crate_path::join! })
400                    }
401                } else {
402                    None
403                }
404            })
405        } else {
406            None
407        };
408
409        if is_async {
410            let join_results = joiner
411                .map(|joiner| quote! { #joiner(#( #step_streams ),*) })
412                .unwrap_or_else(|| quote! { #( #step_streams )*.await });
413
414            quote! {
415                #( #def_streams )*
416                let #step_results_name = #join_results;
417            }
418        } else {
419            //
420            // In case of sync spawn generates thread builder for every branch wich needs it and joins it.
421            //
422            let (thread_builders, spawn_joiners) = self
423                .generate_thread_builders_and_spawn_joiners(step_number, step_results_name)
424                .map(|(a, b)| (a.into(), b.into()))
425                .unwrap_or((None, None));
426
427            quote! {
428                #thread_builders
429                #( #def_streams )*
430                let #step_results_name = #joiner(#( #step_streams ),*);
431                #spawn_joiners
432            }
433        }
434    }
435
436    ///
437    /// Joins step steam with next step stream (if `Some`), returning `TokenStream` which contains all code.
438    /// `result_pats` will be used for destructuring `let` patterns while `result_vars` will be source values for next step.
439    /// `result_vars` are also used in results transposer.
440    ///
441    pub fn join_steps<TPat: ToTokens + Clone, TVar: ToTokens + Clone, TName: ToTokens>(
442        &self,
443        step_number: impl Into<usize>,
444        step_stream: TokenStream,
445        next_step_stream: impl Into<Option<TokenStream>>,
446        result_pats: &[TPat],
447        result_vars: &[TVar],
448        step_results_name: &TName,
449    ) -> TokenStream {
450        let step_number = step_number.into();
451        let next_step_stream = next_step_stream.into();
452        let &Self {
453            transpose,
454            max_step_count,
455            branch_count,
456            ..
457        } = self;
458        let Config {
459            is_try, is_async, ..
460        } = self.config;
461
462        let extracted_results =
463            self.extract_results_tuple(&step_results_name, result_pats, None, step_number);
464        let err_to_err = quote! { Err(err) => Err(err) };
465
466        if is_try && (step_number) < max_step_count - 1 {
467            if transpose {
468                let (is_result_successful, result_vars_matcher): (Vec<_>, Vec<_>) = result_vars
469                    .iter()
470                    .enumerate()
471                    .filter_map(|(index, result_var)| {
472                        if self.is_branch_active_in_step(step_number, index) {
473                            (
474                                quote! { #result_var.as_ref().map(|_| true).unwrap_or(false) },
475                                quote! {
476                                    #index => #result_var.map(|_| unreachable!())
477                                },
478                            )
479                                .into()
480                        } else {
481                            None
482                        }
483                    })
484                    .unzip();
485                let value_name = construct_internal_value_name();
486
487                quote! {
488                    #step_stream
489                    #extracted_results
490                    if let Some(__fail_index) = [#( #is_result_successful ),*].iter().position(|#value_name| !#value_name) {
491                        match __fail_index {
492                            #( #result_vars_matcher ),*,
493                            _ => unreachable!()
494                        }
495                    } else {
496                        #next_step_stream
497                    }
498                }
499            } else {
500                let current_step_results = if is_async {
501                    let mut index: usize = 0;
502                    let ok_result_vars = (0..branch_count).filter_map(|branch_index| {
503                        if self.is_branch_active_in_step(step_number, branch_index) {
504                            let result_var = self.generate_indexed_step_results_name(
505                                step_results_name,
506                                step_number,
507                                index,
508                            );
509                            index += 1;
510
511                            Some(quote! { Ok(#result_var) })
512                        } else {
513                            None
514                        }
515                    });
516
517                    quote! {
518                        let #step_results_name = (#(# ok_result_vars ),*);
519                        #extracted_results
520                    }
521                } else {
522                    extracted_results
523                };
524
525                quote! {
526                    #step_stream
527                    match #step_results_name {
528                        Ok(#step_results_name) => {
529                            #current_step_results
530                            #next_step_stream
531                        },
532                        #err_to_err
533                    }
534                }
535            }
536        } else if transpose && is_try {
537            let transposer = self.generate_results_transposer(result_vars, None);
538
539            quote! {
540                #step_stream
541                #extracted_results
542                #transposer
543            }
544        } else if is_try {
545            let final_stream = if self.branch_count > 1 {
546                let results: Vec<_> = result_vars
547                    .iter()
548                    .enumerate()
549                    .filter_map(|(index, result_var)| {
550                        if !self.is_branch_active_in_step(step_number, index) {
551                            Some(result_var.clone())
552                        } else {
553                            None
554                        }
555                    })
556                    .collect();
557
558                if !results.is_empty() {
559                    let transposer = self.generate_results_transposer(&results, &result_vars);
560
561                    quote! {
562                        match #step_results_name {
563                            Ok(#step_results_name) => {
564                                #extracted_results
565                                #transposer
566                            },
567                            #err_to_err
568                        }
569                    }
570                } else {
571                    quote! {
572                        match #step_results_name {
573                            Ok(#step_results_name) => {
574                                #extracted_results
575                                Ok((#(# result_vars ),*))
576                            },
577                            #err_to_err
578                        }
579                    }
580                }
581            } else {
582                let value_name = construct_internal_value_name();
583                quote! {
584                    match #step_results_name {
585                        Ok(#value_name) => Ok((#value_name)),
586                        #err_to_err
587                    }
588                }
589            };
590
591            quote! {
592                #step_stream
593                #final_stream
594            }
595        } else {
596            let final_stream = next_step_stream.unwrap_or_else(|| quote! { (#( #result_vars ),*) });
597
598            quote! {
599                #step_stream
600                #extracted_results
601                #final_stream
602            }
603        }
604    }
605
606    ///
607    /// Returns provided `PatIdent` or autogenerated branch name for use in `let` bindings.
608    ///
609    pub fn branch_result_pat(&self, branch_index: impl Into<usize>) -> TokenStream {
610        let branch_index = branch_index.into();
611        self.branch_pats[branch_index]
612            .map(ToTokens::into_token_stream)
613            .unwrap_or_else(|| construct_result_name(branch_index).into_token_stream())
614    }
615
616    ///
617    /// Returns provided or autogenerated `Ident` for use in expressions.
618    ///
619    pub fn branch_result_name(&self, branch_index: impl Into<usize>) -> Ident {
620        let branch_index = branch_index.into();
621        self.branch_pats[branch_index]
622            .map(|pat| pat.ident.clone())
623            .unwrap_or_else(|| construct_result_name(branch_index))
624    }
625
626    ///
627    /// Wraps given value into `{...}` if sync or into `async move {...}` if async.
628    ///
629    pub fn wrap_into_block<TVal: ToTokens>(&self, value: &TVal) -> TokenStream {
630        if self.config.is_async {
631            //
632            // In case of async `join` we should wrap given value into `Future` with `move`.
633            //
634            quote! { async move { #value } }
635        } else {
636            //
637            // Otherwise it will be enough to just wrap in `{...}`.
638            //
639            quote! { { #value } }
640        }
641    }
642
643    ///
644    /// Returns count of active branches for given step.
645    ///
646    pub fn active_step_branch_count(&self, step_number: impl Into<usize>) -> usize {
647        let step_number = step_number.into();
648        self.depths
649            .iter()
650            .filter(|&&branch_depth| branch_depth > step_number)
651            .count()
652    }
653
654    ///
655    /// Returns `true` if branch with given index is active in provided step.
656    ///
657    pub fn is_branch_active_in_step(
658        &self,
659        step_number: impl Into<usize>,
660        branch_index: impl Into<usize>,
661    ) -> bool {
662        self.depths[branch_index.into()] > step_number.into()
663    }
664
665    ///
666    /// Generates indexed step result name if branch count for step is greater than 1, otherwise
667    /// returns current step results name.
668    ///
669    fn generate_indexed_step_results_name<TName: ToTokens>(
670        &self,
671        step_results_name: &TName,
672        step_number: impl Into<usize>,
673        index: impl Into<usize>,
674    ) -> TokenStream {
675        let step_number = step_number.into();
676        let index = index.into();
677
678        if self.active_step_branch_count(step_number) > 1 {
679            let index = Index::from(index);
680            quote! { #step_results_name.#index }
681        } else {
682            step_results_name.into_token_stream()
683        }
684    }
685
686    ///
687    /// Generates thread builders and result joiners for step if `join_spawn!` or `spawn!` macro call was used.
688    ///
689    fn generate_thread_builders_and_spawn_joiners<TName: ToTokens>(
690        &self,
691        step_number: impl Into<usize>,
692        step_results_name: &TName,
693    ) -> Option<(TokenStream, TokenStream)> {
694        let step_number = step_number.into();
695        let Config {
696            is_async, is_spawn, ..
697        } = self.config;
698
699        if is_async || !is_spawn || self.active_step_branch_count(step_number) < 2 {
700            None
701        } else {
702            let thread_builders = (0..self.branch_count).filter_map(|branch_index| {
703                if self.is_branch_active_in_step(step_number, branch_index) {
704                    let thread_builder_name = construct_thread_builder_name(branch_index);
705                    let construct_thread_builder_fn_name = construct_thread_builder_fn_name();
706                    Some(quote! { let #thread_builder_name = #construct_thread_builder_fn_name(#branch_index); })
707                } else {
708                    None
709                }
710            });
711
712            let mut index: usize = 0;
713            let result_joiners = (0..self.branch_count).filter_map(|branch_index| {
714                if self.is_branch_active_in_step(step_number, branch_index) {
715                    let step_result = self.generate_indexed_step_results_name(
716                        step_results_name,
717                        step_number,
718                        index,
719                    );
720                    index += 1;
721                    Some(quote! { #step_result.join().unwrap() })
722                } else {
723                    None
724                }
725            });
726
727            Some((
728                quote! { #( #thread_builders )* },
729                quote! { let #step_results_name = (#( #result_joiners ),*); },
730            ))
731        }
732    }
733
734    ///
735    /// Generates token stream which transposes tuple of `Result`s into `Result` of tuple.
736    /// Optional `return_vars` may be specified to return something other than `result_vars`.
737    ///
738    /// (Result<A, Error>, Result<B, Error>, Result<C, Error>) => Result<(A, B, C), Error>
739    ///
740    /// # Example:
741    ///
742    /// ```
743    /// let result0 = Ok::<_,()>(0);
744    /// let result1 = Ok::<_,()>(1);
745    /// let result2 = Ok::<_,()>(2);
746    /// let final_result = result0.and_then(|value0| result1.and_then(|value1| result2.map(|value2| (value0, value1, value2))));
747    /// assert_eq!(final_result, Ok((0,1,2)));
748    /// ```
749    ///
750    ///
751    fn generate_results_transposer<'b, T: ToTokens + 'b>(
752        &self,
753        result_vars: &'b [T],
754        return_vars: impl Into<Option<&'b &'b [T]>>,
755    ) -> TokenStream {
756        let return_vars = return_vars.into();
757
758        result_vars
759            .iter()
760            .rev()
761            .fold(None, |acc, result_var_name| {
762                acc.map_or_else(
763                    || {
764                        let return_value = return_vars.map_or_else(
765                            || quote! { (#( #result_vars ),*) },
766                            |return_vars| quote! { (#( #return_vars ),*) },
767                        );
768                        Some(quote! { #result_var_name.map(|#result_var_name| #return_value ) })
769                    },
770                    |acc| Some(quote! { #result_var_name.and_then(|#result_var_name| #acc) }),
771                )
772            })
773            .unwrap()
774    }
775
776    ///
777    /// Expands process expr with given prev result.
778    ///
779    fn expand_process_expr(&self, prev_result: TokenStream, expr: &ProcessExpr) -> TokenStream {
780        match expr {
781            ProcessExpr::Then(_) => {
782                quote! { (#expr(#prev_result)) }
783            }
784            ProcessExpr::Inspect([expr]) => {
785                let inspect_fn_name = construct_inspect_fn_name();
786                if self.config.is_async {
787                    quote! { #prev_result.inspect(#expr) }
788                } else {
789                    //
790                    // Define custom `into_token_stream` converter because `inspect` fn signature accepts two params.
791                    //
792                    quote! { #inspect_fn_name(#expr, #prev_result) }
793                }
794            }
795            _ => {
796                quote! { #prev_result#expr }
797            }
798        }
799    }
800
801    ///
802    /// Separates definition stream from step stream for given expression.
803    ///
804    fn separate_block_expr<ExprType: InnerExpr + Clone>(
805        &self,
806        inner_expr: &ExprType,
807        branch_index: impl Into<usize>,
808        expr_index: impl Into<usize>,
809    ) -> (Option<TokenStream>, Option<ExprType>) {
810        let branch_index = branch_index.into();
811        let expr_index = expr_index.into();
812
813        if inner_expr.is_replaceable() {
814            inner_expr.inner_exprs().and_then(|exprs| {
815                let (def, replace_exprs): (Option<_>, Vec<_>) = exprs
816                    .iter()
817                    .enumerate()
818                    .map(|(index, expr)| {
819                        if is_block_expr(expr) {
820                            let wrapper_name =
821                                construct_expr_wrapper_name(branch_index, expr_index, index);
822                            (
823                                Some((
824                                    quote! { let #wrapper_name = #expr; },
825                                    parse_quote! { #wrapper_name },
826                                )),
827                                None,
828                            )
829                        } else {
830                            (None, Some(expr))
831                        }
832                    })
833                    .fold(
834                        (None, Vec::new()),
835                        |(def_acc, mut replace_acc), (def_with_expr, expr)| {
836                            if let Some((def, expr)) = def_with_expr {
837                                replace_acc.push(expr);
838                                (
839                                    def_acc
840                                        .map(|def_acc| quote! { #def_acc #def })
841                                        .or(Some(def)),
842                                    replace_acc,
843                                )
844                            } else {
845                                replace_acc.push(expr.unwrap().clone());
846                                (def_acc, replace_acc)
847                            }
848                        },
849                    );
850                def.map(|def| (def, inner_expr.clone().replace_inner_exprs(&replace_exprs)))
851            })
852        } else {
853            None
854        }
855        .map_or((None, None), |(def_stream, replaced_expr)| {
856            (Some(def_stream), replaced_expr)
857        })
858    }
859
860    ///
861    /// Extracts elements from tuple of values. Applies handler, if provided.
862    ///
863    fn extract_results_tuple<'b, TRes: ToTokens + 'b, TVar: ToTokens + 'b>(
864        &self,
865        results_var: &'b TRes,
866        result_vars: &'b [TVar],
867        handler: impl Into<Option<&'b Ident>>,
868        step_number: impl Into<Option<usize>>,
869    ) -> TokenStream {
870        let handler = handler.into();
871        let step_number = step_number.into();
872
873        let extracted = step_number.map_or_else(
874            || quote! { let (#( #result_vars ),*) = #results_var; },
875            |step_number| {
876                let mut index: usize = 0;
877                let result_vars = result_vars.iter().filter(|_| {
878                    let result = self.is_branch_active_in_step(step_number, index);
879                    index += 1;
880                    result
881                });
882                quote! { let (#( #result_vars ),*) = #results_var; }
883            },
884        );
885
886        if let Some(handler) = handler {
887            quote! {{
888                #extracted
889                #handler(#( #result_vars ),*)
890            }}
891        } else {
892            extracted
893        }
894    }
895
896    ///
897    /// Pops top stream of step_streams and wraps it into prev expr, adds next expr (if `Some`) to the result chain.
898    ///
899    fn wrap_last_step_stream<'b, 'c>(
900        &self,
901        StepAcc {
902            def_stream: prev_def_stream,
903            mut step_streams,
904        }: StepAcc<'c>,
905        action_expr_pos: impl Into<Option<ActionExprPos<'b>>>,
906    ) -> StepAcc<'c> {
907        let (prev_step_stream, _) = step_streams.pop().expect(
908            "join: Unexpected error on attempt to get last step stream. This's a bug, please report it.",
909        );
910
911        let (current_step_steam, action_expr_wrapper) = step_streams.pop().expect("join: Step expressions length is zero while it should be >=1. This's a bug, please report it.");
912
913        let action_expr_wrapper = action_expr_wrapper
914            .expect("join: Expected wrapper, found `None`. This's a bug, please report it.");
915
916        let internal_value_name = construct_internal_value_name();
917
918        let replaced_expr = action_expr_wrapper
919            .expr
920            .clone()
921            .replace_inner_exprs(&[parse_quote! { |#internal_value_name| #prev_step_stream }])
922            .expect("join: Failed to replace expr in unwrap expr. This's a bug, please report it.");
923
924        let replaced_action_expr_position = ActionExprPos {
925            expr: &replaced_expr,
926            expr_index: action_expr_wrapper.expr_index,
927            branch_index: action_expr_wrapper.branch_index,
928        };
929
930        let (def_stream, step_stream) = self.generate_def_and_step_streams(
931            prev_def_stream,
932            current_step_steam,
933            replaced_action_expr_position,
934        );
935
936        let (def_stream, step_stream) =
937            self.generate_def_and_step_streams(def_stream, step_stream, action_expr_pos.into());
938
939        step_streams.push((step_stream, None));
940
941        StepAcc {
942            def_stream,
943            step_streams,
944        }
945    }
946
947    ///
948    /// Produces definition stream and step stream for given `ExprGroup<ActionExpr>` (if `Some`).
949    ///
950    fn generate_def_and_step_streams<'b>(
951        &self,
952        prev_def_stream: impl Into<Option<TokenStream>>,
953        prev_step_stream: TokenStream,
954        action_expr_pos: impl Into<Option<ActionExprPos<'b>>>,
955    ) -> (Option<TokenStream>, TokenStream) {
956        let prev_def_stream = prev_def_stream.into();
957
958        if let Some(ActionExprPos {
959            expr: action_expr,
960            branch_index,
961            expr_index,
962        }) = action_expr_pos.into()
963        {
964            match action_expr.expr() {
965                ActionExpr::Process(process_expr) => {
966                    let (def_stream, replaced_expr) =
967                        self.separate_block_expr(process_expr, branch_index, expr_index);
968
969                    let step_stream = self.expand_process_expr(
970                        prev_step_stream,
971                        replaced_expr.as_ref().unwrap_or(process_expr),
972                    );
973
974                    (
975                        prev_def_stream
976                            .map(|prev| quote! { #prev #def_stream })
977                            .or(def_stream),
978                        step_stream,
979                    )
980                }
981                ActionExpr::Err(err_expr) => {
982                    let (def_stream, replaced_expr) =
983                        self.separate_block_expr(err_expr, branch_index, expr_index);
984
985                    let err_expr = replaced_expr.as_ref().unwrap_or(err_expr);
986
987                    (
988                        prev_def_stream
989                            .map(|prev| quote! { #prev #def_stream })
990                            .or(def_stream),
991                        quote! { #prev_step_stream#err_expr },
992                    )
993                }
994                ActionExpr::Initial(initial_expr) => {
995                    let (def_stream, replaced_expr) =
996                        self.separate_block_expr(initial_expr, branch_index, expr_index);
997
998                    let initial_expr = replaced_expr.as_ref().unwrap_or(initial_expr);
999
1000                    (
1001                        prev_def_stream
1002                            .map(|prev| quote! { #prev #def_stream })
1003                            .or(def_stream),
1004                        quote! { #initial_expr },
1005                    )
1006                }
1007            }
1008        } else {
1009            (prev_def_stream, prev_step_stream)
1010        }
1011    }
1012
1013    ///
1014    /// Generates code based on `MoveType` of expr. If `MoveType` is `Wrap`, it will generate nested exprs, if `Unwrap`,
1015    /// it will got one step up, otherwise continue current chain.
1016    ///
1017    fn process_step_action_expr<'b>(
1018        &self,
1019        action_expr_pos: impl Into<Option<ActionExprPos<'b>>>,
1020        step_acc: StepAcc<'b>,
1021    ) -> StepAcc<'b> {
1022        let ActionExprPos {
1023            expr: action_expr,
1024            branch_index,
1025            expr_index,
1026        } = action_expr_pos.into().expect("join: Unexpected `None` `ActionExprPos` in `process_step_action_expr`. This's a bug, please report it.");
1027
1028        match action_expr.move_type() {
1029            MoveType::Unwrap => self.wrap_last_step_stream(
1030                step_acc,
1031                None, // Because now only `Combinator::UNWRAP` can have this `MoveType`
1032            ),
1033            MoveType::Wrap => {
1034                let StepAcc {
1035                    def_stream: prev_def_stream,
1036                    mut step_streams,
1037                } = step_acc;
1038
1039                step_streams.last_mut().expect("join: Unexpected 0 length of step streams. This's a bug, please report it.").1 =
1040                    ActionExprPos::new(
1041                        action_expr,
1042                        branch_index,
1043                        expr_index,
1044                    ).into();
1045                step_streams.push((construct_internal_value_name().into_token_stream(), None));
1046
1047                StepAcc {
1048                    def_stream: prev_def_stream,
1049                    step_streams,
1050                }
1051            }
1052            MoveType::None => {
1053                let StepAcc {
1054                    def_stream: prev_def_stream,
1055                    mut step_streams,
1056                } = step_acc;
1057
1058                let (prev_step_stream, _) =
1059                    step_streams.pop().expect("join: Unexpected `None` when pop step streams. This's a bug, please report it.");
1060                let (def_stream, step_stream) = self.generate_def_and_step_streams(
1061                    prev_def_stream,
1062                    prev_step_stream,
1063                    ActionExprPos::new(action_expr, branch_index, expr_index),
1064                );
1065
1066                step_streams.push((step_stream, None));
1067
1068                StepAcc {
1069                    def_stream,
1070                    step_streams,
1071                }
1072            }
1073        }
1074    }
1075}
1076
1077impl<'a> ToTokens for JoinOutput<'a> {
1078    fn to_tokens(&self, output: &mut TokenStream) {
1079        let Config {
1080            is_async, is_spawn, ..
1081        } = self.config;
1082
1083        let results_var = construct_results_name();
1084        let handler_name = construct_handler_name();
1085        let (result_pats, result_vars): (Vec<_>, Vec<_>) = (0..self.branch_count)
1086            .map(|branch_index| {
1087                (
1088                    self.branch_result_pat(branch_index),
1089                    self.branch_result_name(branch_index),
1090                )
1091            })
1092            .unzip();
1093
1094        //
1095        // All generated code to be executed step by step and returns final step results.
1096        //
1097        let steps_stream = self.generate_steps(&result_pats, &result_vars);
1098
1099        //
1100        // Handle results based on user input or returns result of tuple of values
1101        // [or single result in case of one branch].
1102        //
1103        let handle_results = self.generate_handle(&results_var, &handler_name);
1104
1105        let handler_definition = self
1106            .handler
1107            .map(Handler::extract_expr)
1108            .map(|handler_expr| quote! { let #handler_name = #handler_expr; });
1109
1110        output.extend(
1111           if is_async {
1112                let futures_crate_path = self.futures_crate_path;
1113                let async_spawn_fn_definition = if is_spawn {
1114                    let spawn_tokio_fn_name = construct_spawn_tokio_fn_name();
1115                    let value_name = construct_internal_value_name();
1116
1117                    Some(
1118                        quote! {
1119                            fn #spawn_tokio_fn_name<T, F>(__future: F) -> impl #futures_crate_path::future::Future<Output=T>
1120                            where
1121                                F: #futures_crate_path::future::Future<Output = T> + Send + 'static,
1122                                T: Send + 'static,
1123                            {
1124                                ::tokio::spawn(__future).map(|#value_name| #value_name.unwrap_or_else(|err| panic!("tokio JoinHandle failed: {:#?}", err)))
1125                            }
1126                        }
1127                    )
1128                } else {
1129                    None
1130                };
1131
1132                quote! {
1133                    Box::pin(
1134                        async move {
1135                            use #futures_crate_path::{FutureExt, TryFutureExt, StreamExt, TryStreamExt};
1136                            #async_spawn_fn_definition
1137                            #handler_definition
1138                            let #results_var = { #steps_stream };
1139                            #handle_results
1140                        }
1141                    )
1142                }
1143            } else {
1144                let inspect_fn_definition = {
1145                    let inspect_fn_name = construct_inspect_fn_name();
1146                    let handler_name = construct_handler_name();
1147                    let value_name = construct_internal_value_name();
1148
1149                    quote! {
1150                        fn #inspect_fn_name<I>(#handler_name: impl Fn(&I) -> (), #value_name: I) -> I {
1151                            #handler_name(&#value_name);
1152                            #value_name
1153                        }
1154                    }
1155                };
1156                let thread_builder_fn_definition = if is_spawn {
1157                    let construct_thread_builder_fn_name = construct_thread_builder_fn_name();
1158
1159                    Some(
1160                       quote! {
1161                            fn #construct_thread_builder_fn_name(branch_index: usize) -> ::std::thread::Builder {
1162                                let thread_name = format!("join_{}", branch_index);
1163                                ::std::thread::Builder::new().name(
1164                                    ::std::thread::current().name()
1165                                        .map(
1166                                            |current_thread_name|
1167                                                format!("{current_thread_name}_{new_thread_name}",
1168                                                    current_thread_name=current_thread_name,
1169                                                    new_thread_name=thread_name
1170                                                )
1171                                        )
1172                                        .unwrap_or(thread_name)
1173                                )
1174                            }
1175                        }
1176                   )
1177                } else {
1178                    None
1179                };
1180
1181                quote! {{
1182                    #inspect_fn_definition
1183                    #thread_builder_fn_definition
1184                    #handler_definition
1185                    let #results_var = { #steps_stream };
1186                    #handle_results
1187                }}
1188            }
1189       );
1190    }
1191}