use std::env;
use std::net::SocketAddr;
use std::path::PathBuf;
use tonic::transport::Server;
use tracing::info;
use crate::handler::{
CloudEventsGrpcHandler, CommandHandlerGrpc, ProcessManagerGrpcHandler, ProjectorHandler,
SagaHandler, UpcasterGrpcHandler,
};
use crate::proto::command_handler_service_server::CommandHandlerServiceServer;
use crate::proto::process_manager_service_server::ProcessManagerServiceServer;
use crate::proto::projector_service_server::ProjectorServiceServer;
use crate::proto::saga_service_server::SagaServiceServer;
use crate::proto::upcaster_service_server::UpcasterServiceServer;
use crate::router::{
CloudEventsRouter, CommandHandlerDomainHandler, CommandHandlerRouter, ProcessManagerRouter,
SagaDomainHandler, SagaRouter,
};
pub struct ServerConfig {
pub port: u16,
pub uds_path: Option<PathBuf>,
}
impl ServerConfig {
pub fn from_env(default_port: u16) -> Self {
if let (Ok(base_path), Ok(service_name), Ok(domain)) = (
env::var("UDS_BASE_PATH"),
env::var("SERVICE_NAME"),
env::var("DOMAIN"),
) {
let socket_name = format!("{}-{}.sock", service_name, domain);
let uds_path = PathBuf::from(base_path).join(socket_name);
return Self {
port: default_port,
uds_path: Some(uds_path),
};
}
let port = env::var("PORT")
.or_else(|_| env::var("GRPC_PORT"))
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(default_port);
Self {
port,
uds_path: None,
}
}
}
pub async fn run_command_handler_server<S, H>(
domain: &str,
default_port: u16,
router: CommandHandlerRouter<S, H>,
) -> Result<(), tonic::transport::Error>
where
S: Default + Send + Sync + 'static,
H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
{
let config = ServerConfig::from_env(default_port);
let handler = CommandHandlerGrpc::new(router);
let service = CommandHandlerServiceServer::new(handler);
if let Some(uds_path) = &config.uds_path {
info!(
domain = domain,
path = %uds_path.display(),
"Starting command handler server (UDS)"
);
let _ = std::fs::remove_file(uds_path);
let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
} else {
let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
info!(
domain = domain,
port = config.port,
"Starting command handler server"
);
Server::builder().add_service(service).serve(addr).await
}
}
pub async fn run_saga_server<H>(
name: &str,
default_port: u16,
router: SagaRouter<H>,
) -> Result<(), tonic::transport::Error>
where
H: SagaDomainHandler + Clone + 'static,
{
let config = ServerConfig::from_env(default_port);
let handler = SagaHandler::new(router);
let service = SagaServiceServer::new(handler);
if let Some(uds_path) = &config.uds_path {
info!(
name = name,
path = %uds_path.display(),
"Starting saga server (UDS)"
);
let _ = std::fs::remove_file(uds_path);
let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
} else {
let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
info!(name = name, port = config.port, "Starting saga server");
Server::builder().add_service(service).serve(addr).await
}
}
pub async fn run_projector_server(
name: &str,
default_port: u16,
handler: ProjectorHandler,
) -> Result<(), tonic::transport::Error> {
let config = ServerConfig::from_env(default_port);
let service = ProjectorServiceServer::new(handler);
if let Some(uds_path) = &config.uds_path {
info!(
name = name,
path = %uds_path.display(),
"Starting projector server (UDS)"
);
let _ = std::fs::remove_file(uds_path);
let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
} else {
let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
info!(name = name, port = config.port, "Starting projector server");
Server::builder().add_service(service).serve(addr).await
}
}
pub async fn run_process_manager_server<S: Default + Send + Sync + 'static>(
name: &str,
default_port: u16,
router: ProcessManagerRouter<S>,
) -> Result<(), tonic::transport::Error> {
let config = ServerConfig::from_env(default_port);
let handler = ProcessManagerGrpcHandler::new(router);
let service = ProcessManagerServiceServer::new(handler);
if let Some(uds_path) = &config.uds_path {
info!(
name = name,
path = %uds_path.display(),
"Starting process manager server (UDS)"
);
let _ = std::fs::remove_file(uds_path);
let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
} else {
let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
info!(
name = name,
port = config.port,
"Starting process manager server"
);
Server::builder().add_service(service).serve(addr).await
}
}
pub async fn run_upcaster_server(
name: &str,
default_port: u16,
handler: UpcasterGrpcHandler,
) -> Result<(), tonic::transport::Error> {
let config = ServerConfig::from_env(default_port);
let service = UpcasterServiceServer::new(handler);
if let Some(uds_path) = &config.uds_path {
info!(
name = name,
path = %uds_path.display(),
"Starting upcaster server (UDS)"
);
let _ = std::fs::remove_file(uds_path);
let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
} else {
let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
info!(name = name, port = config.port, "Starting upcaster server");
Server::builder().add_service(service).serve(addr).await
}
}
pub async fn run_cloudevents_projector(
name: &str,
default_port: u16,
router: CloudEventsRouter,
) -> Result<(), tonic::transport::Error> {
let config = ServerConfig::from_env(default_port);
let handler = CloudEventsGrpcHandler::new(router);
let service = ProjectorServiceServer::new(handler);
if let Some(uds_path) = &config.uds_path {
info!(
name = name,
path = %uds_path.display(),
"Starting CloudEvents projector server (UDS)"
);
let _ = std::fs::remove_file(uds_path);
let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
Server::builder()
.add_service(service)
.serve_with_incoming(incoming)
.await
} else {
let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
info!(
name = name,
port = config.port,
"Starting CloudEvents projector server"
);
Server::builder().add_service(service).serve(addr).await
}
}