1use std::time::Duration;
4
5use crate::error::{ClientError, Result};
6use crate::proto::{
7 command_handler_coordinator_service_client::CommandHandlerCoordinatorServiceClient as TonicCommandHandlerClient,
8 event_query_service_client::EventQueryServiceClient as TonicQueryClient,
9 process_manager_coordinator_service_client::ProcessManagerCoordinatorServiceClient as TonicPmClient,
10 projector_coordinator_service_client::ProjectorCoordinatorServiceClient as TonicProjectorClient,
11 saga_coordinator_service_client::SagaCoordinatorServiceClient as TonicSagaClient,
12 CascadeErrorMode, CommandBook, CommandRequest, CommandResponse, EventBook,
13 ProcessManagerHandleResponse, Projection, Query, SagaResponse, SpeculateCommandHandlerRequest,
14 SpeculatePmRequest, SpeculateProjectorRequest, SpeculateSagaRequest, SyncMode,
15};
16use crate::traits;
17use async_trait::async_trait;
18use backon::{BackoffBuilder, ExponentialBuilder};
19use tonic::transport::{Channel, Endpoint, Uri};
20use tracing::warn;
21
22async fn create_channel(endpoint: &str) -> Result<Channel> {
29 let uds_path = if endpoint.starts_with('/') || endpoint.starts_with("./") {
30 Some(endpoint.to_string())
31 } else {
32 endpoint.strip_prefix("unix://").map(str::to_string)
33 };
34
35 let backoff = ExponentialBuilder::default()
36 .with_min_delay(Duration::from_millis(100))
37 .with_max_delay(Duration::from_secs(5))
38 .with_max_times(10)
39 .with_jitter()
40 .build();
41
42 let mut last_error: Option<ClientError> = None;
43
44 for (attempt, delay) in std::iter::once(Duration::ZERO).chain(backoff).enumerate() {
45 if attempt > 0 {
46 warn!(
47 endpoint = %endpoint,
48 attempt = attempt,
49 backoff_ms = %delay.as_millis(),
50 "gRPC connection failed, retrying after backoff"
51 );
52 tokio::time::sleep(delay).await;
53 }
54
55 let result = if let Some(ref path) = uds_path {
56 let path = path.clone();
60 Endpoint::try_from("http://[::]:50051")
61 .map_err(|e| ClientError::Connection { msg: e.to_string() })?
62 .connect_with_connector(tower::service_fn(move |_: Uri| {
63 let path = path.clone();
64 async move {
65 tokio::net::UnixStream::connect(path)
66 .await
67 .map(hyper_util::rt::TokioIo::new)
68 }
69 }))
70 .await
71 } else {
72 match Channel::from_shared(endpoint.to_string()) {
74 Ok(ep) => ep.connect().await,
75 Err(e) => {
76 return Err(ClientError::Connection { msg: e.to_string() });
78 }
79 }
80 };
81
82 match result {
83 Ok(channel) => return Ok(channel),
84 Err(e) => {
85 last_error = Some(ClientError::Connection {
86 msg: format!("Connection failed: {}", e),
87 });
88 }
89 }
90 }
91
92 Err(last_error.unwrap_or_else(|| ClientError::Connection {
93 msg: "Connection failed after max retries".to_string(),
94 }))
95}
96
97#[derive(Clone)]
99pub struct QueryClient {
100 inner: TonicQueryClient<Channel>,
101}
102
103impl QueryClient {
104 pub async fn connect(endpoint: &str) -> Result<Self> {
108 let channel = create_channel(endpoint).await?;
109 Ok(Self::from_channel(channel))
110 }
111
112 pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
114 let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
115 Self::connect(&endpoint).await
116 }
117
118 pub fn from_channel(channel: Channel) -> Self {
120 Self {
121 inner: TonicQueryClient::new(channel),
122 }
123 }
124
125 pub async fn get_events(&self, query: Query) -> Result<EventBook> {
127 let response = self.inner.clone().get_event_book(query).await?;
128 Ok(response.into_inner())
129 }
130}
131
132#[async_trait]
133impl traits::QueryClient for QueryClient {
134 async fn get_events(&self, query: Query) -> Result<EventBook> {
135 self.get_events(query).await
136 }
137}
138
139#[derive(Clone)]
141pub struct CommandHandlerClient {
142 inner: TonicCommandHandlerClient<Channel>,
143}
144
145impl CommandHandlerClient {
146 pub async fn connect(endpoint: &str) -> Result<Self> {
150 let channel = create_channel(endpoint).await?;
151 Ok(Self::from_channel(channel))
152 }
153
154 pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
156 let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
157 Self::connect(&endpoint).await
158 }
159
160 pub fn from_channel(channel: Channel) -> Self {
162 Self {
163 inner: TonicCommandHandlerClient::new(channel),
164 }
165 }
166
167 pub async fn handle_command(&self, command: CommandRequest) -> Result<CommandResponse> {
173 let response = self.inner.clone().handle_command(command).await?;
174 Ok(response.into_inner())
175 }
176
177 pub async fn handle(&self, command: CommandBook) -> Result<CommandResponse> {
181 self.handle_command(CommandRequest {
182 command: Some(command),
183 sync_mode: SyncMode::Async as i32,
184 cascade_error_mode: CascadeErrorMode::CascadeErrorFailFast as i32,
185 })
186 .await
187 }
188
189 pub async fn handle_sync_speculative(
191 &self,
192 request: SpeculateCommandHandlerRequest,
193 ) -> Result<CommandResponse> {
194 let response = self.inner.clone().handle_sync_speculative(request).await?;
195 Ok(response.into_inner())
196 }
197}
198
199#[async_trait]
200impl traits::GatewayClient for CommandHandlerClient {
201 async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
202 self.handle(command).await
203 }
204}
205
206#[derive(Clone)]
215pub struct DomainClient {
216 pub command_handler: CommandHandlerClient,
218 pub query: QueryClient,
220 pub speculative: SpeculativeClient,
222}
223
224impl DomainClient {
225 pub async fn connect(endpoint: &str) -> Result<Self> {
229 let channel = create_channel(endpoint).await?;
230 Ok(Self::from_channel(channel))
231 }
232
233 pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
235 let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
236 Self::connect(&endpoint).await
237 }
238
239 pub fn from_channel(channel: Channel) -> Self {
241 Self {
242 command_handler: CommandHandlerClient::from_channel(channel.clone()),
243 query: QueryClient::from_channel(channel.clone()),
244 speculative: SpeculativeClient::from_channel(channel),
245 }
246 }
247
248 pub async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
252 self.command_handler.handle(command).await
253 }
254
255 pub async fn execute_with_mode(
261 &self,
262 command: CommandBook,
263 sync_mode: SyncMode,
264 ) -> Result<CommandResponse> {
265 self.command_handler
266 .handle_command(CommandRequest {
267 command: Some(command),
268 sync_mode: sync_mode as i32,
269 cascade_error_mode: CascadeErrorMode::CascadeErrorFailFast as i32,
270 })
271 .await
272 }
273
274 pub async fn get_events(&self, query: Query) -> Result<EventBook> {
276 self.query.get_events(query).await
277 }
278}
279
280#[async_trait]
281impl traits::GatewayClient for DomainClient {
282 async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
283 self.execute(command).await
284 }
285}
286
287#[async_trait]
288impl traits::QueryClient for DomainClient {
289 async fn get_events(&self, query: Query) -> Result<EventBook> {
290 self.get_events(query).await
291 }
292}
293
294#[derive(Clone)]
299pub struct SpeculativeClient {
300 command_handler: TonicCommandHandlerClient<Channel>,
301 projector: TonicProjectorClient<Channel>,
302 saga: TonicSagaClient<Channel>,
303 pm: TonicPmClient<Channel>,
304}
305
306impl SpeculativeClient {
307 pub async fn connect(endpoint: &str) -> Result<Self> {
311 let channel = create_channel(endpoint).await?;
312 Ok(Self::from_channel(channel))
313 }
314
315 pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
317 let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
318 Self::connect(&endpoint).await
319 }
320
321 pub fn from_channel(channel: Channel) -> Self {
323 Self {
324 command_handler: TonicCommandHandlerClient::new(channel.clone()),
325 projector: TonicProjectorClient::new(channel.clone()),
326 saga: TonicSagaClient::new(channel.clone()),
327 pm: TonicPmClient::new(channel),
328 }
329 }
330}
331
332#[async_trait]
333impl traits::SpeculativeClient for SpeculativeClient {
334 async fn command_handler(
335 &self,
336 request: SpeculateCommandHandlerRequest,
337 ) -> Result<CommandResponse> {
338 let response = self
339 .command_handler
340 .clone()
341 .handle_sync_speculative(request)
342 .await?;
343 Ok(response.into_inner())
344 }
345
346 async fn projector(&self, request: SpeculateProjectorRequest) -> Result<Projection> {
347 let response = self.projector.clone().handle_speculative(request).await?;
348 Ok(response.into_inner())
349 }
350
351 async fn saga(&self, request: SpeculateSagaRequest) -> Result<SagaResponse> {
352 let response = self.saga.clone().execute_speculative(request).await?;
353 Ok(response.into_inner())
354 }
355
356 async fn process_manager(
357 &self,
358 request: SpeculatePmRequest,
359 ) -> Result<ProcessManagerHandleResponse> {
360 let response = self.pm.clone().handle_speculative(request).await?;
361 Ok(response.into_inner())
362 }
363}