zrpc_macros/
lib.rs

1/*********************************************************************************
2* Copyright (c) 2022 ZettaScale Technology
3*
4* This program and the accompanying materials are made available under the
5* terms of the Eclipse Public License 2.0 which is available at
6* http://www.eclipse.org/legal/epl-2.0, or the Apache Software License 2.0
7* which is available at https://www.apache.org/licenses/LICENSE-2.0.
8*
9* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10* Contributors:
11*   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
12*********************************************************************************/
13#![allow(clippy::upper_case_acronyms)]
14#![recursion_limit = "512"]
15
16extern crate base64;
17extern crate darling;
18extern crate proc_macro;
19extern crate proc_macro2;
20extern crate quote;
21extern crate serde;
22extern crate syn;
23
24use darling::FromMeta;
25use inflector::cases::snakecase::to_snake_case;
26use proc_macro::TokenStream;
27use proc_macro2::{Span, TokenStream as TokenStream2};
28use quote::{format_ident, quote, quote_spanned, ToTokens};
29use std::str::FromStr;
30use syn::visit_mut::VisitMut;
31use syn::{
32    braced,
33    ext::IdentExt,
34    parenthesized,
35    parse::{Parse, ParseStream},
36    parse_macro_input, parse_quote,
37    spanned::Spanned,
38    token::Comma,
39    Attribute, AttributeArgs, Block, FnArg, Ident, ImplItem, ImplItemMethod, ImplItemType,
40    ItemImpl, Pat, PatIdent, PatType, Receiver, ReturnType, Signature, Token, Type, Visibility,
41};
42use syn_serde::json;
43use zenoh::prelude::ZenohId;
44
45mod receiver;
46use receiver::ReplaceReceiver;
47
48macro_rules! extend_errors {
49    ($errors: ident, $e: expr) => {
50        match $errors {
51            Ok(_) => $errors = Err($e),
52            Err(ref mut errors) => errors.extend($e),
53        }
54    };
55}
56
57#[derive(Debug, FromMeta)]
58struct ZServiceMacroArgs {
59    timeout_s: u16,
60    #[darling(default)]
61    prefix: Option<String>,
62    #[darling(default)]
63    service_uuid: Option<String>,
64}
65
66struct ZService {
67    attrs: Vec<Attribute>,
68    vis: Visibility,
69    ident: Ident,
70    evals: Vec<EvalMethod>,
71}
72
73impl Parse for ZService {
74    fn parse(input: ParseStream) -> syn::Result<Self> {
75        let attrs = input.call(Attribute::parse_outer)?;
76        let vis = input.parse()?;
77        input.parse::<Token![trait]>()?;
78        let ident: Ident = input.parse()?;
79        let content;
80        braced!(content in input);
81        let mut evals = Vec::<EvalMethod>::new();
82        while !content.is_empty() {
83            evals.push(content.parse()?);
84        }
85        let mut ident_errors = Ok(());
86        for eval in &evals {
87            if eval.ident == "new" {
88                extend_errors!(
89                    ident_errors,
90                    syn::Error::new(
91                        eval.ident.span(),
92                        format!(
93                            "method name conflicts with generated fn `{}Client::new`",
94                            ident.unraw()
95                        )
96                    )
97                );
98            }
99            if eval.ident == "serve" {
100                extend_errors!(
101                    ident_errors,
102                    syn::Error::new(
103                        eval.ident.span(),
104                        format!("method name conflicts with generated fn `{}::serve`", ident)
105                    )
106                );
107            }
108        }
109        ident_errors?;
110
111        Ok(Self {
112            attrs,
113            vis,
114            ident,
115            evals,
116        })
117    }
118}
119
120struct EvalMethod {
121    attrs: Vec<Attribute>,
122    ident: Ident,
123    receiver: Receiver,
124    args: Vec<PatType>,
125    output: ReturnType,
126}
127
128impl Parse for EvalMethod {
129    fn parse(input: ParseStream) -> syn::Result<Self> {
130        let attrs = input.call(Attribute::parse_outer)?;
131        input.parse::<Token![async]>()?;
132        input.parse::<Token![fn]>()?;
133        let ident = input.parse()?;
134        let content;
135        let mut recv: Option<Receiver> = None;
136        parenthesized!(content in input);
137        let mut args = Vec::new();
138        let mut errors = Ok(());
139        for arg in content.parse_terminated::<FnArg, Comma>(FnArg::parse)? {
140            match arg {
141                FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
142                    args.push(captured);
143                }
144                FnArg::Typed(captured) => {
145                    extend_errors!(
146                        errors,
147                        syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args")
148                    );
149                }
150                FnArg::Receiver(receiver) => {
151                    //Should take whatever used by the user and strip it for client
152                    recv = Some(receiver)
153                    // extend_errors!(
154                    //     errors,
155                    //     syn::Error::new(arg.span(), "method args cannot start with self, &mut self is added by the macro!")
156                    // );
157                }
158            }
159        }
160        match recv {
161            None => extend_errors!(
162                errors,
163                syn::Error::new(
164                    recv.span(),
165                    "Missing any receiver in method declaration, please add one!"
166                )
167            ),
168            Some(_) => (),
169        }
170
171        errors?;
172        let output = input.parse()?;
173        input.parse::<Token![;]>()?;
174        let receiver = recv.unwrap();
175        Ok(Self {
176            attrs,
177            ident,
178            receiver,
179            args,
180            output,
181        })
182    }
183}
184
185#[proc_macro_derive(Ast)]
186pub fn derive_ast(item: TokenStream) -> TokenStream {
187    let ast: syn::DeriveInput = syn::parse(item).unwrap();
188    let exp: syn::File = syn::parse_quote! {
189        #ast
190    };
191
192    println!("{}", json::to_string_pretty(&exp));
193    TokenStream::new()
194}
195
196/// Generates:
197/// - service trait
198/// - serve fn
199/// - client stub struct
200/// - new_stub client factory fn
201/// - Request and Response enums
202#[proc_macro_attribute]
203pub fn zservice(attr: TokenStream, input: TokenStream) -> TokenStream {
204    let unit_type: &Type = &parse_quote!(());
205
206    //parsing the trait body
207    let ZService {
208        ref attrs,
209        ref vis,
210        ref ident,
211        ref evals,
212    } = parse_macro_input!(input as ZService);
213
214    //parsing the attributes to the macro
215    let attr_args = parse_macro_input!(attr as AttributeArgs);
216    let macro_args = match ZServiceMacroArgs::from_list(&attr_args) {
217        Ok(v) => v,
218        Err(e) => {
219            return TokenStream::from(e.write_errors());
220        }
221    };
222
223    //converts the functions names from snake_case to CamelCase
224    let camel_case_fn_names: &Vec<_> = &evals
225        .iter()
226        .map(|eval| snake_to_camel(&eval.ident.unraw().to_string()))
227        .collect();
228
229    let snake_case_ident = to_snake_case(&ident.unraw().to_string());
230
231    // Collects the pattern for the types
232    let args: &[&[PatType]] = &evals.iter().map(|eval| &*eval.args).collect::<Vec<_>>();
233
234    let service_uuid = match macro_args.service_uuid {
235        Some(u) => ZenohId::from_str(&u).unwrap(),
236        None => ZenohId::rand(),
237    };
238
239    //service eval path
240    let path = match macro_args.prefix {
241        Some(prefix) => format!("{}/zservice/{}/{}/", prefix, ident, service_uuid),
242        None => format!("zservice/{}/{}/", ident, service_uuid),
243    };
244
245    let service_name = format!("{}Service", ident);
246    // Generates the code
247    let ts: TokenStream = ZServiceGenerator {
248        service_ident: ident,
249        server_ident: &format_ident!("Serve{}", ident), //Server is called Serve<Trait Name>
250        client_ident: &format_ident!("{}Client", ident), //Client is called <Trait Name>Client
251        request_ident: &format_ident!("{}Request", ident), //Request type is called <Trait Name>Request
252        response_ident: &format_ident!("{}Response", ident), //Response type is called <Trait Name>Response
253        vis,
254        args,
255        method_attrs: &evals.iter().map(|eval| &*eval.attrs).collect::<Vec<_>>(), //getting evals attributes
256        method_idents: &evals.iter().map(|eval| &eval.ident).collect::<Vec<_>>(), //getting evals names
257        attrs,
258        evals,
259        return_types: &evals //getting evals return type, if non present using unit
260            .iter()
261            .map(|eval| match eval.output {
262                ReturnType::Type(_, ref ty) => ty,
263                ReturnType::Default => unit_type,
264            })
265            .collect::<Vec<_>>(),
266        arg_pats: &args
267            .iter()
268            .map(|args| args.iter().map(|arg| &*arg.pat).collect())
269            .collect::<Vec<_>>(),
270        camel_case_idents: &evals
271            .iter()
272            .zip(camel_case_fn_names.iter())
273            .map(|(eval, name)| Ident::new(name, eval.ident.span()))
274            .collect::<Vec<_>>(),
275        timeout: &macro_args.timeout_s,
276        eval_path: &path,
277        service_name: &service_name,
278        service_get_server_ident: &format_ident!("get_{}_server", snake_case_ident),
279    }
280    .into_token_stream()
281    .into();
282    ts
283}
284
285/// Update the implementation of the trait
286///
287/// Modifies the method in a similar way to what async_trait does
288/// This impl:
289///
290/// #[macro@zserver]
291/// impl Hello for Server {
292///     async pub fn hello(&self, param : String) -> String {
293///         format!("Hello {} from {}", param, self)
294///     }
295/// }
296///
297/// Should become something like:
298///
299/// impl Hello for Server {
300///     pub fn hello(&self, param : String) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = String> + '_ >> {
301///         async fn __hello(_self : &Server, param : String) -> String {
302///             format!("Hello {} from {}", param, self)
303///         }
304///         Box::pin(__hello(self,param))
305///     }
306/// }
307#[proc_macro_attribute]
308pub fn zserver(_attr: TokenStream, input: TokenStream) -> TokenStream {
309    let mut item = syn::parse_macro_input!(input as ItemImpl);
310    let span = item.span();
311
312    // let attr_args = parse_macro_input!(_attr as AttributeArgs);
313
314    let mut expected_non_async_types: Vec<(&ImplItemMethod, String)> = Vec::new();
315    let mut found_non_async_types: Vec<&ImplItemType> = Vec::new();
316
317    for inner in &mut item.items {
318        match inner {
319            ImplItem::Method(method) => {
320                if method.sig.asyncness.is_some() {
321                    // if this function is declared async, transform it into a regular function
322                    //method.sig.asyncness = None;
323                    // put the body inside an task::block_on(async {})
324                    /*
325                    let content = method.block.to_token_stream();
326                    let updated_impl = TokenStream::from(quote! {
327                        {
328                            task::block_on(
329                                async move
330                                #content
331                            )
332                        }
333                    });
334                    // and add the  #[allow(unused,clippy::manual_async_fn)] attribute
335                    method
336                        .attrs
337                        .push(parse_quote!(#[allow(unused,clippy::manual_async_fn)]));
338                    method.block = parse_macro_input!(updated_impl as Block);
339                    */
340                    let sig = &mut method.sig;
341                    let block = &mut method.block;
342                    transform_server_method_block(sig, block, &item.self_ty);
343                    transform_server_method_sig(sig);
344                    method
345                        .attrs
346                        .push(parse_quote!(#[allow(unused,clippy::manual_async_fn)]));
347                } else {
348                    // If it's not async, keep track of all required associated types for better
349                    // error reporting.
350                    expected_non_async_types.push((method, associated_type_for_eval(method)));
351                }
352            }
353            ImplItem::Type(typedecl) => found_non_async_types.push(typedecl),
354            _ => {}
355        }
356    }
357
358    if let Err(e) =
359        verify_types_were_provided(span, &expected_non_async_types, &found_non_async_types)
360    {
361        return TokenStream::from(e.to_compile_error());
362    }
363
364    TokenStream::from(quote!(#item))
365}
366
367/// Transforms the method signature for the server
368/// to return a Pin<Box<dyn Future<Output = #old_return_type > + Send + '_ >>
369fn transform_server_method_sig(sig: &mut Signature) {
370    let ret = match &sig.output {
371        ReturnType::Default => quote!(()),
372        ReturnType::Type(_, ret) => quote!(#ret),
373    };
374
375    sig.output = parse_quote! {
376        -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = #ret> + core::marker::Send + '_ >>
377    };
378    sig.asyncness = None;
379}
380
381fn transform_server_method_block(sig: &mut Signature, block: &mut Block, receiver: &Type) {
382    let inner_ident = format_ident!("__{}", sig.ident);
383
384    let args = sig.inputs.iter().enumerate().map(|(i, arg)| match arg {
385        FnArg::Receiver(Receiver { self_token, .. }) => quote!(#self_token),
386        FnArg::Typed(arg) => {
387            if let Pat::Ident(PatIdent { ident, .. }) = &*arg.pat {
388                quote!(#ident)
389            } else {
390                format_ident!("__arg{}", i).into_token_stream()
391            }
392        }
393    });
394
395    let mut standalone = sig.clone();
396    standalone.ident = inner_ident.clone();
397
398    match standalone.inputs.iter_mut().next() {
399        Some(
400            arg @ FnArg::Receiver(Receiver {
401                reference: Some(_), ..
402            }),
403        ) => {
404            let (self_token, mutability) = match arg {
405                FnArg::Receiver(Receiver {
406                    mutability,
407                    self_token,
408                    ..
409                }) => (self_token, mutability),
410                _ => unreachable!(),
411            };
412            let under_self = Ident::new("_self", self_token.span);
413            *arg = parse_quote! {
414                #mutability #under_self: &#receiver
415            };
416        }
417        Some(arg @ FnArg::Receiver(_)) => {
418            let (self_token, mutability) = match arg {
419                FnArg::Receiver(Receiver {
420                    mutability,
421                    self_token,
422                    ..
423                }) => (self_token, mutability),
424                _ => unreachable!(),
425            };
426            let under_self = Ident::new("_self", self_token.span);
427            *arg = parse_quote! {
428                #mutability #under_self: #receiver
429            };
430        }
431        Some(FnArg::Typed(arg)) => {
432            if let Pat::Ident(arg) = &mut *arg.pat {
433                if arg.ident == "self" {
434                    arg.ident = Ident::new("_self", arg.ident.span());
435                }
436            }
437        }
438        _ => {}
439    }
440
441    let mut replace = ReplaceReceiver::with_as_trait(receiver.clone(), None);
442    replace.visit_signature_mut(&mut standalone);
443    replace.visit_block_mut(block);
444
445    let brace = block.brace_token;
446    let box_pin = quote_spanned!(
447        brace.span => {
448            #standalone #block
449            Box::pin(#inner_ident(#(#args),*))
450    });
451    *block = parse_quote!(#box_pin);
452    block.brace_token = brace;
453}
454
455/// Creates the type name for a future, to be removed...
456fn associated_type_for_eval(method: &ImplItemMethod) -> String {
457    snake_to_camel(&method.sig.ident.unraw().to_string()) + "Fut"
458}
459
460/// Verifies if the types are provide for each methods
461fn verify_types_were_provided(
462    span: Span,
463    expected: &[(&ImplItemMethod, String)],
464    provided: &[&ImplItemType],
465) -> syn::Result<()> {
466    let mut result = Ok(());
467    for (method, expected) in expected {
468        if !provided.iter().any(|typedecl| typedecl.ident == expected) {
469            let mut e = syn::Error::new(
470                span,
471                format!("not all trait items implemented, missing: `{}`", expected),
472            );
473            let fn_span = method.sig.fn_token.span();
474            e.extend(syn::Error::new(
475                fn_span.join(method.sig.ident.span()).unwrap_or(fn_span),
476                format!(
477                    "hint: `#[zerver]` only rewrites async fns, and `fn {}` is not async",
478                    method.sig.ident
479                ),
480            ));
481            match result {
482                Ok(_) => result = Err(e),
483                Err(ref mut error) => error.extend(Some(e)),
484            }
485        }
486    }
487    result
488}
489
490/// Generator for the ZService
491struct ZServiceGenerator<'a> {
492    service_ident: &'a Ident,            //service type
493    server_ident: &'a Ident,             //server type
494    client_ident: &'a Ident,             //client type
495    request_ident: &'a Ident,            //request type
496    response_ident: &'a Ident,           //response type
497    vis: &'a Visibility,                 //visibility
498    attrs: &'a [Attribute],              //attributes
499    evals: &'a [EvalMethod],             //functions to be exposed via evals
500    camel_case_idents: &'a [Ident],      //camel case conversion of all names
501    method_idents: &'a [&'a Ident],      //type of the methods
502    method_attrs: &'a [&'a [Attribute]], //attributes of the methods
503    args: &'a [&'a [PatType]],           // types description pattern
504    return_types: &'a [&'a Type],        // return types of functions
505    arg_pats: &'a [Vec<&'a Pat>],        // patterns for args
506    timeout: &'a u16,                    //eval timeout
507    eval_path: &'a String,               //path for evals
508    service_name: &'a String,            //service name on zenoh
509    service_get_server_ident: &'a Ident, //the ident for the get_<trait>_server
510}
511
512impl<'a> ZServiceGenerator<'a> {
513    // crates the service trait
514    fn trait_service(&self) -> TokenStream2 {
515        let &Self {
516            attrs,
517            evals,
518            vis,
519            return_types,
520            service_ident,
521            server_ident,
522            service_get_server_ident,
523            ..
524        } = self;
525
526        let fns = evals.iter().zip(return_types.iter()).map(
527            |(
528                EvalMethod {
529                    attrs,
530                    ident,
531                    receiver,
532                    args,
533                    ..
534                },
535                output,
536            )| {
537                quote! {
538
539                    #(#attrs)*
540                    //fn #ident(#receiver, #(#args),*) ->  #output;
541                    fn #ident(#receiver, #(#args),*) ->  ::core::pin::Pin<Box<dyn ::core::future::Future<Output = #output> + core::marker::Send + '_ >>;
542                }
543            },
544        );
545
546        quote! {
547
548        #(#attrs)*
549        #vis trait #service_ident : Clone{
550            #(#fns)*
551
552            /// Returns the server object
553            fn #service_get_server_ident(self, z : async_std::sync::Arc<zenoh::Session>, id : Option<zenoh::prelude::ZenohId>) -> #server_ident<Self>{
554                let id = id.unwrap_or_else(zenoh::prelude::ZenohId::rand);
555                log::trace!("Getting Server with ID {}", id);
556                #server_ident::new(z,self, id)
557                }
558            }
559
560        }
561    }
562
563    //creates the server struct
564    fn struct_server(&self) -> TokenStream2 {
565        let &Self {
566            vis,
567            server_ident,
568            service_name,
569            ..
570        } = self;
571
572        quote! {
573            #[derive(Clone)]
574            #vis struct #server_ident<S> {
575                z : async_std::sync::Arc<zenoh::Session>,
576                server: S,
577                instance_id: zenoh::prelude::ZenohId,
578                state : async_std::sync::Arc<async_std::sync::RwLock<zrpc::ComponentState>>
579            }
580
581            impl<S> #server_ident<S> {
582                pub fn new(z : async_std::sync::Arc<zenoh::Session>, server : S, id : zenoh::prelude::ZenohId) -> Self {
583
584                    let ci = zrpc::ComponentState{
585                        uuid : id,
586                        name : format!("{}", #service_name),
587                        routerid : "".to_string(),
588                        peerid : "".to_string(),
589                        status : zrpc::ComponentStatus::HALTED,
590                    };
591
592                    Self {
593                        z,
594                        server,
595                        instance_id : id,
596                        state : async_std::sync::Arc::new(async_std::sync::RwLock::new(ci))
597                    }
598                }
599            }
600
601        }
602    }
603
604    // implements ZServe for the server
605    fn impl_serve_for_server(&self) -> TokenStream2 {
606        let &Self {
607            request_ident,
608            server_ident,
609            service_ident,
610            response_ident,
611            camel_case_idents,
612            arg_pats,
613            method_idents,
614            eval_path,
615            service_name,
616            ..
617        } = self;
618
619        quote! {
620
621
622            impl<S> zrpc::ZServe<#request_ident> for #server_ident<S>
623            where S: #service_ident + Send +'static
624            {
625                type Resp = #response_ident;
626
627                fn instance_uuid(&self) -> zenoh::prelude::ZenohId {
628                    self.instance_id
629                }
630
631                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
632                fn connect(&'_ self) ->
633                ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<(
634                    zrpc::AbortHandle,
635                    async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
636                )>> + '_>> {
637                    log::trace!("Connect Service {} Instance {}", #service_name, self.instance_uuid());
638
639                    async fn __connect<S>(_self: &#server_ident<S>) -> ZRPCResult<(
640                        zrpc::AbortHandle,
641                        async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
642                    )>
643                    where
644                        S: #service_ident + Send + 'static,
645                    {
646                        use futures::prelude::*;
647                        use std::convert::TryInto;
648                        use zenoh::prelude::r#async::*;
649                        use zenoh::prelude::*;
650
651
652                        let zinfo = _self.z.info();
653                        let pid = zinfo.zid().res().await.to_string().to_uppercase();
654
655                        let rid = match zinfo
656                            .routers_zid()
657                            .res()
658                            .await
659                            .collect::<Vec<ZenohId>>()
660                            .first()
661                        {
662                            Some(head) => head.to_string().to_uppercase(),
663                            None => "".to_string(),
664                        };
665
666                        let mut ci = _self.state.write().await;
667                        ci.peerid = pid.clone().to_uppercase();
668                        drop(ci);
669
670                        let zsession = async_std::sync::Arc::clone(&_self.z);
671
672                        let state = _self.state.clone();
673                        let path = format!("{}{}/state",#eval_path,_self.instance_uuid());
674
675                        let run_loop = async move {
676                            let mut queryable = zsession
677                                .declare_queryable(&path)
678                                .res()
679                                .await?;
680
681                            let kexpr: KeyExpr = (path.clone().try_into())
682                                .map_err(|e| zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e)))?;
683
684
685                            loop {
686                                let query = queryable
687                                    .recv_async()
688                                    .await
689                                    .map_err(|_| zrpc::zrpcresult::ZRPCError::MissingValue)?;
690                                let ci = state.read().await;
691                                let data = zrpc::serialize::serialize_state(&*ci)?;
692                                drop(ci);
693                                let value = Value::new(data.into())
694                                    .encoding(Encoding::APP_OCTET_STREAM);
695                                let sample = Sample::new(kexpr.clone(), value);
696                                query.reply(Ok(sample)).res().await.map_err(|e| {
697                                    zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e))
698                                })?;
699                            }
700                        };
701
702
703                        let (abort_handle, abort_registration) = zrpc::AbortHandle::new_pair();
704
705                        log::trace!("Spawning state responder task");
706                        let task_handle =
707                            async_std::task::spawn(zrpc::Abortable::new(run_loop, abort_registration));
708
709                        Ok((abort_handle, task_handle))
710                    }
711                    Box::pin(__connect(self))
712                }
713
714
715                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
716                fn initialize(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
717                    log::trace!("Initialize Service {} Instance {}", #service_name, self.instance_uuid());
718                    async fn __initialize<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
719                    where
720                        S: #service_ident + Send + 'static,
721                    {
722                        let mut ci = _self.state.write().await;
723                        match ci.status {
724                            zrpc::ComponentStatus::HALTED =>{
725                                ci.status = zrpc::ComponentStatus::INITIALIZING;
726                                Ok(())
727                            },
728                            _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot initialize a component in a state different than HALTED".to_string())),
729                        }
730
731                    }
732                    Box::pin(__initialize(self))
733                }
734
735
736                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
737                fn register(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>>{
738                    log::trace!("Register Service {} Instance {}", #service_name, self.instance_uuid());
739                    async fn __register<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
740                    where
741                    S: #service_ident + Send + 'static,
742                    {
743                        let mut ci = _self.state.write().await;
744                        match ci.status {
745                            zrpc::ComponentStatus::INITIALIZING => {
746                                ci.status = zrpc::ComponentStatus::REGISTERED;
747                                Ok(())
748                            },
749                            _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot register a component in a state different than INITIALIZING".to_string())),
750                        }
751                    }
752                    Box::pin(__register(self))
753                }
754
755                #[allow(clippy::type_complexity,clippy::manual_async_fn, clippy::needless_question_mark)]
756                fn start(
757                    &self,
758                ) -> ::core::pin::Pin<
759                    Box<
760                        dyn std::future::Future<
761                                Output = ZRPCResult<(
762                                    zrpc::AbortHandle,
763                                    async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
764                                )>> + '_>>
765                    {
766
767                    log::trace!("Start Service {} Instance {}", #service_name, self.instance_uuid());
768
769                    async fn __start<S>(
770                        _self: &#server_ident<S>,
771                    ) -> ZRPCResult<(
772                        zrpc::AbortHandle,
773                        async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
774                    )>
775                    where
776                        S: #service_ident + Send + 'static,
777                    {
778                            let (s, r) = async_std::channel::bounded::<()>(1);
779                            let barrier = async_std::sync::Arc::new(async_std::sync::Barrier::new(2));
780                            let ci = _self.state.read().await;
781                            match ci.status {
782                                zrpc::ComponentStatus::REGISTERED => {
783                                    drop(ci);
784
785
786                                    let server = _self.clone();
787                                    let b =  barrier.clone();
788                                    let (abort_handle, abort_registration) = zrpc::AbortHandle::new_pair();
789
790                                    log::trace!("Spawning serving loop");
791                                    let task_handle = async_std::task::spawn_blocking(move || {
792                                        async_std::task::block_on(zrpc::Abortable::new(
793                                            async { server.serve(b).await },
794                                            abort_registration,
795                                        ))
796                                    });
797
798                                    log::trace!("Waiting for serving loop to be ready");
799                                    barrier.wait().await;
800
801                                    // Updating status, using barrier to avoid race condition
802                                    let mut ci = _self.state.write().await;
803                                    ci.status = zrpc::ComponentStatus::SERVING;
804                                    drop(ci);
805
806                                    Ok((abort_handle, task_handle))
807
808                                }
809                                _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot start a component in a state different than REGISTERED".to_string())),
810                            }
811
812                    }
813                    Box::pin(__start(self))
814                }
815
816                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
817                fn run(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
818                    log::trace!("Run Service {} Instance {}", #service_name, self.instance_uuid());
819                    async fn __run<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
820                    where
821                        S: #service_ident + Send + 'static,
822                    {
823                        use std::convert::TryInto;
824                        use zenoh::prelude::r#async::*;
825                        use zenoh::prelude::*;
826
827                        let path = format!("{}{}/eval",#eval_path, _self.instance_uuid());
828                        log::trace!("Registering eval on {:?}", path);
829                        let mut queryable = _self
830                            .z
831                            .declare_queryable(&path)
832                            .res()
833                            .await?;
834
835                        let kexpr: KeyExpr = (path.clone().try_into())
836                            .map_err(|e| zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e)))?;
837
838                        log::trace!("Registered on {:?}", path);
839                        loop {
840                            let query = queryable.recv_async().await.map_err(|_| zrpc::zrpcresult::ZRPCError::MissingValue)?;
841                            log::trace!("Received query {:?}", query);
842                            let query_selector = query.selector();
843
844                            match query.value(){
845                                Some(value) => {
846                                    let req = zrpc::serialize::deserialize_request::<#request_ident>(&value.payload.contiguous())?;
847                                    log::trace!("Received on {:?} {:?}", path, req);
848
849                                    let mut ser = _self.server.clone();
850
851                                    let encoded_resp  = match req.clone() {
852                                        #(
853                                            #request_ident::#camel_case_idents{#(#arg_pats),*} => {
854                                                let resp = #response_ident::#camel_case_idents(ser.#method_idents( #(#arg_pats),*).await);
855                                                log::trace!("Reply to {:?} {:?} with {:?}", path, req, resp);
856                                                zrpc::serialize::serialize_response(&resp)
857                                            }
858                                        )*
859                                    }?;
860                                    let value = Value::new(encoded_resp.into()).encoding(Encoding::APP_OCTET_STREAM);
861                                    let sample = Sample::new(kexpr.clone(), value);
862                                    query
863                                        .reply(Ok(sample))
864                                        .res()
865                                        .await
866                                        .map_err(|e| zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e)))?;
867                                }
868                                None => log::error!("Received query on {:?} without value, not replying!", query_selector)
869                            }
870                        }
871                    }
872                    Box::pin( __run(self))
873                }
874
875                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
876                fn serve(
877                    &self,
878                    barrier : async_std::sync::Arc<async_std::sync::Barrier>,
879                ) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
880                    log::trace!("Serve Service {} Instance {}", #service_name, self.instance_uuid());
881                    async fn __serve<S>(_self: &#server_ident<S>, _barrier : async_std::sync::Arc<async_std::sync::Barrier>) -> ZRPCResult<()>
882                    where
883                        S: #service_ident + Send + 'static,
884                    {
885                        use futures::prelude::*;
886                        use async_std::prelude::FutureExt;
887
888                        let ci = _self.state.read().await;
889                        match ci.status {
890                            zrpc::ComponentStatus::REGISTERED => {
891                                drop(ci);
892
893                                _barrier.wait().await;
894
895                                log::trace!("RPC Receiver loop started...");
896                                loop {
897                                    match _self.run().await {
898                                        Err(e) => {
899                                            log::error!("The run loop existed with {:?}, restaring...", e);
900                                        }
901                                        Ok(_) => {
902                                            log::warn!("The run loop existed with unit restaring...");
903                                        }
904
905                                    }
906                                }
907                            }
908                            _ => Err(ZRPCError::StateTransitionNotAllowed("State is not WORK, serve called directly? serve is called by calling work!".to_string())),
909                        }
910                    }
911                    let res  = __serve(self, barrier);
912                    Box::pin(res)
913                }
914
915                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
916                fn stop(
917                    &self,
918                    stop: zrpc::AbortHandle,
919                ) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
920                    log::trace!("Stop Service {} Instance {}", #service_name, self.instance_uuid());
921                    async fn __stop<S>(_self: &#server_ident<S>, _stop: zrpc::AbortHandle) -> ZRPCResult<()>
922                    where
923                        S: #service_ident + Send + 'static,
924                    {
925                        let mut ci = _self.state.write().await;
926                        match ci.status {
927                            zrpc::ComponentStatus::SERVING => {
928                                ci.status = zrpc::ComponentStatus::REGISTERED;
929                                drop(ci);
930                                _stop.abort();
931                                Ok(())
932                            },
933                            _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot stop a component in a state different than WORK".to_string())),
934                        }
935                    }
936                    Box::pin(__stop(self, stop))
937                }
938
939                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
940                fn unregister(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
941                    log::trace!("Unregister Service {} Instance {}", #service_name, self.instance_uuid());
942                    async fn __unregister<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
943                    where
944                        S: #service_ident + Send + 'static,
945                    {
946                        let mut ci = _self.state.write().await;
947                        match ci.status {
948                            zrpc::ComponentStatus::REGISTERED =>{
949                                ci.status = zrpc::ComponentStatus::HALTED;
950                                Ok(())
951                            },
952                            _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot unregister a component in a state different than REGISTERED".to_string())),
953                        }
954                    }
955                    Box::pin(__unregister(self))
956                }
957
958                #[allow(clippy::type_complexity,clippy::manual_async_fn)]
959                fn disconnect(&self, stop: zrpc::AbortHandle,) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
960                    log::trace!("Disconnect Service {} Instance {}", #service_name, self.instance_uuid());
961                    async fn __disconnect<S>(_self: &#server_ident<S>, _stop: zrpc::AbortHandle) -> ZRPCResult<()>
962                    where
963                        S: #service_ident + Send + 'static,
964                        {
965                            let mut ci = _self.state.write().await;
966                            match ci.status {
967                                zrpc::ComponentStatus::HALTED => {
968                                    ci.status = zrpc::ComponentStatus::HALTED;
969                                    drop(ci);
970                                    _stop.abort();
971                                    Ok(())
972                                },
973                                _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot disconnect a component in a state different than HALTED".to_string())),
974                            }
975                        }
976                    Box::pin(__disconnect(self,stop))
977                }
978
979            }
980        }
981    }
982
983    // Generates the request enum type, and makes it derive Debug, Serialize and Deserialize
984    fn enum_request(&self) -> TokenStream2 {
985        let &Self {
986            vis,
987            request_ident,
988            camel_case_idents,
989            args,
990            ..
991        } = self;
992
993        quote! {
994            /// The request sent over the wire from the client to the server.
995            #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
996            #vis enum #request_ident {
997                #( #camel_case_idents{ #( #args ),* } ),*
998            }
999        }
1000    }
1001
1002    // Generates the response enum type, and makes it derive Debug, Serialize and Deserialize
1003    fn enum_response(&self) -> TokenStream2 {
1004        let &Self {
1005            vis,
1006            response_ident,
1007            camel_case_idents,
1008            return_types,
1009            ..
1010        } = self;
1011
1012        quote! {
1013            /// The response sent over the wire from the server to the client.
1014            #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
1015            #vis enum #response_ident {
1016                #( #camel_case_idents(#return_types) ),*
1017            }
1018        }
1019    }
1020
1021    // Generates the client struct
1022    fn struct_client(&self) -> TokenStream2 {
1023        let &Self {
1024            vis,
1025            client_ident,
1026            request_ident,
1027            response_ident,
1028            ..
1029        } = self;
1030
1031        quote! {
1032            #[allow(unused)]
1033            #[derive(Clone, Debug)]
1034            #vis struct #client_ident<C = zrpc::ZClientChannel<#request_ident, #response_ident>>{
1035                ch : C,
1036                server_uuid : zenoh::prelude::ZenohId,
1037            }
1038        }
1039    }
1040
1041    // Generates the implentation of the client
1042    fn impl_client_new_find_servers(&self) -> TokenStream2 {
1043        let &Self {
1044            client_ident,
1045            vis,
1046            eval_path,
1047            ..
1048        } = self;
1049
1050        quote! {
1051            impl #client_ident {
1052                #vis fn new(
1053                    z : async_std::sync::Arc<zenoh::Session>,
1054                    instance_id : zenoh::prelude::ZenohId
1055                ) -> #client_ident {
1056                        let new_client = zrpc::ZClientChannel::new(z, format!("{}",#eval_path), Some(instance_id));
1057                        #client_ident{
1058                            ch : new_client,
1059                            server_uuid : instance_id,
1060                        }
1061
1062                    }
1063
1064                #vis fn get_server_uuid(&self) -> zenoh::prelude::ZenohId {
1065                    self.server_uuid
1066                }
1067
1068                #vis fn find_servers(
1069                    z : async_std::sync::Arc<zenoh::Session>
1070                ) -> impl std::future::Future<Output = ZRPCResult<Vec<zenoh::prelude::ZenohId>>> + 'static
1071                {
1072                    async move {
1073                        use zenoh::prelude::r#async::*;
1074                        use zenoh::query::*;
1075                        use zenoh::prelude::*;
1076
1077
1078                        let selector = format!("{}*/state",#eval_path);
1079                        log::trace!("Find servers selector {}", selector);
1080                        let mut servers = Vec::new();
1081
1082                        let replies = z.get(&selector).target(QueryTarget::All).res().await?;
1083
1084                        while let Ok(d) = replies.recv_async().await {
1085                            match d.sample {
1086                                Ok(sample) => match sample.value.encoding {
1087                                    Encoding::APP_OCTET_STREAM => {
1088                                        let ca = zrpc::serialize::deserialize_state::<zrpc::ComponentState>(
1089                                            &sample.value.payload.contiguous(),
1090                                        )?;
1091                                        servers.push(ca.uuid);
1092                                    }
1093                                    _ => {
1094                                        return Err(ZRPCError::ZenohError(
1095                                            "Server information is not correctly encoded".to_string(),
1096                                        ))
1097                                    }
1098                                },
1099                                Err(e) => {
1100                                    return Err(ZRPCError::ZenohError(format!(
1101                                        "Unable to get sample from {:?}",e
1102                                    )))
1103                                }
1104                            }
1105                        }
1106                        Ok(servers)
1107                    }
1108                }
1109
1110                #vis fn find_servers_info(
1111                    z : async_std::sync::Arc<zenoh::Session>
1112                ) -> impl std::future::Future<Output = ZRPCResult<Vec<zrpc::ComponentState>>> + 'static
1113                {
1114                    async move {
1115                        use zenoh::prelude::r#async::*;
1116                        use zenoh::query::*;
1117                        use zenoh::prelude::*;
1118
1119                        let selector = format!("{}*/state",#eval_path);
1120                        log::trace!("Find servers selector {}", selector);
1121                        let mut servers = Vec::new();
1122
1123                        let replies = z.get(&selector).target(QueryTarget::All).res().await?;
1124
1125                        while let Ok(d) = replies.recv_async().await {
1126                            match d.sample {
1127                                Ok(sample) => match sample.value.encoding {
1128                                    Encoding::APP_OCTET_STREAM => {
1129                                        let ca = zrpc::serialize::deserialize_state::<zrpc::ComponentState>(
1130                                            &sample.value.payload.contiguous(),
1131                                        )?;
1132                                        servers.push(ca);
1133                                    }
1134                                    _ => {
1135                                        return Err(ZRPCError::ZenohError(
1136                                            "Server information is not correctly encoded".to_string(),
1137                                        ))
1138                                    }
1139                                },
1140                                Err(e) => {
1141                                    return Err(ZRPCError::ZenohError(format!(
1142                                        "Unable to get sample from {:?}",e
1143                                    )))
1144                                }
1145                            }
1146                        }
1147                        Ok(servers)
1148                    }
1149                }
1150
1151                #vis fn find_local_servers(
1152                    z : async_std::sync::Arc<zenoh::Session>
1153                ) -> impl std::future::Future<Output = ZRPCResult<Vec<zenoh::prelude::ZenohId>>> + 'static
1154                {
1155                    async move {
1156                        use zenoh::prelude::r#async::*;
1157                        use zenoh::query::*;
1158                        use zenoh::prelude::*;
1159                        use zrpc::zrpcresult::ZRPCError;
1160
1161
1162                        let servers = Self::find_servers_info(async_std::sync::Arc::clone(&z)).await?;
1163
1164                        let zinfo = z.info();
1165
1166                        let rid = match zinfo
1167                            .routers_zid()
1168                            .res()
1169                            .await
1170                            .collect::<Vec<ZenohId>>()
1171                            .first()
1172                        {
1173                            Some(head) => head.to_string().to_uppercase(),
1174                            None => "".to_string(),
1175                        };
1176                        if rid == "" {
1177                            return Ok(vec![])
1178                        }
1179                        log::trace!("Router ID is {}", rid);
1180
1181                        // This is a get from in the Router Admin space
1182                        let selector = format!("@/router/{}", rid);
1183
1184                        let mut rdata: Vec<Reply> = z.get(&selector).res().await?.into_iter().collect();
1185
1186                        if rdata.is_empty() {
1187                            return Err(ZRPCError::NotFound);
1188                        }
1189
1190                        let router_data = rdata.remove(0);
1191                        match router_data.sample {
1192                            Ok(sample) => match sample.value.encoding {
1193                                Encoding::APP_JSON => {
1194                                    let ri = zrpc::serialize::deserialize_router_info(
1195                                        &sample.value.payload.contiguous(),
1196                                    )?;
1197                                    let r: Vec<zenoh::prelude::ZenohId> = servers
1198                                        .into_iter()
1199                                        .filter_map(|ci| {
1200                                            let pid = String::from(&ci.peerid).to_uppercase();
1201                                            let mut it = ri.clone().sessions.into_iter();
1202                                            let f = it.find(|x| x.peer == pid.clone());
1203                                            if f.is_none() {
1204                                                None
1205                                            } else {
1206                                                Some(ci.uuid)
1207                                            }
1208                                        })
1209                                        .collect();
1210
1211                                    Ok(r)
1212                                }
1213                                _ => Err(ZRPCError::ZenohError(
1214                                    "Router information is not encoded in JSON".to_string(),
1215                                )),
1216                            },
1217                            Err(e) => Err(ZRPCError::ZenohError(format!(
1218                                "Unable to get sample from {:?}",e
1219                            ))),
1220                        }
1221                    }
1222                }
1223            }
1224        }
1225    }
1226
1227    // Generates the implementation of the client methods that maps to evals
1228    fn impl_client_eval_methods(&self) -> TokenStream2 {
1229        let &Self {
1230            client_ident,
1231            request_ident,
1232            response_ident,
1233            method_attrs,
1234            vis,
1235            method_idents,
1236            args,
1237            return_types,
1238            arg_pats,
1239            camel_case_idents,
1240            timeout,
1241            ..
1242        } = self;
1243
1244        quote! {
1245
1246            impl #client_ident {
1247                #vis fn verify_server(&self
1248                ) -> impl std::future::Future<Output = ZRPCResult<bool>> + '_ {
1249                    async move {
1250                        self.ch.verify_server().await
1251                    }
1252                }
1253
1254                #(
1255
1256                    #[allow(unused,clippy::manual_async_fn)]
1257                    #( #method_attrs )*
1258                    #vis fn #method_idents(&self, #( #args ),*)
1259                        -> impl std::future::Future<Output = ZRPCResult<#return_types>> + '_ {
1260                        let request = #request_ident::#camel_case_idents { #( #arg_pats ),* };
1261                        log::trace!("Sending {:?}", request);
1262                        async move {
1263                            let resp = self.ch.call_fun(request);
1264                            let dur = std::time::Duration::from_secs(#timeout as u64);
1265                            match async_std::future::timeout(dur, resp).await {
1266                                Ok(r) => match r {
1267                                    Ok(zr) => match zr {
1268                                            #response_ident::#camel_case_idents(msg) => std::result::Result::Ok(msg),
1269                                            _ => Err(ZRPCError::Unreachable),
1270                                        },
1271                                    Err(e) => Err(e),
1272                                },
1273                                Err(e) => Err(ZRPCError::TimedOut),
1274                            }
1275                        }
1276                    }
1277                )*
1278            }
1279        }
1280    }
1281}
1282
1283//Converts ZServiceGenerator to actual code
1284impl<'a> ToTokens for ZServiceGenerator<'a> {
1285    fn to_tokens(&self, output: &mut TokenStream2) {
1286        output.extend(vec![
1287            self.trait_service(),
1288            self.struct_server(),
1289            self.impl_serve_for_server(),
1290            self.enum_request(),
1291            self.enum_response(),
1292            self.struct_client(),
1293            self.impl_client_new_find_servers(),
1294            self.impl_client_eval_methods(),
1295        ])
1296    }
1297}
1298
1299//converts to snake_case to CamelCase, is used to convert functions name
1300fn snake_to_camel(ident_str: &str) -> String {
1301    let mut camel_ty = String::with_capacity(ident_str.len());
1302
1303    let mut last_char_was_underscore = true;
1304    for c in ident_str.chars() {
1305        match c {
1306            '_' => last_char_was_underscore = true,
1307            c if last_char_was_underscore => {
1308                camel_ty.extend(c.to_uppercase());
1309                last_char_was_underscore = false;
1310            }
1311            c => camel_ty.extend(c.to_lowercase()),
1312        }
1313    }
1314
1315    camel_ty.shrink_to_fit();
1316    camel_ty
1317}