pub mod http_bridge;
pub mod proto_parser;
pub mod router;
pub mod service_generator;
use crate::reflection::{MockReflectionProxy, ProxyConfig};
use proto_parser::ProtoParser;
use service_generator::DynamicGrpcService;
use std::collections::HashMap;
use std::sync::Arc;
use tonic::transport::Server;
use tonic_reflection::server::Builder as ReflectionBuilder;
use tracing::*;
#[derive(Debug, Clone)]
pub struct GrpcTlsConfig {
pub cert_path: String,
pub key_path: String,
pub client_ca_path: Option<String>,
}
impl GrpcTlsConfig {
pub fn new(cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
Self {
cert_path: cert_path.into(),
key_path: key_path.into(),
client_ca_path: None,
}
}
pub fn with_mtls(
cert_path: impl Into<String>,
key_path: impl Into<String>,
client_ca_path: impl Into<String>,
) -> Self {
Self {
cert_path: cert_path.into(),
key_path: key_path.into(),
client_ca_path: Some(client_ca_path.into()),
}
}
pub fn from_env() -> Option<Self> {
let cert_path = std::env::var("GRPC_TLS_CERT").ok()?;
let key_path = std::env::var("GRPC_TLS_KEY").ok()?;
let client_ca_path = std::env::var("GRPC_TLS_CLIENT_CA").ok();
Some(Self {
cert_path,
key_path,
client_ca_path,
})
}
}
#[derive(Debug, Clone)]
pub struct DynamicGrpcConfig {
pub proto_dir: String,
pub enable_reflection: bool,
pub excluded_services: Vec<String>,
pub http_bridge: Option<http_bridge::HttpBridgeConfig>,
pub tls: Option<GrpcTlsConfig>,
}
impl Default for DynamicGrpcConfig {
fn default() -> Self {
Self {
proto_dir: "proto".to_string(),
enable_reflection: false,
excluded_services: Vec::new(),
http_bridge: Some(http_bridge::HttpBridgeConfig {
enabled: true,
..Default::default()
}),
tls: GrpcTlsConfig::from_env(),
}
}
}
#[derive(Clone)]
pub struct ServiceRegistry {
services: HashMap<String, Arc<DynamicGrpcService>>,
descriptor_pool: prost_reflect::DescriptorPool,
}
impl Default for ServiceRegistry {
fn default() -> Self {
Self::new()
}
}
impl ServiceRegistry {
pub fn descriptor_pool(&self) -> &prost_reflect::DescriptorPool {
&self.descriptor_pool
}
pub fn new() -> Self {
Self {
services: HashMap::new(),
descriptor_pool: prost_reflect::DescriptorPool::new(),
}
}
pub fn with_descriptor_pool(descriptor_pool: prost_reflect::DescriptorPool) -> Self {
Self {
services: HashMap::new(),
descriptor_pool,
}
}
pub fn set_descriptor_pool(&mut self, pool: prost_reflect::DescriptorPool) {
self.descriptor_pool = pool;
}
pub fn register(&mut self, name: String, service: DynamicGrpcService) {
self.services.insert(name, Arc::new(service));
}
pub fn get(&self, name: &str) -> Option<&Arc<DynamicGrpcService>> {
self.services.get(name)
}
pub fn service_names(&self) -> Vec<String> {
self.services.keys().cloned().collect()
}
}
pub async fn discover_services(
config: &DynamicGrpcConfig,
) -> Result<ServiceRegistry, Box<dyn std::error::Error + Send + Sync>> {
use std::time::Instant;
let discovery_start = Instant::now();
info!("Discovering gRPC services from proto directory: {}", config.proto_dir);
let parse_start = Instant::now();
let mut parser = ProtoParser::new();
parser.parse_directory(&config.proto_dir).await?;
let parse_duration = parse_start.elapsed();
info!("Proto file parsing completed (took {:?})", parse_duration);
let registry_start = Instant::now();
let mut registry = ServiceRegistry::new();
let services = parser.services().clone();
let descriptor_pool = parser.into_pool();
registry.set_descriptor_pool(descriptor_pool);
let registry_duration = registry_start.elapsed();
debug!("Registry creation completed (took {:?})", registry_duration);
let service_reg_start = Instant::now();
for (service_name, proto_service) in services {
if config.excluded_services.contains(&service_name) {
info!("Skipping excluded service: {}", service_name);
continue;
}
let dynamic_service = DynamicGrpcService::new(proto_service.clone(), None);
registry.register(service_name.clone(), dynamic_service);
debug!("Registered service: {}", service_name);
}
let service_reg_duration = service_reg_start.elapsed();
info!(
"Service registration completed for {} services (took {:?})",
registry.service_names().len(),
service_reg_duration
);
let total_discovery_duration = discovery_start.elapsed();
info!("Service discovery completed (total time: {:?})", total_discovery_duration);
Ok(registry)
}
pub async fn start_dynamic_server(
port: u16,
config: DynamicGrpcConfig,
latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use std::time::Instant;
let startup_start = Instant::now();
#[cfg(feature = "data-faker")]
mockforge_data::provider::register_core_faker_provider();
let _latency_injector = latency_profile.map(|profile| {
mockforge_foundation::latency::LatencyInjector::new(profile, Default::default())
});
let registry = discover_services(&config).await?;
let registry_arc = Arc::new(registry);
let addr = mockforge_core::wildcard_socket_addr(port);
info!(
"Dynamic server listening on {} with {} services",
addr,
registry_arc.service_names().len()
);
let proxy_config = ProxyConfig::default();
let reflection_start = Instant::now();
let mock_proxy = MockReflectionProxy::new(proxy_config, registry_arc.clone()).await?;
let reflection_duration = reflection_start.elapsed();
info!("gRPC reflection proxy created (took {:?})", reflection_duration);
let total_startup_duration = startup_start.elapsed();
info!("gRPC server startup completed (total time: {:?})", total_startup_duration);
start_grpc_only_server(port, &config, registry_arc.clone(), mock_proxy).await?;
Ok(())
}
pub async fn start_dynamic_grpc_server(
port: u16,
config: DynamicGrpcConfig,
latency_profile: Option<mockforge_foundation::latency::LatencyProfile>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut grpc_only_config = config;
grpc_only_config.http_bridge = None;
start_dynamic_server(port, grpc_only_config, latency_profile).await
}
async fn start_grpc_only_server(
port: u16,
config: &DynamicGrpcConfig,
registry_arc: Arc<ServiceRegistry>,
_mock_proxy: MockReflectionProxy,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use tonic::transport::{Certificate, Identity, ServerTlsConfig};
let mut server_builder = if let Some(tls_config) = &config.tls {
info!("Configuring gRPC server with TLS");
let cert = tokio::fs::read(&tls_config.cert_path).await.map_err(|e| {
error!("Failed to read TLS certificate from {}: {}", tls_config.cert_path, e);
Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Failed to read TLS certificate: {}",
e
))
})?;
let key = tokio::fs::read(&tls_config.key_path).await.map_err(|e| {
error!("Failed to read TLS key from {}: {}", tls_config.key_path, e);
Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Failed to read TLS key: {}",
e
))
})?;
let identity = Identity::from_pem(cert, key);
let mut tls = ServerTlsConfig::new().identity(identity);
if let Some(client_ca_path) = &tls_config.client_ca_path {
info!("Configuring mutual TLS (mTLS) with client certificate verification");
let client_ca = tokio::fs::read(client_ca_path).await.map_err(|e| {
error!("Failed to read client CA from {}: {}", client_ca_path, e);
Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Failed to read client CA: {}",
e
))
})?;
tls = tls.client_ca_root(Certificate::from_pem(client_ca));
}
Server::builder().tls_config(tls).map_err(|e| {
error!("Failed to configure TLS: {}", e);
Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Failed to configure TLS: {}",
e
))
})?
} else {
info!("gRPC server running in plaintext mode (no TLS configured)");
Server::builder()
};
use std::net::SocketAddr;
let grpc_addr: SocketAddr = mockforge_core::wildcard_socket_addr(port);
info!(
"Starting gRPC server on {} with {} discovered services",
grpc_addr,
registry_arc.service_names().len()
);
for service_name in registry_arc.service_names() {
info!(" - Dynamic service: {}", service_name);
}
use crate::generated::greeter_server::{Greeter, GreeterServer};
use crate::generated::{HelloReply, HelloRequest};
use tonic::{Request, Response, Status};
#[derive(Debug, Default)]
struct MockGreeterService;
use futures::StreamExt;
use std::pin::Pin;
use tokio_stream::wrappers::ReceiverStream;
#[tonic::async_trait]
impl Greeter for MockGreeterService {
type SayHelloStreamStream =
Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
type ChatStream = Pin<Box<dyn futures::Stream<Item = Result<HelloReply, Status>> + Send>>;
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
info!("gRPC say_hello request: {:?}", request);
let req = request.into_inner();
let reply = HelloReply {
message: format!("Hello {}! This is a mock response from MockForge", req.name),
metadata: None,
items: vec![],
};
Ok(Response::new(reply))
}
async fn say_hello_stream(
&self,
request: Request<HelloRequest>,
) -> Result<Response<Self::SayHelloStreamStream>, Status> {
info!("gRPC say_hello_stream request: {:?}", request);
let name = request.into_inner().name;
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
for i in 1..=5 {
let reply = HelloReply {
message: format!(
"Hello {}! Stream message {} of 5 from MockForge",
name, i
),
metadata: None,
items: vec![],
};
if tx.send(Ok(reply)).await.is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::SayHelloStreamStream))
}
async fn say_hello_client_stream(
&self,
request: Request<tonic::Streaming<HelloRequest>>,
) -> Result<Response<HelloReply>, Status> {
info!("gRPC say_hello_client_stream started");
let mut stream = request.into_inner();
let mut names = Vec::new();
let mut count = 0;
while let Some(req) = stream.next().await {
match req {
Ok(hello_request) => {
names.push(hello_request.name);
count += 1;
}
Err(e) => {
error!("Error receiving client stream message: {}", e);
return Err(Status::internal(format!("Stream error: {}", e)));
}
}
}
let message = if names.is_empty() {
"Hello! No names received in the stream.".to_string()
} else {
format!(
"Hello {}! Received {} messages from MockForge client stream.",
names.join(", "),
count
)
};
Ok(Response::new(HelloReply {
message,
metadata: None,
items: vec![],
}))
}
async fn chat(
&self,
request: Request<tonic::Streaming<HelloRequest>>,
) -> Result<Response<Self::ChatStream>, Status> {
info!("gRPC chat (bidirectional streaming) started");
let mut stream = request.into_inner();
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(async move {
let mut message_count = 0;
while let Some(req) = stream.next().await {
match req {
Ok(hello_request) => {
message_count += 1;
let reply = HelloReply {
message: format!(
"Chat response {}: Hello {}! from MockForge",
message_count, hello_request.name
),
metadata: None,
items: vec![],
};
if tx.send(Ok(reply)).await.is_err() {
break;
}
}
Err(e) => {
error!("Chat stream error: {}", e);
let _ = tx
.send(Err(Status::internal(format!("Stream error: {}", e))))
.await;
break;
}
}
}
info!("Chat session ended after {} messages", message_count);
});
let output_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(output_stream) as Self::ChatStream))
}
}
let mut routes_builder = tonic::service::RoutesBuilder::default();
routes_builder.add_service(GreeterServer::new(MockGreeterService));
info!("Registered built-in Greeter service");
if config.enable_reflection {
let encoded_fd_set = registry_arc.descriptor_pool().encode_to_vec();
let reflection_service = ReflectionBuilder::configure()
.register_encoded_file_descriptor_set(&encoded_fd_set)
.build_v1()
.map_err(|e| {
error!("Failed to build reflection service: {}", e);
Box::<dyn std::error::Error + Send + Sync>::from(format!(
"Failed to build reflection service: {}",
e
))
})?;
routes_builder.add_service(reflection_service);
info!("gRPC reflection service enabled");
}
let registry_for_fallback = registry_arc.clone();
let axum_router =
routes_builder
.routes()
.into_axum_router()
.fallback(move |req: axum::extract::Request| {
let registry = registry_for_fallback.clone();
async move {
let path = req.uri().path().to_string();
match router::parse_grpc_path(&path) {
Some((service_name, method_name)) => {
let body = axum::body::to_bytes(req.into_body(), 4 * 1024 * 1024)
.await
.unwrap_or_default();
match router::handle_dynamic_grpc_request(
®istry,
service_name,
method_name,
body,
)
.await
{
Ok(response) => response,
Err(status) => router::create_grpc_error_response(status),
}
}
None => router::create_grpc_error_response(Status::unimplemented(
"Unknown path",
)),
}
}
});
let routes = tonic::service::Routes::from(axum_router);
server_builder.add_routes(routes).serve(grpc_addr).await?;
info!("gRPC server stopped");
Ok(())
}
#[cfg(test)]
mod tests {
#[test]
fn test_module_compiles() {
}
}