use std::sync::Arc;
use crate::bootstrap;
use crate::connection::ConnectionOptions;
use crate::error::ClientError;
use crate::pool::{BrokerInfo, BrokerPool};
use crate::request::ProtocolRequest;
#[derive(Clone)]
pub struct Client {
pool: Arc<BrokerPool>,
#[allow(dead_code)]
options: ConnectionOptions,
}
#[bon::bon]
impl Client {
#[builder(start_fn = builder, finish_fn = build)]
pub async fn start(
#[builder(into)] bootstrap: String,
#[builder(into, default = "crabka".to_string())] client_id: String,
#[builder(default = std::time::Duration::from_secs(30))]
connect_timeout: std::time::Duration,
#[builder(default = std::time::Duration::from_secs(30))]
request_timeout: std::time::Duration,
security: Option<crate::security::ClientSecurity>,
) -> Result<Self, ClientError> {
let options = ConnectionOptions {
client_id,
connect_timeout,
request_timeout,
security: security.map(Box::new),
};
Self::start_with_options(bootstrap, options).await
}
}
impl Client {
async fn start_with_options(
bootstrap: String,
options: ConnectionOptions,
) -> Result<Self, ClientError> {
let addrs = bootstrap::resolve(&bootstrap).await?;
let pool = Arc::new(BrokerPool::new(addrs, options.clone()));
Ok(Client { pool, options })
}
pub async fn send<R: ProtocolRequest>(&self, req: R) -> Result<R::Response, ClientError> {
let conn = self.pool.bootstrap_connection().await?;
conn.send(req).await
}
#[must_use]
pub fn knows_broker(&self, broker_id: i32) -> bool {
self.pool.knows_broker(broker_id)
}
#[must_use]
pub fn broker(&self, broker_id: i32) -> BrokerHandle<'_> {
BrokerHandle {
pool: &self.pool,
broker_id,
}
}
pub async fn refresh_metadata(
&self,
) -> Result<crabka_protocol::owned::metadata_response::MetadataResponse, ClientError> {
use crabka_protocol::owned::metadata_request::MetadataRequest;
let resp = self.send(MetadataRequest::default()).await?;
let brokers: Vec<BrokerInfo> = resp
.brokers
.iter()
.map(|b| BrokerInfo {
id: b.node_id,
host: b.host.clone(),
port: b.port,
rack: b.rack.clone(),
})
.collect();
self.pool.refresh_brokers(&brokers);
Ok(resp)
}
pub async fn offset_for_leader_epoch(
&self,
topic: &str,
partition: i32,
current_leader_epoch: i32,
leader_epoch: i32,
) -> Result<crate::offset_for_leader_epoch::EpochEndOffset, ClientError> {
let conn = self.pool.bootstrap_connection().await?;
crate::offset_for_leader_epoch::offset_for_leader_epoch(
&conn,
topic,
partition,
current_leader_epoch,
leader_epoch,
)
.await
}
pub async fn offset_for_leader_epoch_on(
&self,
broker_id: i32,
topic: &str,
partition: i32,
current_leader_epoch: i32,
leader_epoch: i32,
) -> Result<crate::offset_for_leader_epoch::EpochEndOffset, ClientError> {
let conn = self.pool.get(broker_id).await?;
crate::offset_for_leader_epoch::offset_for_leader_epoch(
&conn,
topic,
partition,
current_leader_epoch,
leader_epoch,
)
.await
}
pub fn close(self) {
if let Some(pool) = Arc::into_inner(self.pool) {
pool.close_all();
}
}
}
pub struct BrokerHandle<'a> {
pool: &'a BrokerPool,
broker_id: i32,
}
impl BrokerHandle<'_> {
pub async fn send<R: ProtocolRequest>(&self, req: R) -> Result<R::Response, ClientError> {
let conn = self.pool.get(self.broker_id).await?;
conn.send(req).await
}
}