Skip to main content

angzarr_client/
server.rs

1//! gRPC server utilities for running aggregate and saga services.
2//!
3//! This module provides helpers for starting gRPC servers with TCP or UDS transport.
4
5use std::env;
6use std::net::SocketAddr;
7use std::path::PathBuf;
8
9use tonic::transport::Server;
10use tracing::info;
11
12use crate::handler::{
13    CloudEventsGrpcHandler, CommandHandlerGrpc, ProcessManagerGrpcHandler, ProjectorHandler,
14    SagaHandler, UpcasterGrpcHandler,
15};
16use crate::proto::command_handler_service_server::CommandHandlerServiceServer;
17use crate::proto::process_manager_service_server::ProcessManagerServiceServer;
18use crate::proto::projector_service_server::ProjectorServiceServer;
19use crate::proto::saga_service_server::SagaServiceServer;
20use crate::proto::upcaster_service_server::UpcasterServiceServer;
21use crate::router::{
22    CloudEventsRouter, CommandHandlerDomainHandler, CommandHandlerRouter, ProcessManagerRouter,
23    SagaDomainHandler, SagaRouter,
24};
25
26/// Configuration for a gRPC server.
27pub struct ServerConfig {
28    /// Port to listen on (TCP mode).
29    pub port: u16,
30    /// Unix domain socket path (UDS mode).
31    pub uds_path: Option<PathBuf>,
32}
33
34impl ServerConfig {
35    /// Create config from environment variables.
36    ///
37    /// UDS mode (standalone):
38    /// - `UDS_BASE_PATH`: Base directory for UDS sockets
39    /// - `SERVICE_NAME`: Service name (e.g., "business")
40    /// - `DOMAIN`: Domain name (e.g., "player")
41    ///   => Socket path: `{UDS_BASE_PATH}/{SERVICE_NAME}-{DOMAIN}.sock`
42    ///
43    /// TCP mode (distributed):
44    /// - `PORT` or `GRPC_PORT`: TCP port (default: `default_port`)
45    pub fn from_env(default_port: u16) -> Self {
46        // Check for UDS mode first
47        if let (Ok(base_path), Ok(service_name), Ok(domain)) = (
48            env::var("UDS_BASE_PATH"),
49            env::var("SERVICE_NAME"),
50            env::var("DOMAIN"),
51        ) {
52            let socket_name = format!("{}-{}.sock", service_name, domain);
53            let uds_path = PathBuf::from(base_path).join(socket_name);
54            return Self {
55                port: default_port,
56                uds_path: Some(uds_path),
57            };
58        }
59
60        // Fall back to TCP mode
61        let port = env::var("PORT")
62            .or_else(|_| env::var("GRPC_PORT"))
63            .ok()
64            .and_then(|s| s.parse().ok())
65            .unwrap_or(default_port);
66
67        Self {
68            port,
69            uds_path: None,
70        }
71    }
72}
73
74/// Run a command handler service with the given router.
75///
76/// Supports both TCP and Unix domain socket (UDS) transport.
77/// UDS is used when `UDS_BASE_PATH`, `SERVICE_NAME`, and `DOMAIN` env vars are set.
78///
79/// # Example
80///
81/// ```rust,ignore
82/// use angzarr_client::{run_command_handler_server, CommandHandlerRouter};
83///
84/// #[tokio::main]
85/// async fn main() {
86///     let router = CommandHandlerRouter::new("player", "player", PlayerHandler::new());
87///
88///     run_command_handler_server("player", 50001, router).await;
89/// }
90/// ```
91pub async fn run_command_handler_server<S, H>(
92    domain: &str,
93    default_port: u16,
94    router: CommandHandlerRouter<S, H>,
95) -> Result<(), tonic::transport::Error>
96where
97    S: Default + Send + Sync + 'static,
98    H: CommandHandlerDomainHandler<State = S> + Clone + 'static,
99{
100    let config = ServerConfig::from_env(default_port);
101    let handler = CommandHandlerGrpc::new(router);
102    let service = CommandHandlerServiceServer::new(handler);
103
104    if let Some(uds_path) = &config.uds_path {
105        // UDS mode (standalone)
106        info!(
107            domain = domain,
108            path = %uds_path.display(),
109            "Starting command handler server (UDS)"
110        );
111
112        // Remove existing socket file if present
113        let _ = std::fs::remove_file(uds_path);
114
115        let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
116        let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
117
118        Server::builder()
119            .add_service(service)
120            .serve_with_incoming(incoming)
121            .await
122    } else {
123        // TCP mode (distributed)
124        let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
125
126        info!(
127            domain = domain,
128            port = config.port,
129            "Starting command handler server"
130        );
131
132        Server::builder().add_service(service).serve(addr).await
133    }
134}
135
136/// Run a saga service with the given router.
137///
138/// Supports both TCP and Unix domain socket (UDS) transport.
139///
140/// # Example
141///
142/// ```rust,ignore
143/// use angzarr_client::{run_saga_server, SagaRouter};
144///
145/// #[tokio::main]
146/// async fn main() {
147///     let router = SagaRouter::new("saga-order-fulfillment", "order", OrderHandler::new());
148///
149///     run_saga_server("saga-order-fulfillment", 50010, router).await;
150/// }
151/// ```
152pub async fn run_saga_server<H>(
153    name: &str,
154    default_port: u16,
155    router: SagaRouter<H>,
156) -> Result<(), tonic::transport::Error>
157where
158    H: SagaDomainHandler + Clone + 'static,
159{
160    let config = ServerConfig::from_env(default_port);
161    let handler = SagaHandler::new(router);
162    let service = SagaServiceServer::new(handler);
163
164    if let Some(uds_path) = &config.uds_path {
165        // UDS mode
166        info!(
167            name = name,
168            path = %uds_path.display(),
169            "Starting saga server (UDS)"
170        );
171
172        let _ = std::fs::remove_file(uds_path);
173
174        let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
175        let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
176
177        Server::builder()
178            .add_service(service)
179            .serve_with_incoming(incoming)
180            .await
181    } else {
182        // TCP mode
183        let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
184
185        info!(name = name, port = config.port, "Starting saga server");
186
187        Server::builder().add_service(service).serve(addr).await
188    }
189}
190
191/// Run a projector service with the given handler.
192///
193/// Supports both TCP and Unix domain socket (UDS) transport.
194///
195/// # Example
196///
197/// ```rust,ignore
198/// use angzarr_client::{run_projector_server, ProjectorHandler};
199///
200/// #[tokio::main]
201/// async fn main() {
202///     let handler = ProjectorHandler::new("output").with_handle(handle_events);
203///
204///     run_projector_server("output", 9090, handler).await;
205/// }
206/// ```
207pub async fn run_projector_server(
208    name: &str,
209    default_port: u16,
210    handler: ProjectorHandler,
211) -> Result<(), tonic::transport::Error> {
212    let config = ServerConfig::from_env(default_port);
213    let service = ProjectorServiceServer::new(handler);
214
215    if let Some(uds_path) = &config.uds_path {
216        // UDS mode
217        info!(
218            name = name,
219            path = %uds_path.display(),
220            "Starting projector server (UDS)"
221        );
222
223        let _ = std::fs::remove_file(uds_path);
224
225        let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
226        let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
227
228        Server::builder()
229            .add_service(service)
230            .serve_with_incoming(incoming)
231            .await
232    } else {
233        // TCP mode
234        let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
235
236        info!(name = name, port = config.port, "Starting projector server");
237
238        Server::builder().add_service(service).serve(addr).await
239    }
240}
241
242/// Run a process manager service with the given router.
243///
244/// Supports both TCP and Unix domain socket (UDS) transport.
245///
246/// # Example
247///
248/// ```rust,ignore
249/// use angzarr_client::{run_process_manager_server, ProcessManagerRouter};
250///
251/// #[tokio::main]
252/// async fn main() {
253///     let router = ProcessManagerRouter::new("hand-flow", "hand-flow", rebuild_state)
254///         .domain("table", TablePmHandler::new());
255///
256///     run_process_manager_server("hand-flow", 9091, router).await;
257/// }
258/// ```
259pub async fn run_process_manager_server<S: Default + Send + Sync + 'static>(
260    name: &str,
261    default_port: u16,
262    router: ProcessManagerRouter<S>,
263) -> Result<(), tonic::transport::Error> {
264    let config = ServerConfig::from_env(default_port);
265    let handler = ProcessManagerGrpcHandler::new(router);
266    let service = ProcessManagerServiceServer::new(handler);
267
268    if let Some(uds_path) = &config.uds_path {
269        // UDS mode
270        info!(
271            name = name,
272            path = %uds_path.display(),
273            "Starting process manager server (UDS)"
274        );
275
276        let _ = std::fs::remove_file(uds_path);
277
278        let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
279        let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
280
281        Server::builder()
282            .add_service(service)
283            .serve_with_incoming(incoming)
284            .await
285    } else {
286        // TCP mode
287        let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
288
289        info!(
290            name = name,
291            port = config.port,
292            "Starting process manager server"
293        );
294
295        Server::builder().add_service(service).serve(addr).await
296    }
297}
298
299/// Run an upcaster service with the given handler.
300///
301/// Supports both TCP and Unix domain socket (UDS) transport.
302///
303/// # Example
304///
305/// ```rust,ignore
306/// use angzarr_client::{run_upcaster_server, UpcasterGrpcHandler, UpcasterRouter};
307///
308/// fn upcast_events(events: &[EventPage]) -> Vec<EventPage> {
309///     let router = UpcasterRouter::new("player")
310///         .on("PlayerRegisteredV1", |old| {
311///             // Transform old event to new version
312///             old.clone()
313///         });
314///     router.upcast(events)
315/// }
316///
317/// #[tokio::main]
318/// async fn main() {
319///     let handler = UpcasterGrpcHandler::new("upcaster-player", "player")
320///         .with_handle(upcast_events);
321///
322///     run_upcaster_server("upcaster-player", 50401, handler).await;
323/// }
324/// ```
325pub async fn run_upcaster_server(
326    name: &str,
327    default_port: u16,
328    handler: UpcasterGrpcHandler,
329) -> Result<(), tonic::transport::Error> {
330    let config = ServerConfig::from_env(default_port);
331    let service = UpcasterServiceServer::new(handler);
332
333    if let Some(uds_path) = &config.uds_path {
334        // UDS mode
335        info!(
336            name = name,
337            path = %uds_path.display(),
338            "Starting upcaster server (UDS)"
339        );
340
341        let _ = std::fs::remove_file(uds_path);
342
343        let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
344        let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
345
346        Server::builder()
347            .add_service(service)
348            .serve_with_incoming(incoming)
349            .await
350    } else {
351        // TCP mode
352        let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
353
354        info!(name = name, port = config.port, "Starting upcaster server");
355
356        Server::builder().add_service(service).serve(addr).await
357    }
358}
359
360/// Run a CloudEvents projector service with the given router.
361///
362/// CloudEvents projectors transform domain events into CloudEvents 1.0 format
363/// for external consumption via HTTP webhooks or Kafka.
364///
365/// Supports both TCP and Unix domain socket (UDS) transport.
366///
367/// # Example
368///
369/// ```rust,ignore
370/// use angzarr_client::{run_cloudevents_projector, CloudEventsRouter};
371/// use angzarr_client::proto::angzarr::CloudEvent;
372/// use angzarr_client::proto::examples::PlayerRegistered;
373///
374/// fn handle_player_registered(event: &PlayerRegistered) -> Option<CloudEvent> {
375///     Some(CloudEvent {
376///         r#type: "com.poker.player.registered".into(),
377///         ..Default::default()
378///     })
379/// }
380///
381/// #[tokio::main]
382/// async fn main() {
383///     let router = CloudEventsRouter::new("prj-player-cloudevents", "player")
384///         .on::<PlayerRegistered>(handle_player_registered);
385///
386///     run_cloudevents_projector("prj-player-cloudevents", 50091, router).await;
387/// }
388/// ```
389pub async fn run_cloudevents_projector(
390    name: &str,
391    default_port: u16,
392    router: CloudEventsRouter,
393) -> Result<(), tonic::transport::Error> {
394    let config = ServerConfig::from_env(default_port);
395    let handler = CloudEventsGrpcHandler::new(router);
396    let service = ProjectorServiceServer::new(handler);
397
398    if let Some(uds_path) = &config.uds_path {
399        // UDS mode
400        info!(
401            name = name,
402            path = %uds_path.display(),
403            "Starting CloudEvents projector server (UDS)"
404        );
405
406        let _ = std::fs::remove_file(uds_path);
407
408        let uds = tokio::net::UnixListener::bind(uds_path).expect("Failed to bind UDS socket");
409        let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
410
411        Server::builder()
412            .add_service(service)
413            .serve_with_incoming(incoming)
414            .await
415    } else {
416        // TCP mode
417        let addr: SocketAddr = format!("0.0.0.0:{}", config.port).parse().unwrap();
418
419        info!(
420            name = name,
421            port = config.port,
422            "Starting CloudEvents projector server"
423        );
424
425        Server::builder().add_service(service).serve(addr).await
426    }
427}