kafka_rs/
client.rs

1//! Kafka client implementation.
2
3use std::{sync::Arc, time::Duration};
4
5use bytes::{Bytes, BytesMut};
6use kafka_protocol::{
7    indexmap::IndexMap,
8    messages::{
9        create_topics_request::CreateTopicsRequest,
10        create_topics_response::CreateTopicsResponse,
11        delete_topics_request::DeleteTopicsRequest,
12        delete_topics_response::DeleteTopicsResponse,
13        fetch_request::{FetchPartition, FetchTopic},
14        list_offsets_request::{ListOffsetsPartition, ListOffsetsTopic},
15        produce_request::PartitionProduceData,
16        FetchRequest, FindCoordinatorRequest, FindCoordinatorResponse, ListOffsetsRequest, MetadataResponse, ProduceRequest, ResponseHeader, ResponseKind,
17    },
18    protocol::StrBytes,
19    records::{Compression, Record, RecordBatchDecoder, RecordBatchEncoder, RecordEncodeOptions, TimestampType},
20    ResponseError,
21};
22use tokio::sync::{mpsc, oneshot};
23use tokio_util::sync::{CancellationToken, DropGuard};
24
25use crate::clitask::{ClientTask, Cluster, ClusterMeta, MetadataPolicy, Msg};
26use crate::error::ClientError;
27#[cfg(feature = "internal")]
28use crate::internal::InternalClient;
29use crate::{broker::BrokerResponse, error::ClientResult};
30
31/*
32- TODO: build a TopicBatcher on top of a TopicProducer which works like a sink,
33  batches based on configured max size, will handle retries, and the other good stuff.
34*/
35
36/// The default timeout used for requests (10s).
37pub const DEFAULT_TIMEOUT: i32 = 10 * 1000;
38
39/// Headers of a message.
40pub type MessageHeaders = IndexMap<StrBytes, Option<Bytes>>;
41
42/// A Kafka client.
43///
44/// This client is `Send + Sync + Clone`, and cloning this client to share it among application
45/// tasks is encouraged.
46///
47/// This client spawns a task which manages the full lifecycle of interaction with a Kafka cluster,
48/// include initial broker connections based on seed list, cluster metadata discovery, connections
49/// to new brokers, establish API versions of brokers, handling reconnects, and anything else
50/// related to maintain connections to a Kafka cluster.
51#[derive(Clone)]
52pub struct Client {
53    /// The channel used for communicating with the client task.
54    pub(crate) tx: mpsc::Sender<Msg>,
55    /// Discovered metadata on a Kafka cluster along with broker connections.
56    ///
57    /// NOTE WELL: this value should never be updated outside of the `ClientTask`.
58    pub(crate) cluster: ClusterMeta,
59    /// The shutdown signal for this client, which will be triggered once all client handles are dropped.
60    _shutdown: Arc<DropGuard>,
61}
62
63impl Client {
64    /// Construct a new instance.
65    pub fn new(seed_list: Vec<String>) -> Self {
66        let (tx, rx) = mpsc::channel(1_000);
67        let shutdown = CancellationToken::new();
68        let task = ClientTask::new(seed_list, None, MetadataPolicy::default(), rx, false, shutdown.clone());
69        let topics = task.cluster.clone();
70        tokio::spawn(task.run());
71        Self {
72            tx,
73            cluster: topics,
74            _shutdown: Arc::new(shutdown.drop_guard()),
75        }
76    }
77
78    /// Construct a new instance from a seed list and a block list.
79    ///
80    /// - `seed_list`: URLs of the initial brokers to attempt to connect to in order to gather
81    ///   cluster metadata. The metadata will be used to establish connections to all other brokers.
82    /// - `block_list`: an optional set of broker IDs for which connections should not be established.
83    ///   This is typically not needed. This only applies after cluster metadata has been gathered.
84    ///
85    /// The metadata policy is set to `Manual` for this constructor.
86    #[cfg(feature = "internal")]
87    pub fn new_internal(seed_list: Vec<String>, block_list: Vec<i32>) -> InternalClient {
88        let (tx, rx) = mpsc::channel(1_000);
89        let shutdown = CancellationToken::new();
90        let task = ClientTask::new(seed_list, Some(block_list), MetadataPolicy::Manual, rx, true, shutdown.clone());
91        let cluster = task.cluster.clone();
92        tokio::spawn(task.run());
93        let cli = Self {
94            tx,
95            cluster,
96            _shutdown: Arc::new(shutdown.drop_guard()),
97        };
98        InternalClient::new(cli)
99    }
100
101    /// Get cluster metadata from the broker specified by ID.
102    pub async fn get_metadata(&self, broker_id: i32) -> ClientResult<MetadataResponse> {
103        let cluster = self.get_cluster_metadata_cache().await?;
104        let broker = cluster
105            .brokers
106            .get(&broker_id)
107            .ok_or(ClientError::Other("broker does not exist in currently discovered metadata".into()))?;
108
109        // Send request.
110        let uid = uuid::Uuid::new_v4();
111        let (tx, rx) = oneshot::channel();
112        broker.conn.get_metadata(uid, tx.into(), true).await;
113
114        // Await response.
115        let res = unpack_broker_response(rx).await.and_then(|(_, res)| {
116            if let ResponseKind::MetadataResponse(res) = res {
117                Ok(res)
118            } else {
119                Err(ClientError::MalformedResponse)
120            }
121        })?;
122
123        Ok(res)
124    }
125
126    /// List topic partition offsets.
127    pub async fn list_offsets(&self, topic: StrBytes, ptn: i32, pos: ListOffsetsPosition) -> ClientResult<i64> {
128        let cluster = self.get_cluster_metadata_cache().await?;
129
130        // Get the broker responsible for the target topic/partition.
131        let topic_ptns = cluster.topics.get(&topic).ok_or(ClientError::UnknownTopic(topic.to_string()))?;
132        let ptn_meta = topic_ptns.get(&ptn).ok_or(ClientError::UnknownPartition(topic.to_string(), ptn))?;
133        let broker = ptn_meta.leader.clone().ok_or(ClientError::NoPartitionLeader(topic.to_string(), ptn))?;
134
135        // Build request.
136        let uid = uuid::Uuid::new_v4();
137        let mut req = ListOffsetsRequest::default();
138        // req.isolation_level = 0; // TODO: update this.
139        let mut req_topic = ListOffsetsTopic::default();
140        req_topic.name = topic.clone().into();
141        let mut req_ptn = ListOffsetsPartition::default();
142        req_ptn.partition_index = ptn;
143        req_ptn.timestamp = match pos {
144            ListOffsetsPosition::Earliest => -2,
145            ListOffsetsPosition::Latest => -1,
146            ListOffsetsPosition::Timestamp(val) => val,
147        };
148        req_topic.partitions.push(req_ptn);
149        req.topics.push(req_topic);
150
151        // Send request.
152        let (tx, rx) = oneshot::channel();
153        broker.conn.list_offsets(uid, req, tx).await;
154
155        // Unpack response & handle errors.
156        // TODO: check for error codes in response.
157        let offset = unpack_broker_response(rx)
158            .await
159            .and_then(|(_, res)| {
160                if let ResponseKind::ListOffsetsResponse(res) = res {
161                    Ok(res)
162                } else {
163                    Err(ClientError::MalformedResponse)
164                }
165            })
166            .and_then(|res| {
167                res.topics
168                    .iter()
169                    .find(|topic_res| topic_res.name.0 == topic)
170                    .and_then(|topic_res| topic_res.partitions.iter().find(|ptn_res| ptn_res.partition_index == ptn).map(|ptn_res| ptn_res.offset))
171                    .ok_or(ClientError::MalformedResponse)
172            })?;
173
174        Ok(offset)
175    }
176
177    /// Fetch a batch of records from the target topic partition.
178    pub async fn fetch(&self, topic: StrBytes, ptn: i32, start: i64) -> ClientResult<Option<Vec<Record>>> {
179        let cluster = self.get_cluster_metadata_cache().await?;
180
181        // Get the broker responsible for the target topic/partition.
182        let topic_ptns = cluster.topics.get(&topic).ok_or(ClientError::UnknownTopic(topic.to_string()))?;
183        let ptn_meta = topic_ptns.get(&ptn).ok_or(ClientError::UnknownPartition(topic.to_string(), ptn))?;
184        let broker = ptn_meta.leader.clone().ok_or(ClientError::NoPartitionLeader(topic.to_string(), ptn))?;
185
186        // Build request.
187        let uid = uuid::Uuid::new_v4();
188        let mut req = FetchRequest::default();
189        req.max_bytes = 1024i32.pow(2);
190        req.max_wait_ms = 10_000;
191        // req.isolation_level = 0; // TODO: update this.
192        let mut req_topic = FetchTopic::default();
193        req_topic.topic = topic.clone().into();
194        let mut req_ptn = FetchPartition::default();
195        req_ptn.partition = ptn;
196        req_ptn.partition_max_bytes = 1024i32.pow(2);
197        req_ptn.fetch_offset = start;
198        req_topic.partitions.push(req_ptn);
199        req.topics.push(req_topic);
200        tracing::debug!("about to send request: {:?}", req);
201
202        // Send request.
203        let (tx, rx) = oneshot::channel();
204        broker.conn.fetch(uid, req, tx).await;
205
206        // Unpack response & handle errors.
207        // TODO: check for error codes in response.
208        let batch_opt = unpack_broker_response(rx)
209            .await
210            .and_then(|(_, res)| {
211                tracing::debug!("res: {:?}", res);
212                if let ResponseKind::FetchResponse(res) = res {
213                    Ok(res)
214                } else {
215                    Err(ClientError::MalformedResponse)
216                }
217            })
218            .and_then(|res| {
219                res.responses
220                    .iter()
221                    .find(|topic_res| topic_res.topic.0 == topic)
222                    .and_then(|topic_res| topic_res.partitions.iter().find(|ptn_res| ptn_res.partition_index == ptn).map(|ptn_res| ptn_res.records.clone()))
223                    .ok_or(ClientError::MalformedResponse)
224            })?;
225
226        // If some data was returned, then decode the batch.
227        let Some(mut batch) = batch_opt else { return Ok(None) };
228        let records = RecordBatchDecoder::decode(&mut batch).map_err(|_| ClientError::MalformedResponse)?;
229
230        Ok(Some(records))
231    }
232
233    /// Get the cached cluster metadata.
234    ///
235    /// If the cluster metadata has not yet been bootstrapped, then this routine will wait for
236    /// a maximum of 10s for the metadata to be bootstrapped, and will then timeout.
237    pub(crate) async fn get_cluster_metadata_cache(&self) -> ClientResult<Arc<Cluster>> {
238        let mut cluster = self.cluster.load();
239        if !*cluster.bootstrap.borrow() {
240            let mut sig = cluster.bootstrap.clone();
241            let _ = tokio::time::timeout(Duration::from_secs(10), sig.wait_for(|val| *val))
242                .await
243                .map_err(|_err| ClientError::ClusterMetadataTimeout)?
244                .map_err(|_err| ClientError::ClusterMetadataTimeout)?;
245            cluster = self.cluster.load();
246        }
247        Ok(cluster.clone())
248    }
249
250    /// Find the coordinator for the given group.
251    pub async fn find_coordinator(&self, key: StrBytes, key_type: i8, broker_id: Option<i32>) -> ClientResult<FindCoordinatorResponse> {
252        let cluster = self.get_cluster_metadata_cache().await?;
253
254        // Get the specified broker connection, else get the first available.
255        let broker = broker_id
256            .and_then(|id| cluster.brokers.get(&id).cloned())
257            .or_else(|| {
258                // TODO: get random broker.
259                cluster.brokers.first_key_value().map(|(_, broker)| broker.clone())
260            })
261            .ok_or_else(|| ClientError::NoBrokerFound)?;
262
263        // Build request.
264        let uid = uuid::Uuid::new_v4();
265        let mut req = FindCoordinatorRequest::default();
266        req.key = key;
267        req.key_type = key_type;
268
269        // Send request.
270        let (tx, rx) = oneshot::channel();
271        broker.conn.find_coordinator(uid, req, tx).await;
272
273        // Unpack response & handle errors.
274        unpack_broker_response(rx)
275            .await
276            .and_then(|(_, res)| {
277                tracing::debug!("res: {:?}", res);
278                if let ResponseKind::FindCoordinatorResponse(res) = res {
279                    Ok(res)
280                } else {
281                    Err(ClientError::MalformedResponse)
282                }
283            })
284            .and_then(|res| {
285                // Handle broker response codes.
286                if res.error_code != 0 {
287                    return Err(ClientError::ResponseError(res.error_code, ResponseError::try_from_code(res.error_code), res.error_message));
288                }
289                Ok(res)
290            })
291    }
292
293    /// Build a producer for a topic.
294    pub fn topic_producer(&self, topic: &str, acks: Acks, timeout_ms: Option<i32>, compression: Option<Compression>) -> TopicProducer {
295        let (tx, rx) = mpsc::unbounded_channel();
296        let compression = compression.unwrap_or(Compression::None);
297        let encode_opts = RecordEncodeOptions { version: 2, compression };
298        TopicProducer {
299            _client: self.clone(),
300            tx,
301            rx,
302            cluster: self.cluster.clone(),
303            topic: StrBytes::from_string(topic.into()),
304            acks,
305            timeout_ms: timeout_ms.unwrap_or(DEFAULT_TIMEOUT),
306            encode_opts,
307            buf: BytesMut::with_capacity(1024 * 1024),
308            batch_buf: Vec::with_capacity(1024),
309            last_ptn: -1,
310        }
311    }
312
313    /// Build an admin client.
314    pub fn admin(&self) -> Admin {
315        Admin { _client: self.clone() }
316    }
317}
318
319/// A message to be encoded as a Kafka record within a record batch.
320pub struct Message {
321    /// An optional key for the record.
322    pub key: Option<Bytes>,
323    /// An optional value as the body of the record.
324    pub value: Option<Bytes>,
325    /// Optional headers to be included in the record.
326    pub headers: MessageHeaders,
327}
328
329impl Message {
330    /// Construct a new record.
331    pub fn new(key: Option<Bytes>, value: Option<Bytes>, headers: MessageHeaders) -> Self {
332        Self { key, value, headers }
333    }
334}
335
336/// Write acknowledgements required for a request.
337#[derive(Clone, Copy)]
338#[repr(i16)]
339pub enum Acks {
340    /// Leader and replicas.
341    All = -1,
342    /// None.
343    None = 0,
344    /// Leader only.
345    Leader = 1,
346}
347
348/// A producer for a specific topic.
349pub struct TopicProducer {
350    /// The client handle from which this producer was created.
351    _client: Client,
352    /// The channel used by this producer.
353    tx: mpsc::UnboundedSender<BrokerResponse>,
354    /// The channel used by this producer.
355    rx: mpsc::UnboundedReceiver<BrokerResponse>,
356    /// Discovered metadata on a Kafka cluster along with broker connections.
357    ///
358    /// NOTE WELL: this value should never be updated outside of the `ClientTask`.
359    cluster: ClusterMeta,
360    /// The topic being produced to.
361    pub topic: StrBytes,
362    /// Acks level to use for produce requests.
363    acks: Acks,
364    /// Timeout for produce requests.
365    timeout_ms: i32,
366    /// Record batch encoding config.
367    encode_opts: RecordEncodeOptions,
368
369    /// A buffer used for encoding batches.
370    buf: BytesMut,
371    /// A buffer to accumulating records to be sent to a broker.
372    batch_buf: Vec<Record>,
373    /// The last partition used for round-robin or uniform sticky batch assignment.
374    last_ptn: i32,
375}
376
377impl TopicProducer {
378    /// Produce a batch of records to the specified topic.
379    pub async fn produce(&mut self, messages: &[Message]) -> ClientResult<(i64, i64)> {
380        // TODO: allow for producer to specify partition instead of using sticky rotating partitions.
381
382        // Check for topic metadata.
383        if messages.is_empty() {
384            return Err(ClientError::ProducerMessagesEmpty);
385        }
386        self.batch_buf.clear(); // Ensure buf is clear.
387        let mut cluster = self.cluster.load();
388        if !*cluster.bootstrap.borrow() {
389            let mut sig = cluster.bootstrap.clone();
390            let _ = sig.wait_for(|val| *val).await; // Ensure the cluster metadata is bootstrapped.
391            cluster = self.cluster.load();
392        }
393        let Some(topic_ptns) = cluster.topics.get(&self.topic) else {
394            return Err(ClientError::UnknownTopic(self.topic.to_string()));
395        };
396
397        // Target the next partition of this topic for this batch.
398        let Some((sticky_ptn, sticky_broker)) = topic_ptns
399            .range((self.last_ptn + 1)..)
400            .filter_map(|(ptn, meta)| meta.leader.clone().map(|leader| (ptn, leader)))
401            .next()
402            .or_else(|| topic_ptns.range(..).filter_map(|(ptn, meta)| meta.leader.clone().map(|leader| (ptn, leader))).next())
403            .map(|(key, val)| (*key, val.clone()))
404        else {
405            return Err(ClientError::NoPartitionsAvailable(self.topic.to_string()));
406        };
407        self.last_ptn = sticky_ptn;
408
409        // Transform the given messages into their record form.
410        let timestamp = chrono::Utc::now().timestamp_millis();
411        for msg in messages.iter() {
412            self.batch_buf.push(Record {
413                transactional: false,
414                control: false,
415                partition_leader_epoch: 0,
416                producer_id: 0,
417                producer_epoch: 0,
418                timestamp,
419                timestamp_type: TimestampType::Creation,
420                offset: 0,
421                sequence: 0,
422                key: msg.key.clone(),
423                value: msg.value.clone(),
424                headers: msg.headers.clone(),
425            });
426        }
427
428        // Encode the records into a request. Note that BytesMut will allocate more space whenever needed.
429        let res = RecordBatchEncoder::encode(&mut self.buf, self.batch_buf.iter(), &self.encode_opts).map_err(|err| ClientError::EncodingError(format!("{:?}", err)));
430        self.batch_buf.clear();
431        res?;
432
433        // Create the request object for the broker.
434        let mut req = ProduceRequest::default();
435        req.acks = self.acks as i16;
436        req.timeout_ms = self.timeout_ms;
437        let topic = req.topic_data.entry(self.topic.clone().into()).or_default();
438        let mut ptn_data = PartitionProduceData::default();
439        ptn_data.index = sticky_ptn;
440        ptn_data.records = Some(self.buf.split().freeze());
441        topic.partition_data.push(ptn_data);
442
443        // Send off request & await response.
444        let uid = uuid::Uuid::new_v4();
445        sticky_broker.conn.produce(uid, req, self.tx.clone()).await;
446        let res = loop {
447            let Some(res) = self.rx.recv().await else {
448                unreachable!("both ends of channel are held, receiving None should not be possible")
449            };
450            if res.id == uid {
451                break res;
452            }
453        };
454
455        // Handle response.
456        // TODO: check for error codes in response.
457        res.result
458            .map_err(ClientError::BrokerError)
459            .and_then(|res| {
460                // Unpack the expected response type.
461                if let ResponseKind::ProduceResponse(inner) = res.1 {
462                    Ok(inner)
463                } else {
464                    tracing::error!("expected broker to return a ProduceResponse, got: {:?}", res.1);
465                    Err(ClientError::MalformedResponse)
466                }
467            })
468            .and_then(|res| {
469                // Unpack the base offset & calculate the final offset.
470                res.responses
471                    .iter()
472                    .find(|topic| topic.0 .0 == self.topic)
473                    .and_then(|val| {
474                        val.1.partition_responses.first().map(|val| {
475                            debug_assert!(!messages.is_empty(), "messages len should always be validated at start of function");
476                            let last_offset = val.base_offset + (messages.len() - 1) as i64;
477                            (val.base_offset, last_offset)
478                        })
479                    })
480                    .ok_or(ClientError::MalformedResponse)
481            })
482    }
483}
484
485/// The position in a log to fetch an offset for.
486pub enum ListOffsetsPosition {
487    /// Fetch the offset of the beginning of the partition's log.
488    Earliest,
489    /// Fetch the next offset after the last offset of the partition's log.
490    Latest,
491    /// Fetch the offset for the corresponding timestamp.
492    Timestamp(i64),
493}
494
495/// Await and unpack a broker response.
496///
497/// TODO: turn this into a macro which can destructure to the match enum variant needed.
498/// That will help reduce error handling boilerplate even further.
499pub(crate) async fn unpack_broker_response(rx: oneshot::Receiver<BrokerResponse>) -> ClientResult<(ResponseHeader, ResponseKind)> {
500    rx.await
501        .map_err(|_| ClientError::Other("response channel dropped by broker, which should never happen".into()))?
502        .result
503        .map_err(ClientError::BrokerError)
504}
505
506pub struct Admin {
507    /// The client handle from which this producer was created.
508    _client: Client,
509}
510
511impl Admin {
512    /// Creates new topics in the Kafka Cluster.
513    pub async fn create_topics(&self, request: CreateTopicsRequest) -> ClientResult<CreateTopicsResponse> {
514        if request.topics.is_empty() {
515            return Err(ClientError::NoTopicsSpecified);
516        }
517        let cluster = self._client.get_cluster_metadata_cache().await?;
518        let (tx, rx) = oneshot::channel();
519
520        return if let Some(leader) = &cluster.controller {
521            let uid = uuid::Uuid::new_v4();
522            leader.conn.create_topics(uid, request, tx).await;
523            unpack_broker_response(rx).await.and_then(|(_, res)| {
524                if let ResponseKind::CreateTopicsResponse(inner) = res {
525                    Ok(inner)
526                } else {
527                    Err(ClientError::MalformedResponse)
528                }
529            })
530        } else {
531            Err(ClientError::NoControllerFound)
532        };
533    }
534
535    /// Delete topics from the Kafka Cluster.
536    pub async fn delete_topics(&self, request: DeleteTopicsRequest) -> ClientResult<DeleteTopicsResponse> {
537        if request.topics.is_empty() {
538            return Err(ClientError::NoTopicsSpecified);
539        }
540        let cluster = self._client.get_cluster_metadata_cache().await?;
541        let (tx, rx) = oneshot::channel();
542
543        return if let Some(leader) = &cluster.controller {
544            let uid = uuid::Uuid::new_v4();
545            leader.conn.delete_topics(uid, request, tx).await;
546            unpack_broker_response(rx).await.and_then(|(_, res)| {
547                if let ResponseKind::DeleteTopicsResponse(inner) = res {
548                    Ok(inner)
549                } else {
550                    Err(ClientError::MalformedResponse)
551                }
552            })
553        } else {
554            Err(ClientError::NoControllerFound)
555        };
556    }
557}