1use std::env;
6use std::net::SocketAddr;
7use std::path::PathBuf;
8
9use tonic::transport::Server;
10use tracing::info;
11
12use crate::handler::{
13 CloudEventsGrpcHandler, CommandHandlerGrpc, ProcessManagerGrpcHandler, ProjectorHandler,
14 SagaHandler, UpcasterGrpcHandler,
15};
16use crate::proto::command_handler_service_server::CommandHandlerServiceServer;
17use crate::proto::process_manager_service_server::ProcessManagerServiceServer;
18use crate::proto::projector_service_server::ProjectorServiceServer;
19use crate::proto::saga_service_server::SagaServiceServer;
20use crate::proto::upcaster_service_server::UpcasterServiceServer;
21use crate::router::{
22 CloudEventsRouter, CommandHandlerDomainHandler, CommandHandlerRouter, ProcessManagerRouter,
23 SagaDomainHandler, SagaRouter,
24};
25
26pub struct ServerConfig {
28 pub port: u16,
30 pub uds_path: Option<PathBuf>,
32}
33
34impl ServerConfig {
35 pub fn from_env(default_port: u16) -> Self {
46 if let (Ok(base_path), Ok(service_name), Ok(domain)) = (
48 env::var("UDS_BASE_PATH"),
49 env::var("SERVICE_NAME"),
50 env::var("DOMAIN"),
51 ) {
52 let socket_name = format!("{}-{}.sock", service_name, domain);
53 let uds_path = PathBuf::from(base_path).join(socket_name);
54 return Self {
55 port: default_port,
56 uds_path: Some(uds_path),
57 };
58 }
59
60 let port = env::var("PORT")
62 .or_else(|_| env::var("GRPC_PORT"))
63 .ok()
64 .and_then(|s| s.parse().ok())
65 .unwrap_or(default_port);
66
67 Self {
68 port,
69 uds_path: None,
70 }
71 }
72}
73
74pub async fn run_command_handler_server<S, H>(
92 domain: &str,
93 default_port: u16,
94 router: CommandHandlerRouter<S, H>,
95) -> Result<(), tonic::transport::Error>
96where
97 S: Default + Send + Sync + 'static,
98 H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
99{
100 let config = ServerConfig::from_env(default_port);
101 let handler = CommandHandlerGrpc::new(router);
102 let service = CommandHandlerServiceServer::new(handler);
103
104 if let Some(uds_path) = &config.uds_path {
105 info!(
107 domain = domain,
108 path = %uds_path.display(),
109 "Starting command handler server (UDS)"
110 );
111
112 let _ = std::fs::remove_file(uds_path);
114
115 let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
116 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
117
118 Server::builder()
119 .add_service(service)
120 .serve_with_incoming(incoming)
121 .await
122 } else {
123 let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
125
126 info!(
127 domain = domain,
128 port = config.port,
129 "Starting command handler server"
130 );
131
132 Server::builder().add_service(service).serve(addr).await
133 }
134}
135
136pub async fn run_saga_server<H>(
153 name: &str,
154 default_port: u16,
155 router: SagaRouter<H>,
156) -> Result<(), tonic::transport::Error>
157where
158 H: SagaDomainHandler + Clone + 'static,
159{
160 let config = ServerConfig::from_env(default_port);
161 let handler = SagaHandler::new(router);
162 let service = SagaServiceServer::new(handler);
163
164 if let Some(uds_path) = &config.uds_path {
165 info!(
167 name = name,
168 path = %uds_path.display(),
169 "Starting saga server (UDS)"
170 );
171
172 let _ = std::fs::remove_file(uds_path);
173
174 let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
175 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
176
177 Server::builder()
178 .add_service(service)
179 .serve_with_incoming(incoming)
180 .await
181 } else {
182 let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
184
185 info!(name = name, port = config.port, "Starting saga server");
186
187 Server::builder().add_service(service).serve(addr).await
188 }
189}
190
191pub async fn run_projector_server(
208 name: &str,
209 default_port: u16,
210 handler: ProjectorHandler,
211) -> Result<(), tonic::transport::Error> {
212 let config = ServerConfig::from_env(default_port);
213 let service = ProjectorServiceServer::new(handler);
214
215 if let Some(uds_path) = &config.uds_path {
216 info!(
218 name = name,
219 path = %uds_path.display(),
220 "Starting projector server (UDS)"
221 );
222
223 let _ = std::fs::remove_file(uds_path);
224
225 let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
226 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
227
228 Server::builder()
229 .add_service(service)
230 .serve_with_incoming(incoming)
231 .await
232 } else {
233 let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
235
236 info!(name = name, port = config.port, "Starting projector server");
237
238 Server::builder().add_service(service).serve(addr).await
239 }
240}
241
242pub async fn run_process_manager_server<S: Default + Send + Sync + 'static>(
260 name: &str,
261 default_port: u16,
262 router: ProcessManagerRouter<S>,
263) -> Result<(), tonic::transport::Error> {
264 let config = ServerConfig::from_env(default_port);
265 let handler = ProcessManagerGrpcHandler::new(router);
266 let service = ProcessManagerServiceServer::new(handler);
267
268 if let Some(uds_path) = &config.uds_path {
269 info!(
271 name = name,
272 path = %uds_path.display(),
273 "Starting process manager server (UDS)"
274 );
275
276 let _ = std::fs::remove_file(uds_path);
277
278 let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
279 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
280
281 Server::builder()
282 .add_service(service)
283 .serve_with_incoming(incoming)
284 .await
285 } else {
286 let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
288
289 info!(
290 name = name,
291 port = config.port,
292 "Starting process manager server"
293 );
294
295 Server::builder().add_service(service).serve(addr).await
296 }
297}
298
299pub async fn run_upcaster_server(
326 name: &str,
327 default_port: u16,
328 handler: UpcasterGrpcHandler,
329) -> Result<(), tonic::transport::Error> {
330 let config = ServerConfig::from_env(default_port);
331 let service = UpcasterServiceServer::new(handler);
332
333 if let Some(uds_path) = &config.uds_path {
334 info!(
336 name = name,
337 path = %uds_path.display(),
338 "Starting upcaster server (UDS)"
339 );
340
341 let _ = std::fs::remove_file(uds_path);
342
343 let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
344 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
345
346 Server::builder()
347 .add_service(service)
348 .serve_with_incoming(incoming)
349 .await
350 } else {
351 let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
353
354 info!(name = name, port = config.port, "Starting upcaster server");
355
356 Server::builder().add_service(service).serve(addr).await
357 }
358}
359
360pub async fn run_cloudevents_projector(
390 name: &str,
391 default_port: u16,
392 router: CloudEventsRouter,
393) -> Result<(), tonic::transport::Error> {
394 let config = ServerConfig::from_env(default_port);
395 let handler = CloudEventsGrpcHandler::new(router);
396 let service = ProjectorServiceServer::new(handler);
397
398 if let Some(uds_path) = &config.uds_path {
399 info!(
401 name = name,
402 path = %uds_path.display(),
403 "Starting CloudEvents projector server (UDS)"
404 );
405
406 let _ = std::fs::remove_file(uds_path);
407
408 let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
409 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
410
411 Server::builder()
412 .add_service(service)
413 .serve_with_incoming(incoming)
414 .await
415 } else {
416 let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
418
419 info!(
420 name = name,
421 port = config.port,
422 "Starting CloudEvents projector server"
423 );
424
425 Server::builder().add_service(service).serve(addr).await
426 }
427}