use super::unified_client::{ClientStats, Headers, RequestPlaneClient};
use crate::error::{DynamoError, ErrorType};
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
pub struct NatsRequestClient {
client: async_nats::Client,
}
impl NatsRequestClient {
pub fn new(client: async_nats::Client) -> Self {
Self { client }
}
}
#[async_trait]
impl RequestPlaneClient for NatsRequestClient {
async fn send_request(
&self,
address: String,
payload: Bytes,
headers: Headers,
) -> Result<Bytes> {
let mut nats_headers = async_nats::HeaderMap::new();
for (key, value) in headers {
nats_headers.insert(key.as_str(), value.as_str());
}
let response = self
.client
.request_with_headers(address.clone(), nats_headers, payload)
.await
.map_err(|e| {
anyhow::anyhow!(
DynamoError::builder()
.error_type(ErrorType::CannotConnect)
.message(format!("NATS request to {address} failed"))
.cause(e)
.build()
)
})?;
Ok(response.payload)
}
fn transport_name(&self) -> &'static str {
"nats"
}
fn is_healthy(&self) -> bool {
true
}
fn stats(&self) -> ClientStats {
ClientStats {
requests_sent: 0,
responses_received: 0,
errors: 0,
bytes_sent: 0,
bytes_received: 0,
active_connections: if self.is_healthy() { 1 } else { 0 },
idle_connections: 0,
avg_latency_us: 0,
}
}
async fn close(&self) -> Result<()> {
Ok(())
}
}