use crate::grpc::services::{
AnalyticsServiceImpl, ConfigServiceImpl, DlqServiceImpl, HealthServiceImpl, StepServiceImpl,
TaskServiceImpl, TemplateServiceImpl,
};
use crate::grpc::state::GrpcState;
use std::net::SocketAddr;
use std::time::Duration;
use tasker_shared::config::GrpcConfig;
use tasker_shared::proto::v1::{
analytics_service_server::AnalyticsServiceServer, config_service_server::ConfigServiceServer,
dlq_service_server::DlqServiceServer, health_service_server::HealthServiceServer,
step_service_server::StepServiceServer, task_service_server::TaskServiceServer,
template_service_server::TemplateServiceServer, FILE_DESCRIPTOR_SET,
};
use tonic::transport::Server;
use tracing::{error, info};
#[derive(Debug)]
pub struct GrpcServer {
config: GrpcConfig,
state: GrpcState,
}
impl GrpcServer {
pub fn new(config: GrpcConfig, state: GrpcState) -> Self {
Self { config, state }
}
pub async fn serve(self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr: SocketAddr = self.config.bind_address.parse().map_err(|e| {
format!(
"Invalid gRPC bind address '{}': {}",
self.config.bind_address, e
)
})?;
info!(
address = %addr,
reflection = self.config.enable_reflection,
health = self.config.enable_health_service,
"Starting gRPC server"
);
let health_service = HealthServiceImpl::new(self.state.clone());
let task_service = TaskServiceImpl::new(self.state.clone());
let step_service = StepServiceImpl::new(self.state.clone());
let config_service = ConfigServiceImpl::new(self.state.clone());
let analytics_service = AnalyticsServiceImpl::new(self.state.clone());
let dlq_service = DlqServiceImpl::new(self.state.clone());
let template_service = TemplateServiceImpl::new(self.state.clone());
let mut server = Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(
self.config.keepalive_interval_seconds as u64,
)))
.http2_keepalive_timeout(Some(Duration::from_secs(
self.config.keepalive_timeout_seconds as u64,
)))
.max_concurrent_streams(Some(self.config.max_concurrent_streams))
.max_frame_size(Some(self.config.max_frame_size));
let mut router = server
.add_service(HealthServiceServer::new(health_service))
.add_service(TaskServiceServer::new(task_service))
.add_service(StepServiceServer::new(step_service))
.add_service(ConfigServiceServer::new(config_service))
.add_service(AnalyticsServiceServer::new(analytics_service))
.add_service(DlqServiceServer::new(dlq_service))
.add_service(TemplateServiceServer::new(template_service));
if self.config.enable_reflection {
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
.build_v1()
.map_err(|e| format!("Failed to build reflection service: {}", e))?;
router = router.add_service(reflection_service);
info!("gRPC reflection service enabled");
}
if self.config.enable_health_service {
let (health_reporter, grpc_health_service) = tonic_health::server::health_reporter();
health_reporter
.set_serving::<TaskServiceServer<TaskServiceImpl>>()
.await;
health_reporter
.set_serving::<StepServiceServer<StepServiceImpl>>()
.await;
health_reporter
.set_serving::<HealthServiceServer<HealthServiceImpl>>()
.await;
health_reporter
.set_serving::<ConfigServiceServer<ConfigServiceImpl>>()
.await;
health_reporter
.set_serving::<AnalyticsServiceServer<AnalyticsServiceImpl>>()
.await;
health_reporter
.set_serving::<DlqServiceServer<DlqServiceImpl>>()
.await;
health_reporter
.set_serving::<TemplateServiceServer<TemplateServiceImpl>>()
.await;
router = router.add_service(grpc_health_service);
info!("gRPC health service (grpc.health.v1) enabled");
}
router.serve(addr).await.map_err(|e| {
error!(error = %e, "gRPC server error");
e
})?;
Ok(())
}
pub fn spawn(self) -> GrpcServerHandle {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let addr = self.config.bind_address.clone();
let handle = tokio::spawn(async move {
tokio::select! {
result = self.serve() => {
if let Err(e) = result {
error!(error = %e, "gRPC server error");
}
}
_ = shutdown_rx => {
info!("gRPC server shutting down");
}
}
});
GrpcServerHandle {
shutdown_tx: Some(shutdown_tx),
handle,
bind_address: addr,
}
}
}
#[derive(Debug)]
pub struct GrpcServerHandle {
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
handle: tokio::task::JoinHandle<()>,
bind_address: String,
}
impl GrpcServerHandle {
pub fn bind_address(&self) -> &str {
&self.bind_address
}
pub async fn stop(mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
self.handle.await?;
Ok(())
}
}
#[expect(
dead_code,
reason = "pub(crate) gRPC infrastructure used by tonic server"
)]
pub async fn start_grpc_server(
config: GrpcConfig,
state: GrpcState,
) -> Result<GrpcServerHandle, Box<dyn std::error::Error + Send + Sync>> {
if !config.enabled {
return Err("gRPC server is disabled in configuration".into());
}
let server = GrpcServer::new(config, state);
Ok(server.spawn())
}