mockforge_grpc/reflection/
proxy.rs

1//! Main reflection proxy implementation
2
3use crate::reflection::{
4    cache::DescriptorCache, client::ReflectionClient, config::ProxyConfig,
5    connection_pool::ConnectionPool,
6};
7use futures_util::Stream;
8#[cfg(feature = "data-faker")]
9use mockforge_data::{DataConfig, DataGenerator, SchemaDefinition};
10use prost_reflect::{DynamicMessage, ReflectMessage};
11use std::pin::Pin;
12use std::time::Duration;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tokio_stream::StreamExt;
16use tonic::{transport::Endpoint, Request, Response, Status, Streaming};
17use tracing::{debug, warn};
18
19/// A reflection-based gRPC proxy that can forward requests to arbitrary services
20pub struct ReflectionProxy {
21    /// The reflection client for discovering services
22    _client: ReflectionClient,
23    /// Cache of service and method descriptors
24    cache: DescriptorCache,
25    /// Proxy configuration
26    config: ProxyConfig,
27    /// Timeout for requests
28    timeout_duration: Duration,
29    /// Connection pool for gRPC channels
30    #[allow(dead_code)]
31    connection_pool: ConnectionPool,
32}
33
34impl ReflectionProxy {
35    /// Create a new reflection proxy
36    pub async fn new(endpoint: Endpoint, config: ProxyConfig) -> Result<Self, Status> {
37        debug!("Creating reflection proxy for endpoint: {:?}", endpoint.uri());
38
39        let client = ReflectionClient::new(endpoint).await?;
40        let cache = DescriptorCache::new();
41
42        // Populate cache from the client's descriptor pool
43        cache.populate_from_pool(Some(client.pool())).await;
44
45        Ok(Self {
46            _client: client,
47            cache,
48            config,
49            timeout_duration: Duration::from_secs(30),
50            connection_pool: ConnectionPool::new(),
51        })
52    }
53
54    /// Set the request timeout
55    pub fn with_timeout(mut self, timeout: Duration) -> Self {
56        self.timeout_duration = timeout;
57        self
58    }
59
60    /// Forward a unary request to the target service
61    pub async fn forward_unary(
62        &self,
63        service_name: &str,
64        method_name: &str,
65        request: Request<DynamicMessage>,
66    ) -> Result<Response<DynamicMessage>, Status> {
67        // Check if service is allowed
68        if !self.config.is_service_allowed(service_name) {
69            return Err(Status::permission_denied(format!(
70                "Service {} is not allowed",
71                service_name
72            )));
73        }
74
75        // Get the method descriptor
76        let method = self.cache.get_method(service_name, method_name).await?;
77
78        // Check if it's actually a unary method
79        if !method.is_server_streaming() && !method.is_client_streaming() {
80            self.forward_unary_impl(method, request).await
81        } else {
82            Err(Status::invalid_argument(format!(
83                "Method {}::{} is not a unary method",
84                service_name, method_name
85            )))
86        }
87    }
88
89    /// Forward a server-streaming request to the target service
90    pub async fn forward_server_streaming(
91        &self,
92        service_name: &str,
93        method_name: &str,
94        request: Request<DynamicMessage>,
95    ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
96    {
97        // Check if service is allowed
98        if !self.config.is_service_allowed(service_name) {
99            return Err(Status::permission_denied(format!(
100                "Service {} is not allowed",
101                service_name
102            )));
103        }
104
105        // Get the method descriptor
106        let method = self.cache.get_method(service_name, method_name).await?;
107
108        // Check if it's actually a server streaming method
109        if method.is_server_streaming() && !method.is_client_streaming() {
110            self.forward_server_streaming_impl(method, request).await
111        } else {
112            Err(Status::invalid_argument(format!(
113                "Method {}::{} is not a server streaming method",
114                service_name, method_name
115            )))
116        }
117    }
118
119    /// Forward a client-streaming request to the target service
120    pub async fn forward_client_streaming(
121        &self,
122        service_name: &str,
123        method_name: &str,
124        request: Request<Streaming<DynamicMessage>>,
125    ) -> Result<Response<DynamicMessage>, Status> {
126        // Check if service is allowed
127        if !self.config.is_service_allowed(service_name) {
128            return Err(Status::permission_denied(format!(
129                "Service {} is not allowed",
130                service_name
131            )));
132        }
133
134        // Get the method descriptor
135        let method = self.cache.get_method(service_name, method_name).await?;
136
137        // Check if it's actually a client streaming method
138        if method.is_client_streaming() && !method.is_server_streaming() {
139            self.forward_client_streaming_impl(method, request).await
140        } else {
141            Err(Status::invalid_argument(format!(
142                "Method {}::{} is not a client streaming method",
143                service_name, method_name
144            )))
145        }
146    }
147
148    /// Forward a bidirectional streaming request to the target service
149    pub async fn forward_bidirectional_streaming(
150        &self,
151        service_name: &str,
152        method_name: &str,
153        request: Request<Streaming<DynamicMessage>>,
154    ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
155    {
156        // Check if service is allowed
157        if !self.config.is_service_allowed(service_name) {
158            return Err(Status::permission_denied(format!(
159                "Service {} is not allowed",
160                service_name
161            )));
162        }
163
164        // Get the method descriptor
165        let method = self.cache.get_method(service_name, method_name).await?;
166
167        // Check if it's actually a bidirectional streaming method
168        if method.is_client_streaming() && method.is_server_streaming() {
169            self.forward_bidirectional_streaming_impl(method, request).await
170        } else {
171            Err(Status::invalid_argument(format!(
172                "Method {}::{} is not a bidirectional streaming method",
173                service_name, method_name
174            )))
175        }
176    }
177
178    /// Implementation for forwarding unary requests
179    async fn forward_unary_impl(
180        &self,
181        method: prost_reflect::MethodDescriptor,
182        request: Request<DynamicMessage>,
183    ) -> Result<Response<DynamicMessage>, Status> {
184        // Real implementation for mock server:
185        // 1. Look up mock responses based on the service/method
186        // 2. Apply any configured latency or error simulation
187        // 3. Return the appropriate mock response with preserved metadata
188        // 4. Preserve all metadata from the original request in the response
189
190        debug!("Generating mock response for method: {}", method.name());
191
192        // Extract service name from method descriptor
193        let service_name = method.parent_service().name();
194        let method_name = method.name();
195
196        // Create a mock response based on the method
197        let mock_response = self.generate_mock_response(service_name, method_name, &method).await?;
198
199        // Create response with mock data and preserve metadata
200        let mut response = Response::new(mock_response);
201
202        // Preserve original request metadata in the response (ASCII only for simplicity)
203        let request_metadata = request.metadata();
204        for entry in request_metadata.iter() {
205            if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
206                // Only preserve certain metadata keys, avoiding system headers
207                if !key.as_str().starts_with(':')
208                    && !key.as_str().starts_with("grpc-")
209                    && !key.as_str().starts_with("te")
210                    && !key.as_str().starts_with("content-type")
211                {
212                    response.metadata_mut().insert(key.clone(), value.clone());
213                }
214            }
215        }
216
217        // Add mock-specific metadata
218        response
219            .metadata_mut()
220            .insert("x-mockforge-service", service_name.parse().unwrap());
221        response
222            .metadata_mut()
223            .insert("x-mockforge-method", method_name.parse().unwrap());
224        response
225            .metadata_mut()
226            .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
227
228        Ok(response)
229    }
230
231    /// Generate a mock response for a given service and method
232    async fn generate_mock_response(
233        &self,
234        service_name: &str,
235        method_name: &str,
236        method_descriptor: &prost_reflect::MethodDescriptor,
237    ) -> Result<DynamicMessage, Status> {
238        debug!("Generating mock response for {}.{}", service_name, method_name);
239
240        // Get the output message descriptor
241        let output_descriptor = method_descriptor.output();
242
243        // Create a new dynamic message with the output descriptor
244        let mut response = DynamicMessage::new(output_descriptor.clone());
245
246        // Generate mock data dynamically based on the proto structure
247        self.populate_dynamic_mock_response(
248            &mut response,
249            service_name,
250            method_name,
251            &output_descriptor,
252        )?;
253
254        Ok(response)
255    }
256
257    /// Populate a dynamic mock response based on the proto structure
258    fn populate_dynamic_mock_response(
259        &self,
260        response: &mut DynamicMessage,
261        service_name: &str,
262        method_name: &str,
263        output_descriptor: &prost_reflect::MessageDescriptor,
264    ) -> Result<(), Status> {
265        debug!("Generating dynamic mock response for {}.{}", service_name, method_name);
266
267        // Get all fields from the output message descriptor
268        for field in output_descriptor.fields() {
269            let field_name = field.name();
270            let field_type = field.kind();
271
272            debug!("Processing field: {} of type: {:?}", field_name, field_type);
273
274            // Generate appropriate mock values based on field type
275            let mock_value = self.generate_mock_value_for_field(&field, service_name, method_name);
276
277            // Try to set the field (ignore errors if field doesn't exist or is wrong type)
278            response.set_field(&field, mock_value);
279        }
280
281        // Always try to add some common metadata fields if they don't exist
282        let metadata_fields = vec![
283            ("mockforge_service", prost_reflect::Value::String(service_name.to_string())),
284            ("mockforge_method", prost_reflect::Value::String(method_name.to_string())),
285            (
286                "mockforge_timestamp",
287                prost_reflect::Value::String(chrono::Utc::now().to_rfc3339()),
288            ),
289            (
290                "mockforge_source",
291                prost_reflect::Value::String("MockForge Reflection Proxy".to_string()),
292            ),
293        ];
294
295        for (field_name, value) in metadata_fields {
296            response.set_field_by_name(field_name, value);
297        }
298
299        Ok(())
300    }
301
302    /// Generate a mock value for a specific field based on its type
303    fn generate_mock_value_for_field(
304        &self,
305        field: &prost_reflect::FieldDescriptor,
306        service_name: &str,
307        method_name: &str,
308    ) -> prost_reflect::Value {
309        self.generate_mock_value_for_field_with_depth(field, service_name, method_name, 0)
310    }
311
312    /// Generate a mock value for a specific field with recursion depth limit
313    fn generate_mock_value_for_field_with_depth(
314        &self,
315        field: &prost_reflect::FieldDescriptor,
316        service_name: &str,
317        method_name: &str,
318        depth: usize,
319    ) -> prost_reflect::Value {
320        // Prevent infinite recursion with a reasonable depth limit
321        const MAX_DEPTH: usize = 5;
322        if depth >= MAX_DEPTH {
323            return prost_reflect::Value::String(format!("max_depth_reached_{}", field.name()));
324        }
325
326        // Handle repeated fields (arrays)
327        if field.is_list() {
328            let mut list_values = Vec::new();
329            // Generate 1-3 mock values for the list
330            let field_name_lower = field.name().to_lowercase();
331            let num_items =
332                if field_name_lower.contains("list") || field_name_lower.contains("items") {
333                    3
334                } else {
335                    1
336                };
337
338            for _ in 0..num_items {
339                let item_value =
340                    self.generate_single_field_value(field, service_name, method_name, depth);
341                list_values.push(item_value);
342            }
343
344            return prost_reflect::Value::List(list_values);
345        }
346
347        self.generate_single_field_value(field, service_name, method_name, depth)
348    }
349
350    /// Generate a mock value for a single (non-repeated) field
351    fn generate_single_field_value(
352        &self,
353        field: &prost_reflect::FieldDescriptor,
354        service_name: &str,
355        method_name: &str,
356        depth: usize,
357    ) -> prost_reflect::Value {
358        let field_name = field.name().to_lowercase();
359        let field_type = field.kind();
360
361        // Generate contextual mock data based on field name patterns
362        if field_name.contains("message")
363            || field_name.contains("text")
364            || field_name.contains("content")
365        {
366            return prost_reflect::Value::String(format!(
367                "Mock response from {} for method {} at {}",
368                service_name,
369                method_name,
370                chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
371            ));
372        }
373
374        if field_name.contains("id") {
375            return prost_reflect::Value::String(format!(
376                "mock_{}",
377                chrono::Utc::now().timestamp()
378            ));
379        }
380
381        if field_name.contains("status") || field_name.contains("state") {
382            return prost_reflect::Value::String("success".to_string());
383        }
384
385        if field_name.contains("count") || field_name.contains("number") {
386            return prost_reflect::Value::I64(42);
387        }
388
389        if field_name.contains("timestamp") || field_name.contains("time") {
390            return prost_reflect::Value::String(chrono::Utc::now().to_rfc3339());
391        }
392
393        if field_name.contains("enabled") || field_name.contains("active") {
394            return prost_reflect::Value::Bool(true);
395        }
396
397        // Default mock values based on field type
398        match field_type {
399            prost_reflect::Kind::String => {
400                prost_reflect::Value::String(format!("mock_{}_{}", service_name, method_name))
401            }
402            prost_reflect::Kind::Int32 => prost_reflect::Value::I32(42),
403            prost_reflect::Kind::Int64 => prost_reflect::Value::I64(42),
404            prost_reflect::Kind::Float => prost_reflect::Value::F32(std::f32::consts::PI),
405            prost_reflect::Kind::Double => prost_reflect::Value::F64(std::f64::consts::PI),
406            prost_reflect::Kind::Bool => prost_reflect::Value::Bool(true),
407            prost_reflect::Kind::Bytes => prost_reflect::Value::Bytes(b"mock_data".to_vec().into()),
408            prost_reflect::Kind::Enum(enum_descriptor) => {
409                // Try to get the first enum value, or use a default
410                if let Some(first_value) = enum_descriptor.values().next() {
411                    // Use the first enum value as the default
412                    prost_reflect::Value::EnumNumber(first_value.number())
413                } else {
414                    // Fallback if no enum values are defined
415                    prost_reflect::Value::EnumNumber(0)
416                }
417            }
418            prost_reflect::Kind::Message(message_descriptor) => {
419                // Recursively generate a mock message for nested types
420                let mut nested_message = DynamicMessage::new(message_descriptor.clone());
421
422                // Populate the nested message with mock values
423                for nested_field in message_descriptor.fields() {
424                    let mock_value = self.generate_mock_value_for_field_with_depth(
425                        &nested_field,
426                        service_name,
427                        method_name,
428                        depth + 1,
429                    );
430                    nested_message.set_field(&nested_field, mock_value);
431                }
432
433                prost_reflect::Value::Message(nested_message)
434            }
435            _ => prost_reflect::Value::String("mock_value".to_string()),
436        }
437    }
438
439    /// Implementation for forwarding server streaming requests
440    async fn forward_server_streaming_impl(
441        &self,
442        method: prost_reflect::MethodDescriptor,
443        request: Request<DynamicMessage>,
444    ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
445    {
446        // Extract metadata from the original request
447        let metadata = request.metadata();
448        debug!(
449            "Forwarding server streaming request for method: {} with {} metadata entries",
450            method.name(),
451            metadata.len()
452        );
453
454        #[cfg(feature = "data-faker")]
455        {
456            // Generate mock streaming responses
457            let output_descriptor = method.output();
458            let messages = self.generate_mock_stream_messages(&output_descriptor, 5).await?;
459
460            // Create a proper streaming response using ReceiverStream
461            let (tx, rx) = mpsc::channel(32);
462            let stream = Box::pin(ReceiverStream::new(rx))
463                as Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>;
464
465            // Spawn a task to send messages
466            tokio::spawn(async move {
467                for message in messages {
468                    if tx.send(Ok(message)).await.is_err() {
469                        break;
470                    }
471                }
472            });
473
474            // Preserve original request metadata in the response
475            let mut response = Response::new(stream);
476
477            // Copy relevant metadata from the original request to the response (ASCII only for simplicity)
478            for entry in metadata.iter() {
479                if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
480                    // Only preserve certain metadata keys, avoiding system headers
481                    if !key.as_str().starts_with(':')
482                        && !key.as_str().starts_with("grpc-")
483                        && !key.as_str().starts_with("te")
484                        && !key.as_str().starts_with("content-type")
485                    {
486                        response.metadata_mut().insert(key.clone(), value.clone());
487                    }
488                }
489            }
490
491            // Add mock-specific metadata
492            response
493                .metadata_mut()
494                .insert("x-mockforge-service", method.parent_service().name().parse().unwrap());
495            response
496                .metadata_mut()
497                .insert("x-mockforge-method", method.name().parse().unwrap());
498            response
499                .metadata_mut()
500                .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
501            response.metadata_mut().insert("x-mockforge-stream-count", "5".parse().unwrap());
502
503            debug!("Generated server streaming response with {} messages", 5);
504            Ok(response)
505        }
506
507        #[cfg(not(feature = "data-faker"))]
508        {
509            debug!("Data faker feature not enabled, returning unimplemented for server streaming");
510            Err(Status::unimplemented("Server streaming requires data-faker feature"))
511        }
512    }
513
514    /// Implementation for forwarding client streaming requests
515    async fn forward_client_streaming_impl(
516        &self,
517        method: prost_reflect::MethodDescriptor,
518        request: Request<Streaming<DynamicMessage>>,
519    ) -> Result<Response<DynamicMessage>, Status> {
520        debug!("Forwarding client streaming request for method: {}", method.name());
521
522        #[cfg(feature = "data-faker")]
523        {
524            // Extract metadata from the original request before consuming it
525            let request_metadata = request.metadata().clone();
526
527            // Process the streaming request and extract message data
528            let mut stream = request.into_inner();
529            let mut message_count = 0;
530            let mut processed_names = Vec::new();
531            let mut user_ids = Vec::new();
532            let mut all_tags = Vec::new();
533
534            while let Some(message_result) = stream.next().await {
535                match message_result {
536                    Ok(message) => {
537                        message_count += 1;
538                        debug!(
539                            "Processing client streaming message {} for method: {}",
540                            message_count,
541                            method.name()
542                        );
543
544                        // Extract data from the HelloRequest message
545                        let input_descriptor = method.input();
546
547                        // Extract the 'name' field
548                        if let Some(name_field) = input_descriptor.get_field_by_name("name") {
549                            let field_value = message.get_field(&name_field);
550                            if let prost_reflect::Value::String(name) = field_value.into_owned() {
551                                processed_names.push(name.clone());
552                                debug!("  - Name: {}", name);
553                            }
554                        }
555
556                        // Extract the 'user_info' field (nested message)
557                        if let Some(user_info_field) =
558                            input_descriptor.get_field_by_name("user_info")
559                        {
560                            let field_value = message.get_field(&user_info_field);
561                            if let prost_reflect::Value::Message(user_info_msg) =
562                                field_value.into_owned()
563                            {
564                                // Extract user_id from user_info
565                                if let Some(user_id_field) =
566                                    user_info_msg.descriptor().get_field_by_name("user_id")
567                                {
568                                    let user_id_value = user_info_msg.get_field(&user_id_field);
569                                    if let prost_reflect::Value::String(user_id) =
570                                        user_id_value.into_owned()
571                                    {
572                                        user_ids.push(user_id.clone());
573                                        debug!("  - User ID: {}", user_id);
574                                    }
575                                }
576                            }
577                        }
578
579                        // Extract the 'tags' field (repeated string)
580                        if let Some(tags_field) = input_descriptor.get_field_by_name("tags") {
581                            let field_value = message.get_field(&tags_field);
582                            if let prost_reflect::Value::List(tags_list) = field_value.into_owned()
583                            {
584                                for tag_value in tags_list {
585                                    if let prost_reflect::Value::String(tag) = tag_value {
586                                        all_tags.push(tag.clone());
587                                        debug!("  - Tag: {}", tag);
588                                    }
589                                }
590                            }
591                        }
592                    }
593                    Err(e) => {
594                        warn!("Error receiving client streaming message: {}", e);
595                        return Err(Status::internal(format!(
596                            "Error processing streaming request: {}",
597                            e
598                        )));
599                    }
600                }
601            }
602
603            debug!("Processed {} messages in client streaming request", message_count);
604            debug!(
605                "Collected data - Names: {:?}, User IDs: {:?}, Tags: {:?}",
606                processed_names, user_ids, all_tags
607            );
608
609            // Generate a mock response based on the output descriptor, but enhance it with processed data
610            let output_descriptor = method.output();
611            let mut mock_response = self.generate_mock_message(&output_descriptor).await?;
612
613            // Enhance the response message with aggregated data from the stream
614            if let Some(message_field) = output_descriptor.get_field_by_name("message") {
615                // Create a personalized message based on the processed data
616                let personalized_message = if !processed_names.is_empty() {
617                    format!("Hello to all {} senders! Processed names: {}, with {} unique tags from {} users",
618                           message_count, processed_names.join(", "), all_tags.len(), user_ids.len())
619                } else {
620                    format!(
621                        "Hello! Processed {} messages with {} tags",
622                        message_count,
623                        all_tags.len()
624                    )
625                };
626
627                // Update the message field in the response
628                mock_response
629                    .set_field(&message_field, prost_reflect::Value::String(personalized_message));
630            }
631
632            // Preserve original request metadata in the response
633            let mut response = Response::new(mock_response);
634
635            // Copy relevant metadata from the original request to the response (ASCII only for simplicity)
636            for entry in request_metadata.iter() {
637                if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
638                    // Only preserve certain metadata keys, avoiding system headers
639                    if !key.as_str().starts_with(':')
640                        && !key.as_str().starts_with("grpc-")
641                        && !key.as_str().starts_with("te")
642                        && !key.as_str().starts_with("content-type")
643                    {
644                        response.metadata_mut().insert(key.clone(), value.clone());
645                    }
646                }
647            }
648
649            // Add mock-specific metadata
650            response
651                .metadata_mut()
652                .insert("x-mockforge-service", method.parent_service().name().parse().unwrap());
653            response
654                .metadata_mut()
655                .insert("x-mockforge-method", method.name().parse().unwrap());
656            response
657                .metadata_mut()
658                .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
659            response
660                .metadata_mut()
661                .insert("x-mockforge-message-count", message_count.to_string().parse().unwrap());
662
663            let response = response;
664
665            debug!(
666                "Generated enhanced client streaming response with {} processed messages",
667                message_count
668            );
669            Ok(response)
670        }
671
672        #[cfg(not(feature = "data-faker"))]
673        {
674            debug!("Data faker feature not enabled, returning unimplemented for client streaming");
675            Err(Status::unimplemented("Client streaming requires data-faker feature"))
676        }
677    }
678
679    /// Implementation for forwarding bidirectional streaming requests
680    async fn forward_bidirectional_streaming_impl(
681        &self,
682        method: prost_reflect::MethodDescriptor,
683        request: Request<Streaming<DynamicMessage>>,
684    ) -> Result<Response<Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>>, Status>
685    {
686        debug!("Forwarding bidirectional streaming request for method: {}", method.name());
687
688        #[cfg(feature = "data-faker")]
689        {
690            // Extract metadata from the original request before consuming it
691            let metadata = request.metadata();
692            debug!("Forwarding bidirectional streaming request for method: {} with {} metadata entries",
693                   method.name(), metadata.len());
694
695            // Generate mock bidirectional streaming responses
696            let output_descriptor = method.output();
697            let messages = self.generate_mock_stream_messages(&output_descriptor, 10).await?;
698
699            // Create streaming response using ReceiverStream
700            let (tx, rx) = mpsc::channel(32);
701            let stream = Box::pin(ReceiverStream::new(rx))
702                as Pin<Box<dyn Stream<Item = Result<DynamicMessage, Status>> + Send>>;
703
704            // Spawn a task to send messages
705            tokio::spawn(async move {
706                for message in messages {
707                    if tx.send(Ok(message)).await.is_err() {
708                        break;
709                    }
710                }
711            });
712
713            // Preserve original request metadata in the response
714            let mut response = Response::new(stream);
715
716            // Copy relevant metadata from the original request to the response (ASCII only for simplicity)
717            for entry in metadata.iter() {
718                if let tonic::metadata::KeyAndValueRef::Ascii(key, value) = entry {
719                    // Only preserve certain metadata keys, avoiding system headers
720                    if !key.as_str().starts_with(':')
721                        && !key.as_str().starts_with("grpc-")
722                        && !key.as_str().starts_with("te")
723                        && !key.as_str().starts_with("content-type")
724                    {
725                        response.metadata_mut().insert(key.clone(), value.clone());
726                    }
727                }
728            }
729
730            // Add mock-specific metadata
731            response
732                .metadata_mut()
733                .insert("x-mockforge-service", method.parent_service().name().parse().unwrap());
734            response
735                .metadata_mut()
736                .insert("x-mockforge-method", method.name().parse().unwrap());
737            response
738                .metadata_mut()
739                .insert("x-mockforge-timestamp", chrono::Utc::now().to_rfc3339().parse().unwrap());
740            response
741                .metadata_mut()
742                .insert("x-mockforge-stream-count", "10".parse().unwrap());
743
744            // Process incoming stream concurrently
745            let mut incoming_stream = request.into_inner();
746            tokio::spawn(async move {
747                let mut count = 0;
748                while let Some(message_result) = incoming_stream.next().await {
749                    match message_result {
750                        Ok(_) => {
751                            count += 1;
752                            debug!(
753                                "Processed bidirectional message {} for method: {}",
754                                count,
755                                method.name()
756                            );
757                        }
758                        Err(e) => {
759                            warn!("Error processing bidirectional message: {}", e);
760                            break;
761                        }
762                    }
763                }
764                debug!("Finished processing {} bidirectional messages", count);
765            });
766
767            debug!("Generated bidirectional streaming response with {} messages", 10);
768            Ok(response)
769        }
770
771        #[cfg(not(feature = "data-faker"))]
772        {
773            debug!("Data faker feature not enabled, returning unimplemented for bidirectional streaming");
774            Err(Status::unimplemented("Bidirectional streaming requires data-faker feature"))
775        }
776    }
777
778    /// Generate a single mock message for the given descriptor
779    #[cfg(feature = "data-faker")]
780    async fn generate_mock_message(
781        &self,
782        descriptor: &prost_reflect::MessageDescriptor,
783    ) -> Result<DynamicMessage, Status> {
784        // Create a basic schema from the descriptor for mock generation
785        let schema_def = self.create_schema_from_protobuf_descriptor(descriptor);
786
787        let config = DataConfig {
788            rows: 1,
789            ..Default::default()
790        };
791
792        let mut generator = DataGenerator::new(schema_def, config)
793            .map_err(|e| Status::internal(format!("Failed to create data generator: {}", e)))?;
794
795        let result = generator
796            .generate()
797            .await
798            .map_err(|e| Status::internal(format!("Failed to generate mock data: {}", e)))?;
799
800        if let Some(data) = result.data.first() {
801            // Convert the generated JSON to a DynamicMessage
802            self.json_to_dynamic_message(descriptor, data)
803        } else {
804            Err(Status::internal("No mock data generated"))
805        }
806    }
807
808    /// Generate multiple mock messages for streaming
809    #[cfg(feature = "data-faker")]
810    async fn generate_mock_stream_messages(
811        &self,
812        descriptor: &prost_reflect::MessageDescriptor,
813        count: usize,
814    ) -> Result<Vec<DynamicMessage>, Status> {
815        let schema_def = self.create_schema_from_protobuf_descriptor(descriptor);
816
817        let config = DataConfig {
818            rows: count,
819            ..Default::default()
820        };
821
822        let mut generator = DataGenerator::new(schema_def, config)
823            .map_err(|e| Status::internal(format!("Failed to create data generator: {}", e)))?;
824
825        let result = generator
826            .generate()
827            .await
828            .map_err(|e| Status::internal(format!("Failed to generate mock data: {}", e)))?;
829
830        result
831            .data
832            .iter()
833            .map(|data| self.json_to_dynamic_message(descriptor, data))
834            .collect()
835    }
836
837    /// Convert JSON data to a DynamicMessage
838    #[cfg(feature = "data-faker")]
839    fn json_to_dynamic_message(
840        &self,
841        descriptor: &prost_reflect::MessageDescriptor,
842        json_data: &serde_json::Value,
843    ) -> Result<DynamicMessage, Status> {
844        let mut message = DynamicMessage::new(descriptor.clone());
845
846        if let serde_json::Value::Object(obj) = json_data {
847            for (key, value) in obj {
848                if let Some(field) = descriptor.get_field_by_name(key) {
849                    let field_value = self.convert_json_value_to_protobuf_value(&field, value)?;
850                    message.set_field(&field, field_value);
851                }
852            }
853        }
854
855        Ok(message)
856    }
857
858    /// Convert a JSON value to a protobuf Value based on the field descriptor
859    #[cfg(feature = "data-faker")]
860    fn convert_json_value_to_protobuf_value(
861        &self,
862        field: &prost_reflect::FieldDescriptor,
863        json_value: &serde_json::Value,
864    ) -> Result<prost_reflect::Value, Status> {
865        use prost_reflect::Kind;
866
867        match json_value {
868            serde_json::Value::Null => {
869                // Return default value for the field type
870                match field.kind() {
871                    Kind::Message(message_descriptor) => Ok(prost_reflect::Value::Message(
872                        DynamicMessage::new(message_descriptor.clone()),
873                    )),
874                    Kind::Enum(enum_descriptor) => {
875                        // Try to get the first enum value, or use 0 as default
876                        if let Some(first_value) = enum_descriptor.values().next() {
877                            Ok(prost_reflect::Value::EnumNumber(first_value.number()))
878                        } else {
879                            Ok(prost_reflect::Value::EnumNumber(0))
880                        }
881                    }
882                    Kind::Int32 | Kind::Sint32 | Kind::Sfixed32 => Ok(prost_reflect::Value::I32(0)),
883                    Kind::Int64 | Kind::Sint64 | Kind::Sfixed64 => Ok(prost_reflect::Value::I64(0)),
884                    Kind::Uint32 | Kind::Fixed32 => Ok(prost_reflect::Value::U32(0)),
885                    Kind::Uint64 | Kind::Fixed64 => Ok(prost_reflect::Value::U64(0)),
886                    Kind::Float => Ok(prost_reflect::Value::F32(0.0)),
887                    Kind::Double => Ok(prost_reflect::Value::F64(0.0)),
888                    Kind::Bool => Ok(prost_reflect::Value::Bool(false)),
889                    Kind::String => Ok(prost_reflect::Value::String(String::new())),
890                    Kind::Bytes => Ok(prost_reflect::Value::Bytes(b"".to_vec().into())),
891                }
892            }
893            serde_json::Value::Bool(b) => Ok(prost_reflect::Value::Bool(*b)),
894            serde_json::Value::Number(n) => {
895                match field.kind() {
896                    Kind::Int32 | Kind::Sint32 | Kind::Sfixed32 => {
897                        if let Some(i) = n.as_i64() {
898                            Ok(prost_reflect::Value::I32(i as i32))
899                        } else {
900                            Err(Status::invalid_argument(format!(
901                                "Cannot convert number {} to int32",
902                                n
903                            )))
904                        }
905                    }
906                    Kind::Int64 | Kind::Sint64 | Kind::Sfixed64 => {
907                        if let Some(i) = n.as_i64() {
908                            Ok(prost_reflect::Value::I64(i))
909                        } else {
910                            Err(Status::invalid_argument(format!(
911                                "Cannot convert number {} to int64",
912                                n
913                            )))
914                        }
915                    }
916                    Kind::Uint32 | Kind::Fixed32 => {
917                        if let Some(i) = n.as_u64() {
918                            Ok(prost_reflect::Value::U32(i as u32))
919                        } else {
920                            Err(Status::invalid_argument(format!(
921                                "Cannot convert number {} to uint32",
922                                n
923                            )))
924                        }
925                    }
926                    Kind::Uint64 | Kind::Fixed64 => {
927                        if let Some(i) = n.as_u64() {
928                            Ok(prost_reflect::Value::U64(i))
929                        } else {
930                            Err(Status::invalid_argument(format!(
931                                "Cannot convert number {} to uint64",
932                                n
933                            )))
934                        }
935                    }
936                    Kind::Float => {
937                        if let Some(f) = n.as_f64() {
938                            Ok(prost_reflect::Value::F32(f as f32))
939                        } else {
940                            Err(Status::invalid_argument(format!(
941                                "Cannot convert number {} to float",
942                                n
943                            )))
944                        }
945                    }
946                    Kind::Double => {
947                        if let Some(f) = n.as_f64() {
948                            Ok(prost_reflect::Value::F64(f))
949                        } else {
950                            Err(Status::invalid_argument(format!(
951                                "Cannot convert number {} to double",
952                                n
953                            )))
954                        }
955                    }
956                    _ => {
957                        // Fallback to int64 for unknown numeric types
958                        if let Some(i) = n.as_i64() {
959                            Ok(prost_reflect::Value::I64(i))
960                        } else {
961                            Err(Status::invalid_argument(format!(
962                                "Cannot convert number {} to numeric type",
963                                n
964                            )))
965                        }
966                    }
967                }
968            }
969            serde_json::Value::String(s) => {
970                match field.kind() {
971                    Kind::String => Ok(prost_reflect::Value::String(s.clone())),
972                    Kind::Bytes => Ok(prost_reflect::Value::Bytes(s.as_bytes().to_vec().into())),
973                    Kind::Enum(enum_descriptor) => {
974                        // Try to convert string to enum value
975                        if let Some(enum_value) = enum_descriptor.get_value_by_name(s) {
976                            Ok(prost_reflect::Value::EnumNumber(enum_value.number()))
977                        } else {
978                            // Try to parse as number
979                            if let Ok(num) = s.parse::<i32>() {
980                                Ok(prost_reflect::Value::EnumNumber(num))
981                            } else {
982                                warn!(
983                                    "Unknown enum value '{}' for field '{}', using default",
984                                    s,
985                                    field.name()
986                                );
987                                Ok(prost_reflect::Value::EnumNumber(0))
988                            }
989                        }
990                    }
991                    _ => {
992                        // For other types, treat string as string
993                        Ok(prost_reflect::Value::String(s.clone()))
994                    }
995                }
996            }
997            serde_json::Value::Array(arr) => {
998                let mut list_values = Vec::new();
999
1000                for item in arr {
1001                    let item_value = self.convert_json_value_to_protobuf_value(field, item)?;
1002                    list_values.push(item_value);
1003                }
1004
1005                Ok(prost_reflect::Value::List(list_values))
1006            }
1007            serde_json::Value::Object(_obj) => match field.kind() {
1008                Kind::Message(message_descriptor) => self
1009                    .json_to_dynamic_message(&message_descriptor, json_value)
1010                    .map(prost_reflect::Value::Message),
1011                _ => Err(Status::invalid_argument(format!(
1012                    "Cannot convert object to field {} of type {:?}",
1013                    field.name(),
1014                    field.kind()
1015                ))),
1016            },
1017        }
1018    }
1019
1020    /// Create a basic schema definition from a protobuf message descriptor
1021    #[cfg(feature = "data-faker")]
1022    fn create_schema_from_protobuf_descriptor(
1023        &self,
1024        descriptor: &prost_reflect::MessageDescriptor,
1025    ) -> SchemaDefinition {
1026        use mockforge_data::schema::FieldDefinition;
1027
1028        let mut schema = SchemaDefinition::new(descriptor.name().to_string());
1029
1030        for field in descriptor.fields() {
1031            let field_name = field.name().to_string();
1032            let field_type = match field.kind() {
1033                prost_reflect::Kind::Message(_) => {
1034                    // For nested messages, use a generic object type
1035                    "object".to_string()
1036                }
1037                prost_reflect::Kind::Enum(_) => "string".to_string(),
1038                // Simplified type mapping - default to string for all scalar types
1039                _ => "string".to_string(),
1040            };
1041
1042            let mut field_def = FieldDefinition::new(field_name, field_type);
1043
1044            // Check if field is optional based on protobuf field properties
1045            // In proto3, all non-repeated fields are effectively optional
1046            // In proto2, only explicitly optional or required fields exist
1047            if field.supports_presence() && !field.is_list() {
1048                // Field supports presence detection and is not repeated, so it's optional
1049                field_def = field_def.optional();
1050            }
1051
1052            schema = schema.with_field(field_def);
1053        }
1054
1055        schema
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061
1062    #[test]
1063    fn test_module_compiles() {}
1064}