1use proc_macro::TokenStream;
2use std::cmp::Ordering;
3use quote::{quote, format_ident};
4use syn::{parse_macro_input, Ident, Type, parse::Parse, parse::ParseStream, Token, ItemStruct, FieldsNamed, Fields};
5
6struct ProcessorPipeline {
7 pipeline_name: Ident,
8 data_type: Type,
9 processors: Vec<Type>,
10}
11
12impl Parse for ProcessorPipeline {
13 fn parse(input: ParseStream) -> syn::Result<Self> {
14 let pipeline_name: Ident = input.parse()?;
15 input.parse::<Token![,]>()?;
16 let data_type: Type = input.parse()?;
17 input.parse::<Token![,]>()?;
18 let mut processors = Vec::new();
19 while !input.is_empty() {
21 let processor_type: Type = input.parse()?;
22 processors.push(processor_type);
23 if input.is_empty() {
24 break;
25 }
26 input.parse::<Token![,]>()?;
27 }
28 Ok(ProcessorPipeline {
29 pipeline_name,
30 data_type,
31 processors,
32 })
33 }
34}
35
36#[proc_macro]
37pub fn stateful_processor_pipeline_with_index(input: TokenStream) -> TokenStream {
38 let ProcessorPipeline { pipeline_name, data_type, processors } = parse_macro_input!(input as ProcessorPipeline);
39 let struct_fields = processors.iter().enumerate().map(|(idx, processor_type)| {
40 let type_str = get_type_name(processor_type);
42 let field_name = format_ident!("processor_{}_{}", type_str, idx);
43 quote! { #field_name: #processor_type }
44 });
45 let constructor_params = processors.iter().enumerate().map(|(idx, processor_type)| {
46 let type_str = get_type_name(processor_type);
47 let param_name = format_ident!("processor_{}_{}", type_str, idx);
48 quote! { #param_name: #processor_type }
49 });
50 let field_initializers = processors.iter().enumerate().map(|(idx, processor_type)| {
51 let type_str = get_type_name(processor_type);
52 let field_name = format_ident!("processor_{}_{}", type_str, idx);
53 quote! { #field_name }
54 });
55 let process_implementation = processors.iter().enumerate().map(|(idx, processor_type)| {
56 let type_str = get_type_name(processor_type);
57 let field_name = format_ident!("processor_{}_{}", type_str, idx);
58 quote! { data = self.#field_name.process(data); }
59 });
60 let expanded = quote! {
61 pub struct #pipeline_name {
62 #(#struct_fields,)*
63 }
64 impl #pipeline_name {
65 pub fn new(#(#constructor_params,)*) -> Self {
66 Self {
67 #(#field_initializers,)*
68 }
69 }
70 }
71 impl StatefulProcessor<#data_type> for #pipeline_name {
72 fn process(&mut self, mut data: #data_type) -> #data_type {
73 #(#process_implementation)*
74 data
75 }
76 }
77 };
78 TokenStream::from(expanded)
79}
80
81struct ProcessorInplacePipeline {
82 pipeline_name: Ident,
83 data_type: Type,
84 error_type: Type,
85 processors: Vec<Type>,
86}
87
88impl Parse for ProcessorInplacePipeline {
89 fn parse(input: ParseStream) -> syn::Result<Self> {
90 let pipeline_name: Ident = input.parse()?;
91 input.parse::<Token![,]>()?;
92 let data_type: Type = input.parse()?;
93 input.parse::<Token![,]>()?;
94 let error_type: Type = input.parse()?;
95 input.parse::<Token![,]>()?;
96 let mut processors = Vec::new();
97 while !input.is_empty() {
99 let processor_type: Type = input.parse()?;
100 processors.push(processor_type);
101
102 if input.is_empty() {
103 break;
104 }
105 input.parse::<Token![,]>()?;
106 }
107 Ok(ProcessorInplacePipeline{
108 pipeline_name,
109 data_type,
110 error_type,
111 processors,
112 })
113 }
114}
115
116
117#[proc_macro_attribute]
118pub fn implement_processor_swapping(_attr: proc_macro::TokenStream, item: proc_macro::TokenStream) -> proc_macro::TokenStream {
119 let input_struct_item = parse_macro_input!(item as ItemStruct);
120 let struct_name = &input_struct_item.ident;
121 let generics = &input_struct_item.generics;
122 let (impl_generics, ty_generics, where_clause) = generics.split_for_impl();
123 let type_param_idents: Vec<&Ident> = generics.type_params().map(|tp| &tp.ident).collect();
124 let fields = match &input_struct_item.fields {
125 Fields::Named(FieldsNamed { named, .. }) => named,
126 _ => panic!("#[implement_processor_swapping] only works on structs with named fields."),
127 };
128 let processor_field_idents: Vec<&Ident> = fields.iter()
129 .filter_map(|f| f.ident.as_ref())
130 .filter(|&id| id.to_string() != "_marker")
131 .collect();
132 if type_param_idents.len() != processor_field_idents.len() {
133 panic!(
134 "#[implement_processor_swapping] detected a mismatch between the number of generic type parameters ({}) and processor fields ({}). Expected them to be equal.",
135 type_param_idents.len(),
136 processor_field_idents.len()
137 );
138 }
139 let mut trait_impls = Vec::new();
141 if !type_param_idents.is_empty() {
142 let mut rev_types = type_param_idents.clone();
144 rev_types.reverse();
145 let mut rev_source_fields = processor_field_idents.clone();
146 rev_source_fields.reverse();
147 let rev_assignments = processor_field_idents.iter().zip(rev_source_fields.iter()).map(|(dest, src)| {
148 quote!{ #dest: self.#src }
149 });
150 trait_impls.push(quote!{
151 impl #impl_generics type_flow_traits::Reverse for #struct_name #ty_generics #where_clause {
152 type Output = #struct_name<#(#rev_types),*>;
153 fn reverse(self) -> Self::Output {
154 Self::Output {
155 _marker: std::marker::PhantomData,
156 #(#rev_assignments),*
157 }
158 }
159 }
160 });
161 let mut sl_types = type_param_idents.clone();
163 sl_types.rotate_left(1);
164 let mut sl_source_fields = processor_field_idents.clone();
165 sl_source_fields.rotate_left(1);
166 let sl_assignments = processor_field_idents.iter().zip(sl_source_fields.iter()).map(|(dest, src)| {
167 quote!{ #dest: self.#src }
168 });
169 trait_impls.push(quote!{
170 impl #impl_generics type_flow_traits::ShiftLeft for #struct_name #ty_generics #where_clause {
171 type ShiftedLeft = #struct_name<#(#sl_types),*>;
172 fn shift_left(self) -> Self::ShiftedLeft {
173 Self::ShiftedLeft {
174 _marker: std::marker::PhantomData,
175 #(#sl_assignments),*
176 }
177 }
178 }
179 });
180 let mut sr_types = type_param_idents.clone();
182 sr_types.rotate_right(1);
183 let mut sr_source_fields = processor_field_idents.clone();
184 sr_source_fields.rotate_right(1);
185 let sr_assignments = processor_field_idents.iter().zip(sr_source_fields.iter()).map(|(dest, src)| {
186 quote!{ #dest: self.#src }
187 });
188 trait_impls.push(quote!{
189 impl #impl_generics type_flow_traits::ShiftRight for #struct_name #ty_generics #where_clause {
190 type ShiftedRight = #struct_name<#(#sr_types),*>;
191 fn shift_right(self) -> Self::ShiftedRight {
192 Self::ShiftedRight {
193 _marker: std::marker::PhantomData,
194 #(#sr_assignments),*
195 }
196 }
197 }
198 });
199 let mut sse_types = type_param_idents.clone();
201 let sse_length = sse_types.len();
202 if sse_length > 1 { sse_types.swap(0, sse_length - 1); }
203 let mut sse_source_fields = processor_field_idents.clone();
204 let sse_source_length = sse_source_fields.len();
205 if sse_source_length > 1 { sse_source_fields.swap(0, sse_source_length - 1); }
206 let sse_assignments = processor_field_idents.iter().zip(sse_source_fields.iter()).map(|(dest, src)| {
207 quote!{ #dest: self.#src }
208 });
209 trait_impls.push(quote!{
210 impl #impl_generics type_flow_traits::SwapStartEnd for #struct_name #ty_generics #where_clause {
211 type Output = #struct_name<#(#sse_types),*>;
212 fn swap(self) -> Self::Output {
213 Self::Output {
214 _marker: std::marker::PhantomData,
215 #(#sse_assignments),*
216 }
217 }
218 }
219 });
220 }
221 let num_processors = type_param_idents.len();
222 for i in 0..num_processors {
223 let mut process_indexes_before_pivot = Vec::new();
224 let mut process_indexes_after_pivot = Vec::new();
225 let mut process_indexes_before_interleave_index = Vec::new();
226 let mut process_indexes_after_interleave_index = Vec::new();
227 for j in 0..num_processors {
228 if i != j {
229 let mut swapped_generic_params_for_type = type_param_idents.clone();
230 swapped_generic_params_for_type.swap(i, j);
231 let field_i_name = processor_field_idents[i];
232 let field_j_name = processor_field_idents[j];
233 let mut current_field_initializers = Vec::new();
234 for k in 0..num_processors {
235 let current_field_name = processor_field_idents[k];
236 if k == i {
237 current_field_initializers.push(quote! { #current_field_name: self.#field_j_name });
238 } else if k == j {
239 current_field_initializers.push(quote! { #current_field_name: self.#field_i_name });
240 } else {
241 current_field_initializers.push(quote! { #current_field_name: self.#current_field_name });
242 }
243 }
244 trait_impls.push(quote! {
246 impl #impl_generics type_flow_traits::SwapArbitraryProcessors<#i, #j> for #struct_name #ty_generics #where_clause {
247 type SwappedOutput = #struct_name<#(#swapped_generic_params_for_type),*>;
248 fn swap_processors(self) -> Self::SwappedOutput {
249 #struct_name {
250 _marker: std::marker::PhantomData,
251 #( #current_field_initializers ),*
252 }
253 }
254 }
255 });
256 match j.cmp(&i) {
259 Ordering::Equal => {},
260 Ordering::Less => { process_indexes_before_pivot.push(j); },
261 Ordering::Greater => { process_indexes_after_pivot.push(j); },
262 }
263 }
264 if i != num_processors - 1 {
267 let even_j_index = j * 2;
268 let odd_i_index = i * 2 + 1;
269 match even_j_index.cmp(&odd_i_index) {
270 Ordering::Equal => {},
271 Ordering::Less => { process_indexes_before_interleave_index.push(j); },
272 Ordering::Greater => { process_indexes_after_interleave_index.push(j); },
273 }
274 }
275 }
276 let mut swap_indexes_before_pivot = process_indexes_before_pivot.clone();
279 swap_indexes_before_pivot.reverse();
280 let mut swap_indexes_after_pivot = process_indexes_after_pivot.clone();
281 swap_indexes_after_pivot.reverse();
282 let mut swap_assignments = Vec::new();
283 let mut spin_assignments = Vec::new();
284 let pivot_side_length_difference = process_indexes_before_pivot.len() as isize - process_indexes_after_pivot.len() as isize;
285 if 0 == pivot_side_length_difference {
286 for _ in 0..process_indexes_after_pivot.len() {
287 swap_assignments.push(swap_indexes_after_pivot.pop().unwrap());
288 spin_assignments.push(process_indexes_after_pivot.pop().unwrap());
289 }
290 swap_assignments.push(swap_indexes_before_pivot.len());
291 spin_assignments.push(process_indexes_before_pivot.len());
292 for _ in 0..process_indexes_before_pivot.len() {
293 swap_assignments.push(swap_indexes_before_pivot.pop().unwrap());
294 spin_assignments.push(process_indexes_before_pivot.pop().unwrap());
295 }
296 } else if 0 > pivot_side_length_difference {
297 let mut swap_hanging_off_front = Vec::new();
298 let mut spin_hanging_off_front = Vec::new();
299 for _ in 0..pivot_side_length_difference.abs() as usize {
300 swap_hanging_off_front.push(swap_indexes_after_pivot.pop().unwrap());
301 spin_hanging_off_front.push(process_indexes_after_pivot.pop().unwrap());
302 }
303 swap_hanging_off_front.reverse();
304 spin_hanging_off_front.reverse();
305 for _ in 0..process_indexes_after_pivot.len() {
306 swap_assignments.push(swap_indexes_after_pivot.pop().unwrap());
307 spin_assignments.push(process_indexes_after_pivot.pop().unwrap());
308 }
309 swap_assignments.push(swap_indexes_before_pivot.len());
310 spin_assignments.push(process_indexes_before_pivot.len());
311 for _ in 0..process_indexes_before_pivot.len() {
312 swap_assignments.push(swap_indexes_before_pivot.pop().unwrap());
313 spin_assignments.push(process_indexes_before_pivot.pop().unwrap());
314 }
315 for _ in 0..spin_hanging_off_front.len() {
316 swap_assignments.push(swap_hanging_off_front.pop().unwrap());
317 spin_assignments.push(spin_hanging_off_front.pop().unwrap());
318 }
319 } else {
320 let mut swap_hanging_off_back = Vec::new();
321 let mut spin_hanging_off_back = Vec::new();
322 swap_indexes_before_pivot.reverse();
323 process_indexes_before_pivot.reverse();
324 for _ in 0..pivot_side_length_difference.abs() as usize {
325 swap_hanging_off_back.push(swap_indexes_before_pivot.pop().unwrap());
326 spin_hanging_off_back.push(process_indexes_before_pivot.pop().unwrap());
327 }
328 swap_indexes_before_pivot.reverse();
329 process_indexes_before_pivot.reverse();
330 for _ in 0..spin_hanging_off_back.len() {
331 swap_assignments.push(swap_hanging_off_back.pop().unwrap());
332 spin_assignments.push(spin_hanging_off_back.pop().unwrap());
333 }
334 for _ in 0..process_indexes_after_pivot.len() {
335 swap_assignments.push(swap_indexes_after_pivot.pop().unwrap());
336 spin_assignments.push(process_indexes_after_pivot.pop().unwrap());
337 }
338 swap_assignments.push(swap_assignments.len());
339 spin_assignments.push(spin_assignments.len());
340 for _ in 0..process_indexes_before_pivot.len() {
341 swap_assignments.push(swap_indexes_before_pivot.pop().unwrap());
342 spin_assignments.push(process_indexes_before_pivot.pop().unwrap());
343 }
344 }
345 if num_processors != spin_assignments.len() {
346 panic!("swap_assignments.len() != num_processors");
347 }
348 let mut spun_generic_params_for_type = Vec::new();
349 let mut swapped_generic_params_for_type = Vec::new();
350 let mut spun_field_initializers = Vec::new();
351 let mut swapped_field_initializes = Vec::new();
352 for k in 0..num_processors {
353 spun_generic_params_for_type.push(type_param_idents[spin_assignments[k]]);
354 swapped_generic_params_for_type.push(type_param_idents[swap_assignments[k]]);
355 let current_field_name = processor_field_idents[k];
356 let spun_name = processor_field_idents[spin_assignments[k]];
357 let swapped_name = processor_field_idents[swap_assignments[k]];
358 spun_field_initializers.push(quote! { #current_field_name: self.#spun_name });
359 swapped_field_initializes.push(quote! { #current_field_name: self.#swapped_name });
360 }
361 trait_impls.push(quote! {
362 impl #impl_generics type_flow_traits::PivotSpin<#i> for #struct_name #ty_generics #where_clause {
363 type PivotedSpinOutput = #struct_name<#(#spun_generic_params_for_type),*>;
364 fn pivot_spin(self) -> Self::PivotedSpinOutput {
365 #struct_name {
366 _marker: std::marker::PhantomData,
367 #( #spun_field_initializers ),*
368 }
369 }
370 }
371 });
372 if i < num_processors - 1 && i > 0 {
373 trait_impls.push(quote! {
374 impl #impl_generics type_flow_traits::PivotSwap<#i> for #struct_name #ty_generics #where_clause {
375 type PivotedSwapOutput = #struct_name<#(#swapped_generic_params_for_type),*>;
376 fn pivot_swap(self) -> Self::PivotedSwapOutput {
377 #struct_name {
378 _marker: std::marker::PhantomData,
379 #( #swapped_field_initializes ),*
380 }
381 }
382 }
383 });
384 }
385 if i != num_processors - 1 {
386 let total_length = process_indexes_before_interleave_index.len() + process_indexes_after_interleave_index.len();
387 if num_processors != total_length {
388 panic!("Wrong Total Length For Interleave");
389 }
390
391 let mut swap_indexes_before_interleave = process_indexes_before_interleave_index.clone();
392 swap_indexes_before_interleave.reverse();
393 let mut swap_indexes_after_interleave = process_indexes_after_interleave_index.clone();
394 swap_indexes_after_interleave.reverse();
395 let mut swap_assignments_interleave = Vec::new();
396 let mut spin_assignments_interleave = Vec::new();
397 let interleave_side_length_difference = swap_indexes_before_interleave.len() as isize - swap_indexes_after_interleave.len() as isize;
398 if 0 == interleave_side_length_difference {
399 for _ in 0..process_indexes_after_interleave_index.len() {
400 swap_assignments_interleave.push(swap_indexes_after_interleave.pop().unwrap());
401 spin_assignments_interleave.push(process_indexes_after_interleave_index.pop().unwrap());
402 }
403 for _ in 0..process_indexes_before_interleave_index.len() {
404 swap_assignments_interleave.push(swap_indexes_before_interleave.pop().unwrap());
405 spin_assignments_interleave.push(process_indexes_before_interleave_index.pop().unwrap());
406 }
407 }else if 0 > interleave_side_length_difference {
408 let mut swap_hanging_off_front = Vec::new();
409 let mut spin_hanging_off_front = Vec::new();
410 for _ in 0..interleave_side_length_difference.abs() as usize {
411 swap_hanging_off_front.push(swap_indexes_after_interleave.pop().unwrap());
412 spin_hanging_off_front.push(process_indexes_after_interleave_index.pop().unwrap());
413 }
414 swap_hanging_off_front.reverse();
415 spin_hanging_off_front.reverse();
416 for _ in 0..process_indexes_after_interleave_index.len() {
417 swap_assignments_interleave.push(swap_indexes_after_interleave.pop().unwrap());
418 spin_assignments_interleave.push(process_indexes_after_interleave_index.pop().unwrap());
419 }
420 for _ in 0..process_indexes_before_interleave_index.len() {
421 swap_assignments_interleave.push(swap_indexes_before_interleave.pop().unwrap());
422 spin_assignments_interleave.push(process_indexes_before_interleave_index.pop().unwrap());
423 }
424 for _ in 0..spin_hanging_off_front.len() {
425 swap_assignments_interleave.push(swap_hanging_off_front.pop().unwrap());
426 spin_assignments_interleave.push(spin_hanging_off_front.pop().unwrap());
427 }
428 } else {
429 let mut swap_hanging_off_back = Vec::new();
430 let mut spin_hanging_off_back = Vec::new();
431 swap_indexes_before_interleave.reverse();
432 process_indexes_before_interleave_index.reverse();
433 for _ in 0..interleave_side_length_difference.abs() as usize {
434 swap_hanging_off_back.push(swap_indexes_before_interleave.pop().unwrap());
435 spin_hanging_off_back.push(process_indexes_before_interleave_index.pop().unwrap());
436 }
437 swap_indexes_before_interleave.reverse();
438 process_indexes_before_interleave_index.reverse();
439 for _ in 0..spin_hanging_off_back.len() {
440 swap_assignments_interleave.push(swap_hanging_off_back.pop().unwrap());
441 spin_assignments_interleave.push(spin_hanging_off_back.pop().unwrap());
442 }
443 for _ in 0..process_indexes_after_interleave_index.len() {
444 swap_assignments_interleave.push(swap_indexes_after_interleave.pop().unwrap());
445 spin_assignments_interleave.push(process_indexes_after_interleave_index.pop().unwrap());
446 }
447 for _ in 0..process_indexes_before_interleave_index.len() {
448 swap_assignments_interleave.push(swap_indexes_before_interleave.pop().unwrap());
449 spin_assignments_interleave.push(process_indexes_before_interleave_index.pop().unwrap());
450 }
451 }
452 if num_processors != spin_assignments_interleave.len() || num_processors != swap_assignments_interleave.len(){
453 panic!("swap_assignments_interleave.len() != num_processors");
454 }
455 let mut spun_interleave_generic_params_for_type = Vec::new();
456 let mut swapped_interleave_generic_params_for_type = Vec::new();
457 let mut spun_interleave_field_initializers = Vec::new();
458 let mut swapped_interleave_field_initializes = Vec::new();
459 for k in 0..num_processors {
460 spun_interleave_generic_params_for_type.push(type_param_idents[spin_assignments_interleave[k]]);
461 swapped_interleave_generic_params_for_type.push(type_param_idents[swap_assignments_interleave[k]]);
462 let current_field_name = processor_field_idents[k];
463 let spun_name = processor_field_idents[spin_assignments_interleave[k]];
464 let swapped_name = processor_field_idents[swap_assignments_interleave[k]];
465 spun_interleave_field_initializers.push(quote! { #current_field_name: self.#spun_name });
466 swapped_interleave_field_initializes.push(quote! { #current_field_name: self.#swapped_name });
467 }
468 trait_impls.push(quote! {
469 impl #impl_generics type_flow_traits::InterleavePivotSpin<#i> for #struct_name #ty_generics #where_clause {
470 type InterleavePivotSpinOutput = #struct_name<#(#spun_interleave_generic_params_for_type),*>;
471 fn interleaved_pivot_spin(self) -> Self::InterleavePivotSpinOutput {
472 #struct_name {
473 _marker: std::marker::PhantomData,
474 #( #spun_interleave_field_initializers ),*
475 }
476 }
477 }
478 });
479 trait_impls.push(quote! {
480 impl #impl_generics type_flow_traits::InterleavePivotSwap<#i> for #struct_name #ty_generics #where_clause {
481 type InterleavePivotSwapOutput = #struct_name<#(#swapped_interleave_generic_params_for_type),*>;
482 fn interleaved_pivot_swap(self) -> Self::InterleavePivotSwapOutput {
483 #struct_name {
484 _marker: std::marker::PhantomData,
485 #( #swapped_interleave_field_initializes ),*
486 }
487 }
488 }
489 });
490 }
491 }
492 let expanded = quote! {
493 #input_struct_item
494 #(#trait_impls)*
495 };
496 proc_macro::TokenStream::from(expanded)
497}
498
499struct TypeFLowInplaceStatefulProcessorPipeline {
500 pipeline_name: Ident,
501 data_type: Type,
502 error_type: Type,
503 number_of_processors: usize,
504}
505impl Parse for TypeFLowInplaceStatefulProcessorPipeline {
506 fn parse(input: ParseStream) -> syn::Result<Self> {
507 let pipeline_name: Ident = input.parse()?;
508 input.parse::<Token![,]>()?;
509 let data_type: Type = input.parse()?;
510 input.parse::<Token![,]>()?;
511 let error_type: Type = input.parse()?;
512 input.parse::<Token![,]>()?;
513 let lit_int: syn::LitInt = input.parse()?;
514 let number_of_processors = lit_int.base10_parse::<usize>()?;
515 Ok(TypeFLowInplaceStatefulProcessorPipeline{
516 pipeline_name,
517 data_type,
518 error_type,
519 number_of_processors,
520 })
521 }
522}
523
524#[proc_macro]
525pub fn type_flow_inplace_stateful_processor_pipeline_by_count(input: TokenStream) -> TokenStream {
526 let TypeFLowInplaceStatefulProcessorPipeline { pipeline_name, data_type, error_type, number_of_processors } = parse_macro_input!(input as TypeFLowInplaceStatefulProcessorPipeline);
527 let generic_params: Vec<_> = (0..number_of_processors)
528 .map(|i| format_ident!("P{}", i))
529 .collect();
530 let field_names: Vec<_> = (0..number_of_processors)
531 .map(|i| format_ident!("p{}", i))
532 .collect();
533 let fields_with_types: Vec<_> = field_names.iter().zip(generic_params.iter())
534 .map(|(field, param)| quote! { #field: #param })
535 .collect();
536 let processor_args = fields_with_types.into_iter().reduce(|acc, item| {
537 quote! { #acc, #item }
538 });
539 let expanded = quote! {
540 type_flow_macros::type_flow_inplace_stateful_processor_pipeline!(
541 #pipeline_name,
542 #data_type,
543 #error_type,
544 #processor_args
545 );
546 };
547 TokenStream::from(expanded)
548}
549
550
551#[proc_macro]
552pub fn inplace_stateful_processor_pipeline_with_index(input: TokenStream) -> TokenStream {
553 let ProcessorInplacePipeline { pipeline_name, data_type, error_type, processors } = parse_macro_input!(input as ProcessorInplacePipeline);
554 let struct_fields = processors.iter().enumerate().map(|(idx, processor_type)| {
555 let type_str = get_type_name(processor_type);
556 let field_name = format_ident!("processor_{}_{}", type_str, idx);
557 quote! { #field_name: #processor_type }
558 });
559 let constructor_params = processors.iter().enumerate().map(|(idx, processor_type)| {
560 let type_str = get_type_name(processor_type);
561 let param_name = format_ident!("processor_{}_{}", type_str, idx);
562 quote! { #param_name: #processor_type }
563 });
564 let field_initializers = processors.iter().enumerate().map(|(idx, processor_type)| {
565 let type_str = get_type_name(processor_type);
566 let field_name = format_ident!("processor_{}_{}", type_str, idx);
567 quote! { #field_name }
568 });
569 let process_implementation = processors.iter().enumerate().map(|(idx, processor_type)| {
570 let type_str = get_type_name(processor_type);
571 let field_name = format_ident!("processor_{}_{}", type_str, idx);
572 quote! { self.#field_name.process(data)?; }
573 });
574 let expanded = quote! {
575 pub struct #pipeline_name {
576 #(#struct_fields,)*
577 }
578 impl #pipeline_name {
579 pub fn new(#(#constructor_params,)*) -> Self {
580 Self {
581 #(#field_initializers,)*
582 }
583 }
584 }
585 impl crate::InPlaceStatefulProcessor<#data_type, #error_type> for #pipeline_name {
586 fn process(&mut self, data: &mut #data_type) -> Result<(), #error_type> {
587 #(#process_implementation)*
588 Ok(())
589 }
590 }
591 };
592 TokenStream::from(expanded)
593}
594
595fn get_type_name(ty: &syn::Type) -> String {
597 match ty {
598 syn::Type::Path(type_path) if !type_path.path.segments.is_empty() => {
599 let segment = type_path.path.segments.last().unwrap();
601 let name = segment.ident.to_string();
602
603 name.chars()
605 .map(|c| if c.is_alphanumeric() { c } else { '_' })
606 .collect()
607 },
608 _ => "unknown_type".to_string(),
610 }
611}