Skip to main content

crabka_client_core/
client.rs

1//! Top-level [`Client`]. Wraps a [`BrokerPool`] and
2//! exposes a typed-request `send` API.
3
4use std::sync::Arc;
5
6use crate::bootstrap;
7use crate::connection::ConnectionOptions;
8use crate::error::ClientError;
9use crate::pool::{BrokerInfo, BrokerPool};
10use crate::request::ProtocolRequest;
11
12/// A Kafka client backed by a [`BrokerPool`].
13///
14/// Construct via [`Client::builder`].
15///
16/// Cloning a `Client` is cheap — it shares the underlying [`BrokerPool`] via
17/// an `Arc` and the connection options via a value clone.
18#[derive(Clone)]
19pub struct Client {
20    pool: Arc<BrokerPool>,
21    #[allow(dead_code)]
22    options: ConnectionOptions,
23}
24
25#[bon::bon]
26impl Client {
27    /// Build a [`Client`] pointed at the given bootstrap address.
28    #[builder(start_fn = builder, finish_fn = build)]
29    pub async fn start(
30        #[builder(into)] bootstrap: String,
31        #[builder(into, default = "crabka".to_string())] client_id: String,
32        #[builder(default = std::time::Duration::from_secs(30))]
33        connect_timeout: std::time::Duration,
34        #[builder(default = std::time::Duration::from_secs(30))]
35        request_timeout: std::time::Duration,
36        security: Option<crate::security::ClientSecurity>,
37    ) -> Result<Self, ClientError> {
38        let options = ConnectionOptions {
39            client_id,
40            connect_timeout,
41            request_timeout,
42            security: security.map(Box::new),
43        };
44        Self::start_with_options(bootstrap, options).await
45    }
46}
47
48impl Client {
49    async fn start_with_options(
50        bootstrap: String,
51        options: ConnectionOptions,
52    ) -> Result<Self, ClientError> {
53        let addrs = bootstrap::resolve(&bootstrap).await?;
54        let pool = Arc::new(BrokerPool::new(addrs, options.clone()));
55        Ok(Client { pool, options })
56    }
57
58    /// Send a request to the bootstrap broker (or any cached open connection).
59    pub async fn send<R: ProtocolRequest>(&self, req: R) -> Result<R::Response, ClientError> {
60        let conn = self.pool.bootstrap_connection().await?;
61        conn.send(req).await
62    }
63
64    /// Whether the pool knows a dialable address for `broker_id` (learned via
65    /// [`refresh_metadata`](Client::refresh_metadata), port not `0`). Lets a
66    /// caller choose between [`broker`](Client::broker) routing and the
67    /// bootstrap [`send`](Client::send) without a speculative connect.
68    #[must_use]
69    pub fn knows_broker(&self, broker_id: i32) -> bool {
70        self.pool.knows_broker(broker_id)
71    }
72
73    /// Return a [`BrokerHandle`] that routes requests to a specific broker by id.
74    ///
75    /// The broker must have been registered via [`refresh_metadata`] first.
76    ///
77    /// [`refresh_metadata`]: Client::refresh_metadata
78    #[must_use]
79    pub fn broker(&self, broker_id: i32) -> BrokerHandle<'_> {
80        BrokerHandle {
81            pool: &self.pool,
82            broker_id,
83        }
84    }
85
86    /// Send a default `MetadataRequest`, parse the broker list from the response,
87    /// refresh the pool's address registry, and return the typed response.
88    pub async fn refresh_metadata(
89        &self,
90    ) -> Result<crabka_protocol::owned::metadata_response::MetadataResponse, ClientError> {
91        use crabka_protocol::owned::metadata_request::MetadataRequest;
92        let resp = self.send(MetadataRequest::default()).await?;
93        let brokers: Vec<BrokerInfo> = resp
94            .brokers
95            .iter()
96            .map(|b| BrokerInfo {
97                id: b.node_id,
98                host: b.host.clone(),
99                port: b.port,
100                rack: b.rack.clone(),
101            })
102            .collect();
103        self.pool.refresh_brokers(&brokers);
104        Ok(resp)
105    }
106
107    /// Send a single-partition `OffsetForLeaderEpoch` (`api_key=23`) via the
108    /// bootstrap connection. Thin wrapper over the free
109    /// [`offset_for_leader_epoch`](crate::offset_for_leader_epoch) helper used
110    /// by the consumer's KIP-320 position-validation pass; `Client` does not
111    /// otherwise expose its connection, so this borrows the same bootstrap
112    /// connection `send` uses.
113    ///
114    /// # Errors
115    /// Transport / version-negotiation failure, or a partition not present in
116    /// the response.
117    pub async fn offset_for_leader_epoch(
118        &self,
119        topic: &str,
120        partition: i32,
121        current_leader_epoch: i32,
122        leader_epoch: i32,
123    ) -> Result<crate::offset_for_leader_epoch::EpochEndOffset, ClientError> {
124        let conn = self.pool.bootstrap_connection().await?;
125        crate::offset_for_leader_epoch::offset_for_leader_epoch(
126            &conn,
127            topic,
128            partition,
129            current_leader_epoch,
130            leader_epoch,
131        )
132        .await
133    }
134
135    /// Send a single-partition `OffsetForLeaderEpoch` (`api_key=23`) to a
136    /// *specific* broker by id, via [`BrokerPool::get`]. Mirrors
137    /// [`offset_for_leader_epoch`](Client::offset_for_leader_epoch) but targets
138    /// the partition leader instead of the bootstrap connection — KIP-320
139    /// requires the validation RPC reach the partition leader, which is the
140    /// only replica with the authoritative epoch→end-offset history.
141    ///
142    /// The broker must already be in the pool's registry (populated by
143    /// [`refresh_metadata`](Client::refresh_metadata)).
144    ///
145    /// # Errors
146    /// `Disconnected` if `broker_id` is not in the registry; transport /
147    /// version-negotiation failure; or a partition not present in the response.
148    pub async fn offset_for_leader_epoch_on(
149        &self,
150        broker_id: i32,
151        topic: &str,
152        partition: i32,
153        current_leader_epoch: i32,
154        leader_epoch: i32,
155    ) -> Result<crate::offset_for_leader_epoch::EpochEndOffset, ClientError> {
156        let conn = self.pool.get(broker_id).await?;
157        crate::offset_for_leader_epoch::offset_for_leader_epoch(
158            &conn,
159            topic,
160            partition,
161            current_leader_epoch,
162            leader_epoch,
163        )
164        .await
165    }
166
167    /// Close the client and all pooled connections.
168    pub fn close(self) {
169        if let Some(pool) = Arc::into_inner(self.pool) {
170            pool.close_all();
171        }
172    }
173}
174
175/// A handle to a specific broker within a [`Client`]'s pool.
176///
177/// Obtained via [`Client::broker`].
178pub struct BrokerHandle<'a> {
179    pool: &'a BrokerPool,
180    broker_id: i32,
181}
182
183impl BrokerHandle<'_> {
184    /// Send a request to this specific broker.
185    pub async fn send<R: ProtocolRequest>(&self, req: R) -> Result<R::Response, ClientError> {
186        let conn = self.pool.get(self.broker_id).await?;
187        conn.send(req).await
188    }
189}