Skip to main content

angzarr_client/
client.rs

1//! Default client implementations wrapping tonic gRPC clients.
2
3use std::time::Duration;
4
5use crate::error::{ClientError, Result};
6use crate::proto::{
7    command_handler_coordinator_service_client::CommandHandlerCoordinatorServiceClient as TonicCommandHandlerClient,
8    event_query_service_client::EventQueryServiceClient as TonicQueryClient,
9    process_manager_coordinator_service_client::ProcessManagerCoordinatorServiceClient as TonicPmClient,
10    projector_coordinator_service_client::ProjectorCoordinatorServiceClient as TonicProjectorClient,
11    saga_coordinator_service_client::SagaCoordinatorServiceClient as TonicSagaClient,
12    CascadeErrorMode, CommandBook, CommandRequest, CommandResponse, EventBook,
13    ProcessManagerHandleResponse, Projection, Query, SagaResponse, SpeculateCommandHandlerRequest,
14    SpeculatePmRequest, SpeculateProjectorRequest, SpeculateSagaRequest, SyncMode,
15};
16use crate::traits;
17use async_trait::async_trait;
18use backon::{BackoffBuilder, ExponentialBuilder};
19use tonic::transport::{Channel, Endpoint, Uri};
20use tracing::warn;
21
22/// Create a gRPC channel from an endpoint string.
23///
24/// Supports both TCP (host:port or http://host:port) and Unix Domain Sockets.
25/// UDS paths are detected by leading '/' or './' and use a custom connector.
26///
27/// Retries connection with exponential backoff (100ms-5s, 10 attempts) on failure.
28async fn create_channel(endpoint: &str) -> Result<Channel> {
29    let uds_path = if endpoint.starts_with('/') || endpoint.starts_with("./") {
30        Some(endpoint.to_string())
31    } else {
32        endpoint.strip_prefix("unix://").map(str::to_string)
33    };
34
35    let backoff = ExponentialBuilder::default()
36        .with_min_delay(Duration::from_millis(100))
37        .with_max_delay(Duration::from_secs(5))
38        .with_max_times(10)
39        .with_jitter()
40        .build();
41
42    let mut last_error: Option<ClientError> = None;
43
44    for (attempt, delay) in std::iter::once(Duration::ZERO).chain(backoff).enumerate() {
45        if attempt > 0 {
46            warn!(
47                endpoint = %endpoint,
48                attempt = attempt,
49                backoff_ms = %delay.as_millis(),
50                "gRPC connection failed, retrying after backoff"
51            );
52            tokio::time::sleep(delay).await;
53        }
54
55        let result = if let Some(ref path) = uds_path {
56            // Unix Domain Socket - use custom connector
57            // NOTE: The URI is ignored for UDS, but tonic requires a valid one.
58            // We use a dummy URI and override the connector to use UnixStream.
59            let path = path.clone();
60            Endpoint::try_from("http://[::]:50051")
61                .map_err(|e| ClientError::Connection { msg: e.to_string() })?
62                .connect_with_connector(tower::service_fn(move |_: Uri| {
63                    let path = path.clone();
64                    async move {
65                        tokio::net::UnixStream::connect(path)
66                            .await
67                            .map(hyper_util::rt::TokioIo::new)
68                    }
69                }))
70                .await
71        } else {
72            // TCP endpoint
73            match Channel::from_shared(endpoint.to_string()) {
74                Ok(ep) => ep.connect().await,
75                Err(e) => {
76                    // Invalid URI is not retryable
77                    return Err(ClientError::Connection { msg: e.to_string() });
78                }
79            }
80        };
81
82        match result {
83            Ok(channel) => return Ok(channel),
84            Err(e) => {
85                last_error = Some(ClientError::Connection {
86                    msg: format!("Connection failed: {}", e),
87                });
88            }
89        }
90    }
91
92    Err(last_error.unwrap_or_else(|| ClientError::Connection {
93        msg: "Connection failed after max retries".to_string(),
94    }))
95}
96
97/// Default event query client using tonic gRPC.
98#[derive(Clone)]
99pub struct QueryClient {
100    inner: TonicQueryClient<Channel>,
101}
102
103impl QueryClient {
104    /// Connect to an event query service at the given endpoint.
105    ///
106    /// Supports both TCP (host:port) and Unix Domain Sockets (file paths).
107    pub async fn connect(endpoint: &str) -> Result<Self> {
108        let channel = create_channel(endpoint).await?;
109        Ok(Self::from_channel(channel))
110    }
111
112    /// Connect using an endpoint from environment variable with fallback.
113    pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
114        let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
115        Self::connect(&endpoint).await
116    }
117
118    /// Create a client from an existing channel.
119    pub fn from_channel(channel: Channel) -> Self {
120        Self {
121            inner: TonicQueryClient::new(channel),
122        }
123    }
124
125    /// Query events for an aggregate.
126    pub async fn get_events(&self, query: Query) -> Result<EventBook> {
127        let response = self.inner.clone().get_event_book(query).await?;
128        Ok(response.into_inner())
129    }
130}
131
132#[async_trait]
133impl traits::QueryClient for QueryClient {
134    async fn get_events(&self, query: Query) -> Result<EventBook> {
135        self.get_events(query).await
136    }
137}
138
139/// Default command handler coordinator client using tonic gRPC.
140#[derive(Clone)]
141pub struct CommandHandlerClient {
142    inner: TonicCommandHandlerClient<Channel>,
143}
144
145impl CommandHandlerClient {
146    /// Connect to a command handler coordinator at the given endpoint.
147    ///
148    /// Supports both TCP (host:port) and Unix Domain Sockets (file paths).
149    pub async fn connect(endpoint: &str) -> Result<Self> {
150        let channel = create_channel(endpoint).await?;
151        Ok(Self::from_channel(channel))
152    }
153
154    /// Connect using an endpoint from environment variable with fallback.
155    pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
156        let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
157        Self::connect(&endpoint).await
158    }
159
160    /// Create a client from an existing channel.
161    pub fn from_channel(channel: Channel) -> Self {
162        Self {
163            inner: TonicCommandHandlerClient::new(channel),
164        }
165    }
166
167    /// Execute a command with specified sync mode.
168    ///
169    /// Use `SyncMode::Async` for fire-and-forget (default).
170    /// Use `SyncMode::Simple` to wait for sync projectors.
171    /// Use `SyncMode::Cascade` for full sync including saga cascade.
172    pub async fn handle_command(&self, command: CommandRequest) -> Result<CommandResponse> {
173        let response = self.inner.clone().handle_command(command).await?;
174        Ok(response.into_inner())
175    }
176
177    /// Execute a command asynchronously (fire-and-forget).
178    ///
179    /// Convenience method that wraps CommandBook in CommandRequest with async sync mode.
180    pub async fn handle(&self, command: CommandBook) -> Result<CommandResponse> {
181        self.handle_command(CommandRequest {
182            command: Some(command),
183            sync_mode: SyncMode::Async as i32,
184            cascade_error_mode: CascadeErrorMode::CascadeErrorFailFast as i32,
185        })
186        .await
187    }
188
189    /// Speculative execution against temporal state.
190    pub async fn handle_sync_speculative(
191        &self,
192        request: SpeculateCommandHandlerRequest,
193    ) -> Result<CommandResponse> {
194        let response = self.inner.clone().handle_sync_speculative(request).await?;
195        Ok(response.into_inner())
196    }
197}
198
199#[async_trait]
200impl traits::GatewayClient for CommandHandlerClient {
201    async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
202        self.handle(command).await
203    }
204}
205
206/// Per-domain client combining command execution, event querying, and speculative operations.
207///
208/// Connects to a single domain's endpoint and provides:
209/// - Command execution via `command_handler`
210/// - Event querying via `query`
211/// - Speculative (what-if) execution via `speculative`
212///
213/// Matches the distributed architecture where each domain has its own coordinator service.
214#[derive(Clone)]
215pub struct DomainClient {
216    /// Command handler client for command execution.
217    pub command_handler: CommandHandlerClient,
218    /// Query client for event retrieval.
219    pub query: QueryClient,
220    /// Speculative client for dry-run and what-if scenarios.
221    pub speculative: SpeculativeClient,
222}
223
224impl DomainClient {
225    /// Connect to a domain's coordinator at the given endpoint.
226    ///
227    /// Supports both TCP (host:port) and Unix Domain Sockets (file paths).
228    pub async fn connect(endpoint: &str) -> Result<Self> {
229        let channel = create_channel(endpoint).await?;
230        Ok(Self::from_channel(channel))
231    }
232
233    /// Connect using an endpoint from environment variable with fallback.
234    pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
235        let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
236        Self::connect(&endpoint).await
237    }
238
239    /// Create a client from an existing channel.
240    pub fn from_channel(channel: Channel) -> Self {
241        Self {
242            command_handler: CommandHandlerClient::from_channel(channel.clone()),
243            query: QueryClient::from_channel(channel.clone()),
244            speculative: SpeculativeClient::from_channel(channel),
245        }
246    }
247
248    /// Execute a command asynchronously (fire-and-forget).
249    ///
250    /// Use `execute_with_mode()` to specify a different sync mode.
251    pub async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
252        self.command_handler.handle(command).await
253    }
254
255    /// Execute a command with the specified sync mode.
256    ///
257    /// Use `SyncMode::Async` for fire-and-forget (default).
258    /// Use `SyncMode::Simple` to wait for sync projectors.
259    /// Use `SyncMode::Cascade` for full sync including saga cascade.
260    pub async fn execute_with_mode(
261        &self,
262        command: CommandBook,
263        sync_mode: SyncMode,
264    ) -> Result<CommandResponse> {
265        self.command_handler
266            .handle_command(CommandRequest {
267                command: Some(command),
268                sync_mode: sync_mode as i32,
269                cascade_error_mode: CascadeErrorMode::CascadeErrorFailFast as i32,
270            })
271            .await
272    }
273
274    /// Query events (delegates to query client).
275    pub async fn get_events(&self, query: Query) -> Result<EventBook> {
276        self.query.get_events(query).await
277    }
278}
279
280#[async_trait]
281impl traits::GatewayClient for DomainClient {
282    async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
283        self.execute(command).await
284    }
285}
286
287#[async_trait]
288impl traits::QueryClient for DomainClient {
289    async fn get_events(&self, query: Query) -> Result<EventBook> {
290        self.get_events(query).await
291    }
292}
293
294/// Speculative client for what-if scenarios.
295///
296/// Provides speculative execution across different coordinator types.
297/// Each method targets a specific coordinator's speculative RPC.
298#[derive(Clone)]
299pub struct SpeculativeClient {
300    command_handler: TonicCommandHandlerClient<Channel>,
301    projector: TonicProjectorClient<Channel>,
302    saga: TonicSagaClient<Channel>,
303    pm: TonicPmClient<Channel>,
304}
305
306impl SpeculativeClient {
307    /// Connect to services at the given endpoint.
308    ///
309    /// Supports both TCP (host:port) and Unix Domain Sockets (file paths).
310    pub async fn connect(endpoint: &str) -> Result<Self> {
311        let channel = create_channel(endpoint).await?;
312        Ok(Self::from_channel(channel))
313    }
314
315    /// Connect using an endpoint from environment variable with fallback.
316    pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
317        let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
318        Self::connect(&endpoint).await
319    }
320
321    /// Create a client from an existing channel.
322    pub fn from_channel(channel: Channel) -> Self {
323        Self {
324            command_handler: TonicCommandHandlerClient::new(channel.clone()),
325            projector: TonicProjectorClient::new(channel.clone()),
326            saga: TonicSagaClient::new(channel.clone()),
327            pm: TonicPmClient::new(channel),
328        }
329    }
330}
331
332#[async_trait]
333impl traits::SpeculativeClient for SpeculativeClient {
334    async fn command_handler(
335        &self,
336        request: SpeculateCommandHandlerRequest,
337    ) -> Result<CommandResponse> {
338        let response = self
339            .command_handler
340            .clone()
341            .handle_sync_speculative(request)
342            .await?;
343        Ok(response.into_inner())
344    }
345
346    async fn projector(&self, request: SpeculateProjectorRequest) -> Result<Projection> {
347        let response = self.projector.clone().handle_speculative(request).await?;
348        Ok(response.into_inner())
349    }
350
351    async fn saga(&self, request: SpeculateSagaRequest) -> Result<SagaResponse> {
352        let response = self.saga.clone().execute_speculative(request).await?;
353        Ok(response.into_inner())
354    }
355
356    async fn process_manager(
357        &self,
358        request: SpeculatePmRequest,
359    ) -> Result<ProcessManagerHandleResponse> {
360        let response = self.pm.clone().handle_speculative(request).await?;
361        Ok(response.into_inner())
362    }
363}