type_flow_proc_macros/
lib.rs

1use proc_macro::TokenStream;
2use std::cmp::Ordering;
3use quote::{quote, format_ident};
4use syn::{parse_macro_input, Ident, Type, parse::Parse, parse::ParseStream, Token, ItemStruct, FieldsNamed, Fields};
5
6struct ProcessorPipeline {
7    pipeline_name: Ident,
8    data_type: Type,
9    processors: Vec<Type>,
10}
11
12impl Parse for ProcessorPipeline {
13    fn parse(input: ParseStream) -> syn::Result<Self> {
14        let pipeline_name: Ident = input.parse()?;
15        input.parse::<Token![,]>()?;
16        let data_type: Type = input.parse()?;
17        input.parse::<Token![,]>()?;
18        let mut processors = Vec::new();
19        // Parse the remaining types as processor types
20        while !input.is_empty() {
21            let processor_type: Type = input.parse()?;
22            processors.push(processor_type);
23            if input.is_empty() {
24                break;
25            }
26            input.parse::<Token![,]>()?;
27        }
28        Ok(ProcessorPipeline {
29            pipeline_name,
30            data_type,
31            processors,
32        })
33    }
34}
35
36#[proc_macro]
37pub fn stateful_processor_pipeline_with_index(input: TokenStream) -> TokenStream {
38    let ProcessorPipeline { pipeline_name, data_type, processors } = parse_macro_input!(input as ProcessorPipeline);
39    let struct_fields = processors.iter().enumerate().map(|(idx, processor_type)| {
40        // Get a string representation of the type for the field name
41        let type_str = get_type_name(processor_type);
42        let field_name = format_ident!("processor_{}_{}", type_str, idx);
43        quote! { #field_name: #processor_type }
44    });
45    let constructor_params = processors.iter().enumerate().map(|(idx, processor_type)| {
46        let type_str = get_type_name(processor_type);
47        let param_name = format_ident!("processor_{}_{}", type_str, idx);
48        quote! { #param_name: #processor_type }
49    });
50    let field_initializers = processors.iter().enumerate().map(|(idx, processor_type)| {
51        let type_str = get_type_name(processor_type);
52        let field_name = format_ident!("processor_{}_{}", type_str, idx);
53        quote! { #field_name }
54    });
55    let process_implementation = processors.iter().enumerate().map(|(idx, processor_type)| {
56        let type_str = get_type_name(processor_type);
57        let field_name = format_ident!("processor_{}_{}", type_str, idx);
58        quote! { data = self.#field_name.process(data); }
59    });
60    let expanded = quote! {
61        pub struct #pipeline_name {
62            #(#struct_fields,)*
63        }
64        impl #pipeline_name {
65            pub fn new(#(#constructor_params,)*) -> Self {
66                Self {
67                    #(#field_initializers,)*
68                }
69            }
70        }
71        impl StatefulProcessor<#data_type> for #pipeline_name {
72            fn process(&mut self, mut data: #data_type) -> #data_type {
73                #(#process_implementation)*
74                data
75            }
76        }
77    };
78    TokenStream::from(expanded)
79}
80
81struct ProcessorInplacePipeline {
82    pipeline_name: Ident,
83    data_type: Type,
84    error_type: Type,
85    processors: Vec<Type>,
86}
87
88impl Parse for ProcessorInplacePipeline {
89    fn parse(input: ParseStream) -> syn::Result<Self> {
90        let pipeline_name: Ident = input.parse()?;
91        input.parse::<Token![,]>()?;
92        let data_type: Type = input.parse()?;
93        input.parse::<Token![,]>()?;
94        let error_type: Type = input.parse()?;
95        input.parse::<Token![,]>()?;
96        let mut processors = Vec::new();
97        // Parse the remaining types as processor types
98        while !input.is_empty() {
99            let processor_type: Type = input.parse()?;
100            processors.push(processor_type);
101
102            if input.is_empty() {
103                break;
104            }
105            input.parse::<Token![,]>()?;
106        }
107        Ok(ProcessorInplacePipeline{
108            pipeline_name,
109            data_type,
110            error_type,
111            processors,
112        })
113    }
114}
115
116
117#[proc_macro_attribute]
118pub fn implement_processor_swapping(_attr: proc_macro::TokenStream, item: proc_macro::TokenStream) -> proc_macro::TokenStream {
119    let input_struct_item = parse_macro_input!(item as ItemStruct);
120    let struct_name = &input_struct_item.ident;
121    let generics = &input_struct_item.generics;
122    let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
123    let type_param_idents: Vec<&Ident> = generics.type_params().map(|tp| &tp.ident).collect();
124    let fields = match &input_struct_item.fields {
125        Fields::Named(FieldsNamed { named, .. }) => named,
126        _ => panic!("#[implement_processor_swapping] only works on structs with named fields."),
127    };
128    let processor_field_idents: Vec<&Ident> = fields.iter()
129        .filter_map(|f| f.ident.as_ref())
130        .filter(|&id| id.to_string() != "_marker")
131        .collect();
132    if type_param_idents.len() != processor_field_idents.len() {
133        panic!(
134            "#[implement_processor_swapping] detected a mismatch between the number of generic type parameters ({}) and processor fields ({}). Expected them to be equal.",
135            type_param_idents.len(),
136            processor_field_idents.len()
137        );
138    }
139    // --- Trait Impls ---
140    let mut trait_impls = Vec::new();
141    if !type_param_idents.is_empty() {
142        // Reverse
143        let mut rev_types = type_param_idents.clone();
144        rev_types.reverse();
145        let mut rev_source_fields = processor_field_idents.clone();
146        rev_source_fields.reverse();
147        let rev_assignments = processor_field_idents.iter().zip(rev_source_fields.iter()).map(|(dest, src)| {
148            quote!{ #dest: self.#src }
149        });
150        trait_impls.push(quote!{
151            impl #impl_generics type_flow_traits::Reverse for #struct_name #ty_generics #where_clause {
152                type Output = #struct_name<#(#rev_types),*>;
153                fn reverse(self) -> Self::Output {
154                    Self::Output {
155                        _marker: std::marker::PhantomData,
156                        #(#rev_assignments),*
157                    }
158                }
159            }
160        });
161        // ShiftLeft
162        let mut sl_types = type_param_idents.clone();
163        sl_types.rotate_left(1);
164        let mut sl_source_fields = processor_field_idents.clone();
165        sl_source_fields.rotate_left(1);
166        let sl_assignments = processor_field_idents.iter().zip(sl_source_fields.iter()).map(|(dest, src)| {
167            quote!{ #dest: self.#src }
168        });
169        trait_impls.push(quote!{
170            impl #impl_generics type_flow_traits::ShiftLeft for #struct_name #ty_generics #where_clause {
171                type ShiftedLeft = #struct_name<#(#sl_types),*>;
172                fn shift_left(self) -> Self::ShiftedLeft {
173                    Self::ShiftedLeft {
174                        _marker: std::marker::PhantomData,
175                        #(#sl_assignments),*
176                    }
177                }
178            }
179        });
180        // ShiftRight
181        let mut sr_types = type_param_idents.clone();
182        sr_types.rotate_right(1);
183        let mut sr_source_fields = processor_field_idents.clone();
184        sr_source_fields.rotate_right(1);
185        let sr_assignments = processor_field_idents.iter().zip(sr_source_fields.iter()).map(|(dest, src)| {
186            quote!{ #dest: self.#src }
187        });
188        trait_impls.push(quote!{
189            impl #impl_generics type_flow_traits::ShiftRight for #struct_name #ty_generics #where_clause {
190                type ShiftedRight = #struct_name<#(#sr_types),*>;
191                fn shift_right(self) -> Self::ShiftedRight {
192                    Self::ShiftedRight {
193                        _marker: std::marker::PhantomData,
194                        #(#sr_assignments),*
195                    }
196                }
197            }
198        });
199        // SwapStartEnd
200        let mut sse_types = type_param_idents.clone();
201        let sse_length = sse_types.len();
202        if sse_length > 1 { sse_types.swap(0, sse_length - 1); }
203        let mut sse_source_fields = processor_field_idents.clone();
204        let sse_source_length = sse_source_fields.len();
205        if sse_source_length > 1 { sse_source_fields.swap(0, sse_source_length - 1); }
206        let sse_assignments = processor_field_idents.iter().zip(sse_source_fields.iter()).map(|(dest, src)| {
207            quote!{ #dest: self.#src }
208        });
209        trait_impls.push(quote!{
210            impl #impl_generics type_flow_traits::SwapStartEnd for #struct_name #ty_generics #where_clause {
211                type Output = #struct_name<#(#sse_types),*>;
212                fn swap(self) -> Self::Output {
213                    Self::Output {
214                        _marker: std::marker::PhantomData,
215                        #(#sse_assignments),*
216                    }
217                }
218            }
219        });
220    }
221    let num_processors = type_param_idents.len();
222    for i in 0..num_processors {
223        let mut process_indexes_before_pivot = Vec::new();
224        let mut process_indexes_after_pivot = Vec::new();
225        let mut process_indexes_before_interleave_index = Vec::new();
226        let mut process_indexes_after_interleave_index = Vec::new();
227        for j in 0..num_processors {
228            if i != j {
229                let mut swapped_generic_params_for_type = type_param_idents.clone();
230                swapped_generic_params_for_type.swap(i, j);
231                let field_i_name = processor_field_idents[i];
232                let field_j_name = processor_field_idents[j];
233                let mut current_field_initializers = Vec::new();
234                for k in 0..num_processors {
235                    let current_field_name = processor_field_idents[k];
236                    if k == i {
237                        current_field_initializers.push(quote! { #current_field_name: self.#field_j_name });
238                    } else if k == j {
239                        current_field_initializers.push(quote! { #current_field_name: self.#field_i_name });
240                    } else {
241                        current_field_initializers.push(quote! { #current_field_name: self.#current_field_name });
242                    }
243                }
244                // SwapArbitraryProcessors
245                trait_impls.push(quote! {
246                    impl #impl_generics type_flow_traits::SwapArbitraryProcessors<#i, #j> for #struct_name #ty_generics #where_clause {
247                        type SwappedOutput = #struct_name<#(#swapped_generic_params_for_type),*>;
248                        fn swap_processors(self) -> Self::SwappedOutput {
249                            #struct_name {
250                                _marker: std::marker::PhantomData,
251                                #( #current_field_initializers ),*
252                            }
253                        }
254                    }
255                });
256                // PivotSpin picks an index of a processor to keep in its current location and spins all the other processors around its index.
257                // PivotSwap picks an index of a processor to keep in its current location and swaps the two sections to either side, preserving their order within their section.
258                match j.cmp(&i) {
259                    Ordering::Equal => {},
260                    Ordering::Less => { process_indexes_before_pivot.push(j); },
261                    Ordering::Greater => { process_indexes_after_pivot.push(j); },
262                }
263            }
264            // InterleavePivotSpin picks an index between processors and reorders the processors in the pipeline by spinning around the interleaved index.
265            // InterleavePivotSwap picks an index between processors and reorders the processors in the pipeline by swapping the two sections to either side of the interleaved index, preserving their order within their section.
266            if i != num_processors - 1 {
267                let even_j_index = j * 2;
268                let odd_i_index = i * 2 + 1;
269                match even_j_index.cmp(&odd_i_index) {
270                    Ordering::Equal => {},
271                    Ordering::Less => { process_indexes_before_interleave_index.push(j); },
272                    Ordering::Greater => { process_indexes_after_interleave_index.push(j); },
273                }
274            }
275        }
276        //Implement the Pivot Interleave Spin and swap traits
277        //for swap you need reverse order of the indexes because you are using them like a stack
278        let mut swap_indexes_before_pivot = process_indexes_before_pivot.clone();
279        swap_indexes_before_pivot.reverse();
280        let mut swap_indexes_after_pivot = process_indexes_after_pivot.clone();
281        swap_indexes_after_pivot.reverse();
282        let mut swap_assignments = Vec::new();
283        let mut spin_assignments = Vec::new();
284        let pivot_side_length_difference = process_indexes_before_pivot.len() as isize - process_indexes_after_pivot.len() as isize;
285        if 0 == pivot_side_length_difference {
286            for _ in 0..process_indexes_after_pivot.len() {
287                swap_assignments.push(swap_indexes_after_pivot.pop().unwrap());
288                spin_assignments.push(process_indexes_after_pivot.pop().unwrap());
289            }
290            swap_assignments.push(swap_indexes_before_pivot.len());
291            spin_assignments.push(process_indexes_before_pivot.len());
292            for _ in 0..process_indexes_before_pivot.len() {
293                swap_assignments.push(swap_indexes_before_pivot.pop().unwrap());
294                spin_assignments.push(process_indexes_before_pivot.pop().unwrap());
295            }
296        } else if 0 > pivot_side_length_difference {
297            let mut swap_hanging_off_front = Vec::new();
298            let mut spin_hanging_off_front = Vec::new();
299            for _ in 0..pivot_side_length_difference.abs() as usize {
300                swap_hanging_off_front.push(swap_indexes_after_pivot.pop().unwrap());
301                spin_hanging_off_front.push(process_indexes_after_pivot.pop().unwrap());
302            }
303            swap_hanging_off_front.reverse();
304            spin_hanging_off_front.reverse();
305            for _ in 0..process_indexes_after_pivot.len() {
306                swap_assignments.push(swap_indexes_after_pivot.pop().unwrap());
307                spin_assignments.push(process_indexes_after_pivot.pop().unwrap());
308            }
309            swap_assignments.push(swap_indexes_before_pivot.len());
310            spin_assignments.push(process_indexes_before_pivot.len());
311            for _ in 0..process_indexes_before_pivot.len() {
312                swap_assignments.push(swap_indexes_before_pivot.pop().unwrap());
313                spin_assignments.push(process_indexes_before_pivot.pop().unwrap());
314            }
315            for _ in 0..spin_hanging_off_front.len() {
316                swap_assignments.push(swap_hanging_off_front.pop().unwrap());
317                spin_assignments.push(spin_hanging_off_front.pop().unwrap());
318            }
319        } else {
320            let mut swap_hanging_off_back = Vec::new();
321            let mut spin_hanging_off_back = Vec::new();
322            swap_indexes_before_pivot.reverse();
323            process_indexes_before_pivot.reverse();
324            for _ in 0..pivot_side_length_difference.abs() as usize {
325                swap_hanging_off_back.push(swap_indexes_before_pivot.pop().unwrap());
326                spin_hanging_off_back.push(process_indexes_before_pivot.pop().unwrap());
327            }
328            swap_indexes_before_pivot.reverse();
329            process_indexes_before_pivot.reverse();
330            for _ in 0..spin_hanging_off_back.len() {
331                swap_assignments.push(swap_hanging_off_back.pop().unwrap());
332                spin_assignments.push(spin_hanging_off_back.pop().unwrap());
333            }
334            for _ in 0..process_indexes_after_pivot.len() {
335                swap_assignments.push(swap_indexes_after_pivot.pop().unwrap());
336                spin_assignments.push(process_indexes_after_pivot.pop().unwrap());
337            }
338            swap_assignments.push(swap_assignments.len());
339            spin_assignments.push(spin_assignments.len());
340            for _ in 0..process_indexes_before_pivot.len() {
341                swap_assignments.push(swap_indexes_before_pivot.pop().unwrap());
342                spin_assignments.push(process_indexes_before_pivot.pop().unwrap());
343            }
344        }
345        if num_processors != spin_assignments.len() {
346            panic!("swap_assignments.len() != num_processors");
347        }
348        let mut spun_generic_params_for_type = Vec::new();
349        let mut swapped_generic_params_for_type = Vec::new();
350        let mut spun_field_initializers = Vec::new();
351        let mut swapped_field_initializes = Vec::new();
352        for k in 0..num_processors {
353            spun_generic_params_for_type.push(type_param_idents[spin_assignments[k]]);
354            swapped_generic_params_for_type.push(type_param_idents[swap_assignments[k]]);
355            let current_field_name = processor_field_idents[k];
356            let spun_name = processor_field_idents[spin_assignments[k]];
357            let swapped_name = processor_field_idents[swap_assignments[k]];
358            spun_field_initializers.push(quote! { #current_field_name: self.#spun_name });
359            swapped_field_initializes.push(quote! { #current_field_name: self.#swapped_name });
360        }
361        trait_impls.push(quote! {
362            impl #impl_generics type_flow_traits::PivotSpin<#i> for #struct_name #ty_generics #where_clause {
363                type PivotedSpinOutput = #struct_name<#(#spun_generic_params_for_type),*>;
364                fn pivot_spin(self) -> Self::PivotedSpinOutput {
365                    #struct_name {
366                        _marker: std::marker::PhantomData,
367                        #( #spun_field_initializers ),*
368                    }
369                }
370            }
371        });
372        if i < num_processors - 1 && i > 0 {
373            trait_impls.push(quote! {
374                impl #impl_generics type_flow_traits::PivotSwap<#i> for #struct_name #ty_generics #where_clause {
375                    type PivotedSwapOutput = #struct_name<#(#swapped_generic_params_for_type),*>;
376                    fn pivot_swap(self) -> Self::PivotedSwapOutput {
377                        #struct_name {
378                            _marker: std::marker::PhantomData,
379                            #( #swapped_field_initializes ),*
380                        }
381                    }
382                }
383            });
384        }
385        if i != num_processors - 1 {
386            let total_length = process_indexes_before_interleave_index.len() + process_indexes_after_interleave_index.len();
387            if num_processors != total_length {
388                panic!("Wrong Total Length For Interleave");
389            }
390
391            let mut swap_indexes_before_interleave = process_indexes_before_interleave_index.clone();
392            swap_indexes_before_interleave.reverse();
393            let mut swap_indexes_after_interleave = process_indexes_after_interleave_index.clone();
394            swap_indexes_after_interleave.reverse();
395            let mut swap_assignments_interleave = Vec::new();
396            let mut spin_assignments_interleave = Vec::new();
397            let interleave_side_length_difference = swap_indexes_before_interleave.len() as isize - swap_indexes_after_interleave.len() as isize;
398            if 0 == interleave_side_length_difference {
399                for _ in 0..process_indexes_after_interleave_index.len() {
400                    swap_assignments_interleave.push(swap_indexes_after_interleave.pop().unwrap());
401                    spin_assignments_interleave.push(process_indexes_after_interleave_index.pop().unwrap());
402                }
403                for _ in 0..process_indexes_before_interleave_index.len() {
404                    swap_assignments_interleave.push(swap_indexes_before_interleave.pop().unwrap());
405                    spin_assignments_interleave.push(process_indexes_before_interleave_index.pop().unwrap());
406                }
407            }else if 0 > interleave_side_length_difference {
408                let mut swap_hanging_off_front = Vec::new();
409                let mut spin_hanging_off_front = Vec::new();
410                for _ in 0..interleave_side_length_difference.abs() as usize {
411                    swap_hanging_off_front.push(swap_indexes_after_interleave.pop().unwrap());
412                    spin_hanging_off_front.push(process_indexes_after_interleave_index.pop().unwrap());
413                }
414                swap_hanging_off_front.reverse();
415                spin_hanging_off_front.reverse();
416                for _ in 0..process_indexes_after_interleave_index.len() {
417                    swap_assignments_interleave.push(swap_indexes_after_interleave.pop().unwrap());
418                    spin_assignments_interleave.push(process_indexes_after_interleave_index.pop().unwrap());
419                }
420                for _ in 0..process_indexes_before_interleave_index.len() {
421                    swap_assignments_interleave.push(swap_indexes_before_interleave.pop().unwrap());
422                    spin_assignments_interleave.push(process_indexes_before_interleave_index.pop().unwrap());
423                }
424                for _ in 0..spin_hanging_off_front.len() {
425                    swap_assignments_interleave.push(swap_hanging_off_front.pop().unwrap());
426                    spin_assignments_interleave.push(spin_hanging_off_front.pop().unwrap());
427                }
428            } else {
429                let mut swap_hanging_off_back = Vec::new();
430                let mut spin_hanging_off_back = Vec::new();
431                swap_indexes_before_interleave.reverse();
432                process_indexes_before_interleave_index.reverse();
433                for _ in 0..interleave_side_length_difference.abs() as usize {
434                    swap_hanging_off_back.push(swap_indexes_before_interleave.pop().unwrap());
435                    spin_hanging_off_back.push(process_indexes_before_interleave_index.pop().unwrap());
436                }
437                swap_indexes_before_interleave.reverse();
438                process_indexes_before_interleave_index.reverse();
439                for _ in 0..spin_hanging_off_back.len() {
440                    swap_assignments_interleave.push(swap_hanging_off_back.pop().unwrap());
441                    spin_assignments_interleave.push(spin_hanging_off_back.pop().unwrap());
442                }
443                for _ in 0..process_indexes_after_interleave_index.len() {
444                    swap_assignments_interleave.push(swap_indexes_after_interleave.pop().unwrap());
445                    spin_assignments_interleave.push(process_indexes_after_interleave_index.pop().unwrap());
446                }
447                for _ in 0..process_indexes_before_interleave_index.len() {
448                    swap_assignments_interleave.push(swap_indexes_before_interleave.pop().unwrap());
449                    spin_assignments_interleave.push(process_indexes_before_interleave_index.pop().unwrap());
450                }
451            }
452            if num_processors != spin_assignments_interleave.len() || num_processors != swap_assignments_interleave.len(){
453                panic!("swap_assignments_interleave.len() != num_processors");
454            }
455            let mut spun_interleave_generic_params_for_type = Vec::new();
456            let mut swapped_interleave_generic_params_for_type = Vec::new();
457            let mut spun_interleave_field_initializers = Vec::new();
458            let mut swapped_interleave_field_initializes = Vec::new();
459            for k in 0..num_processors {
460                spun_interleave_generic_params_for_type.push(type_param_idents[spin_assignments_interleave[k]]);
461                swapped_interleave_generic_params_for_type.push(type_param_idents[swap_assignments_interleave[k]]);
462                let current_field_name = processor_field_idents[k];
463                let spun_name = processor_field_idents[spin_assignments_interleave[k]];
464                let swapped_name = processor_field_idents[swap_assignments_interleave[k]];
465                spun_interleave_field_initializers.push(quote! { #current_field_name: self.#spun_name });
466                swapped_interleave_field_initializes.push(quote! { #current_field_name: self.#swapped_name });
467            }
468            trait_impls.push(quote! {
469                impl #impl_generics type_flow_traits::InterleavePivotSpin<#i> for #struct_name #ty_generics #where_clause {
470                    type InterleavePivotSpinOutput = #struct_name<#(#spun_interleave_generic_params_for_type),*>;
471                    fn interleaved_pivot_spin(self) -> Self::InterleavePivotSpinOutput {
472                        #struct_name {
473                            _marker: std::marker::PhantomData,
474                            #( #spun_interleave_field_initializers ),*
475                        }
476                    }
477                }
478            });
479            trait_impls.push(quote! {
480                impl #impl_generics type_flow_traits::InterleavePivotSwap<#i> for #struct_name #ty_generics #where_clause {
481                    type InterleavePivotSwapOutput = #struct_name<#(#swapped_interleave_generic_params_for_type),*>;
482                    fn interleaved_pivot_swap(self) -> Self::InterleavePivotSwapOutput {
483                        #struct_name {
484                            _marker: std::marker::PhantomData,
485                            #( #swapped_interleave_field_initializes ),*
486                        }
487                    }
488                }
489            });
490        }
491    }
492    let expanded = quote! {
493        #input_struct_item
494        #(#trait_impls)*
495    };
496    proc_macro::TokenStream::from(expanded)
497}
498
499struct TypeFLowInplaceStatefulProcessorPipeline {
500    pipeline_name: Ident,
501    data_type: Type,
502    error_type: Type,
503    number_of_processors: usize,
504}
505impl Parse for TypeFLowInplaceStatefulProcessorPipeline {
506    fn parse(input: ParseStream) -> syn::Result<Self> {
507        let pipeline_name: Ident = input.parse()?;
508        input.parse::<Token![,]>()?;
509        let data_type: Type = input.parse()?;
510        input.parse::<Token![,]>()?;
511        let error_type: Type = input.parse()?;
512        input.parse::<Token![,]>()?;
513        let lit_int: syn::LitInt = input.parse()?;
514        let number_of_processors = lit_int.base10_parse::<usize>()?;
515        Ok(TypeFLowInplaceStatefulProcessorPipeline{
516            pipeline_name,
517            data_type,
518            error_type,
519            number_of_processors,
520        })
521    }
522}
523
524#[proc_macro]
525pub fn type_flow_inplace_stateful_processor_pipeline_by_count(input: TokenStream) -> TokenStream {
526    let TypeFLowInplaceStatefulProcessorPipeline { pipeline_name, data_type, error_type, number_of_processors } = parse_macro_input!(input as TypeFLowInplaceStatefulProcessorPipeline);
527    let generic_params: Vec<_> = (0..number_of_processors)
528        .map(|i| format_ident!("P{}", i))
529        .collect();
530    let field_names: Vec<_> = (0..number_of_processors)
531        .map(|i| format_ident!("p{}", i))
532        .collect();
533    let fields_with_types: Vec<_> = field_names.iter().zip(generic_params.iter())
534        .map(|(field, param)| quote! { #field: #param })
535        .collect();
536    let processor_args = fields_with_types.into_iter().reduce(|acc, item| {
537        quote! { #acc, #item }
538    });
539    let expanded = quote! {
540        type_flow_macros::type_flow_inplace_stateful_processor_pipeline!(
541            #pipeline_name,
542            #data_type,
543            #error_type,
544            #processor_args
545        );
546    };
547    TokenStream::from(expanded)
548}
549
550
551#[proc_macro]
552pub fn inplace_stateful_processor_pipeline_with_index(input: TokenStream) -> TokenStream { 
553    let ProcessorInplacePipeline { pipeline_name, data_type, error_type, processors } = parse_macro_input!(input as ProcessorInplacePipeline);
554    let struct_fields = processors.iter().enumerate().map(|(idx, processor_type)| {
555        let type_str = get_type_name(processor_type);
556        let field_name = format_ident!("processor_{}_{}", type_str, idx);
557        quote! { #field_name: #processor_type }
558    });
559    let constructor_params = processors.iter().enumerate().map(|(idx, processor_type)| {
560        let type_str = get_type_name(processor_type);
561        let param_name = format_ident!("processor_{}_{}", type_str, idx);
562        quote! { #param_name: #processor_type }
563    });
564    let field_initializers = processors.iter().enumerate().map(|(idx, processor_type)| {
565        let type_str = get_type_name(processor_type);
566        let field_name = format_ident!("processor_{}_{}", type_str, idx);
567        quote! { #field_name }
568    });
569    let process_implementation = processors.iter().enumerate().map(|(idx, processor_type)| {
570        let type_str = get_type_name(processor_type);
571        let field_name = format_ident!("processor_{}_{}", type_str, idx);
572        quote! { self.#field_name.process(data)?; }
573    });
574    let expanded = quote! {
575        pub struct #pipeline_name {
576            #(#struct_fields,)*
577        }
578        impl #pipeline_name {
579            pub fn new(#(#constructor_params,)*) -> Self {
580                Self {
581                    #(#field_initializers,)*
582                }
583            }
584        }
585        impl crate::InPlaceStatefulProcessor<#data_type, #error_type> for #pipeline_name {
586            fn process(&mut self, data: &mut #data_type) -> Result<(), #error_type> {
587                #(#process_implementation)*
588                Ok(())
589            }
590        }
591    };
592    TokenStream::from(expanded)
593}
594
595// Helper function to extract a usable identifier from a Type
596fn get_type_name(ty: &syn::Type) -> String {
597    match ty {
598        syn::Type::Path(type_path) if !type_path.path.segments.is_empty() => {
599            // Get the last segment of the path (e.g., for std::string::String, get "String")
600            let segment = type_path.path.segments.last().unwrap();
601            let name = segment.ident.to_string();
602
603            // Filter out any characters that aren't valid in an identifier
604            name.chars()
605                .map(|c| if c.is_alphanumeric() { c } else { '_' })
606                .collect()
607        },
608        // Handle other types (arrays, references, etc.) by using a generic name
609        _ => "unknown_type".to_string(),
610    }
611}