Skip to main content

futuresdr_macros/
lib.rs

1//! Procedural macros for FutureSDR applications and custom blocks.
2//!
3//! The main entry points are:
4//!
5//! - `connect!`, which adds blocks to a flowgraph and wires stream, local
6//!   stream, message, and circuit connections.
7//! - `#[derive(Block)]`, which generates the runtime interface for block kernels.
8use proc_macro::TokenStream;
9use quote::quote;
10use syn::Attribute;
11use syn::Data;
12use syn::DeriveInput;
13use syn::Fields;
14use syn::GenericArgument;
15use syn::GenericParam;
16use syn::Ident;
17use syn::Index;
18use syn::Meta;
19use syn::PathArguments;
20use syn::Result;
21use syn::Token;
22use syn::Type;
23use syn::bracketed;
24use syn::parse::Parse;
25use syn::parse::ParseStream;
26use syn::parse_macro_input;
27use syn::parse_quote;
28use syn::punctuated::Punctuated;
29use syn::token;
30
31/// Avoid boilerplate when setting up the flowgraph.
32///
33/// `connect!` adds all mentioned blocks to the flowgraph if needed and then
34/// records the requested connections. It leaves the local variable names bound
35/// to typed block references, so they can be used later for inspection or
36/// message calls.
37///
38/// ```ignore
39/// let mut fg = Flowgraph::new();
40///
41/// connect!(fg,
42///     src.out > shift.in;
43///     shift > resamp1 > demod;
44///     demod > resamp2 > snk;
45/// );
46/// ```
47///
48/// It roughly generates code like:
49///
50/// ```ignore
51/// // Add all the blocks to the `Flowgraph`...
52/// let src = fg.add(src);
53/// let shift = fg.add(shift);
54/// let resamp1 = fg.add(resamp1);
55/// let demod = fg.add(demod);
56/// let resamp2 = fg.add(resamp2);
57/// let snk = fg.add(snk);
58///
59/// // ... and connect the ports appropriately
60/// fg.stream(&src, |b| b.output(), &shift, |b| b.input())?;
61/// fg.stream(&shift, |b| b.output(), &resamp1, |b| b.input())?;
62/// fg.stream(&resamp1, |b| b.output(), &demod, |b| b.input())?;
63/// fg.stream(&demod, |b| b.output(), &resamp2, |b| b.input())?;
64/// fg.stream(&resamp2, |b| b.output(), &snk, |b| b.input())?;
65/// ```
66///
67/// Connection endpoints are defined by `block.port_name`. Standard stream port
68/// names can be omitted: a missing source port means `output()`, and a missing
69/// destination port means `input()`. Message endpoints default to `"out"` and
70/// `"in"`.
71///
72/// Send-capable stream connections are indicated as `>`, while local-domain-only
73/// stream connections for non-`Send` buffers are indicated as `~>`. Message
74/// connections are indicated as `|`.
75///
76/// Circuit-capable buffers are still connected with normal stream connections.
77/// The `<` operator performs the additional circuit-closing step that sends
78/// buffers back from the downstream end to the upstream start.
79///
80/// If a block uses non-standard port names it is possible to use triples, e.g.:
81///
82/// ```ignore
83/// connect!(fg, src > input.foo.output > snk);
84/// ```
85///
86/// Indexed stream ports generated from `Vec<T>` or `[T; N]` fields can be
87/// selected with `field[index]`:
88///
89/// ```ignore
90/// connect!(fg, src.output[0] > input.snk);
91/// ```
92///
93/// It is possible to add blocks that have no connections by just putting them
94/// on a line separately.
95///
96/// ```ignore
97/// connect!(fg, dummy);
98/// ```
99#[proc_macro]
100pub fn connect(input: TokenStream) -> TokenStream {
101    let connect_input = parse_macro_input!(input as ConnectInput);
102    generate_connect(connect_input, ConnectMode::BlockingNative).into()
103}
104
105/// Async counterpart to [`connect!`].
106///
107/// This macro emits `.await` points for the generated connection operations and
108/// is the required flowgraph-construction macro on WASM targets.
109#[proc_macro]
110pub fn connect_async(input: TokenStream) -> TokenStream {
111    let connect_input = parse_macro_input!(input as ConnectInput);
112    generate_connect(connect_input, ConnectMode::Async).into()
113}
114
115#[derive(Clone, Copy)]
116enum ConnectMode {
117    Async,
118    BlockingNative,
119}
120
121fn generate_connect(connect_input: ConnectInput, mode: ConnectMode) -> proc_macro2::TokenStream {
122    // dbg!(&connect_input);
123    let fg = connect_input.flowgraph;
124
125    let mut blocks: Vec<Ident> = Vec::new();
126    let mut connections = Vec::new();
127
128    // Collect all blocks and generate connections
129    for conn in connect_input.connection_strings.iter() {
130        let src_block = &conn.source.block;
131        blocks.push(src_block.clone());
132
133        let mut src_block = &conn.source.block;
134        let mut src_port = &conn.source.output;
135
136        for (connection_type, dst) in &conn.connections {
137            blocks.push(dst.block.clone());
138
139            let out = match connection_type {
140                ConnectionType::Stream | ConnectionType::LocalStream => {
141                    let src_port = port_method(src_port, quote!(output()));
142                    let dst_port = port_method(&dst.input, quote!(input()));
143                    let dst_block = &dst.block;
144                    let method = match connection_type {
145                        ConnectionType::Stream => quote! { stream_async },
146                        ConnectionType::LocalStream => quote! { stream_local_async },
147                        _ => unreachable!(),
148                    };
149                    quote! {
150                        #fg.#method(
151                            &#src_block,
152                            |b| b.#src_port,
153                            &#dst_block,
154                            |b| b.#dst_port,
155                        ).await?;
156                    }
157                }
158                ConnectionType::Circuit => {
159                    let src_port = port_method(src_port, quote!(output()));
160                    let dst_port = port_method(&dst.input, quote!(input()));
161                    let dst_block = &dst.block;
162                    quote! {
163                        #fg.close_circuit_async(
164                            &#src_block,
165                            |b| b.#src_port,
166                            &#dst_block,
167                            |b| b.#dst_port,
168                        ).await?;
169                    }
170                }
171                ConnectionType::Message => {
172                    let src_port = if let Some(p) = &src_port {
173                        let src_port = p.name.to_string();
174                        quote! { #src_port }
175                    } else {
176                        quote!("out")
177                    };
178                    let dst_port = if let Some(p) = &dst.input {
179                        let dst_port = p.name.to_string();
180                        quote! { #dst_port }
181                    } else {
182                        quote!("in")
183                    };
184                    let dest_block = &dst.block;
185                    quote! {
186                        #fg.message_async(
187                            #src_block,
188                            #src_port,
189                            #dest_block,
190                            #dst_port,
191                        ).await?;
192                    }
193                }
194            };
195            connections.push(out);
196            src_block = &dst.block;
197            src_port = &dst.output;
198        }
199    }
200
201    // Deduplicate blocks
202    blocks.sort_by_key(|b| b.to_string());
203    blocks.dedup();
204
205    let block_decls = blocks.iter().map(|block| match mode {
206        ConnectMode::Async => quote! {
207            let #block = #fg.connect_add_async(#block).await?;
208        },
209        ConnectMode::BlockingNative => quote! {
210            let #block = #fg.connect_add(#block)?;
211        },
212    });
213
214    let connect_add_trait = match mode {
215        ConnectMode::Async => quote! { use ::futuresdr::runtime::__private::ConnectAddAsync as _; },
216        ConnectMode::BlockingNative => {
217            quote! { use ::futuresdr::runtime::__private::ConnectAdd as _; }
218        }
219    };
220
221    let body = quote! {
222        #connect_add_trait
223        #(#block_decls)*
224        #(#connections)*
225        ::core::result::Result::Ok::<_, ::futuresdr::runtime::Error>((#(#blocks),*))
226    };
227
228    match mode {
229        ConnectMode::Async => quote![
230            #[allow(unused_variables)]
231            let (#(#blocks),*) = {
232                #body
233            }?;
234        ],
235        ConnectMode::BlockingNative => quote![
236            #[cfg(target_arch = "wasm32")]
237            compile_error!("connect! is synchronous and unavailable on wasm32; use connect_async!(...).await instead");
238
239            #[cfg(not(target_arch = "wasm32"))]
240            #[allow(unused_variables)]
241            let (#(#blocks),*) = ::futuresdr::runtime::block_on(async {
242                #body
243            })?;
244        ],
245    }
246}
247
248fn port_method(port: &Option<Port>, default: proc_macro2::TokenStream) -> proc_macro2::TokenStream {
249    match port {
250        Some(Port { name, index: None }) => {
251            quote! { #name() }
252        }
253        Some(Port {
254            name,
255            index: Some(i),
256        }) => {
257            quote! { #name().get_mut(#i).unwrap() }
258        }
259        None => default,
260    }
261}
262
263// full macro input
264#[derive(Debug)]
265struct ConnectInput {
266    flowgraph: Ident,
267    _comma: Token![,],
268    connection_strings: Punctuated<ConnectionString, Token![;]>,
269}
270impl Parse for ConnectInput {
271    fn parse(input: ParseStream) -> Result<Self> {
272        Ok(ConnectInput {
273            flowgraph: input.parse()?,
274            _comma: input.parse()?,
275            connection_strings: Punctuated::parse_terminated(input)?,
276        })
277    }
278}
279
280// connection line in the macro input
281#[derive(Debug)]
282struct ConnectionString {
283    source: Source,
284    connections: Vec<(ConnectionType, Endpoint)>,
285}
286impl Parse for ConnectionString {
287    fn parse(input: ParseStream) -> Result<Self> {
288        let source: Source = input.parse()?;
289        let mut connections = Vec::new();
290
291        while let Ok(ct) = input.parse::<ConnectionType>() {
292            let dest: Endpoint = input.parse()?;
293            connections.push((ct, dest));
294        }
295
296        Ok(ConnectionString {
297            source,
298            connections,
299        })
300    }
301}
302
303#[derive(Debug)]
304enum ConnectionType {
305    Stream,
306    LocalStream,
307    Message,
308    Circuit,
309}
310
311impl Parse for ConnectionType {
312    fn parse(input: ParseStream) -> Result<Self> {
313        if input.peek(Token![>]) {
314            input.parse::<Token![>]>()?;
315            Ok(Self::Stream)
316        } else if input.peek(Token![~]) {
317            input.parse::<Token![~]>()?;
318            input.parse::<Token![>]>()?;
319            Ok(Self::LocalStream)
320        } else if input.peek(Token![|]) {
321            input.parse::<Token![|]>()?;
322            Ok(Self::Message)
323        } else if input.peek(Token![<]) {
324            input.parse::<Token![<]>()?;
325            Ok(Self::Circuit)
326        } else {
327            Err(input.error("expected `>`, `~>`, `|`, or `<` to specify the connection type"))
328        }
329    }
330}
331
332#[derive(Debug)]
333struct Source {
334    block: Ident,
335    output: Option<Port>,
336}
337impl Parse for Source {
338    fn parse(input: ParseStream) -> Result<Self> {
339        let block: Ident = input.parse()?;
340        if input.peek(Token![.]) {
341            input.parse::<Token![.]>()?;
342            let port: Port = input.parse()?;
343            Ok(Self {
344                block,
345                output: Some(port),
346            })
347        } else {
348            Ok(Self {
349                block,
350                output: None,
351            })
352        }
353    }
354}
355
356// connection endpoint is a block with input and output ports
357#[derive(Debug)]
358struct Endpoint {
359    block: Ident,
360    input: Option<Port>,
361    output: Option<Port>,
362}
363impl Parse for Endpoint {
364    fn parse(input: ParseStream) -> Result<Self> {
365        let first: Port = input.parse()?;
366
367        // there is only one identifier, it has to be the block
368        if !input.peek(Token![.]) {
369            if first.index.is_none() {
370                return Ok(Self {
371                    block: first.name,
372                    input: None,
373                    output: None,
374                });
375            } else {
376                return Err(input.error("expected endpoint, got only port"));
377            }
378        }
379
380        input.parse::<Token![.]>()?;
381        let block: Ident = input.parse()?;
382
383        if !input.peek(Token![.]) {
384            return Ok(Self {
385                block,
386                input: Some(first),
387                output: None,
388            });
389        }
390
391        input.parse::<Token![.]>()?;
392        let second: Port = input.parse()?;
393
394        Ok(Self {
395            block,
396            input: Some(first),
397            output: Some(second),
398        })
399    }
400}
401
402// input or output port
403#[derive(Debug)]
404struct Port {
405    name: Ident,
406    index: Option<Index>,
407}
408impl Parse for Port {
409    fn parse(input: ParseStream) -> Result<Self> {
410        let name: Ident = input.parse()?;
411        let index = if input.peek(token::Bracket) {
412            let content;
413            bracketed!(content in input);
414            Some(content.parse()?)
415        } else {
416            None
417        };
418        Ok(Port { name, index })
419    }
420}
421
422/// Check for  `#[input]` attribute
423fn has_input_attr(attrs: &[Attribute]) -> bool {
424    attrs.iter().any(|attr| attr.path().is_ident("input"))
425}
426/// Check for  `#[output]` attribute
427fn has_output_attr(attrs: &[Attribute]) -> bool {
428    attrs.iter().any(|attr| attr.path().is_ident("output"))
429}
430/// Check if parameter is a Vec
431fn is_vec(type_path: &syn::TypePath) -> bool {
432    if type_path.path.segments.len() != 1 {
433        return false;
434    }
435
436    let segment = &type_path.path.segments[0];
437    if segment.ident != "Vec" {
438        return false;
439    }
440
441    matches!(segment.arguments, PathArguments::AngleBracketed(_))
442}
443
444fn port_bound_types(ty: &Type) -> Vec<Type> {
445    match ty {
446        Type::Path(type_path) if is_vec(type_path) => {
447            if let PathArguments::AngleBracketed(args) = &type_path.path.segments[0].arguments {
448                args.args
449                    .iter()
450                    .filter_map(|arg| match arg {
451                        GenericArgument::Type(ty) => Some(ty.clone()),
452                        _ => None,
453                    })
454                    .collect()
455            } else {
456                Vec::new()
457            }
458        }
459        Type::Array(array) => vec![(*array.elem).clone()],
460        Type::Tuple(tuple) => tuple.elems.iter().cloned().collect(),
461        _ => vec![ty.clone()],
462    }
463}
464
465//=========================================================================
466// BLOCK MACRO
467//=========================================================================
468/// Derive the runtime interface for a block kernel.
469///
470/// `#[derive(Block)]` is used on a struct that implements `Kernel`. Fields
471/// marked with `#[input]` or `#[output]` become stream ports. Struct-level
472/// `#[message_inputs(...)]` and `#[message_outputs(...)]` attributes declare
473/// message ports.
474///
475/// ```ignore
476/// #[derive(Block)]
477/// #[message_inputs(set_gain)]
478/// #[message_outputs(done)]
479/// struct Scale {
480///     #[input]
481///     input: DefaultCpuReader<f32>,
482///     #[output]
483///     output: DefaultCpuWriter<f32>,
484///     gain: f32,
485/// }
486/// ```
487///
488/// Generated stream port getter methods have the same names as the annotated
489/// fields and are used by the `connect!` macro. `Vec<T>` and arrays of buffer
490/// ports are expanded into indexed port names such as `outputs[0]`; tuples are
491/// exposed through dynamic port names such as `ports.1`.
492///
493/// Supported struct attributes:
494///
495/// - `#[message_inputs(handler)]`: call `self.handler(...)` for a message input
496///   named `handler`.
497/// - `#[message_inputs(handler = "port-name")]`: expose a message input under a
498///   name that differs from the Rust method name.
499/// - `#[message_outputs(out)]`: declare a message output port named `out`.
500/// - `#[blocking]`: run this block on the blocking/local execution path.
501/// - `#[type_name(Name)]`: override the type name exposed in runtime
502///   descriptions.
503/// - `#[null_kernel]`: generate an empty `Kernel` implementation.
504#[proc_macro_derive(
505    Block,
506    attributes(
507        input,
508        output,
509        message_inputs,
510        message_outputs,
511        blocking,
512        type_name,
513        null_kernel
514    )
515)]
516pub fn derive_block(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
517    derive_block_impl(input)
518}
519
520fn derive_block_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
521    let input = parse_macro_input!(input as DeriveInput);
522    let struct_name = &input.ident;
523    let generics = &input.generics;
524    let where_clause = &input.generics.where_clause;
525
526    let mut message_inputs: Vec<Ident> = Vec::new();
527    let mut message_input_names: Vec<String> = Vec::new();
528    let mut message_output_names: Vec<String> = Vec::new();
529    let mut kernel = quote! {};
530    let mut blocking = quote! { false };
531    let mut type_name = struct_name.to_string();
532
533    // remove defaults from generics
534    let mut generics = generics.clone();
535    for param in &mut generics.params {
536        match param {
537            GenericParam::Type(type_param) => {
538                type_param.default = None;
539            }
540            GenericParam::Const(const_param) => {
541                const_param.default = None;
542            }
543            GenericParam::Lifetime(_) => {}
544        }
545    }
546
547    let unconstraint_params: Vec<proc_macro2::TokenStream> = generics
548        .params
549        .iter()
550        .map(|param| match param {
551            GenericParam::Type(ty) => {
552                let ident = &ty.ident;
553                quote! { #ident }
554            }
555            GenericParam::Lifetime(lt) => {
556                let lifetime = &lt.lifetime;
557                quote! { #lifetime }
558            }
559            GenericParam::Const(c) => {
560                let ident = &c.ident;
561                quote! { #ident }
562            }
563        })
564        .collect();
565
566    // Surround the parameters with angle brackets if they exist
567    let unconstraint_generics = if generics.params.is_empty() {
568        quote! {}
569    } else {
570        quote! { <#(#unconstraint_params),*> }
571    };
572
573    // Parse Struct
574    let struct_data = match input.data {
575        Data::Struct(data) => data,
576        _ => {
577            return syn::Error::new_spanned(input.ident, "Block can only be derived for structs")
578                .to_compile_error()
579                .into();
580        }
581    };
582
583    let stream_inputs = match struct_data.fields {
584        Fields::Named(ref fields) => {
585            fields
586                .named
587                .iter()
588                .filter_map(|field| {
589                    // Check if field has #[input] attribute
590                    if !field.attrs.iter().any(|attr| attr.path().is_ident("input")) {
591                        return None;
592                    }
593
594                    let field_name = field.ident.as_ref().unwrap();
595                    let field_name_str = field_name.to_string();
596
597                    match &field.ty {
598                        // Handle Vec<T>
599                        Type::Path(type_path) if is_vec(type_path) => {
600                            let name_code = quote! {
601                                for i in 0..self.#field_name.len() {
602                                    names.push(format!("{}[{}]", #field_name_str, i));
603                                }
604                            };
605                            let init_code = quote! {
606                                for i in 0..self.#field_name.len() {
607                                    __FsdrInput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
608                                }
609                            };
610                            let validate_code = quote! {
611                                for i in 0..self.#field_name.len() {
612                                    __FsdrInput::validate(&self.#field_name[i])?;
613                                }
614                            };
615                            let notify_code = quote! {
616                                for i in 0..self.#field_name.len() {
617                                    __FsdrInput::notify_finished(&mut self.#field_name[i]).await;
618                                }
619                            };
620                            let finish_code = quote! {
621                                for (i, _) in self.#field_name.iter_mut().enumerate() {
622                                    if port == format!("{}[{}]", #field_name_str, i) {
623                                        __FsdrInput::finish(&mut self.#field_name[i]);
624                                        return Ok(());
625                                    }
626                                }
627                            };
628                            let get_input_code = quote! {
629                                for (i, _) in self.#field_name.iter_mut().enumerate() {
630                                    if name == format!("{}[{}]", #field_name_str, i) {
631                                        return Ok(&mut self.#field_name[i]);
632                                    }
633                                }
634                            };
635                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
636                        }
637                        // Handle arrays [T; N]
638                        Type::Array(array) => {
639                            let len = &array.len;
640                            let name_code = quote! {
641                                for i in 0..#len {
642                                    names.push(format!("{}[{}]", #field_name_str, i));
643                                }
644                            };
645                            let init_code = quote! {
646                                for i in 0..#len {
647                                    __FsdrInput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
648                                }
649                            };
650                            let validate_code = quote! {
651                                for i in 0..#len {
652                                    __FsdrInput::validate(&self.#field_name[i])?;
653                                }
654                            };
655                            let notify_code = quote! {
656                                for i in 0..#len {
657                                    __FsdrInput::notify_finished(&mut self.#field_name[i]).await;
658                                }
659                            };
660                            let finish_code = quote! {
661                                for (i, _) in self.#field_name.iter_mut().enumerate() {
662                                    if port == format!("{}[{}]", #field_name_str, i) {
663                                        __FsdrInput::finish(&mut self.#field_name[i]);
664                                        return Ok(());
665                                    }
666                                }
667                            };
668                            let get_input_code = quote! {
669                                for (i, _) in self.#field_name.iter_mut().enumerate() {
670                                    if name == format!("{}[{}]", #field_name_str, i) {
671                                        return Ok(&mut self.#field_name[i]);
672                                    }
673                                }
674                            };
675                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
676                        }
677                        // Handle tuples (T1, T2, ...)
678                        Type::Tuple(tuple) => {
679                            let len = tuple.elems.len();
680                            let name_code = quote! {
681                                for i in 0..#len {
682                                    names.push(format!("{}.{}", #field_name_str, i));
683                                }
684                            };
685                            let init_code = tuple.elems.iter().enumerate().map(|(i, _)| {
686                                let index = syn::Index::from(i);
687                                quote! {
688                                    __FsdrInput::init(&mut self.#field_name.#index, block_id, PortId::new(format!("{}.{}", #field_name_str, #index)), inbox.clone());
689                                }
690                            });
691                            let init_code = quote! {
692                                #(#init_code)*
693                            };
694                            let validate_code = tuple.elems.iter().enumerate().map(|(i, _)| {
695                                let index = syn::Index::from(i);
696                                quote! {
697                                    __FsdrInput::validate(&self.#field_name.#index)?;
698                                }
699                            });
700                            let validate_code = quote! {
701                                #(#validate_code)*
702                            };
703                            let notify_code = tuple.elems.iter().enumerate().map(|(i, _)| {
704                                let index = syn::Index::from(i);
705                                quote! {
706                                    __FsdrInput::notify_finished(&mut self.#field_name.#index).await;
707                                }
708                            });
709                            let notify_code = quote! {
710                                #(#notify_code)*
711                            };
712                            let finish_code = tuple.elems.iter().enumerate().map(|(i, _)| {
713                                let index = syn::Index::from(i);
714                                quote!{
715                                    if port == format!("{}.{}", #field_name_str, #index) {
716                                        __FsdrInput::finish(&mut self.#field_name.#index);
717                                        return Ok(());
718                                    }
719                                }
720                            });
721                            let finish_code = quote! {
722                                #(#finish_code)*
723                            };
724                            let get_input_code = tuple.elems.iter().enumerate().map(|(i, _)| {
725                                let index = syn::Index::from(i);
726                                quote!{
727                                    if name == format!("{}.{}", #field_name_str, #index) {
728                                        return Ok(&mut self.#field_name.#index);
729                                    }
730                                }
731                            });
732                            let get_input_code = quote! {
733                                #(#get_input_code)*
734                            };
735                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
736                        }
737                        // Handle normal types
738                        _ => {
739                            let name_code = quote! {
740                                names.push(#field_name_str.to_string());
741                            };
742                            let init_code = quote! {
743                                __FsdrInput::init(&mut self.#field_name, block_id, PortId::new(#field_name_str.to_string()), inbox.clone());
744                            };
745                            let validate_code = quote! {
746                                __FsdrInput::validate(&self.#field_name)?;
747                            };
748                            let notify_code = quote! {
749                                __FsdrInput::notify_finished(&mut self.#field_name).await;
750                            };
751                            let finish_code = quote! {
752                                if port == #field_name_str {
753                                    __FsdrInput::finish(&mut self.#field_name);
754                                    return Ok(());
755                                }
756                            };
757                            let get_input_code = quote! {
758                                if name == #field_name_str {
759                                    return Ok(&mut self.#field_name)
760                                }
761                            };
762                            Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
763                        }
764                    }
765                })
766                .collect::<Vec<_>>()
767        }
768        _ => Vec::new(),
769    };
770
771    let stream_inputs_names = stream_inputs
772        .iter()
773        .map(|x| x.0.clone())
774        .collect::<Vec<_>>();
775    let stream_inputs_init = stream_inputs
776        .iter()
777        .map(|x| x.1.clone())
778        .collect::<Vec<_>>();
779    let stream_inputs_validate = stream_inputs
780        .iter()
781        .map(|x| x.2.clone())
782        .collect::<Vec<_>>();
783    let stream_inputs_notify = stream_inputs
784        .iter()
785        .map(|x| x.3.clone())
786        .collect::<Vec<_>>();
787    let stream_inputs_finish = stream_inputs
788        .iter()
789        .map(|x| x.4.clone())
790        .collect::<Vec<_>>();
791    let stream_inputs_get = stream_inputs
792        .iter()
793        .map(|x| x.5.clone())
794        .collect::<Vec<_>>();
795
796    let stream_outputs = match struct_data.fields {
797        Fields::Named(ref fields) => {
798            fields
799                .named
800                .iter()
801                .filter_map(|field| {
802                    // Check if field has #[input] attribute
803                    if !field.attrs.iter().any(|attr| attr.path().is_ident("output")) {
804                        return None;
805                    }
806
807                    let field_name = field.ident.as_ref().unwrap();
808                    let field_name_str = field_name.to_string();
809
810                    match &field.ty {
811                        // Handle Vec<T>
812                        Type::Path(type_path) if is_vec(type_path) => {
813                            let name_code = quote! {
814                                for i in 0..self.#field_name.len() {
815                                    names.push(format!("{}[{}]", #field_name_str, i));
816                                }
817                            };
818                            let init_code = quote! {
819                                for i in 0..self.#field_name.len() {
820                                    __FsdrOutput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
821                                }
822                            };
823                            let validate_code = quote! {
824                                for i in 0..self.#field_name.len() {
825                                    __FsdrOutput::validate(&self.#field_name[i])?;
826                                }
827                            };
828                            let notify_code = quote! {
829                                for i in 0..self.#field_name.len() {
830                                    __FsdrOutput::notify_finished(&mut self.#field_name[i]).await;
831                                }
832                            };
833                            let connect_code = quote! {
834                                for (i, _) in self.#field_name.iter_mut().enumerate() {
835                                    if name == format!("{}[{}]", #field_name_str, i) {
836                                        return __FsdrOutput::connect_dyn(&mut self.#field_name[i], reader);
837                                    }
838                                }
839                            };
840                            Some((name_code, init_code, validate_code, notify_code, connect_code))
841                        }
842                        // Handle arrays [T; N]
843                        Type::Array(array) => {
844                            let len = &array.len;
845                            let name_code = quote! {
846                                for i in 0..#len {
847                                    names.push(format!("{}[{}]", #field_name_str, i));
848                                }
849                            };
850                            let init_code = quote! {
851                                for i in 0..#len {
852                                    __FsdrOutput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
853                                }
854                            };
855                            let validate_code = quote! {
856                                for i in 0..#len {
857                                    __FsdrOutput::validate(&self.#field_name[i])?;
858                                }
859                            };
860                            let notify_code = quote! {
861                                for i in 0..#len {
862                                    __FsdrOutput::notify_finished(&mut self.#field_name[i]).await;
863                                }
864                            };
865                            let connect_code = quote! {
866                                for (i, _) in self.#field_name.iter_mut().enumerate() {
867                                    if name == format!("{}[{}]", #field_name_str, i) {
868                                        return __FsdrOutput::connect_dyn(&mut self.#field_name[i], reader);
869                                    }
870                                }
871                            };
872                            Some((name_code, init_code, validate_code, notify_code, connect_code))
873                        }
874                        // Handle tuples (T1, T2, ...)
875                        Type::Tuple(tuple) => {
876                            let len = tuple.elems.len();
877                            let name_code = quote! {
878                                for i in 0..#len {
879                                    names.push(format!("{}.{}", #field_name_str, i));
880                                }
881                            };
882                            let init_code = tuple.elems.iter().enumerate().map(|(i, _)| {
883                                let index = syn::Index::from(i);
884                                quote! {
885                                    __FsdrOutput::init(&mut self.#field_name.#index, block_id, PortId::new(format!("{}.{}", #field_name_str, #index)), inbox.clone());
886                                }
887                            });
888                            let init_code = quote! {
889                                #(#init_code)*
890                            };
891                            let validate_code = tuple.elems.iter().enumerate().map(|(i, _)| {
892                                let index = syn::Index::from(i);
893                                quote! {
894                                    __FsdrOutput::validate(&self.#field_name.#index)?;
895                                }
896                            });
897                            let validate_code = quote! {
898                                #(#validate_code)*
899                            };
900                            let notify_code = tuple.elems.iter().enumerate().map(|(i, _)| {
901                                let index = syn::Index::from(i);
902                                quote! {
903                                    __FsdrOutput::notify_finished(&mut self.#field_name.#index).await;
904                                }
905                            });
906                            let notify_code = quote! {
907                                #(#notify_code)*
908                            };
909                            let connect_code = tuple.elems.iter().enumerate().map(|(i, _)| {
910                                let index = syn::Index::from(i);
911                                quote!{
912                                    if name == format!("{}.{}", #field_name_str, #index) {
913                                        return __FsdrOutput::connect_dyn(&mut self.#field_name.#index, reader);
914                                    }
915                                }
916                            });
917                            let connect_code = quote! {
918                                #(#connect_code)*
919                            };
920                            Some((name_code, init_code, validate_code, notify_code, connect_code))
921                        }
922                        // Handle normal types
923                        _ => {
924                            let name_code = quote! {
925                                names.push(#field_name_str.to_string());
926                            };
927                            let init_code = quote! {
928                                __FsdrOutput::init(&mut self.#field_name, block_id, PortId::new(#field_name_str.to_string()), inbox.clone());
929                            };
930                            let validate_code = quote! {
931                                __FsdrOutput::validate(&self.#field_name)?;
932                            };
933                            let notify_code = quote! {
934                                __FsdrOutput::notify_finished(&mut self.#field_name).await;
935                            };
936                            let connect_code = quote! {
937                                if name == #field_name_str {
938                                    return __FsdrOutput::connect_dyn(&mut self.#field_name, reader);
939                                }
940                            };
941                            Some((name_code, init_code, validate_code, notify_code, connect_code))
942                        }
943                    }
944                })
945                .collect::<Vec<_>>()
946        }
947        _ => Vec::new(),
948    };
949
950    let stream_outputs_names = stream_outputs
951        .iter()
952        .map(|x| x.0.clone())
953        .collect::<Vec<_>>();
954    let stream_outputs_init = stream_outputs
955        .iter()
956        .map(|x| x.1.clone())
957        .collect::<Vec<_>>();
958    let stream_outputs_validate = stream_outputs
959        .iter()
960        .map(|x| x.2.clone())
961        .collect::<Vec<_>>();
962    let stream_outputs_notify = stream_outputs
963        .iter()
964        .map(|x| x.3.clone())
965        .collect::<Vec<_>>();
966    let stream_outputs_connect = stream_outputs
967        .iter()
968        .map(|x| x.4.clone())
969        .collect::<Vec<_>>();
970
971    // Collect the names and types of fields that have the #[input] or #[output] attribute
972    let (port_idents, port_types): (Vec<Ident>, Vec<Type>) = match struct_data.fields {
973        Fields::Named(ref fields_named) => fields_named
974            .named
975            .iter()
976            .filter_map(|field| {
977                if has_input_attr(&field.attrs) || has_output_attr(&field.attrs) {
978                    let ident = field.ident.clone().unwrap();
979                    let ty = field.ty.clone();
980                    Some((ident, ty))
981                } else {
982                    None
983                }
984            })
985            .unzip(),
986        Fields::Unnamed(_) | Fields::Unit => (Vec::new(), Vec::new()),
987    };
988    let port_getter_fns = port_idents
989        .iter()
990        .zip(port_types.iter())
991        .map(|(ident, ty)| {
992            quote! {
993                /// Getter for stream port.
994                pub fn #ident(&mut self) -> &mut #ty {
995                    &mut self.#ident
996                }
997            }
998        });
999
1000    let input_bound_types = match struct_data.fields {
1001        Fields::Named(ref fields_named) => fields_named
1002            .named
1003            .iter()
1004            .filter(|field| has_input_attr(&field.attrs))
1005            .flat_map(|field| port_bound_types(&field.ty))
1006            .collect::<Vec<_>>(),
1007        Fields::Unnamed(_) | Fields::Unit => Vec::new(),
1008    };
1009    let output_bound_types = match struct_data.fields {
1010        Fields::Named(ref fields_named) => fields_named
1011            .named
1012            .iter()
1013            .filter(|field| has_output_attr(&field.attrs))
1014            .flat_map(|field| port_bound_types(&field.ty))
1015            .collect::<Vec<_>>(),
1016        Fields::Unnamed(_) | Fields::Unit => Vec::new(),
1017    };
1018    let mut kernel_interface_generics = generics.clone();
1019    {
1020        let where_clause = kernel_interface_generics.make_where_clause();
1021        for ty in input_bound_types.iter() {
1022            where_clause
1023                .predicates
1024                .push(parse_quote!(#ty: ::futuresdr::runtime::buffer::BufferReader));
1025        }
1026        for ty in output_bound_types.iter() {
1027            where_clause
1028                .predicates
1029                .push(parse_quote!(#ty: ::futuresdr::runtime::buffer::BufferWriter));
1030        }
1031    }
1032    let (kernel_interface_impl_generics, _, kernel_interface_where_clause) =
1033        kernel_interface_generics.split_for_impl();
1034
1035    // Search for struct attributes
1036    for attr in &input.attrs {
1037        if attr.path().is_ident("message_inputs") {
1038            let nested = attr
1039                .parse_args_with(
1040                    syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
1041                )
1042                .unwrap();
1043            for m in nested {
1044                match m {
1045                    Meta::NameValue(m) => {
1046                        message_inputs.push(m.path.get_ident().unwrap().clone());
1047                        if let syn::Expr::Lit(syn::ExprLit {
1048                            lit: syn::Lit::Str(s),
1049                            ..
1050                        }) = m.value
1051                        {
1052                            message_input_names.push(s.value());
1053                        } else {
1054                            panic!(
1055                                "message handlers have to be an identifier or identifier = \"port name\""
1056                            );
1057                        }
1058                    }
1059                    Meta::Path(p) => {
1060                        let p = p.get_ident().unwrap();
1061                        message_inputs.push(p.clone());
1062                        message_input_names.push(p.to_string());
1063                    }
1064                    _ => {
1065                        panic!("message inputs has to be a list of name-values or paths")
1066                    }
1067                }
1068            }
1069        } else if attr.path().is_ident("message_outputs") {
1070            let nested = attr
1071                .parse_args_with(
1072                    syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
1073                )
1074                .unwrap();
1075            for m in nested {
1076                match m {
1077                    Meta::Path(p) => {
1078                        let p = p.get_ident().unwrap();
1079                        message_output_names.push(p.to_string());
1080                    }
1081                    _ => {
1082                        panic!("message outputs has to be a list of paths")
1083                    }
1084                }
1085            }
1086        } else if attr.path().is_ident("null_kernel") {
1087            let kernel_trait = quote! { ::futuresdr::runtime::dev::Kernel };
1088            kernel = quote! {
1089                #[doc(hidden)]
1090                impl #generics #kernel_trait for #struct_name #generics
1091                    #where_clause { }
1092
1093            }
1094        } else if attr.path().is_ident("blocking") {
1095            blocking = quote! { true }
1096        } else if attr.path().is_ident("type_name") {
1097            let nested = attr
1098                .parse_args_with(
1099                    syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
1100                )
1101                .unwrap();
1102            if let Some(Meta::Path(p)) = nested.get(0) {
1103                type_name = p.get_ident().unwrap().to_string();
1104            } else {
1105                panic!("type_name attribute should be in the form type_name(foo)");
1106            }
1107        }
1108    }
1109
1110    // Generate handler names as strings
1111    let message_input_names = message_input_names
1112        .into_iter()
1113        .map(|handler| {
1114            let handler = if let Some(stripped) = handler.strip_prefix("r#") {
1115                stripped.to_string()
1116            } else {
1117                handler
1118            };
1119            quote! {
1120                #handler
1121            }
1122        })
1123        .collect::<Vec<_>>();
1124
1125    // Generate match arms for the handle method
1126    let handler_matches = message_inputs
1127        .iter()
1128        .zip(message_input_names.clone())
1129        .map(|(handler, handler_name)| {
1130            quote! {
1131                #handler_name  => self.#handler(io, mo, meta, p).await,
1132            }
1133        })
1134        .collect::<Vec<_>>();
1135
1136    let interface_trait = quote! { ::futuresdr::runtime::__private::KernelInterface };
1137    let work_io_type = quote! { ::futuresdr::runtime::dev::WorkIo };
1138    let port_getters = quote! {
1139        impl #generics #struct_name #unconstraint_generics
1140            #where_clause
1141        {
1142            #(#port_getter_fns)*
1143        }
1144    };
1145
1146    let expanded = quote! {
1147
1148        #port_getters
1149
1150        impl #kernel_interface_impl_generics #interface_trait for #struct_name #unconstraint_generics
1151            #kernel_interface_where_clause
1152        {
1153            fn is_blocking() -> bool {
1154                #blocking
1155            }
1156            fn type_name() -> &'static str {
1157                static TYPE_NAME: &str = #type_name;
1158                TYPE_NAME
1159            }
1160            fn stream_inputs(&self) -> Vec<String> {
1161                let mut names = vec![];
1162                #(#stream_inputs_names)*
1163                names
1164            }
1165            fn stream_outputs(&self) -> Vec<String> {
1166                let mut names = vec![];
1167                #(#stream_outputs_names)*
1168                names
1169            }
1170
1171            fn stream_ports_init(&mut self, block_id: ::futuresdr::runtime::BlockId, inbox: ::futuresdr::runtime::dev::BlockInbox) {
1172                use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1173                use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1174                use ::futuresdr::runtime::PortId;
1175                #(#stream_inputs_init)*
1176                #(#stream_outputs_init)*
1177            }
1178            fn stream_ports_validate(&self) -> ::futuresdr::runtime::Result<(), ::futuresdr::runtime::Error> {
1179                use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1180                use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1181                use ::futuresdr::runtime::PortId;
1182                #(#stream_inputs_validate)*
1183                #(#stream_outputs_validate)*
1184                Ok(())
1185            }
1186            fn stream_input_finish(&mut self, port_id: ::futuresdr::runtime::PortId) -> ::futuresdr::runtime::Result<(), futuresdr::runtime::Error> {
1187                use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1188                use ::futuresdr::runtime::Error;
1189                use ::futuresdr::runtime::BlockPortCtx;
1190                let port = port_id.name();
1191                #(#stream_inputs_finish)*
1192                Err(Error::InvalidStreamPort(BlockPortCtx::None, port_id))
1193            }
1194            async fn stream_ports_notify_finished(&mut self) {
1195                use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1196                use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1197                #(#stream_inputs_notify)*
1198                #(#stream_outputs_notify)*
1199            }
1200            fn stream_input(
1201                &mut self,
1202                id: &::futuresdr::runtime::PortId,
1203            ) -> ::futuresdr::runtime::Result<
1204                &mut dyn ::futuresdr::runtime::buffer::BufferReader,
1205                ::futuresdr::runtime::Error,
1206            > {
1207                use ::futuresdr::runtime::Error;
1208                use ::futuresdr::runtime::BlockPortCtx;
1209                let name = id.name();
1210                #(#stream_inputs_get)*
1211                Err(Error::InvalidStreamPort(BlockPortCtx::None, id.clone()))
1212            }
1213            fn connect_stream_output(
1214                &mut self,
1215                id: &::futuresdr::runtime::PortId,
1216                reader: &mut dyn ::futuresdr::runtime::buffer::BufferReader,
1217            ) -> ::futuresdr::runtime::Result<(), ::futuresdr::runtime::Error> {
1218                use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1219                use ::futuresdr::runtime::Error;
1220                use ::futuresdr::runtime::BlockPortCtx;
1221                let name = id.name();
1222                #(#stream_outputs_connect)*
1223                Err(Error::InvalidStreamPort(BlockPortCtx::None, id.clone()))
1224            }
1225
1226            fn message_inputs() -> &'static[&'static str] {
1227                static MESSAGE_INPUTS: &[&str] = &[#(#message_input_names),*];
1228                MESSAGE_INPUTS
1229            }
1230            fn message_outputs() -> &'static[&'static str] {
1231                static MESSAGE_OUTPUTS: &[&str] = &[#(#message_output_names),*];
1232                MESSAGE_OUTPUTS
1233            }
1234            async fn call_handler(
1235                &mut self,
1236                io: &mut #work_io_type,
1237                mo: &mut ::futuresdr::runtime::dev::MessageOutputs,
1238                meta: &mut ::futuresdr::runtime::dev::BlockMeta,
1239                id: ::futuresdr::runtime::PortId,
1240                p: ::futuresdr::runtime::Pmt) ->
1241                    ::futuresdr::runtime::Result<::futuresdr::runtime::Pmt, ::futuresdr::runtime::Error> {
1242                        use ::futuresdr::runtime::BlockPortCtx;
1243                        use ::futuresdr::runtime::Error;
1244                        use ::futuresdr::runtime::Pmt;
1245                        use ::futuresdr::runtime::PortId;
1246                        use ::futuresdr::runtime::Result;
1247                        let ret: Result<Pmt> = match id.name() {
1248                                #(#handler_matches)*
1249                                _ => return Err(Error::InvalidMessagePort(
1250                                    BlockPortCtx::None,
1251                                    id)),
1252                        };
1253
1254                        #[allow(unreachable_code)]
1255                        ret.map_err(|e| Error::HandlerError(e.to_string()))
1256            }
1257        }
1258
1259        #kernel
1260    };
1261    // println!("{}", pretty_print(&expanded));
1262    proc_macro::TokenStream::from(expanded)
1263}
1264
1265#[allow(dead_code)]
1266fn pretty_print(ts: &proc_macro2::TokenStream) -> String {
1267    let syntax_tree = syn::parse2(ts.clone()).unwrap();
1268    prettyplease::unparse(&syntax_tree)
1269}