crabka_client_core/
client.rs1use std::sync::Arc;
5
6use crate::bootstrap;
7use crate::connection::ConnectionOptions;
8use crate::error::ClientError;
9use crate::pool::{BrokerInfo, BrokerPool};
10use crate::request::ProtocolRequest;
11
12#[derive(Clone)]
19pub struct Client {
20 pool: Arc<BrokerPool>,
21 #[allow(dead_code)]
22 options: ConnectionOptions,
23}
24
25#[bon::bon]
26impl Client {
27 #[builder(start_fn = builder, finish_fn = build)]
29 pub async fn start(
30 #[builder(into)] bootstrap: String,
31 #[builder(into, default = "crabka".to_string())] client_id: String,
32 #[builder(default = std::time::Duration::from_secs(30))]
33 connect_timeout: std::time::Duration,
34 #[builder(default = std::time::Duration::from_secs(30))]
35 request_timeout: std::time::Duration,
36 security: Option<crate::security::ClientSecurity>,
37 ) -> Result<Self, ClientError> {
38 let options = ConnectionOptions {
39 client_id,
40 connect_timeout,
41 request_timeout,
42 security: security.map(Box::new),
43 };
44 Self::start_with_options(bootstrap, options).await
45 }
46}
47
48impl Client {
49 async fn start_with_options(
50 bootstrap: String,
51 options: ConnectionOptions,
52 ) -> Result<Self, ClientError> {
53 let addrs = bootstrap::resolve(&bootstrap).await?;
54 let pool = Arc::new(BrokerPool::new(addrs, options.clone()));
55 Ok(Client { pool, options })
56 }
57
58 pub async fn send<R: ProtocolRequest>(&self, req: R) -> Result<R::Response, ClientError> {
60 let conn = self.pool.bootstrap_connection().await?;
61 conn.send(req).await
62 }
63
64 #[must_use]
69 pub fn knows_broker(&self, broker_id: i32) -> bool {
70 self.pool.knows_broker(broker_id)
71 }
72
73 #[must_use]
79 pub fn broker(&self, broker_id: i32) -> BrokerHandle<'_> {
80 BrokerHandle {
81 pool: &self.pool,
82 broker_id,
83 }
84 }
85
86 pub async fn refresh_metadata(
89 &self,
90 ) -> Result<crabka_protocol::owned::metadata_response::MetadataResponse, ClientError> {
91 use crabka_protocol::owned::metadata_request::MetadataRequest;
92 let resp = self.send(MetadataRequest::default()).await?;
93 let brokers: Vec<BrokerInfo> = resp
94 .brokers
95 .iter()
96 .map(|b| BrokerInfo {
97 id: b.node_id,
98 host: b.host.clone(),
99 port: b.port,
100 rack: b.rack.clone(),
101 })
102 .collect();
103 self.pool.refresh_brokers(&brokers);
104 Ok(resp)
105 }
106
107 pub async fn offset_for_leader_epoch(
118 &self,
119 topic: &str,
120 partition: i32,
121 current_leader_epoch: i32,
122 leader_epoch: i32,
123 ) -> Result<crate::offset_for_leader_epoch::EpochEndOffset, ClientError> {
124 let conn = self.pool.bootstrap_connection().await?;
125 crate::offset_for_leader_epoch::offset_for_leader_epoch(
126 &conn,
127 topic,
128 partition,
129 current_leader_epoch,
130 leader_epoch,
131 )
132 .await
133 }
134
135 pub async fn offset_for_leader_epoch_on(
149 &self,
150 broker_id: i32,
151 topic: &str,
152 partition: i32,
153 current_leader_epoch: i32,
154 leader_epoch: i32,
155 ) -> Result<crate::offset_for_leader_epoch::EpochEndOffset, ClientError> {
156 let conn = self.pool.get(broker_id).await?;
157 crate::offset_for_leader_epoch::offset_for_leader_epoch(
158 &conn,
159 topic,
160 partition,
161 current_leader_epoch,
162 leader_epoch,
163 )
164 .await
165 }
166
167 pub fn close(self) {
169 if let Some(pool) = Arc::into_inner(self.pool) {
170 pool.close_all();
171 }
172 }
173}
174
175pub struct BrokerHandle<'a> {
179 pool: &'a BrokerPool,
180 broker_id: i32,
181}
182
183impl BrokerHandle<'_> {
184 pub async fn send<R: ProtocolRequest>(&self, req: R) -> Result<R::Response, ClientError> {
186 let conn = self.pool.get(self.broker_id).await?;
187 conn.send(req).await
188 }
189}