1use proc_macro::TokenStream;
2use quote::quote;
3use syn::ItemTrait;
4use syn::ReturnType;
5use syn::TraitItem;
6use syn::parse_macro_input;
7
8#[proc_macro_attribute]
9pub fn service(args: TokenStream, input: TokenStream) -> TokenStream {
10 let mut trait_item = parse_macro_input!(input as ItemTrait);
11
12 let mut service_name = None;
13 let mut service_version = None;
14 let parser = syn::meta::parser(|meta| {
15 if meta.path.is_ident("name") {
16 service_name = Some(meta.value()?.parse::<syn::LitStr>()?.value());
17 Ok(())
18 } else if meta.path.is_ident("version") {
19 service_version = Some(meta.value()?.parse::<syn::LitStr>()?.value());
20 Ok(())
21 } else {
22 Err(meta.error("unsupported service attribute"))
23 }
24 });
25 parse_macro_input!(args with parser);
26 let service_name_template = service_name.expect("service attribute requires 'name' parameter");
27 let service_version = service_version.expect("service attribute requires 'version' parameter");
28
29 let service_template_params = extract_template_params(&service_name_template);
30
31 let param_idents: Vec<syn::Ident> = service_template_params
33 .iter()
34 .map(|p| syn::Ident::new(p, proc_macro2::Span::call_site()))
35 .collect();
36
37 let service_name = service_name_template
40 .split('.')
41 .next()
42 .unwrap_or(&service_name_template);
43
44 let trait_name = &trait_item.ident;
45 let ext_trait_name = syn::Ident::new(&format!("{}Ext", trait_name), trait_name.span());
46
47 let where_clause = trait_item.generics.make_where_clause();
49 where_clause
50 .predicates
51 .push(syn::parse_quote!(Self::Context: ::rofr::ServiceContext));
52
53 let mut endpoint_methods = Vec::new();
54 let mut stream_methods = Vec::new();
55
56 for item in &mut trait_item.items {
57 if let TraitItem::Fn(method) = item {
58 let mut stream_name = None;
60 let mut stream_subject = None;
61 let mut stream_storage = None;
62 let mut stream_message = None;
63
64 let mut endpoint_subject = None;
66 method.attrs.retain(|attr| {
67 if attr.path().is_ident("stream") {
68 let _ = attr.parse_nested_meta(|meta| {
69 if meta.path.is_ident("name") {
70 let value = meta.value()?;
71 let s: syn::LitStr = value.parse()?;
72 stream_name = Some(s.value());
73 Ok(())
74 } else if meta.path.is_ident("subject") {
75 let value = meta.value()?;
76 let s: syn::LitStr = value.parse()?;
77 stream_subject = Some(s.value());
78 Ok(())
79 } else if meta.path.is_ident("storage") {
80 let value = meta.value()?;
81 let path: syn::Path = value.parse()?;
82 stream_storage = Some(path);
83 Ok(())
84 } else if meta.path.is_ident("message") {
85 let value = meta.value()?;
86 let ty: syn::Type = value.parse()?;
87 stream_message = Some(ty);
88 Ok(())
89 } else {
90 Err(meta.error("unsupported stream attribute"))
91 }
92 });
93 false } else if attr.path().is_ident("endpoint") {
95 let _ = attr.parse_nested_meta(|meta| {
96 if meta.path.is_ident("subject") {
97 let value = meta.value()?;
98 let s: syn::LitStr = value.parse()?;
99 endpoint_subject = Some(s.value());
100 Ok(())
101 } else {
102 Err(meta.error("unsupported endpoint attribute"))
103 }
104 });
105 false } else {
107 true }
109 });
110
111 if let (Some(name), Some(subject)) = (stream_name, stream_subject) {
112 let method_name = method.sig.ident.clone();
113
114 if method.sig.asyncness.is_some()
116 && let ReturnType::Type(_, ref mut ty) = method.sig.output
117 {
118 let original_ty = (**ty).clone();
120 **ty = syn::parse_quote!(
121 impl ::std::future::Future<Output = #original_ty> + Send
122 );
123 method.sig.asyncness = None;
125 }
126
127 stream_methods.push((method_name, name, subject, stream_storage, stream_message));
128 }
129
130 if let Some(subject) = endpoint_subject {
131 let method_name = method.sig.ident.clone();
132 let has_body_param = method.sig.inputs.len() > 1;
133
134 let request_type = if has_body_param
136 && let syn::FnArg::Typed(arg) = &method.sig.inputs[1]
137 && let syn::Type::Path(type_path) = &*arg.ty
138 && let Some(segment) = type_path.path.segments.last()
139 && segment.ident == "Request"
140 && let syn::PathArguments::AngleBracketed(args) = &segment.arguments
141 && let Some(syn::GenericArgument::Type(ty)) = args.args.first()
142 {
143 ty.clone()
144 } else {
145 syn::parse_str("()").unwrap()
146 };
147
148 let response_type = if let ReturnType::Type(_, ref ty) = method.sig.output {
150 extract_response_type(ty).unwrap_or(syn::parse_str("()").unwrap())
151 } else {
152 syn::parse_str("()").unwrap()
153 };
154
155 if method.sig.asyncness.is_some()
157 && let ReturnType::Type(_, ref mut ty) = method.sig.output
158 {
159 let original_ty = (**ty).clone();
161 **ty = syn::parse_quote!(
162 impl ::std::future::Future<Output = #original_ty> + Send
163 );
164 method.sig.asyncness = None;
166 }
167
168 endpoint_methods.push((
169 method_name,
170 subject,
171 has_body_param,
172 request_type,
173 response_type,
174 ));
175 }
176 }
177 }
178
179 let mut handler_structs = Vec::new();
181 let mut handler_debug_impls = Vec::new();
182 let mut handler_impls = Vec::new();
183 let mut endpoint_registrations = Vec::new();
184
185 let mut stream_handler_structs = Vec::new();
187 let mut stream_handler_debug_impls = Vec::new();
188 let mut stream_handler_impls = Vec::new();
189
190 for (method_name, subject, has_body_param, _request_type, _response_type) in &endpoint_methods {
191 let handler_name = syn::Ident::new(
193 &format!("{}Handler", snake_to_pascal(&method_name.to_string())),
194 method_name.span(),
195 );
196
197 handler_structs.push(quote! {
199 struct #handler_name<T>(::std::marker::PhantomData<T>);
200 });
201
202 handler_debug_impls.push(quote! {
204 impl<T> ::std::fmt::Debug for #handler_name<T> {
205 fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
206 f.debug_struct(stringify!(#handler_name)).finish()
207 }
208 }
209 });
210
211 let handler_impl = if *has_body_param {
213 quote! {
214 #[::rofr::async_trait::async_trait]
215 impl<T> ::rofr::EndpointHandler<T::Context> for #handler_name<T>
216 where
217 T: #trait_name + Send + Sync + 'static,
218 T::Context: ::rofr::ServiceContext,
219 {
220 async fn handle_request(
221 &self,
222 rqctx: ::rofr::RequestContext<T::Context>,
223 body: ::rofr::Bytes,
224 ) -> Result<::rofr::Bytes, Box<dyn std::error::Error + Send + Sync>> {
225 let request = ::rofr::Request::from_bytes(&body)?;
226 Ok(T::#method_name(rqctx, request).await?.into_bytes()?)
227 }
228 }
229 }
230 } else {
231 quote! {
232 #[::rofr::async_trait::async_trait]
233 impl<T> ::rofr::EndpointHandler<T::Context> for #handler_name<T>
234 where
235 T: #trait_name + Send + Sync + 'static,
236 T::Context: ::rofr::ServiceContext,
237 {
238 async fn handle_request(
239 &self,
240 rqctx: ::rofr::RequestContext<T::Context>,
241 _body: ::rofr::Bytes,
242 ) -> Result<::rofr::Bytes, Box<dyn std::error::Error + Send + Sync>> {
243 Ok(T::#method_name(rqctx).await?.into_bytes()?)
244 }
245 }
246 }
247 };
248
249 handler_impls.push(handler_impl);
250
251 let subject_expr = build_subject_expr(subject, &service_template_params);
253
254 endpoint_registrations.push(quote! {
255 endpoints.push(::rofr::Endpoint {
256 subject: #subject_expr,
257 handler: ::std::sync::Arc::new(#handler_name::<Self>(::std::marker::PhantomData)),
258 });
259 });
260 }
261
262 let param_types: Vec<proc_macro2::TokenStream> = param_idents
265 .iter()
266 .map(|_| quote! { impl ::std::fmt::Display })
267 .collect();
268
269 let service_fn_signature = if service_template_params.is_empty() {
270 quote! {
271 fn service(context: Self::Context) -> ::rofr::Service<Self::Context>
272 }
273 } else {
274 quote! {
275 fn service(context: Self::Context, params: (#(#param_types,)*)) -> ::rofr::Service<Self::Context>
276 }
277 };
278
279 let service_fn_body_prelude = if param_idents.is_empty() {
281 quote! {}
282 } else {
283 quote! { let (#(#param_idents,)*) = params; }
284 };
285
286 for (method_name, _stream_name, _stream_subject, _storage_type, _message_type) in
288 &stream_methods
289 {
290 let handler_name = syn::Ident::new(
291 &format!("{}StreamHandler", snake_to_pascal(&method_name.to_string())),
292 method_name.span(),
293 );
294
295 stream_handler_structs.push(quote! {
296 struct #handler_name<T>(::std::marker::PhantomData<T>);
297 });
298
299 stream_handler_debug_impls.push(quote! {
300 impl<T> ::std::fmt::Debug for #handler_name<T> {
301 fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
302 f.debug_struct(stringify!(#handler_name)).finish()
303 }
304 }
305 });
306
307 stream_handler_impls.push(quote! {
308 #[::rofr::async_trait::async_trait]
309 impl<T> ::rofr::StreamHandler<T::Context> for #handler_name<T>
310 where
311 T: #trait_name + Send + Sync + 'static,
312 T::Context: ::rofr::ServiceContext,
313 {
314 async fn handle_stream(
315 &self,
316 ctx: ::rofr::StreamContext<T::Context>,
317 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
318 T::#method_name(ctx).await?;
319 Ok(())
320 }
321 }
322 });
323 }
324
325 let mut stream_registrations = Vec::new();
327 for (method_name, stream_name, stream_subject, storage_type, _message_type) in &stream_methods {
328 let handler_name = syn::Ident::new(
329 &format!("{}StreamHandler", snake_to_pascal(&method_name.to_string())),
330 method_name.span(),
331 );
332
333 let storage_expr = if let Some(storage) = storage_type {
334 quote! { #storage }
335 } else {
336 quote! { ::async_nats::jetstream::stream::StorageType::File }
337 };
338
339 let subject_prefix_expr = build_subject_prefix_expr(&service_template_params);
340
341 stream_registrations.push(quote! {
342 streams.push(::rofr::Stream {
343 subject_prefix: {
344 let __prefix = #subject_prefix_expr;
345 if __prefix.is_empty() {
346 #service_name.to_string()
347 } else {
348 format!("{}.{}", #service_name, __prefix)
349 }
350 },
351 config: ::async_nats::jetstream::stream::Config {
352 name: format!("{}_{}", #service_name.to_string().to_uppercase(), #stream_name.to_string()),
353 subjects: vec![{
354 let __prefix = #subject_prefix_expr;
355 if __prefix.is_empty() {
356 format!("{}.{}", #service_name, #stream_subject)
357 } else {
358 format!("{}.{}.{}", #service_name, __prefix, #stream_subject)
359 }
360 }],
361 storage: #storage_expr,
362 ..Default::default()
363 },
364 handler: ::std::sync::Arc::new(#handler_name::<Self>(::std::marker::PhantomData)),
365 });
366 });
367 }
368
369 let client_name = syn::Ident::new(&format!("{}Client", trait_name), trait_name.span());
370
371 let client_param_fields: Vec<proc_macro2::TokenStream> =
372 param_idents.iter().map(|p| quote! { #p: String }).collect();
373
374 let client_new_params = if param_idents.is_empty() {
375 quote! { nats: ::async_nats::Client }
376 } else {
377 quote! { nats: ::async_nats::Client, params: (#(#param_types,)*) }
378 };
379
380 let client_params_destructure = if param_idents.is_empty() {
382 quote! {}
383 } else {
384 quote! { let (#(#param_idents,)*) = params; }
385 };
386
387 let client_field_inits: Vec<proc_macro2::TokenStream> = param_idents
388 .iter()
389 .map(|p| quote! { #p: #p.to_string() })
390 .collect();
391
392 let client_methods: Vec<proc_macro2::TokenStream> = endpoint_methods
393 .iter()
394 .map(|(method_name, subject, has_body_param, request_type, response_type)| {
395 let subject_expr =
396 build_client_subject_expr(service_name, subject, &service_template_params);
397 let header_block = quote! {
398 let request_id = ::rofr::generate_request_id();
399 let mut headers = ::async_nats::HeaderMap::new();
400 headers.insert(::rofr::header::REQUEST_ID, request_id.as_str());
401 let subject = #subject_expr;
402 };
403 let status_check = quote! {
404 if let Some(status) = msg.status {
405 if status.as_u16() != 200 {
406 let err = msg.description
407 .unwrap_or_else(|| String::from_utf8_lossy(&msg.payload).to_string());
408 return Err(::rofr::ClientError::ServiceError(err));
409 }
410 }
411 let result = ::rofr::Response::<#response_type>::from_bytes(&msg.payload)
412 .map_err(::rofr::ClientError::Deserialize)?;
413 Ok(result.0)
414 };
415 if *has_body_param {
416 quote! {
417 pub async fn #method_name(&self, body: #request_type) -> Result<#response_type, ::rofr::ClientError> {
418 #header_block
419 let payload = ::rofr::Request { inner: body }
420 .into_bytes()
421 .map_err(::rofr::ClientError::Serialize)?;
422 let msg = self.nats
423 .request_with_headers(subject, headers, ::rofr::Bytes::from(payload))
424 .await
425 .map_err(|e| ::rofr::ClientError::Request(Box::new(e)))?;
426 #status_check
427 }
428 }
429 } else {
430 quote! {
431 pub async fn #method_name(&self) -> Result<#response_type, ::rofr::ClientError> {
432 #header_block
433 let msg = self.nats
434 .request_with_headers(subject, headers, ::rofr::Bytes::new())
435 .await
436 .map_err(|e| ::rofr::ClientError::Request(Box::new(e)))?;
437 #status_check
438 }
439 }
440 }
441 })
442 .collect();
443
444 let stream_client_methods: Vec<proc_macro2::TokenStream> = stream_methods
445 .iter()
446 .map(
447 |(method_name, stream_name, _stream_subject, _storage_type, message_type)| {
448 let nats_stream_name = format!("{}_{}", service_name.to_uppercase(), stream_name);
450 let msg_type: proc_macro2::TokenStream = message_type
451 .as_ref()
452 .map(|t| quote! { #t })
453 .unwrap_or_else(|| quote! { ::rofr::Bytes });
454 quote! {
455 pub async fn #method_name(
456 &self,
457 ) -> Result<
458 impl ::rofr::futures::Stream<Item = Result<#msg_type, ::rofr::ClientError>>,
459 ::rofr::ClientError,
460 > {
461 use ::rofr::futures::StreamExt;
462 let jetstream = ::async_nats::jetstream::new(self.nats.clone());
463 let nats_stream = jetstream
464 .get_stream(#nats_stream_name)
465 .await
466 .map_err(|e| ::rofr::ClientError::Request(Box::new(e)))?;
467 let consumer = nats_stream
468 .create_consumer(
469 ::async_nats::jetstream::consumer::push::OrderedConfig {
470 deliver_subject: self.nats.new_inbox(),
471 ..Default::default()
472 },
473 )
474 .await
475 .map_err(|e| ::rofr::ClientError::Request(Box::new(e)))?;
476 let messages = consumer
477 .messages()
478 .await
479 .map_err(|e| ::rofr::ClientError::Request(Box::new(e)))?;
480 Ok(messages.map(|msg| {
481 let msg =
482 msg.map_err(|e| ::rofr::ClientError::Request(Box::new(e)))?;
483 ::rofr::Response::<#msg_type>::from_bytes(&msg.payload)
484 .map_err(::rofr::ClientError::Deserialize)
485 .map(|r| r.0)
486 }))
487 }
488 }
489 },
490 )
491 .collect();
492
493 let expanded = quote! {
494 #trait_item
495
496 #(#handler_structs)*
498
499 #(#handler_debug_impls)*
501
502 #(#handler_impls)*
504
505 #(#stream_handler_structs)*
507
508 #(#stream_handler_debug_impls)*
510
511 #(#stream_handler_impls)*
513
514 pub trait #ext_trait_name: #trait_name + Sized
516 where
517 Self: Send + Sync + 'static,
518 Self::Context: ::rofr::ServiceContext,
519 {
520 #service_fn_signature;
521 }
522
523 impl<T> #ext_trait_name for T
525 where
526 T: #trait_name + Send + Sync + 'static,
527 T::Context: ::rofr::ServiceContext,
528 {
529 #service_fn_signature {
530 #service_fn_body_prelude
531 let mut endpoints = Vec::new();
532 let mut streams = Vec::new();
533
534 #(#endpoint_registrations)*
535
536 #(#stream_registrations)*
537
538 ::rofr::Service {
539 name: #service_name.to_string(),
540 version: #service_version.to_string(),
541 endpoints,
542 streams,
543 context,
544 }
545 }
546 }
547
548 pub struct #client_name {
550 nats: ::async_nats::Client,
551 #(#client_param_fields,)*
552 }
553
554 impl #client_name {
555 pub fn new(#client_new_params) -> Self {
556 #client_params_destructure
557 Self {
558 nats,
559 #(#client_field_inits,)*
560 }
561 }
562
563 #(#client_methods)*
564
565 #(#stream_client_methods)*
566 }
567 };
568
569 TokenStream::from(expanded)
570}
571
572#[proc_macro_attribute]
573pub fn endpoint(_args: TokenStream, input: TokenStream) -> TokenStream {
574 input
576}
577
578#[proc_macro_attribute]
579pub fn stream(_args: TokenStream, input: TokenStream) -> TokenStream {
580 input
582}
583
584fn extract_response_type(ty: &syn::Type) -> Option<syn::Type> {
586 if let syn::Type::Path(type_path) = ty {
587 if let Some(segment) = type_path.path.segments.last()
589 && segment.ident == "Result"
590 && let syn::PathArguments::AngleBracketed(args) = &segment.arguments
591 {
592 if let Some(syn::GenericArgument::Type(syn::Type::Path(response_path))) =
594 args.args.first()
595 && let Some(response_segment) = response_path.path.segments.last()
596 && response_segment.ident == "Response"
597 && let syn::PathArguments::AngleBracketed(response_args) =
598 &response_segment.arguments
599 {
600 if let Some(syn::GenericArgument::Type(inner_ty)) = response_args.args.first() {
602 return Some(inner_ty.clone());
603 }
604 }
605 }
606 }
607
608 None
609}
610
611fn snake_to_pascal(s: &str) -> String {
612 s.split('_')
613 .filter(|word| !word.is_empty())
614 .map(|word| {
615 let mut c = word.chars();
616 match c.next() {
617 None => String::new(),
618 Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
619 }
620 })
621 .collect()
622}
623
624fn extract_template_params(template: &str) -> Vec<String> {
627 let mut params = Vec::new();
628 let mut chars = template.chars().peekable();
629
630 while let Some(ch) = chars.next() {
631 if ch == '{' {
632 let mut param = String::new();
633 for ch in chars.by_ref() {
634 if ch == '}' {
635 break;
636 }
637 param.push(ch);
638 }
639 if !param.is_empty() {
640 params.push(param);
641 }
642 }
643 }
644
645 params
646}
647
648fn build_subject_expr(subject: &str, service_params: &[String]) -> proc_macro2::TokenStream {
652 if service_params.is_empty() {
653 return quote! { #subject.to_string() };
655 }
656
657 let mut format_str = String::new();
659 for _ in service_params {
660 format_str.push_str("{}.");
661 }
662 format_str.push_str(subject);
663
664 let param_idents: Vec<proc_macro2::TokenStream> = service_params
666 .iter()
667 .map(|p| {
668 let ident = syn::Ident::new(p, proc_macro2::Span::call_site());
669 quote! { #ident }
670 })
671 .collect();
672
673 quote! {
674 format!(#format_str, #(#param_idents),*)
675 }
676}
677
678fn build_subject_prefix_expr(service_params: &[String]) -> proc_macro2::TokenStream {
681 let format_str = service_params
682 .iter()
683 .map(|_| "{}")
684 .collect::<Vec<_>>()
685 .join(".");
686
687 let param_idents: Vec<proc_macro2::TokenStream> = service_params
689 .iter()
690 .map(|p| {
691 let ident = syn::Ident::new(p, proc_macro2::Span::call_site());
692 quote! { #ident }
693 })
694 .collect();
695
696 quote! {
697 format!(#format_str, #(#param_idents),*)
698 }
699}
700
701fn build_client_subject_expr(
706 service_name: &str,
707 subject: &str,
708 service_params: &[String],
709) -> proc_macro2::TokenStream {
710 if service_params.is_empty() {
711 let full = format!("{}.{}", service_name, subject);
712 return quote! { #full.to_string() };
713 }
714
715 let mut fmt = format!("{}.", service_name);
716 for _ in service_params {
717 fmt.push_str("{}.");
718 }
719 fmt.push_str(subject);
720
721 let param_exprs: Vec<proc_macro2::TokenStream> = service_params
722 .iter()
723 .map(|p| {
724 let ident = syn::Ident::new(p, proc_macro2::Span::call_site());
725 quote! { &self.#ident }
726 })
727 .collect();
728
729 quote! { format!(#fmt, #(#param_exprs),*) }
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use quote::quote;
736
737 #[test]
738 fn test_snake_to_pascal_simple() {
739 assert_eq!(snake_to_pascal("hello_world"), "HelloWorld");
740 }
741
742 #[test]
743 fn test_snake_to_pascal_single_word() {
744 assert_eq!(snake_to_pascal("hello"), "Hello");
745 }
746
747 #[test]
748 fn test_snake_to_pascal_empty_string() {
749 assert_eq!(snake_to_pascal(""), "");
750 }
751
752 #[test]
753 fn test_snake_to_pascal_multiple_underscores() {
754 assert_eq!(snake_to_pascal("hello__world"), "HelloWorld");
755 }
756
757 #[test]
758 fn test_snake_to_pascal_leading_underscore() {
759 assert_eq!(snake_to_pascal("_hello_world"), "HelloWorld");
760 }
761
762 #[test]
763 fn test_snake_to_pascal_trailing_underscore() {
764 assert_eq!(snake_to_pascal("hello_world_"), "HelloWorld");
765 }
766
767 #[test]
768 fn test_snake_to_pascal_many_words() {
769 assert_eq!(snake_to_pascal("this_is_a_long_name"), "ThisIsALongName");
770 }
771
772 #[test]
773 fn test_snake_to_pascal_single_char_words() {
774 assert_eq!(snake_to_pascal("a_b_c"), "ABC");
775 }
776
777 #[test]
778 fn test_snake_to_pascal_already_capitalized() {
779 assert_eq!(snake_to_pascal("Hello_World"), "HelloWorld");
780 }
781
782 #[test]
783 fn test_extract_template_params_none() {
784 assert_eq!(extract_template_params("wind_speed"), Vec::<String>::new());
785 }
786
787 #[test]
788 fn test_extract_template_params_single() {
789 assert_eq!(extract_template_params("weather.{id}"), vec!["id"]);
790 }
791
792 #[test]
793 fn test_extract_template_params_multiple() {
794 assert_eq!(
795 extract_template_params("weather.{id}.{zone}"),
796 vec!["id", "zone"]
797 );
798 }
799
800 #[test]
801 fn test_extract_template_params_empty_braces() {
802 assert_eq!(extract_template_params("weather.{}"), Vec::<String>::new());
803 }
804
805 #[test]
806 fn test_extract_template_params_mixed() {
807 assert_eq!(
808 extract_template_params("prefix.{param1}.middle.{param2}.suffix"),
809 vec!["param1", "param2"]
810 );
811 }
812
813 #[test]
814 fn test_build_subject_expr_no_params() {
815 let subject = "wind_speed";
816 let service_params: Vec<String> = vec![];
817
818 let result = build_subject_expr(subject, &service_params);
819 let expected = quote! { "wind_speed".to_string() };
820
821 assert_eq!(result.to_string(), expected.to_string());
822 }
823
824 #[test]
825 fn test_build_subject_expr_single_param() {
826 let subject = "sensor_data";
827 let service_params = vec!["id".to_string()];
828
829 let result = build_subject_expr(subject, &service_params);
830 let expected = quote! {
831 format!("{}.sensor_data", id)
832 };
833
834 assert_eq!(result.to_string(), expected.to_string());
835 }
836
837 #[test]
838 fn test_build_subject_expr_multiple_params() {
839 let subject = "wind_speed";
840 let service_params = vec!["region".to_string(), "id".to_string()];
841
842 let result = build_subject_expr(subject, &service_params);
843 let expected = quote! {
844 format!("{}.{}.wind_speed", region, id)
845 };
846
847 assert_eq!(result.to_string(), expected.to_string());
848 }
849
850 #[test]
851 fn test_build_subject_expr_three_params() {
852 let subject = "data";
853 let service_params = vec![
854 "namespace".to_string(),
855 "service".to_string(),
856 "id".to_string(),
857 ];
858
859 let result = build_subject_expr(subject, &service_params);
860 let expected = quote! {
861 format!("{}.{}.{}.data", namespace, service, id)
862 };
863
864 assert_eq!(result.to_string(), expected.to_string());
865 }
866
867 #[test]
868 fn test_build_subject_expr_subject_with_special_chars() {
869 let subject = "sensor.temperature_reading";
870 let service_params = vec!["id".to_string()];
871
872 let result = build_subject_expr(subject, &service_params);
873 let expected = quote! {
874 format!("{}.sensor.temperature_reading", id)
875 };
876
877 assert_eq!(result.to_string(), expected.to_string());
878 }
879
880 #[test]
881 fn test_build_subject_expr_empty_subject() {
882 let subject = "";
883 let service_params = vec!["id".to_string()];
884
885 let result = build_subject_expr(subject, &service_params);
886 let expected = quote! {
887 format!("{}.", id)
888 };
889
890 assert_eq!(result.to_string(), expected.to_string());
891 }
892}