use std::time::Duration;
use crate::error::{ClientError, Result};
use crate::proto::{
command_handler_coordinator_service_client::CommandHandlerCoordinatorServiceClient as TonicCommandHandlerClient,
event_query_service_client::EventQueryServiceClient as TonicQueryClient,
process_manager_coordinator_service_client::ProcessManagerCoordinatorServiceClient as TonicPmClient,
projector_coordinator_service_client::ProjectorCoordinatorServiceClient as TonicProjectorClient,
saga_coordinator_service_client::SagaCoordinatorServiceClient as TonicSagaClient,
CascadeErrorMode, CommandBook, CommandRequest, CommandResponse, EventBook,
ProcessManagerHandleResponse, Projection, Query, SagaResponse, SpeculateCommandHandlerRequest,
SpeculatePmRequest, SpeculateProjectorRequest, SpeculateSagaRequest, SyncMode,
};
use crate::traits;
use async_trait::async_trait;
use backon::{BackoffBuilder, ExponentialBuilder};
use tonic::transport::{Channel, Endpoint, Uri};
use tracing::warn;
async fn create_channel(endpoint: &str) -> Result<Channel> {
let uds_path = if endpoint.starts_with('/') || endpoint.starts_with("./") {
Some(endpoint.to_string())
} else {
endpoint.strip_prefix("unix://").map(str::to_string)
};
let backoff = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(100))
.with_max_delay(Duration::from_secs(5))
.with_max_times(10)
.with_jitter()
.build();
let mut last_error: Option<ClientError> = None;
for (attempt, delay) in std::iter::once(Duration::ZERO).chain(backoff).enumerate() {
if attempt > 0 {
warn!(
endpoint = %endpoint,
attempt = attempt,
backoff_ms = %delay.as_millis(),
"gRPC connection failed, retrying after backoff"
);
tokio::time::sleep(delay).await;
}
let result = if let Some(ref path) = uds_path {
let path = path.clone();
Endpoint::try_from("http://[::]:50051")
.map_err(|e| ClientError::Connection { msg: e.to_string() })?
.connect_with_connector(tower::service_fn(move |_: Uri| {
let path = path.clone();
async move {
tokio::net::UnixStream::connect(path)
.await
.map(hyper_util::rt::TokioIo::new)
}
}))
.await
} else {
match Channel::from_shared(endpoint.to_string()) {
Ok(ep) => ep.connect().await,
Err(e) => {
return Err(ClientError::Connection { msg: e.to_string() });
}
}
};
match result {
Ok(channel) => return Ok(channel),
Err(e) => {
last_error = Some(ClientError::Connection {
msg: format!("Connection failed: {}", e),
});
}
}
}
Err(last_error.unwrap_or_else(|| ClientError::Connection {
msg: "Connection failed after max retries".to_string(),
}))
}
#[derive(Clone)]
pub struct QueryClient {
inner: TonicQueryClient<Channel>,
}
impl QueryClient {
pub async fn connect(endpoint: &str) -> Result<Self> {
let channel = create_channel(endpoint).await?;
Ok(Self::from_channel(channel))
}
pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
Self::connect(&endpoint).await
}
pub fn from_channel(channel: Channel) -> Self {
Self {
inner: TonicQueryClient::new(channel),
}
}
pub async fn get_events(&self, query: Query) -> Result<EventBook> {
let response = self.inner.clone().get_event_book(query).await?;
Ok(response.into_inner())
}
}
#[async_trait]
impl traits::QueryClient for QueryClient {
async fn get_events(&self, query: Query) -> Result<EventBook> {
self.get_events(query).await
}
}
#[derive(Clone)]
pub struct CommandHandlerClient {
inner: TonicCommandHandlerClient<Channel>,
}
impl CommandHandlerClient {
pub async fn connect(endpoint: &str) -> Result<Self> {
let channel = create_channel(endpoint).await?;
Ok(Self::from_channel(channel))
}
pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
Self::connect(&endpoint).await
}
pub fn from_channel(channel: Channel) -> Self {
Self {
inner: TonicCommandHandlerClient::new(channel),
}
}
pub async fn handle_command(&self, command: CommandRequest) -> Result<CommandResponse> {
let response = self.inner.clone().handle_command(command).await?;
Ok(response.into_inner())
}
pub async fn handle(&self, command: CommandBook) -> Result<CommandResponse> {
self.handle_command(CommandRequest {
command: Some(command),
sync_mode: SyncMode::Async as i32,
cascade_error_mode: CascadeErrorMode::CascadeErrorFailFast as i32,
})
.await
}
pub async fn handle_sync_speculative(
&self,
request: SpeculateCommandHandlerRequest,
) -> Result<CommandResponse> {
let response = self.inner.clone().handle_sync_speculative(request).await?;
Ok(response.into_inner())
}
}
#[async_trait]
impl traits::GatewayClient for CommandHandlerClient {
async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
self.handle(command).await
}
}
#[derive(Clone)]
pub struct DomainClient {
pub command_handler: CommandHandlerClient,
pub query: QueryClient,
pub speculative: SpeculativeClient,
}
impl DomainClient {
pub async fn connect(endpoint: &str) -> Result<Self> {
let channel = create_channel(endpoint).await?;
Ok(Self::from_channel(channel))
}
pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
Self::connect(&endpoint).await
}
pub fn from_channel(channel: Channel) -> Self {
Self {
command_handler: CommandHandlerClient::from_channel(channel.clone()),
query: QueryClient::from_channel(channel.clone()),
speculative: SpeculativeClient::from_channel(channel),
}
}
pub async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
self.command_handler.handle(command).await
}
pub async fn execute_with_mode(
&self,
command: CommandBook,
sync_mode: SyncMode,
) -> Result<CommandResponse> {
self.command_handler
.handle_command(CommandRequest {
command: Some(command),
sync_mode: sync_mode as i32,
cascade_error_mode: CascadeErrorMode::CascadeErrorFailFast as i32,
})
.await
}
pub async fn get_events(&self, query: Query) -> Result<EventBook> {
self.query.get_events(query).await
}
}
#[async_trait]
impl traits::GatewayClient for DomainClient {
async fn execute(&self, command: CommandBook) -> Result<CommandResponse> {
self.execute(command).await
}
}
#[async_trait]
impl traits::QueryClient for DomainClient {
async fn get_events(&self, query: Query) -> Result<EventBook> {
self.get_events(query).await
}
}
#[derive(Clone)]
pub struct SpeculativeClient {
command_handler: TonicCommandHandlerClient<Channel>,
projector: TonicProjectorClient<Channel>,
saga: TonicSagaClient<Channel>,
pm: TonicPmClient<Channel>,
}
impl SpeculativeClient {
pub async fn connect(endpoint: &str) -> Result<Self> {
let channel = create_channel(endpoint).await?;
Ok(Self::from_channel(channel))
}
pub async fn from_env(env_var: &str, default: &str) -> Result<Self> {
let endpoint = std::env::var(env_var).unwrap_or_else(|_| default.to_string());
Self::connect(&endpoint).await
}
pub fn from_channel(channel: Channel) -> Self {
Self {
command_handler: TonicCommandHandlerClient::new(channel.clone()),
projector: TonicProjectorClient::new(channel.clone()),
saga: TonicSagaClient::new(channel.clone()),
pm: TonicPmClient::new(channel),
}
}
}
#[async_trait]
impl traits::SpeculativeClient for SpeculativeClient {
async fn command_handler(
&self,
request: SpeculateCommandHandlerRequest,
) -> Result<CommandResponse> {
let response = self
.command_handler
.clone()
.handle_sync_speculative(request)
.await?;
Ok(response.into_inner())
}
async fn projector(&self, request: SpeculateProjectorRequest) -> Result<Projection> {
let response = self.projector.clone().handle_speculative(request).await?;
Ok(response.into_inner())
}
async fn saga(&self, request: SpeculateSagaRequest) -> Result<SagaResponse> {
let response = self.saga.clone().execute_speculative(request).await?;
Ok(response.into_inner())
}
async fn process_manager(
&self,
request: SpeculatePmRequest,
) -> Result<ProcessManagerHandleResponse> {
let response = self.pm.clone().handle_speculative(request).await?;
Ok(response.into_inner())
}
}