1use proc_macro::TokenStream;
9use quote::quote;
10use syn::Attribute;
11use syn::Data;
12use syn::DeriveInput;
13use syn::Fields;
14use syn::GenericArgument;
15use syn::GenericParam;
16use syn::Ident;
17use syn::Index;
18use syn::Meta;
19use syn::PathArguments;
20use syn::Result;
21use syn::Token;
22use syn::Type;
23use syn::bracketed;
24use syn::parse::Parse;
25use syn::parse::ParseStream;
26use syn::parse_macro_input;
27use syn::parse_quote;
28use syn::punctuated::Punctuated;
29use syn::token;
30
31#[proc_macro]
100pub fn connect(input: TokenStream) -> TokenStream {
101 let connect_input = parse_macro_input!(input as ConnectInput);
102 generate_connect(connect_input, ConnectMode::BlockingNative).into()
103}
104
105#[proc_macro]
110pub fn connect_async(input: TokenStream) -> TokenStream {
111 let connect_input = parse_macro_input!(input as ConnectInput);
112 generate_connect(connect_input, ConnectMode::Async).into()
113}
114
115#[derive(Clone, Copy)]
116enum ConnectMode {
117 Async,
118 BlockingNative,
119}
120
121fn generate_connect(connect_input: ConnectInput, mode: ConnectMode) -> proc_macro2::TokenStream {
122 let fg = connect_input.flowgraph;
124
125 let mut blocks: Vec<Ident> = Vec::new();
126 let mut connections = Vec::new();
127
128 for conn in connect_input.connection_strings.iter() {
130 let src_block = &conn.source.block;
131 blocks.push(src_block.clone());
132
133 let mut src_block = &conn.source.block;
134 let mut src_port = &conn.source.output;
135
136 for (connection_type, dst) in &conn.connections {
137 blocks.push(dst.block.clone());
138
139 let out = match connection_type {
140 ConnectionType::Stream | ConnectionType::LocalStream => {
141 let src_port = port_method(src_port, quote!(output()));
142 let dst_port = port_method(&dst.input, quote!(input()));
143 let dst_block = &dst.block;
144 let method = match connection_type {
145 ConnectionType::Stream => quote! { stream_async },
146 ConnectionType::LocalStream => quote! { stream_local_async },
147 _ => unreachable!(),
148 };
149 quote! {
150 #fg.#method(
151 &#src_block,
152 |b| b.#src_port,
153 &#dst_block,
154 |b| b.#dst_port,
155 ).await?;
156 }
157 }
158 ConnectionType::Circuit => {
159 let src_port = port_method(src_port, quote!(output()));
160 let dst_port = port_method(&dst.input, quote!(input()));
161 let dst_block = &dst.block;
162 quote! {
163 #fg.close_circuit_async(
164 &#src_block,
165 |b| b.#src_port,
166 &#dst_block,
167 |b| b.#dst_port,
168 ).await?;
169 }
170 }
171 ConnectionType::Message => {
172 let src_port = if let Some(p) = &src_port {
173 let src_port = p.name.to_string();
174 quote! { #src_port }
175 } else {
176 quote!("out")
177 };
178 let dst_port = if let Some(p) = &dst.input {
179 let dst_port = p.name.to_string();
180 quote! { #dst_port }
181 } else {
182 quote!("in")
183 };
184 let dest_block = &dst.block;
185 quote! {
186 #fg.message_async(
187 #src_block,
188 #src_port,
189 #dest_block,
190 #dst_port,
191 ).await?;
192 }
193 }
194 };
195 connections.push(out);
196 src_block = &dst.block;
197 src_port = &dst.output;
198 }
199 }
200
201 blocks.sort_by_key(|b| b.to_string());
203 blocks.dedup();
204
205 let block_decls = blocks.iter().map(|block| match mode {
206 ConnectMode::Async => quote! {
207 let #block = #fg.connect_add_async(#block).await?;
208 },
209 ConnectMode::BlockingNative => quote! {
210 let #block = #fg.connect_add(#block)?;
211 },
212 });
213
214 let connect_add_trait = match mode {
215 ConnectMode::Async => quote! { use ::futuresdr::runtime::__private::ConnectAddAsync as _; },
216 ConnectMode::BlockingNative => {
217 quote! { use ::futuresdr::runtime::__private::ConnectAdd as _; }
218 }
219 };
220
221 let body = quote! {
222 #connect_add_trait
223 #(#block_decls)*
224 #(#connections)*
225 ::core::result::Result::Ok::<_, ::futuresdr::runtime::Error>((#(#blocks),*))
226 };
227
228 match mode {
229 ConnectMode::Async => quote![
230 #[allow(unused_variables)]
231 let (#(#blocks),*) = {
232 #body
233 }?;
234 ],
235 ConnectMode::BlockingNative => quote![
236 #[cfg(target_arch = "wasm32")]
237 compile_error!("connect! is synchronous and unavailable on wasm32; use connect_async!(...).await instead");
238
239 #[cfg(not(target_arch = "wasm32"))]
240 #[allow(unused_variables)]
241 let (#(#blocks),*) = ::futuresdr::runtime::block_on(async {
242 #body
243 })?;
244 ],
245 }
246}
247
248fn port_method(port: &Option<Port>, default: proc_macro2::TokenStream) -> proc_macro2::TokenStream {
249 match port {
250 Some(Port { name, index: None }) => {
251 quote! { #name() }
252 }
253 Some(Port {
254 name,
255 index: Some(i),
256 }) => {
257 quote! { #name().get_mut(#i).unwrap() }
258 }
259 None => default,
260 }
261}
262
263#[derive(Debug)]
265struct ConnectInput {
266 flowgraph: Ident,
267 _comma: Token![,],
268 connection_strings: Punctuated<ConnectionString, Token![;]>,
269}
270impl Parse for ConnectInput {
271 fn parse(input: ParseStream) -> Result<Self> {
272 Ok(ConnectInput {
273 flowgraph: input.parse()?,
274 _comma: input.parse()?,
275 connection_strings: Punctuated::parse_terminated(input)?,
276 })
277 }
278}
279
280#[derive(Debug)]
282struct ConnectionString {
283 source: Source,
284 connections: Vec<(ConnectionType, Endpoint)>,
285}
286impl Parse for ConnectionString {
287 fn parse(input: ParseStream) -> Result<Self> {
288 let source: Source = input.parse()?;
289 let mut connections = Vec::new();
290
291 while let Ok(ct) = input.parse::<ConnectionType>() {
292 let dest: Endpoint = input.parse()?;
293 connections.push((ct, dest));
294 }
295
296 Ok(ConnectionString {
297 source,
298 connections,
299 })
300 }
301}
302
303#[derive(Debug)]
304enum ConnectionType {
305 Stream,
306 LocalStream,
307 Message,
308 Circuit,
309}
310
311impl Parse for ConnectionType {
312 fn parse(input: ParseStream) -> Result<Self> {
313 if input.peek(Token![>]) {
314 input.parse::<Token![>]>()?;
315 Ok(Self::Stream)
316 } else if input.peek(Token![~]) {
317 input.parse::<Token![~]>()?;
318 input.parse::<Token![>]>()?;
319 Ok(Self::LocalStream)
320 } else if input.peek(Token![|]) {
321 input.parse::<Token![|]>()?;
322 Ok(Self::Message)
323 } else if input.peek(Token![<]) {
324 input.parse::<Token![<]>()?;
325 Ok(Self::Circuit)
326 } else {
327 Err(input.error("expected `>`, `~>`, `|`, or `<` to specify the connection type"))
328 }
329 }
330}
331
332#[derive(Debug)]
333struct Source {
334 block: Ident,
335 output: Option<Port>,
336}
337impl Parse for Source {
338 fn parse(input: ParseStream) -> Result<Self> {
339 let block: Ident = input.parse()?;
340 if input.peek(Token![.]) {
341 input.parse::<Token![.]>()?;
342 let port: Port = input.parse()?;
343 Ok(Self {
344 block,
345 output: Some(port),
346 })
347 } else {
348 Ok(Self {
349 block,
350 output: None,
351 })
352 }
353 }
354}
355
356#[derive(Debug)]
358struct Endpoint {
359 block: Ident,
360 input: Option<Port>,
361 output: Option<Port>,
362}
363impl Parse for Endpoint {
364 fn parse(input: ParseStream) -> Result<Self> {
365 let first: Port = input.parse()?;
366
367 if !input.peek(Token![.]) {
369 if first.index.is_none() {
370 return Ok(Self {
371 block: first.name,
372 input: None,
373 output: None,
374 });
375 } else {
376 return Err(input.error("expected endpoint, got only port"));
377 }
378 }
379
380 input.parse::<Token![.]>()?;
381 let block: Ident = input.parse()?;
382
383 if !input.peek(Token![.]) {
384 return Ok(Self {
385 block,
386 input: Some(first),
387 output: None,
388 });
389 }
390
391 input.parse::<Token![.]>()?;
392 let second: Port = input.parse()?;
393
394 Ok(Self {
395 block,
396 input: Some(first),
397 output: Some(second),
398 })
399 }
400}
401
402#[derive(Debug)]
404struct Port {
405 name: Ident,
406 index: Option<Index>,
407}
408impl Parse for Port {
409 fn parse(input: ParseStream) -> Result<Self> {
410 let name: Ident = input.parse()?;
411 let index = if input.peek(token::Bracket) {
412 let content;
413 bracketed!(content in input);
414 Some(content.parse()?)
415 } else {
416 None
417 };
418 Ok(Port { name, index })
419 }
420}
421
422fn has_input_attr(attrs: &[Attribute]) -> bool {
424 attrs.iter().any(|attr| attr.path().is_ident("input"))
425}
426fn has_output_attr(attrs: &[Attribute]) -> bool {
428 attrs.iter().any(|attr| attr.path().is_ident("output"))
429}
430fn is_vec(type_path: &syn::TypePath) -> bool {
432 if type_path.path.segments.len() != 1 {
433 return false;
434 }
435
436 let segment = &type_path.path.segments[0];
437 if segment.ident != "Vec" {
438 return false;
439 }
440
441 matches!(segment.arguments, PathArguments::AngleBracketed(_))
442}
443
444fn port_bound_types(ty: &Type) -> Vec<Type> {
445 match ty {
446 Type::Path(type_path) if is_vec(type_path) => {
447 if let PathArguments::AngleBracketed(args) = &type_path.path.segments[0].arguments {
448 args.args
449 .iter()
450 .filter_map(|arg| match arg {
451 GenericArgument::Type(ty) => Some(ty.clone()),
452 _ => None,
453 })
454 .collect()
455 } else {
456 Vec::new()
457 }
458 }
459 Type::Array(array) => vec![(*array.elem).clone()],
460 Type::Tuple(tuple) => tuple.elems.iter().cloned().collect(),
461 _ => vec![ty.clone()],
462 }
463}
464
465#[proc_macro_derive(
505 Block,
506 attributes(
507 input,
508 output,
509 message_inputs,
510 message_outputs,
511 blocking,
512 type_name,
513 null_kernel
514 )
515)]
516pub fn derive_block(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
517 derive_block_impl(input)
518}
519
520fn derive_block_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
521 let input = parse_macro_input!(input as DeriveInput);
522 let struct_name = &input.ident;
523 let generics = &input.generics;
524 let where_clause = &input.generics.where_clause;
525
526 let mut message_inputs: Vec<Ident> = Vec::new();
527 let mut message_input_names: Vec<String> = Vec::new();
528 let mut message_output_names: Vec<String> = Vec::new();
529 let mut kernel = quote! {};
530 let mut blocking = quote! { false };
531 let mut type_name = struct_name.to_string();
532
533 let mut generics = generics.clone();
535 for param in &mut generics.params {
536 match param {
537 GenericParam::Type(type_param) => {
538 type_param.default = None;
539 }
540 GenericParam::Const(const_param) => {
541 const_param.default = None;
542 }
543 GenericParam::Lifetime(_) => {}
544 }
545 }
546
547 let unconstraint_params: Vec<proc_macro2::TokenStream> = generics
548 .params
549 .iter()
550 .map(|param| match param {
551 GenericParam::Type(ty) => {
552 let ident = &ty.ident;
553 quote! { #ident }
554 }
555 GenericParam::Lifetime(lt) => {
556 let lifetime = <.lifetime;
557 quote! { #lifetime }
558 }
559 GenericParam::Const(c) => {
560 let ident = &c.ident;
561 quote! { #ident }
562 }
563 })
564 .collect();
565
566 let unconstraint_generics = if generics.params.is_empty() {
568 quote! {}
569 } else {
570 quote! { <#(#unconstraint_params),*> }
571 };
572
573 let struct_data = match input.data {
575 Data::Struct(data) => data,
576 _ => {
577 return syn::Error::new_spanned(input.ident, "Block can only be derived for structs")
578 .to_compile_error()
579 .into();
580 }
581 };
582
583 let stream_inputs = match struct_data.fields {
584 Fields::Named(ref fields) => {
585 fields
586 .named
587 .iter()
588 .filter_map(|field| {
589 if !field.attrs.iter().any(|attr| attr.path().is_ident("input")) {
591 return None;
592 }
593
594 let field_name = field.ident.as_ref().unwrap();
595 let field_name_str = field_name.to_string();
596
597 match &field.ty {
598 Type::Path(type_path) if is_vec(type_path) => {
600 let name_code = quote! {
601 for i in 0..self.#field_name.len() {
602 names.push(format!("{}[{}]", #field_name_str, i));
603 }
604 };
605 let init_code = quote! {
606 for i in 0..self.#field_name.len() {
607 __FsdrInput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
608 }
609 };
610 let validate_code = quote! {
611 for i in 0..self.#field_name.len() {
612 __FsdrInput::validate(&self.#field_name[i])?;
613 }
614 };
615 let notify_code = quote! {
616 for i in 0..self.#field_name.len() {
617 __FsdrInput::notify_finished(&mut self.#field_name[i]).await;
618 }
619 };
620 let finish_code = quote! {
621 for (i, _) in self.#field_name.iter_mut().enumerate() {
622 if port == format!("{}[{}]", #field_name_str, i) {
623 __FsdrInput::finish(&mut self.#field_name[i]);
624 return Ok(());
625 }
626 }
627 };
628 let get_input_code = quote! {
629 for (i, _) in self.#field_name.iter_mut().enumerate() {
630 if name == format!("{}[{}]", #field_name_str, i) {
631 return Ok(&mut self.#field_name[i]);
632 }
633 }
634 };
635 Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
636 }
637 Type::Array(array) => {
639 let len = &array.len;
640 let name_code = quote! {
641 for i in 0..#len {
642 names.push(format!("{}[{}]", #field_name_str, i));
643 }
644 };
645 let init_code = quote! {
646 for i in 0..#len {
647 __FsdrInput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
648 }
649 };
650 let validate_code = quote! {
651 for i in 0..#len {
652 __FsdrInput::validate(&self.#field_name[i])?;
653 }
654 };
655 let notify_code = quote! {
656 for i in 0..#len {
657 __FsdrInput::notify_finished(&mut self.#field_name[i]).await;
658 }
659 };
660 let finish_code = quote! {
661 for (i, _) in self.#field_name.iter_mut().enumerate() {
662 if port == format!("{}[{}]", #field_name_str, i) {
663 __FsdrInput::finish(&mut self.#field_name[i]);
664 return Ok(());
665 }
666 }
667 };
668 let get_input_code = quote! {
669 for (i, _) in self.#field_name.iter_mut().enumerate() {
670 if name == format!("{}[{}]", #field_name_str, i) {
671 return Ok(&mut self.#field_name[i]);
672 }
673 }
674 };
675 Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
676 }
677 Type::Tuple(tuple) => {
679 let len = tuple.elems.len();
680 let name_code = quote! {
681 for i in 0..#len {
682 names.push(format!("{}.{}", #field_name_str, i));
683 }
684 };
685 let init_code = tuple.elems.iter().enumerate().map(|(i, _)| {
686 let index = syn::Index::from(i);
687 quote! {
688 __FsdrInput::init(&mut self.#field_name.#index, block_id, PortId::new(format!("{}.{}", #field_name_str, #index)), inbox.clone());
689 }
690 });
691 let init_code = quote! {
692 #(#init_code)*
693 };
694 let validate_code = tuple.elems.iter().enumerate().map(|(i, _)| {
695 let index = syn::Index::from(i);
696 quote! {
697 __FsdrInput::validate(&self.#field_name.#index)?;
698 }
699 });
700 let validate_code = quote! {
701 #(#validate_code)*
702 };
703 let notify_code = tuple.elems.iter().enumerate().map(|(i, _)| {
704 let index = syn::Index::from(i);
705 quote! {
706 __FsdrInput::notify_finished(&mut self.#field_name.#index).await;
707 }
708 });
709 let notify_code = quote! {
710 #(#notify_code)*
711 };
712 let finish_code = tuple.elems.iter().enumerate().map(|(i, _)| {
713 let index = syn::Index::from(i);
714 quote!{
715 if port == format!("{}.{}", #field_name_str, #index) {
716 __FsdrInput::finish(&mut self.#field_name.#index);
717 return Ok(());
718 }
719 }
720 });
721 let finish_code = quote! {
722 #(#finish_code)*
723 };
724 let get_input_code = tuple.elems.iter().enumerate().map(|(i, _)| {
725 let index = syn::Index::from(i);
726 quote!{
727 if name == format!("{}.{}", #field_name_str, #index) {
728 return Ok(&mut self.#field_name.#index);
729 }
730 }
731 });
732 let get_input_code = quote! {
733 #(#get_input_code)*
734 };
735 Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
736 }
737 _ => {
739 let name_code = quote! {
740 names.push(#field_name_str.to_string());
741 };
742 let init_code = quote! {
743 __FsdrInput::init(&mut self.#field_name, block_id, PortId::new(#field_name_str.to_string()), inbox.clone());
744 };
745 let validate_code = quote! {
746 __FsdrInput::validate(&self.#field_name)?;
747 };
748 let notify_code = quote! {
749 __FsdrInput::notify_finished(&mut self.#field_name).await;
750 };
751 let finish_code = quote! {
752 if port == #field_name_str {
753 __FsdrInput::finish(&mut self.#field_name);
754 return Ok(());
755 }
756 };
757 let get_input_code = quote! {
758 if name == #field_name_str {
759 return Ok(&mut self.#field_name)
760 }
761 };
762 Some((name_code, init_code, validate_code, notify_code, finish_code, get_input_code))
763 }
764 }
765 })
766 .collect::<Vec<_>>()
767 }
768 _ => Vec::new(),
769 };
770
771 let stream_inputs_names = stream_inputs
772 .iter()
773 .map(|x| x.0.clone())
774 .collect::<Vec<_>>();
775 let stream_inputs_init = stream_inputs
776 .iter()
777 .map(|x| x.1.clone())
778 .collect::<Vec<_>>();
779 let stream_inputs_validate = stream_inputs
780 .iter()
781 .map(|x| x.2.clone())
782 .collect::<Vec<_>>();
783 let stream_inputs_notify = stream_inputs
784 .iter()
785 .map(|x| x.3.clone())
786 .collect::<Vec<_>>();
787 let stream_inputs_finish = stream_inputs
788 .iter()
789 .map(|x| x.4.clone())
790 .collect::<Vec<_>>();
791 let stream_inputs_get = stream_inputs
792 .iter()
793 .map(|x| x.5.clone())
794 .collect::<Vec<_>>();
795
796 let stream_outputs = match struct_data.fields {
797 Fields::Named(ref fields) => {
798 fields
799 .named
800 .iter()
801 .filter_map(|field| {
802 if !field.attrs.iter().any(|attr| attr.path().is_ident("output")) {
804 return None;
805 }
806
807 let field_name = field.ident.as_ref().unwrap();
808 let field_name_str = field_name.to_string();
809
810 match &field.ty {
811 Type::Path(type_path) if is_vec(type_path) => {
813 let name_code = quote! {
814 for i in 0..self.#field_name.len() {
815 names.push(format!("{}[{}]", #field_name_str, i));
816 }
817 };
818 let init_code = quote! {
819 for i in 0..self.#field_name.len() {
820 __FsdrOutput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
821 }
822 };
823 let validate_code = quote! {
824 for i in 0..self.#field_name.len() {
825 __FsdrOutput::validate(&self.#field_name[i])?;
826 }
827 };
828 let notify_code = quote! {
829 for i in 0..self.#field_name.len() {
830 __FsdrOutput::notify_finished(&mut self.#field_name[i]).await;
831 }
832 };
833 let connect_code = quote! {
834 for (i, _) in self.#field_name.iter_mut().enumerate() {
835 if name == format!("{}[{}]", #field_name_str, i) {
836 return __FsdrOutput::connect_dyn(&mut self.#field_name[i], reader);
837 }
838 }
839 };
840 Some((name_code, init_code, validate_code, notify_code, connect_code))
841 }
842 Type::Array(array) => {
844 let len = &array.len;
845 let name_code = quote! {
846 for i in 0..#len {
847 names.push(format!("{}[{}]", #field_name_str, i));
848 }
849 };
850 let init_code = quote! {
851 for i in 0..#len {
852 __FsdrOutput::init(&mut self.#field_name[i], block_id, PortId::new(format!("{}[{}]", #field_name_str, i)), inbox.clone());
853 }
854 };
855 let validate_code = quote! {
856 for i in 0..#len {
857 __FsdrOutput::validate(&self.#field_name[i])?;
858 }
859 };
860 let notify_code = quote! {
861 for i in 0..#len {
862 __FsdrOutput::notify_finished(&mut self.#field_name[i]).await;
863 }
864 };
865 let connect_code = quote! {
866 for (i, _) in self.#field_name.iter_mut().enumerate() {
867 if name == format!("{}[{}]", #field_name_str, i) {
868 return __FsdrOutput::connect_dyn(&mut self.#field_name[i], reader);
869 }
870 }
871 };
872 Some((name_code, init_code, validate_code, notify_code, connect_code))
873 }
874 Type::Tuple(tuple) => {
876 let len = tuple.elems.len();
877 let name_code = quote! {
878 for i in 0..#len {
879 names.push(format!("{}.{}", #field_name_str, i));
880 }
881 };
882 let init_code = tuple.elems.iter().enumerate().map(|(i, _)| {
883 let index = syn::Index::from(i);
884 quote! {
885 __FsdrOutput::init(&mut self.#field_name.#index, block_id, PortId::new(format!("{}.{}", #field_name_str, #index)), inbox.clone());
886 }
887 });
888 let init_code = quote! {
889 #(#init_code)*
890 };
891 let validate_code = tuple.elems.iter().enumerate().map(|(i, _)| {
892 let index = syn::Index::from(i);
893 quote! {
894 __FsdrOutput::validate(&self.#field_name.#index)?;
895 }
896 });
897 let validate_code = quote! {
898 #(#validate_code)*
899 };
900 let notify_code = tuple.elems.iter().enumerate().map(|(i, _)| {
901 let index = syn::Index::from(i);
902 quote! {
903 __FsdrOutput::notify_finished(&mut self.#field_name.#index).await;
904 }
905 });
906 let notify_code = quote! {
907 #(#notify_code)*
908 };
909 let connect_code = tuple.elems.iter().enumerate().map(|(i, _)| {
910 let index = syn::Index::from(i);
911 quote!{
912 if name == format!("{}.{}", #field_name_str, #index) {
913 return __FsdrOutput::connect_dyn(&mut self.#field_name.#index, reader);
914 }
915 }
916 });
917 let connect_code = quote! {
918 #(#connect_code)*
919 };
920 Some((name_code, init_code, validate_code, notify_code, connect_code))
921 }
922 _ => {
924 let name_code = quote! {
925 names.push(#field_name_str.to_string());
926 };
927 let init_code = quote! {
928 __FsdrOutput::init(&mut self.#field_name, block_id, PortId::new(#field_name_str.to_string()), inbox.clone());
929 };
930 let validate_code = quote! {
931 __FsdrOutput::validate(&self.#field_name)?;
932 };
933 let notify_code = quote! {
934 __FsdrOutput::notify_finished(&mut self.#field_name).await;
935 };
936 let connect_code = quote! {
937 if name == #field_name_str {
938 return __FsdrOutput::connect_dyn(&mut self.#field_name, reader);
939 }
940 };
941 Some((name_code, init_code, validate_code, notify_code, connect_code))
942 }
943 }
944 })
945 .collect::<Vec<_>>()
946 }
947 _ => Vec::new(),
948 };
949
950 let stream_outputs_names = stream_outputs
951 .iter()
952 .map(|x| x.0.clone())
953 .collect::<Vec<_>>();
954 let stream_outputs_init = stream_outputs
955 .iter()
956 .map(|x| x.1.clone())
957 .collect::<Vec<_>>();
958 let stream_outputs_validate = stream_outputs
959 .iter()
960 .map(|x| x.2.clone())
961 .collect::<Vec<_>>();
962 let stream_outputs_notify = stream_outputs
963 .iter()
964 .map(|x| x.3.clone())
965 .collect::<Vec<_>>();
966 let stream_outputs_connect = stream_outputs
967 .iter()
968 .map(|x| x.4.clone())
969 .collect::<Vec<_>>();
970
971 let (port_idents, port_types): (Vec<Ident>, Vec<Type>) = match struct_data.fields {
973 Fields::Named(ref fields_named) => fields_named
974 .named
975 .iter()
976 .filter_map(|field| {
977 if has_input_attr(&field.attrs) || has_output_attr(&field.attrs) {
978 let ident = field.ident.clone().unwrap();
979 let ty = field.ty.clone();
980 Some((ident, ty))
981 } else {
982 None
983 }
984 })
985 .unzip(),
986 Fields::Unnamed(_) | Fields::Unit => (Vec::new(), Vec::new()),
987 };
988 let port_getter_fns = port_idents
989 .iter()
990 .zip(port_types.iter())
991 .map(|(ident, ty)| {
992 quote! {
993 pub fn #ident(&mut self) -> &mut #ty {
995 &mut self.#ident
996 }
997 }
998 });
999
1000 let input_bound_types = match struct_data.fields {
1001 Fields::Named(ref fields_named) => fields_named
1002 .named
1003 .iter()
1004 .filter(|field| has_input_attr(&field.attrs))
1005 .flat_map(|field| port_bound_types(&field.ty))
1006 .collect::<Vec<_>>(),
1007 Fields::Unnamed(_) | Fields::Unit => Vec::new(),
1008 };
1009 let output_bound_types = match struct_data.fields {
1010 Fields::Named(ref fields_named) => fields_named
1011 .named
1012 .iter()
1013 .filter(|field| has_output_attr(&field.attrs))
1014 .flat_map(|field| port_bound_types(&field.ty))
1015 .collect::<Vec<_>>(),
1016 Fields::Unnamed(_) | Fields::Unit => Vec::new(),
1017 };
1018 let mut kernel_interface_generics = generics.clone();
1019 {
1020 let where_clause = kernel_interface_generics.make_where_clause();
1021 for ty in input_bound_types.iter() {
1022 where_clause
1023 .predicates
1024 .push(parse_quote!(#ty: ::futuresdr::runtime::buffer::BufferReader));
1025 }
1026 for ty in output_bound_types.iter() {
1027 where_clause
1028 .predicates
1029 .push(parse_quote!(#ty: ::futuresdr::runtime::buffer::BufferWriter));
1030 }
1031 }
1032 let (kernel_interface_impl_generics, _, kernel_interface_where_clause) =
1033 kernel_interface_generics.split_for_impl();
1034
1035 for attr in &input.attrs {
1037 if attr.path().is_ident("message_inputs") {
1038 let nested = attr
1039 .parse_args_with(
1040 syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
1041 )
1042 .unwrap();
1043 for m in nested {
1044 match m {
1045 Meta::NameValue(m) => {
1046 message_inputs.push(m.path.get_ident().unwrap().clone());
1047 if let syn::Expr::Lit(syn::ExprLit {
1048 lit: syn::Lit::Str(s),
1049 ..
1050 }) = m.value
1051 {
1052 message_input_names.push(s.value());
1053 } else {
1054 panic!(
1055 "message handlers have to be an identifier or identifier = \"port name\""
1056 );
1057 }
1058 }
1059 Meta::Path(p) => {
1060 let p = p.get_ident().unwrap();
1061 message_inputs.push(p.clone());
1062 message_input_names.push(p.to_string());
1063 }
1064 _ => {
1065 panic!("message inputs has to be a list of name-values or paths")
1066 }
1067 }
1068 }
1069 } else if attr.path().is_ident("message_outputs") {
1070 let nested = attr
1071 .parse_args_with(
1072 syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
1073 )
1074 .unwrap();
1075 for m in nested {
1076 match m {
1077 Meta::Path(p) => {
1078 let p = p.get_ident().unwrap();
1079 message_output_names.push(p.to_string());
1080 }
1081 _ => {
1082 panic!("message outputs has to be a list of paths")
1083 }
1084 }
1085 }
1086 } else if attr.path().is_ident("null_kernel") {
1087 let kernel_trait = quote! { ::futuresdr::runtime::dev::Kernel };
1088 kernel = quote! {
1089 #[doc(hidden)]
1090 impl #generics #kernel_trait for #struct_name #generics
1091 #where_clause { }
1092
1093 }
1094 } else if attr.path().is_ident("blocking") {
1095 blocking = quote! { true }
1096 } else if attr.path().is_ident("type_name") {
1097 let nested = attr
1098 .parse_args_with(
1099 syn::punctuated::Punctuated::<Meta, syn::Token![,]>::parse_terminated,
1100 )
1101 .unwrap();
1102 if let Some(Meta::Path(p)) = nested.get(0) {
1103 type_name = p.get_ident().unwrap().to_string();
1104 } else {
1105 panic!("type_name attribute should be in the form type_name(foo)");
1106 }
1107 }
1108 }
1109
1110 let message_input_names = message_input_names
1112 .into_iter()
1113 .map(|handler| {
1114 let handler = if let Some(stripped) = handler.strip_prefix("r#") {
1115 stripped.to_string()
1116 } else {
1117 handler
1118 };
1119 quote! {
1120 #handler
1121 }
1122 })
1123 .collect::<Vec<_>>();
1124
1125 let handler_matches = message_inputs
1127 .iter()
1128 .zip(message_input_names.clone())
1129 .map(|(handler, handler_name)| {
1130 quote! {
1131 #handler_name => self.#handler(io, mo, meta, p).await,
1132 }
1133 })
1134 .collect::<Vec<_>>();
1135
1136 let interface_trait = quote! { ::futuresdr::runtime::__private::KernelInterface };
1137 let work_io_type = quote! { ::futuresdr::runtime::dev::WorkIo };
1138 let port_getters = quote! {
1139 impl #generics #struct_name #unconstraint_generics
1140 #where_clause
1141 {
1142 #(#port_getter_fns)*
1143 }
1144 };
1145
1146 let expanded = quote! {
1147
1148 #port_getters
1149
1150 impl #kernel_interface_impl_generics #interface_trait for #struct_name #unconstraint_generics
1151 #kernel_interface_where_clause
1152 {
1153 fn is_blocking() -> bool {
1154 #blocking
1155 }
1156 fn type_name() -> &'static str {
1157 static TYPE_NAME: &str = #type_name;
1158 TYPE_NAME
1159 }
1160 fn stream_inputs(&self) -> Vec<String> {
1161 let mut names = vec![];
1162 #(#stream_inputs_names)*
1163 names
1164 }
1165 fn stream_outputs(&self) -> Vec<String> {
1166 let mut names = vec![];
1167 #(#stream_outputs_names)*
1168 names
1169 }
1170
1171 fn stream_ports_init(&mut self, block_id: ::futuresdr::runtime::BlockId, inbox: ::futuresdr::runtime::dev::BlockInbox) {
1172 use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1173 use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1174 use ::futuresdr::runtime::PortId;
1175 #(#stream_inputs_init)*
1176 #(#stream_outputs_init)*
1177 }
1178 fn stream_ports_validate(&self) -> ::futuresdr::runtime::Result<(), ::futuresdr::runtime::Error> {
1179 use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1180 use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1181 use ::futuresdr::runtime::PortId;
1182 #(#stream_inputs_validate)*
1183 #(#stream_outputs_validate)*
1184 Ok(())
1185 }
1186 fn stream_input_finish(&mut self, port_id: ::futuresdr::runtime::PortId) -> ::futuresdr::runtime::Result<(), futuresdr::runtime::Error> {
1187 use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1188 use ::futuresdr::runtime::Error;
1189 use ::futuresdr::runtime::BlockPortCtx;
1190 let port = port_id.name();
1191 #(#stream_inputs_finish)*
1192 Err(Error::InvalidStreamPort(BlockPortCtx::None, port_id))
1193 }
1194 async fn stream_ports_notify_finished(&mut self) {
1195 use ::futuresdr::runtime::buffer::BufferReader as __FsdrInput;
1196 use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1197 #(#stream_inputs_notify)*
1198 #(#stream_outputs_notify)*
1199 }
1200 fn stream_input(
1201 &mut self,
1202 id: &::futuresdr::runtime::PortId,
1203 ) -> ::futuresdr::runtime::Result<
1204 &mut dyn ::futuresdr::runtime::buffer::BufferReader,
1205 ::futuresdr::runtime::Error,
1206 > {
1207 use ::futuresdr::runtime::Error;
1208 use ::futuresdr::runtime::BlockPortCtx;
1209 let name = id.name();
1210 #(#stream_inputs_get)*
1211 Err(Error::InvalidStreamPort(BlockPortCtx::None, id.clone()))
1212 }
1213 fn connect_stream_output(
1214 &mut self,
1215 id: &::futuresdr::runtime::PortId,
1216 reader: &mut dyn ::futuresdr::runtime::buffer::BufferReader,
1217 ) -> ::futuresdr::runtime::Result<(), ::futuresdr::runtime::Error> {
1218 use ::futuresdr::runtime::buffer::BufferWriter as __FsdrOutput;
1219 use ::futuresdr::runtime::Error;
1220 use ::futuresdr::runtime::BlockPortCtx;
1221 let name = id.name();
1222 #(#stream_outputs_connect)*
1223 Err(Error::InvalidStreamPort(BlockPortCtx::None, id.clone()))
1224 }
1225
1226 fn message_inputs() -> &'static[&'static str] {
1227 static MESSAGE_INPUTS: &[&str] = &[#(#message_input_names),*];
1228 MESSAGE_INPUTS
1229 }
1230 fn message_outputs() -> &'static[&'static str] {
1231 static MESSAGE_OUTPUTS: &[&str] = &[#(#message_output_names),*];
1232 MESSAGE_OUTPUTS
1233 }
1234 async fn call_handler(
1235 &mut self,
1236 io: &mut #work_io_type,
1237 mo: &mut ::futuresdr::runtime::dev::MessageOutputs,
1238 meta: &mut ::futuresdr::runtime::dev::BlockMeta,
1239 id: ::futuresdr::runtime::PortId,
1240 p: ::futuresdr::runtime::Pmt) ->
1241 ::futuresdr::runtime::Result<::futuresdr::runtime::Pmt, ::futuresdr::runtime::Error> {
1242 use ::futuresdr::runtime::BlockPortCtx;
1243 use ::futuresdr::runtime::Error;
1244 use ::futuresdr::runtime::Pmt;
1245 use ::futuresdr::runtime::PortId;
1246 use ::futuresdr::runtime::Result;
1247 let ret: Result<Pmt> = match id.name() {
1248 #(#handler_matches)*
1249 _ => return Err(Error::InvalidMessagePort(
1250 BlockPortCtx::None,
1251 id)),
1252 };
1253
1254 #[allow(unreachable_code)]
1255 ret.map_err(|e| Error::HandlerError(e.to_string()))
1256 }
1257 }
1258
1259 #kernel
1260 };
1261 proc_macro::TokenStream::from(expanded)
1263}
1264
1265#[allow(dead_code)]
1266fn pretty_print(ts: &proc_macro2::TokenStream) -> String {
1267 let syntax_tree = syn::parse2(ts.clone()).unwrap();
1268 prettyplease::unparse(&syntax_tree)
1269}