futuresdr_macros/
lib.rs

1//! Macros to make working with FutureSDR a bit nicer.
2use proc_macro::TokenStream;
3use quote::quote;
4use syn::Attribute;
5use syn::Data;
6use syn::DeriveInput;
7use syn::Fields;
8use syn::GenericParam;
9use syn::Ident;
10use syn::Index;
11use syn::Meta;
12use syn::PathArguments;
13use syn::Result;
14use syn::Token;
15use syn::Type;
16use syn::bracketed;
17use syn::parse::Parse;
18use syn::parse::ParseStream;
19use syn::parse_macro_input;
20use syn::punctuated::Punctuated;
21use syn::token;
22
23/// Avoid boilerplate when setting up the flowgraph.
24///
25/// This macro simplifies adding blocks to the flowgraph and connecting them.
26/// Assume you have created a flowgraph `fg` and several blocks (`src`, `shift`,
27/// ...) and need to add the block to the flowgraph and connect them. Using the
28/// `connect!` macro, this can be done with:
29///
30/// ```ignore
31/// connect!(fg,
32///     src.out > shift.in;
33///     shift > resamp1 > demod;
34///     demod > resamp2 > snk;
35/// );
36/// ```
37///
38/// It generates the following code:
39///
40/// ```ignore
41/// // Add all the blocks to the `Flowgraph`...
42/// let src = fg.add_block(src)?;
43/// let shift = fg.add_block(shift)?;
44/// let resamp1 = fg.add_block(resamp1)?;
45/// let demod = fg.add_block(demod)?;
46/// let resamp2 = fg.add_block(resamp2)?;
47/// let snk = fg.add_block(snk)?;
48///
49/// // ... and connect the ports appropriately
50/// fg.connect_stream(src, "out", shift, "in")?;
51/// fg.connect_stream(shift, "out", resamp1, "in")?;
52/// fg.connect_stream(resamp1, "out", demod, "in")?;
53/// fg.connect_stream(demod, "out", resamp2, "in")?;
54/// fg.connect_stream(resamp2, "out", snk, "in")?;
55/// ```
56///
57/// Connections endpoints are defined by `block.port_name`. Standard names
58/// (i.e., `out`/`in`) can be omitted. When ports have different name than
59/// standard `in` and `out`, one can use following notation.
60///
61/// Stream connections are indicated as `>`, while message connections are
62/// indicated as `|`.
63///
64/// If a block uses non-standard port names it is possible to use triples, e.g.:
65///
66/// ```ignore
67/// connect!(fg, src > input.foo.output > snk);
68/// ```
69///
70/// It is possible to add blocks that have no connections by just putting them
71/// on a line separately.
72///
73/// ```ignore
74/// connect!(fg, dummy);
75/// ```
76#[proc_macro]
77pub fn connect(input: TokenStream) -> TokenStream {
78    let connect_input = parse_macro_input!(input as ConnectInput);
79    // dbg!(&connect_input);
80    let fg = connect_input.flowgraph;
81
82    let mut blocks: Vec<Ident> = Vec::new();
83    let mut connections = Vec::new();
84
85    // Collect all blocks and generate connections
86    for conn in connect_input.connection_strings.iter() {
87        let src_block = &conn.source.block;
88        blocks.push(src_block.clone());
89
90        let mut src_block = &conn.source.block;
91        let mut src_port = &conn.source.output;
92
93        for (connection_type, dst) in &conn.connections {
94            blocks.push(dst.block.clone());
95
96            let out = match connection_type {
97                ConnectionType::Stream => {
98                    let src_port = match src_port {
99                        Some(Port { name, index: None }) => {
100                            quote! { #name() }
101                        }
102                        Some(Port {
103                            name,
104                            index: Some(i),
105                        }) => {
106                            quote! { #name().get_mut(#i).unwrap() }
107                        }
108                        None => {
109                            quote!(output())
110                        }
111                    };
112                    let dst_port = match &dst.input {
113                        Some(Port { name, index: None }) => {
114                            quote! { #name() }
115                        }
116                        Some(Port {
117                            name,
118                            index: Some(i),
119                        }) => {
120                            quote! { #name().get_mut(#i).unwrap() }
121                        }
122                        None => {
123                            quote!(input())
124                        }
125                    };
126                    let dst_block = &dst.block;
127                    quote! {
128                        #fg.connect_stream(#src_block.get()?.#src_port, #dst_block.get()?.#dst_port);
129                    }
130                }
131                ConnectionType::Circuit => {
132                    let src_port = match src_port {
133                        Some(Port { name, index: None }) => {
134                            quote! { #name() }
135                        }
136                        Some(Port {
137                            name,
138                            index: Some(i),
139                        }) => {
140                            quote! { #name().get_mut(#i).unwrap() }
141                        }
142                        None => {
143                            quote!(output())
144                        }
145                    };
146                    let dst_port = match &dst.input {
147                        Some(Port { name, index: None }) => {
148                            quote! { #name() }
149                        }
150                        Some(Port {
151                            name,
152                            index: Some(i),
153                        }) => {
154                            quote! { #name().get_mut(#i).unwrap() }
155                        }
156                        None => {
157                            quote!(input())
158                        }
159                    };
160                    let dst_block = &dst.block;
161                    quote! {
162                        #src_block.get()?.#src_port.close_circuit(#dst_block.get()?.#dst_port);
163                    }
164                }
165                ConnectionType::Message => {
166                    let src_port = if let Some(p) = &src_port {
167                        let src_port = p.name.to_string();
168                        quote! { #src_port }
169                    } else {
170                        quote!("out")
171                    };
172                    let dst_port = if let Some(p) = &dst.input {
173                        let dst_port = p.name.to_string();
174                        quote! { #dst_port }
175                    } else {
176                        quote!("in")
177                    };
178                    let dest_block = &dst.block;
179                    quote! {
180                        #fg.connect_message(&#src_block, #src_port, &#dest_block, #dst_port)?;
181                    }
182                }
183            };
184            connections.push(out);
185            src_block = &dst.block;
186            src_port = &dst.output;
187        }
188    }
189
190    // Deduplicate blocks
191    blocks.sort_by_key(|b| b.to_string());
192    blocks.dedup();
193
194    // Generate block declarations
195    let block_decls = blocks.iter().map(|block| {
196        quote! {
197            let #block = #fg.add(#block);
198        }
199    });
200
201    let out = quote! {
202        use futuresdr::runtime::BlockId;
203        use futuresdr::runtime::BlockRef;
204        use futuresdr::runtime::Flowgraph;
205        use futuresdr::runtime::Kernel;
206        use futuresdr::runtime::KernelInterface;
207        use std::result::Result;
208
209        pub trait AddToFg<K: Kernel + KernelInterface + 'static> {
210            fn add_to_fg(self, fg: &mut Flowgraph) -> BlockRef<K>;
211        }
212        impl<K: Kernel + KernelInterface + 'static> AddToFg<K> for K {
213            fn add_to_fg(self, fg: &mut Flowgraph) -> BlockRef<K> {
214                fg.add_block(self)
215            }
216        }
217        impl<K: Kernel + KernelInterface + 'static> AddToFg<K> for BlockRef<K> {
218            fn add_to_fg(self, _fg: &mut Flowgraph) -> BlockRef<K> {
219                self
220            }
221        }
222        pub trait FgOps {
223            fn add<T, K>(&mut self, item: T) -> BlockRef<K>
224            where
225                T: AddToFg<K>,
226                K: Kernel + KernelInterface + 'static;
227        }
228        impl FgOps for Flowgraph {
229            fn add<T, K>(&mut self, item: T) -> BlockRef<K>
230            where
231                T: AddToFg<K>,
232                K: Kernel + KernelInterface + 'static,
233            {
234                item.add_to_fg(self)
235            }
236        }
237
238        #(#block_decls)*
239        #(#connections)*
240        (#(#blocks),*)
241    };
242
243    let out = quote![
244        #[allow(unused_variables)]
245        let (#(#blocks),*) = {
246            #out
247        };
248    ];
249
250    // let tmp = quote!(fn foo() { #out });
251    // println!("{}", pretty_print(&tmp));
252    // println!("{}", &out);
253    out.into()
254}
255
256// full macro input
257#[derive(Debug)]
258struct ConnectInput {
259    flowgraph: Ident,
260    _comma: Token![,],
261    connection_strings: Punctuated<ConnectionString, Token![;]>,
262}
263impl Parse for ConnectInput {
264    fn parse(input: ParseStream) -> Result<Self> {
265        Ok(ConnectInput {
266            flowgraph: input.parse()?,
267            _comma: input.parse()?,
268            connection_strings: Punctuated::parse_terminated(input)?,
269        })
270    }
271}
272
273// connection line in the macro input
274#[derive(Debug)]
275struct ConnectionString {
276    source: Source,
277    connections: Vec<(ConnectionType, Endpoint)>,
278}
279impl Parse for ConnectionString {
280    fn parse(input: ParseStream) -> Result<Self> {
281        let source: Source = input.parse()?;
282        let mut connections = Vec::new();
283
284        while let Ok(ct) = input.parse::<ConnectionType>() {
285            let dest: Endpoint = input.parse()?;
286            connections.push((ct, dest));
287        }
288
289        Ok(ConnectionString {
290            source,
291            connections,
292        })
293    }
294}
295
296#[derive(Debug)]
297enum ConnectionType {
298    Stream,
299    Message,
300    Circuit,
301}
302
303impl Parse for ConnectionType {
304    fn parse(input: ParseStream) -> Result<Self> {
305        if input.peek(Token![>]) {
306            input.parse::<Token![>]>()?;
307            Ok(Self::Stream)
308        } else if input.peek(Token![|]) {
309            input.parse::<Token![|]>()?;
310            Ok(Self::Message)
311        } else if input.peek(Token![<]) {
312            input.parse::<Token![<]>()?;
313            Ok(Self::Circuit)
314        } else {
315            Err(input.error("expected `>` or `|` to specify the connection type"))
316        }
317    }
318}
319
320#[derive(Debug)]
321struct Source {
322    block: Ident,
323    output: Option<Port>,
324}
325impl Parse for Source {
326    fn parse(input: ParseStream) -> Result<Self> {
327        let block: Ident = input.parse()?;
328        if input.peek(Token![.]) {
329            input.parse::<Token![.]>()?;
330            let port: Port = input.parse()?;
331            Ok(Self {
332                block,
333                output: Some(port),
334            })
335        } else {
336            Ok(Self {
337                block,
338                output: None,
339            })
340        }
341    }
342}
343
344// connection endpoint is a block with input and output ports
345#[derive(Debug)]
346struct Endpoint {
347    block: Ident,
348    input: Option<Port>,
349    output: Option<Port>,
350}
351impl Parse for Endpoint {
352    fn parse(input: ParseStream) -> Result<Self> {
353        let first: Port = input.parse()?;
354
355        // there is only one identifier, it has to be the block
356        if !input.peek(Token![.]) {
357            if first.index.is_none() {
358                return Ok(Self {
359                    block: first.name,
360                    input: None,
361                    output: None,
362                });
363            } else {
364                return Err(input.error("expected endpoint, got only port"));
365            }
366        }
367
368        input.parse::<Token![.]>()?;
369        let block: Ident = input.parse()?;
370
371        if !input.peek(Token![.]) {
372            return Ok(Self {
373                block,
374                input: Some(first),
375                output: None,
376            });
377        }
378
379        input.parse::<Token![.]>()?;
380        let second: Port = input.parse()?;
381
382        Ok(Self {
383            block,
384            input: Some(first),
385            output: Some(second),
386        })
387    }
388}
389
390// input or output port
391#[derive(Debug)]
392struct Port {
393    name: Ident,
394    index: Option<Index>,
395}
396impl Parse for Port {
397    fn parse(input: ParseStream) -> Result<Self> {
398        let name: Ident = input.parse()?;
399        let index = if input.peek(token::Bracket) {
400            let content;
401            bracketed!(content in input);
402            Some(content.parse()?)
403        } else {
404            None
405        };
406        Ok(Port { name, index })
407    }
408}
409
410/// Check for  `#[input]` attribute
411fn has_input_attr(attrs: &[Attribute]) -> bool {
412    attrs.iter().any(|attr| attr.path().is_ident("input"))
413}
414/// Check for  `#[output]` attribute
415fn has_output_attr(attrs: &[Attribute]) -> bool {
416    attrs.iter().any(|attr| attr.path().is_ident("output"))
417}
418/// Check if parameter is a Vec
419fn is_vec(type_path: &syn::TypePath) -> bool {
420    if type_path.path.segments.len() != 1 {
421        return false;
422    }
423
424    let segment = &type_path.path.segments[0];
425    if segment.ident != "Vec" {
426        return false;
427    }
428
429    matches!(segment.arguments, PathArguments::AngleBracketed(_))
430}
431
432//=========================================================================
433// BLOCK MACRO
434//=========================================================================
435/// Block Macro
436#[proc_macro_derive(
437    Block,
438    attributes(
439        input,
440        output,
441        message_inputs,
442        message_outputs,
443        blocking,
444        type_name,
445        null_kernel
446    )
447)]
448pub fn derive_block(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
449    let input = parse_macro_input!(input as DeriveInput);
450    let struct_name = &input.ident;
451    let generics = &input.generics;
452    let where_clause = &input.generics.where_clause;
453
454    let mut message_inputs: Vec<Ident> = Vec::new();
455    let mut message_input_names: Vec<String> = Vec::new();
456    let mut message_output_names: Vec<String> = Vec::new();
457    let mut kernel = quote! {};
458    let mut blocking = quote! { false };
459    let mut type_name = struct_name.to_string();
460
461    // remove defaults from generics
462    let mut generics = generics.clone();
463    for param in &mut generics.params {
464        match param {
465            GenericParam::Type(type_param) => {
466                type_param.default = None;
467            }
468            GenericParam::Const(const_param) => {
469                const_param.default = None;
470            }
471            GenericParam::Lifetime(_) => {}
472        }
473    }
474
475    let unconstraint_params = generics.params.iter().map(|param| match param {
476        GenericParam::Type(ty) => {
477            let ident = &ty.ident;
478            quote! { #ident }
479        }
480        GenericParam::Lifetime(lt) => {
481            let lifetime = &lt.lifetime;
482            quote! { #lifetime }
483        }
484        GenericParam::Const(c) => {
485            let ident = &c.ident;
486            quote! { #ident }
487        }
488    });
489
490    // Surround the parameters with angle brackets if they exist
491    let unconstraint_generics = if generics.params.is_empty() {
492        quote! {}
493    } else {
494        quote! { <#(#unconstraint_params),*> }
495    };
496
497    // Parse Struct
498    let struct_data = match input.data {
499        Data::Struct(data) => data,
500        _ => {
501            return syn::Error::new_spanned(input.ident, "Block can only be derived for structs")
502                .to_compile_error()
503                .into();
504        }
505    };
506
507    let stream_inputs = match struct_data.fields {
508        Fields::Named(ref fields) => {
509            fields
510                .named
511                .iter()
512                .filter_map(|field| {
513                    // Check if field has #[input] attribute
514                    if !field.attrs.iter().any(|attr| attr.path().is_ident("input")) {
515                        return None;
516                    }
517
518                    let field_name = field.ident.as_ref().unwrap();
519                    let field_name_str = field_name.to_string();
520
521                    match &field.ty {
522                        // Handle Vec<T>
523                        Type::Path(type_path) if is_vec(type_path) => {
524                            let name_code = quote! {
525                                for i in 0..self.#field_name.len() {
526                                    names.push(format!("{}[{}]", #field_name_str, i));
527                                }
528                            };
529                            let init_code = quote! {
530                                for i in 0..self.#field_name.len() {
531                                    self.#field_name[i].init(block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
532                                }
533                            };
534                            let validate_code = quote! {
535                                for i in 0..self.#field_name.len() {
536                                    self.#field_name[i].validate()?;
537                                }
538                            };
539                            let notify_code = quote! {
540                                for i in 0..self.#field_name.len() {
541                                    self.#field_name[i].notify_finished().await;
542                                }
543                            };
544                            let finish_code = quote! {
545                                for (i, _) in self.#field_name.iter_mut().enumerate() {
546                                    if port == format!("{}[{}]", #field_name_str, i) {
547                                        self.#field_name[i].finish();
548                                        return Ok(());
549                                    }
550                                }
551                            };
552                            let get_input_code = quote! {
553                                for (i, _) in self.#field_name.iter_mut().enumerate() {
554                                    if name == format!("{}[{}]", #field_name_str, i) {
555                                        return Some(&mut self.#field_name[i]);
556                                    }
557                                }
558                            };
559                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
560                        }
561                        // Handle arrays [T; N]
562                        Type::Array(array) => {
563                            let len = &array.len;
564                            let name_code = quote! {
565                                for i in 0..#len {
566                                    names.push(format!("{}[{}]", #field_name_str, i));
567                                }
568                            };
569                            let init_code = quote! {
570                                for i in 0..#len {
571                                    self.#field_name[i].init(block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
572                                }
573                            };
574                            let validate_code = quote! {
575                                for i in 0..#len {
576                                    self.#field_name[i].validate()?;
577                                }
578                            };
579                            let notify_code = quote! {
580                                for i in 0..#len {
581                                    self.#field_name[i].notify_finished().await;
582                                }
583                            };
584                            let finish_code = quote! {
585                                for (i, _) in self.#field_name.iter_mut().enumerate() {
586                                    if port == format!("{}[{}]", #field_name_str, i) {
587                                        self.#field_name[i].finish();
588                                        return Ok(());
589                                    }
590                                }
591                            };
592                            let get_input_code = quote! {
593                                for (i, _) in self.#field_name.iter_mut().enumerate() {
594                                    if name == format!("{}[{}]", #field_name_str, i) {
595                                        return Some(&mut self.#field_name[i]);
596                                    }
597                                }
598                            };
599                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
600                        }
601                        // Handle tuples (T1, T2, ...)
602                        Type::Tuple(tuple) => {
603                            let len = tuple.elems.len();
604                            let name_code = quote! {
605                                for i in 0..#len {
606                                    names.push(format!("{}.{}", #field_name_str, i));
607                                }
608                            };
609                            let init_code = tuple.elems.iter().enumerate().map(|(i, _)| {
610                                let index = syn::Index::from(i);
611                                quote! {
612                                    self.#field_name.#index.init(block_id, PortId::new(format!("{}.{}", #field_name_str, #index)), inbox.clone());
613                                }
614                            });
615                            let init_code = quote! {
616                                #(#init_code)*
617                            };
618                            let validate_code = tuple.elems.iter().enumerate().map(|(i, _)| {
619                                let index = syn::Index::from(i);
620                                quote! {
621                                    self.#field_name.#index.validate()?;
622                                }
623                            });
624                            let validate_code = quote! {
625                                #(#validate_code)*
626                            };
627                            let notify_code = tuple.elems.iter().enumerate().map(|(i, _)| {
628                                let index = syn::Index::from(i);
629                                quote! {
630                                    self.#field_name.#index.notify_finished().await;
631                                }
632                            });
633                            let notify_code = quote! {
634                                #(#notify_code)*
635                            };
636                            let finish_code = tuple.elems.iter().enumerate().map(|(i, _)| {
637                                let index = syn::Index::from(i);
638                                quote!{
639                                    if port == format!("{}.{}", #field_name_str, #index) {
640                                        self.#field_name.#index.finish();
641                                        return Ok(());
642                                    }
643                                }
644                            });
645                            let finish_code = quote! {
646                                #(#finish_code)*
647                            };
648                            let get_input_code = tuple.elems.iter().enumerate().map(|(i, _)| {
649                                let index = syn::Index::from(i);
650                                quote!{
651                                    if name == format!("{}.{}", #field_name_str, #index) {
652                                        return Some(&mut self.#field_name.#index);
653                                    }
654                                }
655                            });
656                            let get_input_code = quote! {
657                                #(#get_input_code)*
658                            };
659                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
660                        }
661                        // Handle normal types
662                        _ => {
663                            let name_code = quote! {
664                                names.push(#field_name_str.to_string());
665                            };
666                            let init_code = quote! {
667                                self.#field_name.init(block_id, PortId::new(#field_name_str.to_string()), inbox.clone());
668                            };
669                            let validate_code = quote! {
670                                self.#field_name.validate()?;
671                            };
672                            let notify_code = quote! {
673                                self.#field_name.notify_finished().await;
674                            };
675                            let finish_code = quote! {
676                                if port == #field_name_str {
677                                    self.#field_name.finish();
678                                    return Ok(());
679                                }
680                            };
681                            let get_input_code = quote! {
682                                if name == #field_name_str {
683                                    return Some(&mut self.#field_name)
684                                }
685                            };
686                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
687                        }
688                    }
689                })
690                .collect::<Vec<_>>()
691        }
692        _ => Vec::new(),
693    };
694
695    let stream_inputs_names = stream_inputs
696        .iter()
697        .map(|x| x.0.clone())
698        .collect::<Vec<_>>();
699    let stream_inputs_init = stream_inputs
700        .iter()
701        .map(|x| x.1.clone())
702        .collect::<Vec<_>>();
703    let stream_inputs_validate = stream_inputs
704        .iter()
705        .map(|x| x.2.clone())
706        .collect::<Vec<_>>();
707    let stream_inputs_notify = stream_inputs
708        .iter()
709        .map(|x| x.3.clone())
710        .collect::<Vec<_>>();
711    let stream_inputs_finish = stream_inputs
712        .iter()
713        .map(|x| x.4.clone())
714        .collect::<Vec<_>>();
715    let stream_inputs_get = stream_inputs
716        .iter()
717        .map(|x| x.5.clone())
718        .collect::<Vec<_>>();
719
720    let stream_outputs = match struct_data.fields {
721        Fields::Named(ref fields) => {
722            fields
723                .named
724                .iter()
725                .filter_map(|field| {
726                    // Check if field has #[input] attribute
727                    if !field.attrs.iter().any(|attr| attr.path().is_ident("output")) {
728                        return None;
729                    }
730
731                    let field_name = field.ident.as_ref().unwrap();
732                    let field_name_str = field_name.to_string();
733
734                    match &field.ty {
735                        // Handle Vec<T>
736                        Type::Path(type_path) if is_vec(type_path) => {
737                            let name_code = quote! {
738                                for i in 0..self.#field_name.len() {
739                                    names.push(format!("{}[{}]", #field_name_str, i));
740                                }
741                            };
742                            let init_code = quote! {
743                                for i in 0..self.#field_name.len() {
744                                    self.#field_name[i].init(block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
745                                }
746                            };
747                            let validate_code = quote! {
748                                for i in 0..self.#field_name.len() {
749                                    self.#field_name[i].validate()?;
750                                }
751                            };
752                            let notify_code = quote! {
753                                for i in 0..self.#field_name.len() {
754                                    self.#field_name[i].notify_finished().await;
755                                }
756                            };
757                            let connect_code = quote! {
758                                for (i, _) in self.#field_name.iter_mut().enumerate() {
759                                    if name == format!("{}[{}]", #field_name_str, i) {
760                                        return self.#field_name[i].connect_dyn(reader);
761                                    }
762                                }
763                            };
764                            Some((name_code, init_code, validate_code, notify_code, connect_code))
765                        }
766                        // Handle arrays [T; N]
767                        Type::Array(array) => {
768                            let len = &array.len;
769                            let name_code = quote! {
770                                for i in 0..#len {
771                                    names.push(format!("{}[{}]", #field_name_str, i));
772                                }
773                            };
774                            let init_code = quote! {
775                                for i in 0..#len {
776                                    self.#field_name[i].init(block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
777                                }
778                            };
779                            let validate_code = quote! {
780                                for i in 0..#len {
781                                    self.#field_name[i].validate()?;
782                                }
783                            };
784                            let notify_code = quote! {
785                                for i in 0..#len {
786                                    self.#field_name[i].notify_finished().await;
787                                }
788                            };
789                            let connect_code = quote! {
790                                for (i, _) in self.#field_name.iter_mut().enumerate() {
791                                    if name == format!("{}[{}]", #field_name_str, i) {
792                                        return self.#field_name[i].connect_dyn(reader);
793                                    }
794                                }
795                            };
796                            Some((name_code, init_code, validate_code, notify_code, connect_code))
797                        }
798                        // Handle tuples (T1, T2, ...)
799                        Type::Tuple(tuple) => {
800                            let len = tuple.elems.len();
801                            let name_code = quote! {
802                                for i in 0..#len {
803                                    names.push(format!("{}.{}", #field_name_str, i));
804                                }
805                            };
806                            let init_code = tuple.elems.iter().enumerate().map(|(i, _)| {
807                                let index = syn::Index::from(i);
808                                quote! {
809                                    self.#field_name.#index.init(block_id, PortId::new(format!("{}.{}", #field_name_str, #index)), inbox.clone());
810                                }
811                            });
812                            let init_code = quote! {
813                                #(#init_code)*
814                            };
815                            let validate_code = tuple.elems.iter().enumerate().map(|(i, _)| {
816                                let index = syn::Index::from(i);
817                                quote! {
818                                    self.#field_name.#index.validate()?;
819                                }
820                            });
821                            let validate_code = quote! {
822                                #(#validate_code)*
823                            };
824                            let notify_code = tuple.elems.iter().enumerate().map(|(i, _)| {
825                                let index = syn::Index::from(i);
826                                quote! {
827                                    self.#field_name.#index.notify_finished().await;
828                                }
829                            });
830                            let notify_code = quote! {
831                                #(#notify_code)*
832                            };
833                            let connect_code = tuple.elems.iter().enumerate().map(|(i, _)| {
834                                let index = syn::Index::from(i);
835                                quote!{
836                                    if name == format!("{}.{}", #field_name_str, #index) {
837                                        return self.#field_name.#index.connect_dyn(reader);
838                                    }
839                                }
840                            });
841                            let connect_code = quote! {
842                                #(#connect_code)*
843                            };
844                            Some((name_code, init_code, validate_code, notify_code, connect_code))
845                        }
846                        // Handle normal types
847                        _ => {
848                            let name_code = quote! {
849                                names.push(#field_name_str.to_string());
850                            };
851                            let init_code = quote! {
852                                self.#field_name.init(block_id, PortId::new(#field_name_str.to_string()), inbox.clone());
853                            };
854                            let validate_code = quote! {
855                                self.#field_name.validate()?;
856                            };
857                            let notify_code = quote! {
858                                self.#field_name.notify_finished().await;
859                            };
860                            let connect_code = quote! {
861                                if name == #field_name_str {
862                                    return self.#field_name.connect_dyn(reader);
863                                }
864                            };
865                            Some((name_code, init_code, validate_code, notify_code, connect_code))
866                        }
867                    }
868                })
869                .collect::<Vec<_>>()
870        }
871        _ => Vec::new(),
872    };
873
874    let stream_outputs_names = stream_outputs
875        .iter()
876        .map(|x| x.0.clone())
877        .collect::<Vec<_>>();
878    let stream_outputs_init = stream_outputs
879        .iter()
880        .map(|x| x.1.clone())
881        .collect::<Vec<_>>();
882    let stream_outputs_validate = stream_outputs
883        .iter()
884        .map(|x| x.2.clone())
885        .collect::<Vec<_>>();
886    let stream_outputs_notify = stream_outputs
887        .iter()
888        .map(|x| x.3.clone())
889        .collect::<Vec<_>>();
890    let stream_outputs_connect = stream_outputs
891        .iter()
892        .map(|x| x.4.clone())
893        .collect::<Vec<_>>();
894
895    // Collect the names and types of fields that have the #[input] or #[output] attribute
896    let (port_idents, port_types): (Vec<Ident>, Vec<Type>) = match struct_data.fields {
897        Fields::Named(ref fields_named) => fields_named
898            .named
899            .iter()
900            .filter_map(|field| {
901                if has_input_attr(&field.attrs) || has_output_attr(&field.attrs) {
902                    let ident = field.ident.clone().unwrap();
903                    let ty = field.ty.clone();
904                    Some((ident, ty))
905                } else {
906                    None
907                }
908            })
909            .unzip(),
910        Fields::Unnamed(_) | Fields::Unit => (Vec::new(), Vec::new()),
911    };
912    let port_getter_fns = port_idents
913        .iter()
914        .zip(port_types.iter())
915        .map(|(ident, ty)| {
916            quote! {
917                /// Getter for stream port.
918                pub fn #ident(&mut self) -> &mut #ty {
919                    &mut self.#ident
920                }
921            }
922        });
923
924    // Search for struct attributes
925    for attr in &input.attrs {
926        if attr.path().is_ident("message_inputs") {
927            let nested = attr
928                .parse_args_with(
929                    syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
930                )
931                .unwrap();
932            for m in nested {
933                match m {
934                    Meta::NameValue(m) => {
935                        message_inputs.push(m.path.get_ident().unwrap().clone());
936                        if let syn::Expr::Lit(syn::ExprLit {
937                            lit: syn::Lit::Str(s),
938                            ..
939                        }) = m.value
940                        {
941                            message_input_names.push(s.value());
942                        } else {
943                            panic!(
944                                "message handlers have to be an identifier or identifier = \"port name\""
945                            );
946                        }
947                    }
948                    Meta::Path(p) => {
949                        let p = p.get_ident().unwrap();
950                        message_inputs.push(p.clone());
951                        message_input_names.push(p.to_string());
952                    }
953                    _ => {
954                        panic!("message inputs has to be a list of name-values or paths")
955                    }
956                }
957            }
958        } else if attr.path().is_ident("message_outputs") {
959            let nested = attr
960                .parse_args_with(
961                    syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
962                )
963                .unwrap();
964            for m in nested {
965                match m {
966                    Meta::Path(p) => {
967                        let p = p.get_ident().unwrap();
968                        message_output_names.push(p.to_string());
969                    }
970                    _ => {
971                        panic!("message outputs has to be a list of paths")
972                    }
973                }
974            }
975        } else if attr.path().is_ident("null_kernel") {
976            kernel = quote! {
977                #[doc(hidden)]
978                impl #generics ::futuresdr::runtime::Kernel for #struct_name #generics
979                    #where_clause { }
980
981            }
982        } else if attr.path().is_ident("blocking") {
983            blocking = quote! { true }
984        } else if attr.path().is_ident("type_name") {
985            let nested = attr
986                .parse_args_with(
987                    syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
988                )
989                .unwrap();
990            if let Some(Meta::Path(p)) = nested.get(0) {
991                type_name = p.get_ident().unwrap().to_string();
992            } else {
993                panic!("type_name attribute should be in the form type_name(foo)");
994            }
995        }
996    }
997
998    // Generate handler names as strings
999    let message_input_names = message_input_names.into_iter().map(|handler| {
1000        let handler = if let Some(stripped) = handler.strip_prefix("r#") {
1001            stripped.to_string()
1002        } else {
1003            handler
1004        };
1005        quote! {
1006            #handler
1007        }
1008    });
1009
1010    // Generate match arms for the handle method
1011    let handler_matches =
1012        message_inputs
1013            .iter()
1014            .zip(message_input_names.clone())
1015            .map(|(handler, handler_name)| {
1016                quote! {
1017                    #handler_name  => self.#handler(io, mio, meta, p).await,
1018                }
1019            });
1020
1021    let expanded = quote! {
1022
1023        impl #generics #struct_name #unconstraint_generics
1024            #where_clause
1025        {
1026            #(#port_getter_fns)*
1027        }
1028
1029        impl #generics ::futuresdr::runtime::KernelInterface for #struct_name #unconstraint_generics
1030            #where_clause
1031        {
1032            fn is_blocking() -> bool {
1033                #blocking
1034            }
1035            fn type_name() -> &'static str {
1036                static TYPE_NAME: &str = #type_name;
1037                TYPE_NAME
1038            }
1039            fn stream_inputs(&self) -> Vec<String> {
1040                let mut names = vec![];
1041                #(#stream_inputs_names)*
1042                names
1043            }
1044            fn stream_outputs(&self) -> Vec<String> {
1045                let mut names = vec![];
1046                #(#stream_outputs_names)*
1047                names
1048            }
1049
1050            fn stream_ports_init(&mut self, block_id: ::futuresdr::runtime::BlockId, inbox: ::futuresdr::channel::mpsc::Sender<::futuresdr::runtime::BlockMessage>) {
1051                use ::futuresdr::runtime::PortId;
1052                #(#stream_inputs_init)*
1053                #(#stream_outputs_init)*
1054            }
1055            fn stream_ports_validate(&self) -> ::futuresdr::runtime::Result<(), ::futuresdr::runtime::Error> {
1056                use ::futuresdr::runtime::PortId;
1057                #(#stream_inputs_validate)*
1058                #(#stream_outputs_validate)*
1059                Ok(())
1060            }
1061            fn stream_input_finish(&mut self, port_id: ::futuresdr::runtime::PortId) -> ::futuresdr::runtime::Result<(), futuresdr::runtime::Error> {
1062                use ::futuresdr::runtime::Error;
1063                use ::futuresdr::runtime::BlockPortCtx;
1064                let port = port_id.name();
1065                #(#stream_inputs_finish)*
1066                Err(Error::InvalidMessagePort(BlockPortCtx::None, port_id))
1067            }
1068            async fn stream_ports_notify_finished(&mut self) {
1069                #(#stream_inputs_notify)*
1070                #(#stream_outputs_notify)*
1071            }
1072            fn stream_input(&mut self, name: &str) -> Option<&mut dyn ::futuresdr::runtime::buffer::BufferReader> {
1073                #(#stream_inputs_get)*
1074                None
1075            }
1076            fn connect_stream_output(&mut self, name: &str, reader: &mut dyn ::futuresdr::runtime::buffer::BufferReader) -> ::futuresdr::runtime::Result<(), ::futuresdr::runtime::Error> {
1077                use ::futuresdr::runtime::Error;
1078                use ::futuresdr::runtime::BlockPortCtx;
1079                #(#stream_outputs_connect)*
1080                Err(Error::InvalidStreamPort(BlockPortCtx::None, name.into()))
1081            }
1082
1083            fn message_inputs() -> &'static[&'static str] {
1084                static MESSAGE_INPUTS: &[&str] = &[#(#message_input_names),*];
1085                MESSAGE_INPUTS
1086            }
1087            fn message_outputs() -> &'static[&'static str] {
1088                static MESSAGE_OUTPUTS: &[&str] = &[#(#message_output_names),*];
1089                MESSAGE_OUTPUTS
1090            }
1091            async fn call_handler(
1092                &mut self,
1093                io: &mut ::futuresdr::runtime::WorkIo,
1094                mio: &mut ::futuresdr::runtime::MessageOutputs,
1095                meta: &mut ::futuresdr::runtime::BlockMeta,
1096                id: ::futuresdr::runtime::PortId,
1097                p: ::futuresdr::runtime::Pmt) ->
1098                    ::futuresdr::runtime::Result<::futuresdr::runtime::Pmt, ::futuresdr::runtime::Error> {
1099                        use ::futuresdr::runtime::BlockPortCtx;
1100                        use ::futuresdr::runtime::Error;
1101                        use ::futuresdr::runtime::Pmt;
1102                        use ::futuresdr::runtime::PortId;
1103                        use ::futuresdr::runtime::Result;
1104                        let ret: Result<Pmt> = match id.name() {
1105                                #(#handler_matches)*
1106                                _ => return Err(Error::InvalidMessagePort(
1107                                    BlockPortCtx::None,
1108                                    id)),
1109                        };
1110
1111                        #[allow(unreachable_code)]
1112                        ret.map_err(|e| Error::HandlerError(e.to_string()))
1113            }
1114        }
1115
1116        #kernel
1117    };
1118    // println!("{}", pretty_print(&expanded));
1119    proc_macro::TokenStream::from(expanded)
1120}
1121
1122#[allow(dead_code)]
1123fn pretty_print(ts: &proc_macro2::TokenStream) -> String {
1124    let syntax_tree = syn::parse2(ts.clone()).unwrap();
1125    prettyplease::unparse(&syntax_tree)
1126}
1127
1128//=========================================================================
1129// ASYNC_TRAIT
1130//=========================================================================
1131
1132/// Custom version of async_trait that uses non-send futures for WASM.
1133#[proc_macro_attribute]
1134pub fn async_trait(
1135    _attr: proc_macro::TokenStream,
1136    fun: proc_macro::TokenStream,
1137) -> proc_macro::TokenStream {
1138    let fun: proc_macro2::TokenStream = fun.into();
1139    quote!(
1140        #[cfg_attr(not(target_arch = "wasm32"), futuresdr::macros::async_trait_orig)]
1141        #[cfg_attr(target_arch = "wasm32", futuresdr::macros::async_trait_orig(?Send))]
1142        #fun
1143    )
1144    .into()
1145}