1use crate::reflection::{
4 cache::DescriptorCache, client::ReflectionClient, config::ProxyConfig,
5 connection_pool::ConnectionPool,
6};
7use futures_util::Stream;
8#[cfg(feature = "data-faker")]
9use mockforge_data::{DataConfig, DataGenerator, SchemaDefinition};
10use prost_reflect::{DynamicMessage, ReflectMessage};
11use std::pin::Pin;
12use std::time::Duration;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tokio_stream::StreamExt;
16use tonic::{transport::Endpoint, Request, Response, Status, Streaming};
17use tracing::{debug, warn};
18
19pub struct ReflectionProxy {
21 _client: ReflectionClient,
23 cache: DescriptorCache,
25 config: ProxyConfig,
27 timeout_duration: Duration,
29 #[allow(dead_code)]
31 connection_pool: ConnectionPool,
32}
33
34impl ReflectionProxy {
35 pub async fn new(endpoint: Endpoint, config: ProxyConfig) -> Result<Self, Status> {
37 debug!("Creating reflection proxy for endpoint: {:?}", endpoint.uri());
38
39 let client = ReflectionClient::new(endpoint).await?;
40 let cache = DescriptorCache::new();
41
42 cache.populate_from_pool(Some(client.pool())).await;
44
45 Ok(Self {
46 _client: client,
47 cache,
48 config,
49 timeout_duration: Duration::from_secs(30),
50 connection_pool: ConnectionPool::new(),
51 })
52 }
53
54 pub fn with_timeout(mut self, timeout: Duration) -> Self {
56 self.timeout_duration = timeout;
57 self
58 }
59
60 pub async fn forward_unary(
62 &self,
63 service_name: &str,
64 method_name: &str,
65 request: Request<DynamicMessage>,
66 ) -> Result<Response<DynamicMessage>, Status> {
67 if !self.config.is_service_allowed(service_name) {
69 return Err(Status::permission_denied(format!(
70 "Service {} is not allowed",
71 service_name
72 )));
73 }
74
75 let method = self.cache.get_method(service_name, method_name).await?;
77
78 if !method.is_server_streaming() && !method.is_client_streaming() {
80 self.forward_unary_impl(method, request).await
81 } else {
82 Err(Status::invalid_argument(format!(
83 "Method {}::{} is not a unary method",
84 service_name, method_name
85 )))
86 }
87 }
88
89 pub async fn forward_server_streaming(
91 &self,
92 service_name: &str,
93 method_name: &str,
94 request: Request<DynamicMessage>,
95 ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
96 {
97 if !self.config.is_service_allowed(service_name) {
99 return Err(Status::permission_denied(format!(
100 "Service {} is not allowed",
101 service_name
102 )));
103 }
104
105 let method = self.cache.get_method(service_name, method_name).await?;
107
108 if method.is_server_streaming() && !method.is_client_streaming() {
110 self.forward_server_streaming_impl(method, request).await
111 } else {
112 Err(Status::invalid_argument(format!(
113 "Method {}::{} is not a server streaming method",
114 service_name, method_name
115 )))
116 }
117 }
118
119 pub async fn forward_client_streaming(
121 &self,
122 service_name: &str,
123 method_name: &str,
124 request: Request<Streaming<DynamicMessage>>,
125 ) -> Result<Response<DynamicMessage>, Status> {
126 if !self.config.is_service_allowed(service_name) {
128 return Err(Status::permission_denied(format!(
129 "Service {} is not allowed",
130 service_name
131 )));
132 }
133
134 let method = self.cache.get_method(service_name, method_name).await?;
136
137 if method.is_client_streaming() && !method.is_server_streaming() {
139 self.forward_client_streaming_impl(method, request).await
140 } else {
141 Err(Status::invalid_argument(format!(
142 "Method {}::{} is not a client streaming method",
143 service_name, method_name
144 )))
145 }
146 }
147
148 pub async fn forward_bidirectional_streaming(
150 &self,
151 service_name: &str,
152 method_name: &str,
153 request: Request<Streaming<DynamicMessage>>,
154 ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
155 {
156 if !self.config.is_service_allowed(service_name) {
158 return Err(Status::permission_denied(format!(
159 "Service {} is not allowed",
160 service_name
161 )));
162 }
163
164 let method = self.cache.get_method(service_name, method_name).await?;
166
167 if method.is_client_streaming() && method.is_server_streaming() {
169 self.forward_bidirectional_streaming_impl(method, request).await
170 } else {
171 Err(Status::invalid_argument(format!(
172 "Method {}::{} is not a bidirectional streaming method",
173 service_name, method_name
174 )))
175 }
176 }
177
178 async fn forward_unary_impl(
180 &self,
181 method: prost_reflect::MethodDescriptor,
182 request: Request<DynamicMessage>,
183 ) -> Result<Response<DynamicMessage>, Status> {
184 debug!("Generating mock response for method: {}", method.name());
191
192 let service_name = method.parent_service().name();
194 let method_name = method.name();
195
196 let mock_response = self.generate_mock_response(service_name, method_name, &method).await?;
198
199 let mut response = Response::new(mock_response);
201
202 let request_metadata = request.metadata();
204 for entry in request_metadata.iter() {
205 if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
206 if !key.as_str().starts_with(':')
208 && !key.as_str().starts_with("grpc-")
209 && !key.as_str().starts_with("te")
210 && !key.as_str().starts_with("content-type")
211 {
212 response.metadata_mut().insert(key.clone(), value.clone());
213 }
214 }
215 }
216
217 response
219 .metadata_mut()
220 .insert("x-mockforge-service", service_name.parse().unwrap());
221 response
222 .metadata_mut()
223 .insert("x-mockforge-method", method_name.parse().unwrap());
224 response
225 .metadata_mut()
226 .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
227
228 Ok(response)
229 }
230
231 async fn generate_mock_response(
233 &self,
234 service_name: &str,
235 method_name: &str,
236 method_descriptor: &prost_reflect::MethodDescriptor,
237 ) -> Result<DynamicMessage, Status> {
238 debug!("Generating mock response for {}.{}", service_name, method_name);
239
240 let output_descriptor = method_descriptor.output();
242
243 let mut response = DynamicMessage::new(output_descriptor.clone());
245
246 self.populate_dynamic_mock_response(
248 &mut response,
249 service_name,
250 method_name,
251 &output_descriptor,
252 )?;
253
254 Ok(response)
255 }
256
257 fn populate_dynamic_mock_response(
259 &self,
260 response: &mut DynamicMessage,
261 service_name: &str,
262 method_name: &str,
263 output_descriptor: &prost_reflect::MessageDescriptor,
264 ) -> Result<(), Status> {
265 debug!("Generating dynamic mock response for {}.{}", service_name, method_name);
266
267 for field in output_descriptor.fields() {
269 let field_name = field.name();
270 let field_type = field.kind();
271
272 debug!("Processing field: {} of type: {:?}", field_name, field_type);
273
274 let mock_value = self.generate_mock_value_for_field(&field, service_name, method_name);
276
277 response.set_field(&field, mock_value);
279 }
280
281 let metadata_fields = vec![
283 ("mockforge_service", prost_reflect::Value::String(service_name.to_string())),
284 ("mockforge_method", prost_reflect::Value::String(method_name.to_string())),
285 (
286 "mockforge_timestamp",
287 prost_reflect::Value::String(chrono::Utc::now().to_rfc3339()),
288 ),
289 (
290 "mockforge_source",
291 prost_reflect::Value::String("MockForge Reflection Proxy".to_string()),
292 ),
293 ];
294
295 for (field_name, value) in metadata_fields {
296 response.set_field_by_name(field_name, value);
297 }
298
299 Ok(())
300 }
301
302 fn generate_mock_value_for_field(
304 &self,
305 field: &prost_reflect::FieldDescriptor,
306 service_name: &str,
307 method_name: &str,
308 ) -> prost_reflect::Value {
309 self.generate_mock_value_for_field_with_depth(field, service_name, method_name, 0)
310 }
311
312 fn generate_mock_value_for_field_with_depth(
314 &self,
315 field: &prost_reflect::FieldDescriptor,
316 service_name: &str,
317 method_name: &str,
318 depth: usize,
319 ) -> prost_reflect::Value {
320 const MAX_DEPTH: usize = 5;
322 if depth >= MAX_DEPTH {
323 return prost_reflect::Value::String(format!("max_depth_reached_{}", field.name()));
324 }
325
326 if field.is_list() {
328 let mut list_values = Vec::new();
329 let field_name_lower = field.name().to_lowercase();
331 let num_items =
332 if field_name_lower.contains("list") || field_name_lower.contains("items") {
333 3
334 } else {
335 1
336 };
337
338 for _ in 0..num_items {
339 let item_value =
340 self.generate_single_field_value(field, service_name, method_name, depth);
341 list_values.push(item_value);
342 }
343
344 return prost_reflect::Value::List(list_values);
345 }
346
347 self.generate_single_field_value(field, service_name, method_name, depth)
348 }
349
350 fn generate_single_field_value(
352 &self,
353 field: &prost_reflect::FieldDescriptor,
354 service_name: &str,
355 method_name: &str,
356 depth: usize,
357 ) -> prost_reflect::Value {
358 let field_name = field.name().to_lowercase();
359 let field_type = field.kind();
360
361 if field_name.contains("message")
363 || field_name.contains("text")
364 || field_name.contains("content")
365 {
366 return prost_reflect::Value::String(format!(
367 "Mock response from {} for method {} at {}",
368 service_name,
369 method_name,
370 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
371 ));
372 }
373
374 if field_name.contains("id") {
375 return prost_reflect::Value::String(format!(
376 "mock_{}",
377 chrono::Utc::now().timestamp()
378 ));
379 }
380
381 if field_name.contains("status") || field_name.contains("state") {
382 return prost_reflect::Value::String("success".to_string());
383 }
384
385 if field_name.contains("count") || field_name.contains("number") {
386 return prost_reflect::Value::I64(42);
387 }
388
389 if field_name.contains("timestamp") || field_name.contains("time") {
390 return prost_reflect::Value::String(chrono::Utc::now().to_rfc3339());
391 }
392
393 if field_name.contains("enabled") || field_name.contains("active") {
394 return prost_reflect::Value::Bool(true);
395 }
396
397 match field_type {
399 prost_reflect::Kind::String => {
400 prost_reflect::Value::String(format!("mock_{}_{}", service_name, method_name))
401 }
402 prost_reflect::Kind::Int32 => prost_reflect::Value::I32(42),
403 prost_reflect::Kind::Int64 => prost_reflect::Value::I64(42),
404 prost_reflect::Kind::Float => prost_reflect::Value::F32(std::f32::consts::PI),
405 prost_reflect::Kind::Double => prost_reflect::Value::F64(std::f64::consts::PI),
406 prost_reflect::Kind::Bool => prost_reflect::Value::Bool(true),
407 prost_reflect::Kind::Bytes => prost_reflect::Value::Bytes(b"mock_data".to_vec().into()),
408 prost_reflect::Kind::Enum(enum_descriptor) => {
409 if let Some(first_value) = enum_descriptor.values().next() {
411 prost_reflect::Value::EnumNumber(first_value.number())
413 } else {
414 prost_reflect::Value::EnumNumber(0)
416 }
417 }
418 prost_reflect::Kind::Message(message_descriptor) => {
419 let mut nested_message = DynamicMessage::new(message_descriptor.clone());
421
422 for nested_field in message_descriptor.fields() {
424 let mock_value = self.generate_mock_value_for_field_with_depth(
425 &nested_field,
426 service_name,
427 method_name,
428 depth + 1,
429 );
430 nested_message.set_field(&nested_field, mock_value);
431 }
432
433 prost_reflect::Value::Message(nested_message)
434 }
435 _ => prost_reflect::Value::String("mock_value".to_string()),
436 }
437 }
438
439 async fn forward_server_streaming_impl(
441 &self,
442 method: prost_reflect::MethodDescriptor,
443 request: Request<DynamicMessage>,
444 ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
445 {
446 let metadata = request.metadata();
448 debug!(
449 "Forwarding server streaming request for method: {} with {} metadata entries",
450 method.name(),
451 metadata.len()
452 );
453
454 #[cfg(feature = "data-faker")]
455 {
456 let output_descriptor = method.output();
458 let messages = self.generate_mock_stream_messages(&output_descriptor, 5).await?;
459
460 let (tx, rx) = mpsc::channel(32);
462 let stream = Box::pin(ReceiverStream::new(rx))
463 as Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>;
464
465 tokio::spawn(async move {
467 for message in messages {
468 if tx.send(Ok(message)).await.is_err() {
469 break;
470 }
471 }
472 });
473
474 let mut response = Response::new(stream);
476
477 for entry in metadata.iter() {
479 if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
480 if !key.as_str().starts_with(':')
482 && !key.as_str().starts_with("grpc-")
483 && !key.as_str().starts_with("te")
484 && !key.as_str().starts_with("content-type")
485 {
486 response.metadata_mut().insert(key.clone(), value.clone());
487 }
488 }
489 }
490
491 response
493 .metadata_mut()
494 .insert("x-mockforge-service", method.parent_service().name().parse().unwrap());
495 response
496 .metadata_mut()
497 .insert("x-mockforge-method", method.name().parse().unwrap());
498 response
499 .metadata_mut()
500 .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
501 response.metadata_mut().insert("x-mockforge-stream-count", "5".parse().unwrap());
502
503 debug!("Generated server streaming response with {} messages", 5);
504 Ok(response)
505 }
506
507 #[cfg(not(feature = "data-faker"))]
508 {
509 debug!("Data faker feature not enabled, returning unimplemented for server streaming");
510 Err(Status::unimplemented("Server streaming requires data-faker feature"))
511 }
512 }
513
514 async fn forward_client_streaming_impl(
516 &self,
517 method: prost_reflect::MethodDescriptor,
518 request: Request<Streaming<DynamicMessage>>,
519 ) -> Result<Response<DynamicMessage>, Status> {
520 debug!("Forwarding client streaming request for method: {}", method.name());
521
522 #[cfg(feature = "data-faker")]
523 {
524 let request_metadata = request.metadata().clone();
526
527 let mut stream = request.into_inner();
529 let mut message_count = 0;
530 let mut processed_names = Vec::new();
531 let mut user_ids = Vec::new();
532 let mut all_tags = Vec::new();
533
534 while let Some(message_result) = stream.next().await {
535 match message_result {
536 Ok(message) => {
537 message_count += 1;
538 debug!(
539 "Processing client streaming message {} for method: {}",
540 message_count,
541 method.name()
542 );
543
544 let input_descriptor = method.input();
546
547 if let Some(name_field) = input_descriptor.get_field_by_name("name") {
549 let field_value = message.get_field(&name_field);
550 if let prost_reflect::Value::String(name) = field_value.into_owned() {
551 processed_names.push(name.clone());
552 debug!(" - Name: {}", name);
553 }
554 }
555
556 if let Some(user_info_field) =
558 input_descriptor.get_field_by_name("user_info")
559 {
560 let field_value = message.get_field(&user_info_field);
561 if let prost_reflect::Value::Message(user_info_msg) =
562 field_value.into_owned()
563 {
564 if let Some(user_id_field) =
566 user_info_msg.descriptor().get_field_by_name("user_id")
567 {
568 let user_id_value = user_info_msg.get_field(&user_id_field);
569 if let prost_reflect::Value::String(user_id) =
570 user_id_value.into_owned()
571 {
572 user_ids.push(user_id.clone());
573 debug!(" - User ID: {}", user_id);
574 }
575 }
576 }
577 }
578
579 if let Some(tags_field) = input_descriptor.get_field_by_name("tags") {
581 let field_value = message.get_field(&tags_field);
582 if let prost_reflect::Value::List(tags_list) = field_value.into_owned()
583 {
584 for tag_value in tags_list {
585 if let prost_reflect::Value::String(tag) = tag_value {
586 all_tags.push(tag.clone());
587 debug!(" - Tag: {}", tag);
588 }
589 }
590 }
591 }
592 }
593 Err(e) => {
594 warn!("Error receiving client streaming message: {}", e);
595 return Err(Status::internal(format!(
596 "Error processing streaming request: {}",
597 e
598 )));
599 }
600 }
601 }
602
603 debug!("Processed {} messages in client streaming request", message_count);
604 debug!(
605 "Collected data - Names: {:?}, User IDs: {:?}, Tags: {:?}",
606 processed_names, user_ids, all_tags
607 );
608
609 let output_descriptor = method.output();
611 let mut mock_response = self.generate_mock_message(&output_descriptor).await?;
612
613 if let Some(message_field) = output_descriptor.get_field_by_name("message") {
615 let personalized_message = if !processed_names.is_empty() {
617 format!("Hello to all {} senders! Processed names: {}, with {} unique tags from {} users",
618 message_count, processed_names.join(", "), all_tags.len(), user_ids.len())
619 } else {
620 format!(
621 "Hello! Processed {} messages with {} tags",
622 message_count,
623 all_tags.len()
624 )
625 };
626
627 mock_response
629 .set_field(&message_field, prost_reflect::Value::String(personalized_message));
630 }
631
632 let mut response = Response::new(mock_response);
634
635 for entry in request_metadata.iter() {
637 if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
638 if !key.as_str().starts_with(':')
640 && !key.as_str().starts_with("grpc-")
641 && !key.as_str().starts_with("te")
642 && !key.as_str().starts_with("content-type")
643 {
644 response.metadata_mut().insert(key.clone(), value.clone());
645 }
646 }
647 }
648
649 response
651 .metadata_mut()
652 .insert("x-mockforge-service", method.parent_service().name().parse().unwrap());
653 response
654 .metadata_mut()
655 .insert("x-mockforge-method", method.name().parse().unwrap());
656 response
657 .metadata_mut()
658 .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
659 response
660 .metadata_mut()
661 .insert("x-mockforge-message-count", message_count.to_string().parse().unwrap());
662
663 let response = response;
664
665 debug!(
666 "Generated enhanced client streaming response with {} processed messages",
667 message_count
668 );
669 Ok(response)
670 }
671
672 #[cfg(not(feature = "data-faker"))]
673 {
674 debug!("Data faker feature not enabled, returning unimplemented for client streaming");
675 Err(Status::unimplemented("Client streaming requires data-faker feature"))
676 }
677 }
678
679 async fn forward_bidirectional_streaming_impl(
681 &self,
682 method: prost_reflect::MethodDescriptor,
683 request: Request<Streaming<DynamicMessage>>,
684 ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
685 {
686 debug!("Forwarding bidirectional streaming request for method: {}", method.name());
687
688 #[cfg(feature = "data-faker")]
689 {
690 let metadata = request.metadata();
692 debug!("Forwarding bidirectional streaming request for method: {} with {} metadata entries",
693 method.name(), metadata.len());
694
695 let output_descriptor = method.output();
697 let messages = self.generate_mock_stream_messages(&output_descriptor, 10).await?;
698
699 let (tx, rx) = mpsc::channel(32);
701 let stream = Box::pin(ReceiverStream::new(rx))
702 as Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>;
703
704 tokio::spawn(async move {
706 for message in messages {
707 if tx.send(Ok(message)).await.is_err() {
708 break;
709 }
710 }
711 });
712
713 let mut response = Response::new(stream);
715
716 for entry in metadata.iter() {
718 if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
719 if !key.as_str().starts_with(':')
721 && !key.as_str().starts_with("grpc-")
722 && !key.as_str().starts_with("te")
723 && !key.as_str().starts_with("content-type")
724 {
725 response.metadata_mut().insert(key.clone(), value.clone());
726 }
727 }
728 }
729
730 response
732 .metadata_mut()
733 .insert("x-mockforge-service", method.parent_service().name().parse().unwrap());
734 response
735 .metadata_mut()
736 .insert("x-mockforge-method", method.name().parse().unwrap());
737 response
738 .metadata_mut()
739 .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
740 response
741 .metadata_mut()
742 .insert("x-mockforge-stream-count", "10".parse().unwrap());
743
744 let mut incoming_stream = request.into_inner();
746 tokio::spawn(async move {
747 let mut count = 0;
748 while let Some(message_result) = incoming_stream.next().await {
749 match message_result {
750 Ok(_) => {
751 count += 1;
752 debug!(
753 "Processed bidirectional message {} for method: {}",
754 count,
755 method.name()
756 );
757 }
758 Err(e) => {
759 warn!("Error processing bidirectional message: {}", e);
760 break;
761 }
762 }
763 }
764 debug!("Finished processing {} bidirectional messages", count);
765 });
766
767 debug!("Generated bidirectional streaming response with {} messages", 10);
768 Ok(response)
769 }
770
771 #[cfg(not(feature = "data-faker"))]
772 {
773 debug!("Data faker feature not enabled, returning unimplemented for bidirectional streaming");
774 Err(Status::unimplemented("Bidirectional streaming requires data-faker feature"))
775 }
776 }
777
778 #[cfg(feature = "data-faker")]
780 async fn generate_mock_message(
781 &self,
782 descriptor: &prost_reflect::MessageDescriptor,
783 ) -> Result<DynamicMessage, Status> {
784 let schema_def = self.create_schema_from_protobuf_descriptor(descriptor);
786
787 let config = DataConfig {
788 rows: 1,
789 ..Default::default()
790 };
791
792 let mut generator = DataGenerator::new(schema_def, config)
793 .map_err(|e| Status::internal(format!("Failed to create data generator: {}", e)))?;
794
795 let result = generator
796 .generate()
797 .await
798 .map_err(|e| Status::internal(format!("Failed to generate mock data: {}", e)))?;
799
800 if let Some(data) = result.data.first() {
801 self.json_to_dynamic_message(descriptor, data)
803 } else {
804 Err(Status::internal("No mock data generated"))
805 }
806 }
807
808 #[cfg(feature = "data-faker")]
810 async fn generate_mock_stream_messages(
811 &self,
812 descriptor: &prost_reflect::MessageDescriptor,
813 count: usize,
814 ) -> Result<Vec<DynamicMessage>, Status> {
815 let schema_def = self.create_schema_from_protobuf_descriptor(descriptor);
816
817 let config = DataConfig {
818 rows: count,
819 ..Default::default()
820 };
821
822 let mut generator = DataGenerator::new(schema_def, config)
823 .map_err(|e| Status::internal(format!("Failed to create data generator: {}", e)))?;
824
825 let result = generator
826 .generate()
827 .await
828 .map_err(|e| Status::internal(format!("Failed to generate mock data: {}", e)))?;
829
830 result
831 .data
832 .iter()
833 .map(|data| self.json_to_dynamic_message(descriptor, data))
834 .collect()
835 }
836
837 #[cfg(feature = "data-faker")]
839 fn json_to_dynamic_message(
840 &self,
841 descriptor: &prost_reflect::MessageDescriptor,
842 json_data: &serde_json::Value,
843 ) -> Result<DynamicMessage, Status> {
844 let mut message = DynamicMessage::new(descriptor.clone());
845
846 if let serde_json::Value::Object(obj) = json_data {
847 for (key, value) in obj {
848 if let Some(field) = descriptor.get_field_by_name(key) {
849 let field_value = self.convert_json_value_to_protobuf_value(&field, value)?;
850 message.set_field(&field, field_value);
851 }
852 }
853 }
854
855 Ok(message)
856 }
857
858 #[cfg(feature = "data-faker")]
860 fn convert_json_value_to_protobuf_value(
861 &self,
862 field: &prost_reflect::FieldDescriptor,
863 json_value: &serde_json::Value,
864 ) -> Result<prost_reflect::Value, Status> {
865 use prost_reflect::Kind;
866
867 match json_value {
868 serde_json::Value::Null => {
869 match field.kind() {
871 Kind::Message(message_descriptor) => Ok(prost_reflect::Value::Message(
872 DynamicMessage::new(message_descriptor.clone()),
873 )),
874 Kind::Enum(enum_descriptor) => {
875 if let Some(first_value) = enum_descriptor.values().next() {
877 Ok(prost_reflect::Value::EnumNumber(first_value.number()))
878 } else {
879 Ok(prost_reflect::Value::EnumNumber(0))
880 }
881 }
882 Kind::Int32 | Kind::Sint32 | Kind::Sfixed32 => Ok(prost_reflect::Value::I32(0)),
883 Kind::Int64 | Kind::Sint64 | Kind::Sfixed64 => Ok(prost_reflect::Value::I64(0)),
884 Kind::Uint32 | Kind::Fixed32 => Ok(prost_reflect::Value::U32(0)),
885 Kind::Uint64 | Kind::Fixed64 => Ok(prost_reflect::Value::U64(0)),
886 Kind::Float => Ok(prost_reflect::Value::F32(0.0)),
887 Kind::Double => Ok(prost_reflect::Value::F64(0.0)),
888 Kind::Bool => Ok(prost_reflect::Value::Bool(false)),
889 Kind::String => Ok(prost_reflect::Value::String(String::new())),
890 Kind::Bytes => Ok(prost_reflect::Value::Bytes(b"".to_vec().into())),
891 }
892 }
893 serde_json::Value::Bool(b) => Ok(prost_reflect::Value::Bool(*b)),
894 serde_json::Value::Number(n) => {
895 match field.kind() {
896 Kind::Int32 | Kind::Sint32 | Kind::Sfixed32 => {
897 if let Some(i) = n.as_i64() {
898 Ok(prost_reflect::Value::I32(i as i32))
899 } else {
900 Err(Status::invalid_argument(format!(
901 "Cannot convert number {} to int32",
902 n
903 )))
904 }
905 }
906 Kind::Int64 | Kind::Sint64 | Kind::Sfixed64 => {
907 if let Some(i) = n.as_i64() {
908 Ok(prost_reflect::Value::I64(i))
909 } else {
910 Err(Status::invalid_argument(format!(
911 "Cannot convert number {} to int64",
912 n
913 )))
914 }
915 }
916 Kind::Uint32 | Kind::Fixed32 => {
917 if let Some(i) = n.as_u64() {
918 Ok(prost_reflect::Value::U32(i as u32))
919 } else {
920 Err(Status::invalid_argument(format!(
921 "Cannot convert number {} to uint32",
922 n
923 )))
924 }
925 }
926 Kind::Uint64 | Kind::Fixed64 => {
927 if let Some(i) = n.as_u64() {
928 Ok(prost_reflect::Value::U64(i))
929 } else {
930 Err(Status::invalid_argument(format!(
931 "Cannot convert number {} to uint64",
932 n
933 )))
934 }
935 }
936 Kind::Float => {
937 if let Some(f) = n.as_f64() {
938 Ok(prost_reflect::Value::F32(f as f32))
939 } else {
940 Err(Status::invalid_argument(format!(
941 "Cannot convert number {} to float",
942 n
943 )))
944 }
945 }
946 Kind::Double => {
947 if let Some(f) = n.as_f64() {
948 Ok(prost_reflect::Value::F64(f))
949 } else {
950 Err(Status::invalid_argument(format!(
951 "Cannot convert number {} to double",
952 n
953 )))
954 }
955 }
956 _ => {
957 if let Some(i) = n.as_i64() {
959 Ok(prost_reflect::Value::I64(i))
960 } else {
961 Err(Status::invalid_argument(format!(
962 "Cannot convert number {} to numeric type",
963 n
964 )))
965 }
966 }
967 }
968 }
969 serde_json::Value::String(s) => {
970 match field.kind() {
971 Kind::String => Ok(prost_reflect::Value::String(s.clone())),
972 Kind::Bytes => Ok(prost_reflect::Value::Bytes(s.as_bytes().to_vec().into())),
973 Kind::Enum(enum_descriptor) => {
974 if let Some(enum_value) = enum_descriptor.get_value_by_name(s) {
976 Ok(prost_reflect::Value::EnumNumber(enum_value.number()))
977 } else {
978 if let Ok(num) = s.parse::<i32>() {
980 Ok(prost_reflect::Value::EnumNumber(num))
981 } else {
982 warn!(
983 "Unknown enum value '{}' for field '{}', using default",
984 s,
985 field.name()
986 );
987 Ok(prost_reflect::Value::EnumNumber(0))
988 }
989 }
990 }
991 _ => {
992 Ok(prost_reflect::Value::String(s.clone()))
994 }
995 }
996 }
997 serde_json::Value::Array(arr) => {
998 let mut list_values = Vec::new();
999
1000 for item in arr {
1001 let item_value = self.convert_json_value_to_protobuf_value(field, item)?;
1002 list_values.push(item_value);
1003 }
1004
1005 Ok(prost_reflect::Value::List(list_values))
1006 }
1007 serde_json::Value::Object(_obj) => match field.kind() {
1008 Kind::Message(message_descriptor) => self
1009 .json_to_dynamic_message(&message_descriptor, json_value)
1010 .map(prost_reflect::Value::Message),
1011 _ => Err(Status::invalid_argument(format!(
1012 "Cannot convert object to field {} of type {:?}",
1013 field.name(),
1014 field.kind()
1015 ))),
1016 },
1017 }
1018 }
1019
1020 #[cfg(feature = "data-faker")]
1022 fn create_schema_from_protobuf_descriptor(
1023 &self,
1024 descriptor: &prost_reflect::MessageDescriptor,
1025 ) -> SchemaDefinition {
1026 use mockforge_data::schema::FieldDefinition;
1027
1028 let mut schema = SchemaDefinition::new(descriptor.name().to_string());
1029
1030 for field in descriptor.fields() {
1031 let field_name = field.name().to_string();
1032 let field_type = match field.kind() {
1033 prost_reflect::Kind::Message(_) => {
1034 "object".to_string()
1036 }
1037 prost_reflect::Kind::Enum(_) => "string".to_string(),
1038 _ => "string".to_string(),
1040 };
1041
1042 let mut field_def = FieldDefinition::new(field_name, field_type);
1043
1044 if field.supports_presence() && !field.is_list() {
1048 field_def = field_def.optional();
1050 }
1051
1052 schema = schema.with_field(field_def);
1053 }
1054
1055 schema
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061
1062 #[test]
1063 fn test_module_compiles() {}
1064}