1#![allow(clippy::upper_case_acronyms)]
14#![recursion_limit = "512"]
15
16extern crate base64;
17extern crate darling;
18extern crate proc_macro;
19extern crate proc_macro2;
20extern crate quote;
21extern crate serde;
22extern crate syn;
23
24use darling::FromMeta;
25use inflector::cases::snakecase::to_snake_case;
26use proc_macro::TokenStream;
27use proc_macro2::{Span, TokenStream as TokenStream2};
28use quote::{format_ident, quote, quote_spanned, ToTokens};
29use std::str::FromStr;
30use syn::visit_mut::VisitMut;
31use syn::{
32 braced,
33 ext::IdentExt,
34 parenthesized,
35 parse::{Parse, ParseStream},
36 parse_macro_input, parse_quote,
37 spanned::Spanned,
38 token::Comma,
39 Attribute, AttributeArgs, Block, FnArg, Ident, ImplItem, ImplItemMethod, ImplItemType,
40 ItemImpl, Pat, PatIdent, PatType, Receiver, ReturnType, Signature, Token, Type, Visibility,
41};
42use syn_serde::json;
43use zenoh::prelude::ZenohId;
44
45mod receiver;
46use receiver::ReplaceReceiver;
47
48macro_rules! extend_errors {
49 ($errors: ident, $e: expr) => {
50 match $errors {
51 Ok(_) => $errors = Err($e),
52 Err(ref mut errors) => errors.extend($e),
53 }
54 };
55}
56
57#[derive(Debug, FromMeta)]
58struct ZServiceMacroArgs {
59 timeout_s: u16,
60 #[darling(default)]
61 prefix: Option<String>,
62 #[darling(default)]
63 service_uuid: Option<String>,
64}
65
66struct ZService {
67 attrs: Vec<Attribute>,
68 vis: Visibility,
69 ident: Ident,
70 evals: Vec<EvalMethod>,
71}
72
73impl Parse for ZService {
74 fn parse(input: ParseStream) -> syn::Result<Self> {
75 let attrs = input.call(Attribute::parse_outer)?;
76 let vis = input.parse()?;
77 input.parse::<Token![trait]>()?;
78 let ident: Ident = input.parse()?;
79 let content;
80 braced!(content in input);
81 let mut evals = Vec::<EvalMethod>::new();
82 while !content.is_empty() {
83 evals.push(content.parse()?);
84 }
85 let mut ident_errors = Ok(());
86 for eval in &evals {
87 if eval.ident == "new" {
88 extend_errors!(
89 ident_errors,
90 syn::Error::new(
91 eval.ident.span(),
92 format!(
93 "method name conflicts with generated fn `{}Client::new`",
94 ident.unraw()
95 )
96 )
97 );
98 }
99 if eval.ident == "serve" {
100 extend_errors!(
101 ident_errors,
102 syn::Error::new(
103 eval.ident.span(),
104 format!("method name conflicts with generated fn `{}::serve`", ident)
105 )
106 );
107 }
108 }
109 ident_errors?;
110
111 Ok(Self {
112 attrs,
113 vis,
114 ident,
115 evals,
116 })
117 }
118}
119
120struct EvalMethod {
121 attrs: Vec<Attribute>,
122 ident: Ident,
123 receiver: Receiver,
124 args: Vec<PatType>,
125 output: ReturnType,
126}
127
128impl Parse for EvalMethod {
129 fn parse(input: ParseStream) -> syn::Result<Self> {
130 let attrs = input.call(Attribute::parse_outer)?;
131 input.parse::<Token![async]>()?;
132 input.parse::<Token![fn]>()?;
133 let ident = input.parse()?;
134 let content;
135 let mut recv: Option<Receiver> = None;
136 parenthesized!(content in input);
137 let mut args = Vec::new();
138 let mut errors = Ok(());
139 for arg in content.parse_terminated::<FnArg, Comma>(FnArg::parse)? {
140 match arg {
141 FnArg::Typed(captured) if matches!(&*captured.pat, Pat::Ident(_)) => {
142 args.push(captured);
143 }
144 FnArg::Typed(captured) => {
145 extend_errors!(
146 errors,
147 syn::Error::new(captured.pat.span(), "patterns aren't allowed in RPC args")
148 );
149 }
150 FnArg::Receiver(receiver) => {
151 recv = Some(receiver)
153 }
158 }
159 }
160 match recv {
161 None => extend_errors!(
162 errors,
163 syn::Error::new(
164 recv.span(),
165 "Missing any receiver in method declaration, please add one!"
166 )
167 ),
168 Some(_) => (),
169 }
170
171 errors?;
172 let output = input.parse()?;
173 input.parse::<Token![;]>()?;
174 let receiver = recv.unwrap();
175 Ok(Self {
176 attrs,
177 ident,
178 receiver,
179 args,
180 output,
181 })
182 }
183}
184
185#[proc_macro_derive(Ast)]
186pub fn derive_ast(item: TokenStream) -> TokenStream {
187 let ast: syn::DeriveInput = syn::parse(item).unwrap();
188 let exp: syn::File = syn::parse_quote! {
189 #ast
190 };
191
192 println!("{}", json::to_string_pretty(&exp));
193 TokenStream::new()
194}
195
196#[proc_macro_attribute]
203pub fn zservice(attr: TokenStream, input: TokenStream) -> TokenStream {
204 let unit_type: &Type = &parse_quote!(());
205
206 let ZService {
208 ref attrs,
209 ref vis,
210 ref ident,
211 ref evals,
212 } = parse_macro_input!(input as ZService);
213
214 let attr_args = parse_macro_input!(attr as AttributeArgs);
216 let macro_args = match ZServiceMacroArgs::from_list(&attr_args) {
217 Ok(v) => v,
218 Err(e) => {
219 return TokenStream::from(e.write_errors());
220 }
221 };
222
223 let camel_case_fn_names: &Vec<_> = &evals
225 .iter()
226 .map(|eval| snake_to_camel(&eval.ident.unraw().to_string()))
227 .collect();
228
229 let snake_case_ident = to_snake_case(&ident.unraw().to_string());
230
231 let args: &[&[PatType]] = &evals.iter().map(|eval| &*eval.args).collect::<Vec<_>>();
233
234 let service_uuid = match macro_args.service_uuid {
235 Some(u) => ZenohId::from_str(&u).unwrap(),
236 None => ZenohId::rand(),
237 };
238
239 let path = match macro_args.prefix {
241 Some(prefix) => format!("{}/zservice/{}/{}/", prefix, ident, service_uuid),
242 None => format!("zservice/{}/{}/", ident, service_uuid),
243 };
244
245 let service_name = format!("{}Service", ident);
246 let ts: TokenStream = ZServiceGenerator {
248 service_ident: ident,
249 server_ident: &format_ident!("Serve{}", ident), client_ident: &format_ident!("{}Client", ident), request_ident: &format_ident!("{}Request", ident), response_ident: &format_ident!("{}Response", ident), vis,
254 args,
255 method_attrs: &evals.iter().map(|eval| &*eval.attrs).collect::<Vec<_>>(), method_idents: &evals.iter().map(|eval| &eval.ident).collect::<Vec<_>>(), attrs,
258 evals,
259 return_types: &evals .iter()
261 .map(|eval| match eval.output {
262 ReturnType::Type(_, ref ty) => ty,
263 ReturnType::Default => unit_type,
264 })
265 .collect::<Vec<_>>(),
266 arg_pats: &args
267 .iter()
268 .map(|args| args.iter().map(|arg| &*arg.pat).collect())
269 .collect::<Vec<_>>(),
270 camel_case_idents: &evals
271 .iter()
272 .zip(camel_case_fn_names.iter())
273 .map(|(eval, name)| Ident::new(name, eval.ident.span()))
274 .collect::<Vec<_>>(),
275 timeout: ¯o_args.timeout_s,
276 eval_path: &path,
277 service_name: &service_name,
278 service_get_server_ident: &format_ident!("get_{}_server", snake_case_ident),
279 }
280 .into_token_stream()
281 .into();
282 ts
283}
284
285#[proc_macro_attribute]
308pub fn zserver(_attr: TokenStream, input: TokenStream) -> TokenStream {
309 let mut item = syn::parse_macro_input!(input as ItemImpl);
310 let span = item.span();
311
312 let mut expected_non_async_types: Vec<(&ImplItemMethod, String)> = Vec::new();
315 let mut found_non_async_types: Vec<&ImplItemType> = Vec::new();
316
317 for inner in &mut item.items {
318 match inner {
319 ImplItem::Method(method) => {
320 if method.sig.asyncness.is_some() {
321 let sig = &mut method.sig;
341 let block = &mut method.block;
342 transform_server_method_block(sig, block, &item.self_ty);
343 transform_server_method_sig(sig);
344 method
345 .attrs
346 .push(parse_quote!(#[allow(unused,clippy::manual_async_fn)]));
347 } else {
348 expected_non_async_types.push((method, associated_type_for_eval(method)));
351 }
352 }
353 ImplItem::Type(typedecl) => found_non_async_types.push(typedecl),
354 _ => {}
355 }
356 }
357
358 if let Err(e) =
359 verify_types_were_provided(span, &expected_non_async_types, &found_non_async_types)
360 {
361 return TokenStream::from(e.to_compile_error());
362 }
363
364 TokenStream::from(quote!(#item))
365}
366
367fn transform_server_method_sig(sig: &mut Signature) {
370 let ret = match &sig.output {
371 ReturnType::Default => quote!(()),
372 ReturnType::Type(_, ret) => quote!(#ret),
373 };
374
375 sig.output = parse_quote! {
376 -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = #ret> + core::marker::Send + '_ >>
377 };
378 sig.asyncness = None;
379}
380
381fn transform_server_method_block(sig: &mut Signature, block: &mut Block, receiver: &Type) {
382 let inner_ident = format_ident!("__{}", sig.ident);
383
384 let args = sig.inputs.iter().enumerate().map(|(i, arg)| match arg {
385 FnArg::Receiver(Receiver { self_token, .. }) => quote!(#self_token),
386 FnArg::Typed(arg) => {
387 if let Pat::Ident(PatIdent { ident, .. }) = &*arg.pat {
388 quote!(#ident)
389 } else {
390 format_ident!("__arg{}", i).into_token_stream()
391 }
392 }
393 });
394
395 let mut standalone = sig.clone();
396 standalone.ident = inner_ident.clone();
397
398 match standalone.inputs.iter_mut().next() {
399 Some(
400 arg @ FnArg::Receiver(Receiver {
401 reference: Some(_), ..
402 }),
403 ) => {
404 let (self_token, mutability) = match arg {
405 FnArg::Receiver(Receiver {
406 mutability,
407 self_token,
408 ..
409 }) => (self_token, mutability),
410 _ => unreachable!(),
411 };
412 let under_self = Ident::new("_self", self_token.span);
413 *arg = parse_quote! {
414 #mutability #under_self: &#receiver
415 };
416 }
417 Some(arg @ FnArg::Receiver(_)) => {
418 let (self_token, mutability) = match arg {
419 FnArg::Receiver(Receiver {
420 mutability,
421 self_token,
422 ..
423 }) => (self_token, mutability),
424 _ => unreachable!(),
425 };
426 let under_self = Ident::new("_self", self_token.span);
427 *arg = parse_quote! {
428 #mutability #under_self: #receiver
429 };
430 }
431 Some(FnArg::Typed(arg)) => {
432 if let Pat::Ident(arg) = &mut *arg.pat {
433 if arg.ident == "self" {
434 arg.ident = Ident::new("_self", arg.ident.span());
435 }
436 }
437 }
438 _ => {}
439 }
440
441 let mut replace = ReplaceReceiver::with_as_trait(receiver.clone(), None);
442 replace.visit_signature_mut(&mut standalone);
443 replace.visit_block_mut(block);
444
445 let brace = block.brace_token;
446 let box_pin = quote_spanned!(
447 brace.span => {
448 #standalone #block
449 Box::pin(#inner_ident(#(#args),*))
450 });
451 *block = parse_quote!(#box_pin);
452 block.brace_token = brace;
453}
454
455fn associated_type_for_eval(method: &ImplItemMethod) -> String {
457 snake_to_camel(&method.sig.ident.unraw().to_string()) + "Fut"
458}
459
460fn verify_types_were_provided(
462 span: Span,
463 expected: &[(&ImplItemMethod, String)],
464 provided: &[&ImplItemType],
465) -> syn::Result<()> {
466 let mut result = Ok(());
467 for (method, expected) in expected {
468 if !provided.iter().any(|typedecl| typedecl.ident == expected) {
469 let mut e = syn::Error::new(
470 span,
471 format!("not all trait items implemented, missing: `{}`", expected),
472 );
473 let fn_span = method.sig.fn_token.span();
474 e.extend(syn::Error::new(
475 fn_span.join(method.sig.ident.span()).unwrap_or(fn_span),
476 format!(
477 "hint: `#[zerver]` only rewrites async fns, and `fn {}` is not async",
478 method.sig.ident
479 ),
480 ));
481 match result {
482 Ok(_) => result = Err(e),
483 Err(ref mut error) => error.extend(Some(e)),
484 }
485 }
486 }
487 result
488}
489
490struct ZServiceGenerator<'a> {
492 service_ident: &'a Ident, server_ident: &'a Ident, client_ident: &'a Ident, request_ident: &'a Ident, response_ident: &'a Ident, vis: &'a Visibility, attrs: &'a [Attribute], evals: &'a [EvalMethod], camel_case_idents: &'a [Ident], method_idents: &'a [&'a Ident], method_attrs: &'a [&'a [Attribute]], args: &'a [&'a [PatType]], return_types: &'a [&'a Type], arg_pats: &'a [Vec<&'a Pat>], timeout: &'a u16, eval_path: &'a String, service_name: &'a String, service_get_server_ident: &'a Ident, }
511
512impl<'a> ZServiceGenerator<'a> {
513 fn trait_service(&self) -> TokenStream2 {
515 let &Self {
516 attrs,
517 evals,
518 vis,
519 return_types,
520 service_ident,
521 server_ident,
522 service_get_server_ident,
523 ..
524 } = self;
525
526 let fns = evals.iter().zip(return_types.iter()).map(
527 |(
528 EvalMethod {
529 attrs,
530 ident,
531 receiver,
532 args,
533 ..
534 },
535 output,
536 )| {
537 quote! {
538
539 #(#attrs)*
540 fn #ident(#receiver, #(#args),*) -> ::core::pin::Pin<Box<dyn ::core::future::Future<Output = #output> + core::marker::Send + '_ >>;
542 }
543 },
544 );
545
546 quote! {
547
548 #(#attrs)*
549 #vis trait #service_ident : Clone{
550 #(#fns)*
551
552 fn #service_get_server_ident(self, z : async_std::sync::Arc<zenoh::Session>, id : Option<zenoh::prelude::ZenohId>) -> #server_ident<Self>{
554 let id = id.unwrap_or_else(zenoh::prelude::ZenohId::rand);
555 log::trace!("Getting Server with ID {}", id);
556 #server_ident::new(z,self, id)
557 }
558 }
559
560 }
561 }
562
563 fn struct_server(&self) -> TokenStream2 {
565 let &Self {
566 vis,
567 server_ident,
568 service_name,
569 ..
570 } = self;
571
572 quote! {
573 #[derive(Clone)]
574 #vis struct #server_ident<S> {
575 z : async_std::sync::Arc<zenoh::Session>,
576 server: S,
577 instance_id: zenoh::prelude::ZenohId,
578 state : async_std::sync::Arc<async_std::sync::RwLock<zrpc::ComponentState>>
579 }
580
581 impl<S> #server_ident<S> {
582 pub fn new(z : async_std::sync::Arc<zenoh::Session>, server : S, id : zenoh::prelude::ZenohId) -> Self {
583
584 let ci = zrpc::ComponentState{
585 uuid : id,
586 name : format!("{}", #service_name),
587 routerid : "".to_string(),
588 peerid : "".to_string(),
589 status : zrpc::ComponentStatus::HALTED,
590 };
591
592 Self {
593 z,
594 server,
595 instance_id : id,
596 state : async_std::sync::Arc::new(async_std::sync::RwLock::new(ci))
597 }
598 }
599 }
600
601 }
602 }
603
604 fn impl_serve_for_server(&self) -> TokenStream2 {
606 let &Self {
607 request_ident,
608 server_ident,
609 service_ident,
610 response_ident,
611 camel_case_idents,
612 arg_pats,
613 method_idents,
614 eval_path,
615 service_name,
616 ..
617 } = self;
618
619 quote! {
620
621
622 impl<S> zrpc::ZServe<#request_ident> for #server_ident<S>
623 where S: #service_ident + Send +'static
624 {
625 type Resp = #response_ident;
626
627 fn instance_uuid(&self) -> zenoh::prelude::ZenohId {
628 self.instance_id
629 }
630
631 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
632 fn connect(&'_ self) ->
633 ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<(
634 zrpc::AbortHandle,
635 async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
636 )>> + '_>> {
637 log::trace!("Connect Service {} Instance {}", #service_name, self.instance_uuid());
638
639 async fn __connect<S>(_self: &#server_ident<S>) -> ZRPCResult<(
640 zrpc::AbortHandle,
641 async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
642 )>
643 where
644 S: #service_ident + Send + 'static,
645 {
646 use futures::prelude::*;
647 use std::convert::TryInto;
648 use zenoh::prelude::r#async::*;
649 use zenoh::prelude::*;
650
651
652 let zinfo = _self.z.info();
653 let pid = zinfo.zid().res().await.to_string().to_uppercase();
654
655 let rid = match zinfo
656 .routers_zid()
657 .res()
658 .await
659 .collect::<Vec<ZenohId>>()
660 .first()
661 {
662 Some(head) => head.to_string().to_uppercase(),
663 None => "".to_string(),
664 };
665
666 let mut ci = _self.state.write().await;
667 ci.peerid = pid.clone().to_uppercase();
668 drop(ci);
669
670 let zsession = async_std::sync::Arc::clone(&_self.z);
671
672 let state = _self.state.clone();
673 let path = format!("{}{}/state",#eval_path,_self.instance_uuid());
674
675 let run_loop = async move {
676 let mut queryable = zsession
677 .declare_queryable(&path)
678 .res()
679 .await?;
680
681 let kexpr: KeyExpr = (path.clone().try_into())
682 .map_err(|e| zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e)))?;
683
684
685 loop {
686 let query = queryable
687 .recv_async()
688 .await
689 .map_err(|_| zrpc::zrpcresult::ZRPCError::MissingValue)?;
690 let ci = state.read().await;
691 let data = zrpc::serialize::serialize_state(&*ci)?;
692 drop(ci);
693 let value = Value::new(data.into())
694 .encoding(Encoding::APP_OCTET_STREAM);
695 let sample = Sample::new(kexpr.clone(), value);
696 query.reply(Ok(sample)).res().await.map_err(|e| {
697 zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e))
698 })?;
699 }
700 };
701
702
703 let (abort_handle, abort_registration) = zrpc::AbortHandle::new_pair();
704
705 log::trace!("Spawning state responder task");
706 let task_handle =
707 async_std::task::spawn(zrpc::Abortable::new(run_loop, abort_registration));
708
709 Ok((abort_handle, task_handle))
710 }
711 Box::pin(__connect(self))
712 }
713
714
715 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
716 fn initialize(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
717 log::trace!("Initialize Service {} Instance {}", #service_name, self.instance_uuid());
718 async fn __initialize<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
719 where
720 S: #service_ident + Send + 'static,
721 {
722 let mut ci = _self.state.write().await;
723 match ci.status {
724 zrpc::ComponentStatus::HALTED =>{
725 ci.status = zrpc::ComponentStatus::INITIALIZING;
726 Ok(())
727 },
728 _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot initialize a component in a state different than HALTED".to_string())),
729 }
730
731 }
732 Box::pin(__initialize(self))
733 }
734
735
736 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
737 fn register(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>>{
738 log::trace!("Register Service {} Instance {}", #service_name, self.instance_uuid());
739 async fn __register<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
740 where
741 S: #service_ident + Send + 'static,
742 {
743 let mut ci = _self.state.write().await;
744 match ci.status {
745 zrpc::ComponentStatus::INITIALIZING => {
746 ci.status = zrpc::ComponentStatus::REGISTERED;
747 Ok(())
748 },
749 _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot register a component in a state different than INITIALIZING".to_string())),
750 }
751 }
752 Box::pin(__register(self))
753 }
754
755 #[allow(clippy::type_complexity,clippy::manual_async_fn, clippy::needless_question_mark)]
756 fn start(
757 &self,
758 ) -> ::core::pin::Pin<
759 Box<
760 dyn std::future::Future<
761 Output = ZRPCResult<(
762 zrpc::AbortHandle,
763 async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
764 )>> + '_>>
765 {
766
767 log::trace!("Start Service {} Instance {}", #service_name, self.instance_uuid());
768
769 async fn __start<S>(
770 _self: &#server_ident<S>,
771 ) -> ZRPCResult<(
772 zrpc::AbortHandle,
773 async_std::task::JoinHandle<Result<ZRPCResult<()>, zrpc::Aborted>>,
774 )>
775 where
776 S: #service_ident + Send + 'static,
777 {
778 let (s, r) = async_std::channel::bounded::<()>(1);
779 let barrier = async_std::sync::Arc::new(async_std::sync::Barrier::new(2));
780 let ci = _self.state.read().await;
781 match ci.status {
782 zrpc::ComponentStatus::REGISTERED => {
783 drop(ci);
784
785
786 let server = _self.clone();
787 let b = barrier.clone();
788 let (abort_handle, abort_registration) = zrpc::AbortHandle::new_pair();
789
790 log::trace!("Spawning serving loop");
791 let task_handle = async_std::task::spawn_blocking(move || {
792 async_std::task::block_on(zrpc::Abortable::new(
793 async { server.serve(b).await },
794 abort_registration,
795 ))
796 });
797
798 log::trace!("Waiting for serving loop to be ready");
799 barrier.wait().await;
800
801 let mut ci = _self.state.write().await;
803 ci.status = zrpc::ComponentStatus::SERVING;
804 drop(ci);
805
806 Ok((abort_handle, task_handle))
807
808 }
809 _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot start a component in a state different than REGISTERED".to_string())),
810 }
811
812 }
813 Box::pin(__start(self))
814 }
815
816 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
817 fn run(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
818 log::trace!("Run Service {} Instance {}", #service_name, self.instance_uuid());
819 async fn __run<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
820 where
821 S: #service_ident + Send + 'static,
822 {
823 use std::convert::TryInto;
824 use zenoh::prelude::r#async::*;
825 use zenoh::prelude::*;
826
827 let path = format!("{}{}/eval",#eval_path, _self.instance_uuid());
828 log::trace!("Registering eval on {:?}", path);
829 let mut queryable = _self
830 .z
831 .declare_queryable(&path)
832 .res()
833 .await?;
834
835 let kexpr: KeyExpr = (path.clone().try_into())
836 .map_err(|e| zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e)))?;
837
838 log::trace!("Registered on {:?}", path);
839 loop {
840 let query = queryable.recv_async().await.map_err(|_| zrpc::zrpcresult::ZRPCError::MissingValue)?;
841 log::trace!("Received query {:?}", query);
842 let query_selector = query.selector();
843
844 match query.value(){
845 Some(value) => {
846 let req = zrpc::serialize::deserialize_request::<#request_ident>(&value.payload.contiguous())?;
847 log::trace!("Received on {:?} {:?}", path, req);
848
849 let mut ser = _self.server.clone();
850
851 let encoded_resp = match req.clone() {
852 #(
853 #request_ident::#camel_case_idents{#(#arg_pats),*} => {
854 let resp = #response_ident::#camel_case_idents(ser.#method_idents( #(#arg_pats),*).await);
855 log::trace!("Reply to {:?} {:?} with {:?}", path, req, resp);
856 zrpc::serialize::serialize_response(&resp)
857 }
858 )*
859 }?;
860 let value = Value::new(encoded_resp.into()).encoding(Encoding::APP_OCTET_STREAM);
861 let sample = Sample::new(kexpr.clone(), value);
862 query
863 .reply(Ok(sample))
864 .res()
865 .await
866 .map_err(|e| zrpc::zrpcresult::ZRPCError::ZenohError(format!("{:?}",e)))?;
867 }
868 None => log::error!("Received query on {:?} without value, not replying!", query_selector)
869 }
870 }
871 }
872 Box::pin( __run(self))
873 }
874
875 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
876 fn serve(
877 &self,
878 barrier : async_std::sync::Arc<async_std::sync::Barrier>,
879 ) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
880 log::trace!("Serve Service {} Instance {}", #service_name, self.instance_uuid());
881 async fn __serve<S>(_self: &#server_ident<S>, _barrier : async_std::sync::Arc<async_std::sync::Barrier>) -> ZRPCResult<()>
882 where
883 S: #service_ident + Send + 'static,
884 {
885 use futures::prelude::*;
886 use async_std::prelude::FutureExt;
887
888 let ci = _self.state.read().await;
889 match ci.status {
890 zrpc::ComponentStatus::REGISTERED => {
891 drop(ci);
892
893 _barrier.wait().await;
894
895 log::trace!("RPC Receiver loop started...");
896 loop {
897 match _self.run().await {
898 Err(e) => {
899 log::error!("The run loop existed with {:?}, restaring...", e);
900 }
901 Ok(_) => {
902 log::warn!("The run loop existed with unit restaring...");
903 }
904
905 }
906 }
907 }
908 _ => Err(ZRPCError::StateTransitionNotAllowed("State is not WORK, serve called directly? serve is called by calling work!".to_string())),
909 }
910 }
911 let res = __serve(self, barrier);
912 Box::pin(res)
913 }
914
915 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
916 fn stop(
917 &self,
918 stop: zrpc::AbortHandle,
919 ) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
920 log::trace!("Stop Service {} Instance {}", #service_name, self.instance_uuid());
921 async fn __stop<S>(_self: &#server_ident<S>, _stop: zrpc::AbortHandle) -> ZRPCResult<()>
922 where
923 S: #service_ident + Send + 'static,
924 {
925 let mut ci = _self.state.write().await;
926 match ci.status {
927 zrpc::ComponentStatus::SERVING => {
928 ci.status = zrpc::ComponentStatus::REGISTERED;
929 drop(ci);
930 _stop.abort();
931 Ok(())
932 },
933 _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot stop a component in a state different than WORK".to_string())),
934 }
935 }
936 Box::pin(__stop(self, stop))
937 }
938
939 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
940 fn unregister(&self) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
941 log::trace!("Unregister Service {} Instance {}", #service_name, self.instance_uuid());
942 async fn __unregister<S>(_self: &#server_ident<S>) -> ZRPCResult<()>
943 where
944 S: #service_ident + Send + 'static,
945 {
946 let mut ci = _self.state.write().await;
947 match ci.status {
948 zrpc::ComponentStatus::REGISTERED =>{
949 ci.status = zrpc::ComponentStatus::HALTED;
950 Ok(())
951 },
952 _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot unregister a component in a state different than REGISTERED".to_string())),
953 }
954 }
955 Box::pin(__unregister(self))
956 }
957
958 #[allow(clippy::type_complexity,clippy::manual_async_fn)]
959 fn disconnect(&self, stop: zrpc::AbortHandle,) -> ::core::pin::Pin<Box<dyn std::future::Future<Output = ZRPCResult<()>> + '_>> {
960 log::trace!("Disconnect Service {} Instance {}", #service_name, self.instance_uuid());
961 async fn __disconnect<S>(_self: &#server_ident<S>, _stop: zrpc::AbortHandle) -> ZRPCResult<()>
962 where
963 S: #service_ident + Send + 'static,
964 {
965 let mut ci = _self.state.write().await;
966 match ci.status {
967 zrpc::ComponentStatus::HALTED => {
968 ci.status = zrpc::ComponentStatus::HALTED;
969 drop(ci);
970 _stop.abort();
971 Ok(())
972 },
973 _ => Err(ZRPCError::StateTransitionNotAllowed("Cannot disconnect a component in a state different than HALTED".to_string())),
974 }
975 }
976 Box::pin(__disconnect(self,stop))
977 }
978
979 }
980 }
981 }
982
983 fn enum_request(&self) -> TokenStream2 {
985 let &Self {
986 vis,
987 request_ident,
988 camel_case_idents,
989 args,
990 ..
991 } = self;
992
993 quote! {
994 #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
996 #vis enum #request_ident {
997 #( #camel_case_idents{ #( #args ),* } ),*
998 }
999 }
1000 }
1001
1002 fn enum_response(&self) -> TokenStream2 {
1004 let &Self {
1005 vis,
1006 response_ident,
1007 camel_case_idents,
1008 return_types,
1009 ..
1010 } = self;
1011
1012 quote! {
1013 #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
1015 #vis enum #response_ident {
1016 #( #camel_case_idents(#return_types) ),*
1017 }
1018 }
1019 }
1020
1021 fn struct_client(&self) -> TokenStream2 {
1023 let &Self {
1024 vis,
1025 client_ident,
1026 request_ident,
1027 response_ident,
1028 ..
1029 } = self;
1030
1031 quote! {
1032 #[allow(unused)]
1033 #[derive(Clone, Debug)]
1034 #vis struct #client_ident<C = zrpc::ZClientChannel<#request_ident, #response_ident>>{
1035 ch : C,
1036 server_uuid : zenoh::prelude::ZenohId,
1037 }
1038 }
1039 }
1040
1041 fn impl_client_new_find_servers(&self) -> TokenStream2 {
1043 let &Self {
1044 client_ident,
1045 vis,
1046 eval_path,
1047 ..
1048 } = self;
1049
1050 quote! {
1051 impl #client_ident {
1052 #vis fn new(
1053 z : async_std::sync::Arc<zenoh::Session>,
1054 instance_id : zenoh::prelude::ZenohId
1055 ) -> #client_ident {
1056 let new_client = zrpc::ZClientChannel::new(z, format!("{}",#eval_path), Some(instance_id));
1057 #client_ident{
1058 ch : new_client,
1059 server_uuid : instance_id,
1060 }
1061
1062 }
1063
1064 #vis fn get_server_uuid(&self) -> zenoh::prelude::ZenohId {
1065 self.server_uuid
1066 }
1067
1068 #vis fn find_servers(
1069 z : async_std::sync::Arc<zenoh::Session>
1070 ) -> impl std::future::Future<Output = ZRPCResult<Vec<zenoh::prelude::ZenohId>>> + 'static
1071 {
1072 async move {
1073 use zenoh::prelude::r#async::*;
1074 use zenoh::query::*;
1075 use zenoh::prelude::*;
1076
1077
1078 let selector = format!("{}*/state",#eval_path);
1079 log::trace!("Find servers selector {}", selector);
1080 let mut servers = Vec::new();
1081
1082 let replies = z.get(&selector).target(QueryTarget::All).res().await?;
1083
1084 while let Ok(d) = replies.recv_async().await {
1085 match d.sample {
1086 Ok(sample) => match sample.value.encoding {
1087 Encoding::APP_OCTET_STREAM => {
1088 let ca = zrpc::serialize::deserialize_state::<zrpc::ComponentState>(
1089 &sample.value.payload.contiguous(),
1090 )?;
1091 servers.push(ca.uuid);
1092 }
1093 _ => {
1094 return Err(ZRPCError::ZenohError(
1095 "Server information is not correctly encoded".to_string(),
1096 ))
1097 }
1098 },
1099 Err(e) => {
1100 return Err(ZRPCError::ZenohError(format!(
1101 "Unable to get sample from {:?}",e
1102 )))
1103 }
1104 }
1105 }
1106 Ok(servers)
1107 }
1108 }
1109
1110 #vis fn find_servers_info(
1111 z : async_std::sync::Arc<zenoh::Session>
1112 ) -> impl std::future::Future<Output = ZRPCResult<Vec<zrpc::ComponentState>>> + 'static
1113 {
1114 async move {
1115 use zenoh::prelude::r#async::*;
1116 use zenoh::query::*;
1117 use zenoh::prelude::*;
1118
1119 let selector = format!("{}*/state",#eval_path);
1120 log::trace!("Find servers selector {}", selector);
1121 let mut servers = Vec::new();
1122
1123 let replies = z.get(&selector).target(QueryTarget::All).res().await?;
1124
1125 while let Ok(d) = replies.recv_async().await {
1126 match d.sample {
1127 Ok(sample) => match sample.value.encoding {
1128 Encoding::APP_OCTET_STREAM => {
1129 let ca = zrpc::serialize::deserialize_state::<zrpc::ComponentState>(
1130 &sample.value.payload.contiguous(),
1131 )?;
1132 servers.push(ca);
1133 }
1134 _ => {
1135 return Err(ZRPCError::ZenohError(
1136 "Server information is not correctly encoded".to_string(),
1137 ))
1138 }
1139 },
1140 Err(e) => {
1141 return Err(ZRPCError::ZenohError(format!(
1142 "Unable to get sample from {:?}",e
1143 )))
1144 }
1145 }
1146 }
1147 Ok(servers)
1148 }
1149 }
1150
1151 #vis fn find_local_servers(
1152 z : async_std::sync::Arc<zenoh::Session>
1153 ) -> impl std::future::Future<Output = ZRPCResult<Vec<zenoh::prelude::ZenohId>>> + 'static
1154 {
1155 async move {
1156 use zenoh::prelude::r#async::*;
1157 use zenoh::query::*;
1158 use zenoh::prelude::*;
1159 use zrpc::zrpcresult::ZRPCError;
1160
1161
1162 let servers = Self::find_servers_info(async_std::sync::Arc::clone(&z)).await?;
1163
1164 let zinfo = z.info();
1165
1166 let rid = match zinfo
1167 .routers_zid()
1168 .res()
1169 .await
1170 .collect::<Vec<ZenohId>>()
1171 .first()
1172 {
1173 Some(head) => head.to_string().to_uppercase(),
1174 None => "".to_string(),
1175 };
1176 if rid == "" {
1177 return Ok(vec![])
1178 }
1179 log::trace!("Router ID is {}", rid);
1180
1181 let selector = format!("@/router/{}", rid);
1183
1184 let mut rdata: Vec<Reply> = z.get(&selector).res().await?.into_iter().collect();
1185
1186 if rdata.is_empty() {
1187 return Err(ZRPCError::NotFound);
1188 }
1189
1190 let router_data = rdata.remove(0);
1191 match router_data.sample {
1192 Ok(sample) => match sample.value.encoding {
1193 Encoding::APP_JSON => {
1194 let ri = zrpc::serialize::deserialize_router_info(
1195 &sample.value.payload.contiguous(),
1196 )?;
1197 let r: Vec<zenoh::prelude::ZenohId> = servers
1198 .into_iter()
1199 .filter_map(|ci| {
1200 let pid = String::from(&ci.peerid).to_uppercase();
1201 let mut it = ri.clone().sessions.into_iter();
1202 let f = it.find(|x| x.peer == pid.clone());
1203 if f.is_none() {
1204 None
1205 } else {
1206 Some(ci.uuid)
1207 }
1208 })
1209 .collect();
1210
1211 Ok(r)
1212 }
1213 _ => Err(ZRPCError::ZenohError(
1214 "Router information is not encoded in JSON".to_string(),
1215 )),
1216 },
1217 Err(e) => Err(ZRPCError::ZenohError(format!(
1218 "Unable to get sample from {:?}",e
1219 ))),
1220 }
1221 }
1222 }
1223 }
1224 }
1225 }
1226
1227 fn impl_client_eval_methods(&self) -> TokenStream2 {
1229 let &Self {
1230 client_ident,
1231 request_ident,
1232 response_ident,
1233 method_attrs,
1234 vis,
1235 method_idents,
1236 args,
1237 return_types,
1238 arg_pats,
1239 camel_case_idents,
1240 timeout,
1241 ..
1242 } = self;
1243
1244 quote! {
1245
1246 impl #client_ident {
1247 #vis fn verify_server(&self
1248 ) -> impl std::future::Future<Output = ZRPCResult<bool>> + '_ {
1249 async move {
1250 self.ch.verify_server().await
1251 }
1252 }
1253
1254 #(
1255
1256 #[allow(unused,clippy::manual_async_fn)]
1257 #( #method_attrs )*
1258 #vis fn #method_idents(&self, #( #args ),*)
1259 -> impl std::future::Future<Output = ZRPCResult<#return_types>> + '_ {
1260 let request = #request_ident::#camel_case_idents { #( #arg_pats ),* };
1261 log::trace!("Sending {:?}", request);
1262 async move {
1263 let resp = self.ch.call_fun(request);
1264 let dur = std::time::Duration::from_secs(#timeout as u64);
1265 match async_std::future::timeout(dur, resp).await {
1266 Ok(r) => match r {
1267 Ok(zr) => match zr {
1268 #response_ident::#camel_case_idents(msg) => std::result::Result::Ok(msg),
1269 _ => Err(ZRPCError::Unreachable),
1270 },
1271 Err(e) => Err(e),
1272 },
1273 Err(e) => Err(ZRPCError::TimedOut),
1274 }
1275 }
1276 }
1277 )*
1278 }
1279 }
1280 }
1281}
1282
1283impl<'a> ToTokens for ZServiceGenerator<'a> {
1285 fn to_tokens(&self, output: &mut TokenStream2) {
1286 output.extend(vec![
1287 self.trait_service(),
1288 self.struct_server(),
1289 self.impl_serve_for_server(),
1290 self.enum_request(),
1291 self.enum_response(),
1292 self.struct_client(),
1293 self.impl_client_new_find_servers(),
1294 self.impl_client_eval_methods(),
1295 ])
1296 }
1297}
1298
1299fn snake_to_camel(ident_str: &str) -> String {
1301 let mut camel_ty = String::with_capacity(ident_str.len());
1302
1303 let mut last_char_was_underscore = true;
1304 for c in ident_str.chars() {
1305 match c {
1306 '_' => last_char_was_underscore = true,
1307 c if last_char_was_underscore => {
1308 camel_ty.extend(c.to_uppercase());
1309 last_char_was_underscore = false;
1310 }
1311 c => camel_ty.extend(c.to_lowercase()),
1312 }
1313 }
1314
1315 camel_ty.shrink_to_fit();
1316 camel_ty
1317}