rustfs-kafka 1.2.0

Rust client for Apache Kafka
Documentation
//! Fetch message operations for [`KafkaClient`].
//!
//! Implements the logic for sending fetch requests to Kafka brokers,
//! grouping partitions by their leader broker, and aggregating responses.

use std::collections::HashMap;
use std::time::Instant;

use crate::error::{Error, Result};

use super::FetchPartition;
use super::config::ClientConfig;
use super::state::ClientState;
use super::transport;
use crate::network::Connections;

fn decode_fetch_response(
    conn: &mut crate::network::KafkaConnection,
    requested_version: i16,
) -> Result<kafka_protocol::messages::FetchResponse> {
    use kafka_protocol::messages::{FetchResponse, ResponseHeader};
    use kafka_protocol::protocol::{Decodable, HeaderVersion};

    let size = transport::get_response_size(conn)?;
    let resp_bytes = conn.read_exact_alloc(crate::protocol::non_negative_i32_to_u64(size)?)?;

    let mut candidates = Vec::with_capacity(1 + 18);
    candidates.push(requested_version);
    for v in (0..=17).rev() {
        if v != requested_version {
            candidates.push(v);
        }
    }

    for version in candidates {
        let mut bytes = resp_bytes.clone();
        let header_version = FetchResponse::header_version(version);
        if ResponseHeader::decode(&mut bytes, header_version).is_err() {
            continue;
        }
        if let Ok(resp) = FetchResponse::decode(&mut bytes, version) {
            return Ok(resp);
        }
    }

    Err(Error::codec())
}

#[tracing::instrument(skip(conn_pool, state, config, input))]
pub fn fetch_messages_kp<'a, I, J>(
    conn_pool: &mut Connections,
    state: &mut ClientState,
    config: &ClientConfig,
    correlation: i32,
    input: I,
) -> Result<Vec<super::fetch_kp::OwnedFetchResponse>>
where
    J: AsRef<FetchPartition<'a>>,
    I: IntoIterator<Item = J>,
{
    #[cfg(feature = "metrics")]
    let start = Instant::now();

    let mut broker_partitions: HashMap<&str, Vec<(&str, i32, i64, i32)>> = HashMap::new();
    for inp in input {
        let inp = inp.as_ref();
        if let Some(broker) = state.find_broker(inp.topic, inp.partition) {
            broker_partitions.entry(broker).or_default().push((
                inp.topic,
                inp.partition,
                inp.offset,
                if inp.max_bytes > 0 {
                    inp.max_bytes
                } else {
                    config.fetch_max_bytes_per_partition()
                },
            ));
        }
    }

    let result = fetch_messages_inner(
        conn_pool,
        correlation,
        &config.client_id,
        config.fetch_max_wait_time(),
        config.fetch_min_bytes(),
        broker_partitions,
    );

    #[cfg(feature = "metrics")]
    {
        let elapsed = start.elapsed().as_secs_f64() * 1000.0;
        match &result {
            Ok(responses) => {
                let mut total_bytes: usize = 0;
                let mut total_messages: usize = 0;
                for resp in responses {
                    for t in &resp.topics {
                        for p in &t.partitions {
                            if let Ok(data) = p.data() {
                                total_messages += data.messages.len();
                                for msg in &data.messages {
                                    total_bytes += msg.key.len() + msg.value.len();
                                }
                            }
                        }
                        crate::metrics::record_fetch(
                            &t.topic,
                            total_bytes,
                            total_messages,
                            elapsed,
                        );
                    }
                }
            }
            Err(e) => {
                let error_type = format!("{e:?}");
                crate::metrics::record_fetch_error("_unknown", &error_type);
            }
        }
    }

    result
}

fn fetch_messages_inner(
    conn_pool: &mut Connections,
    correlation_id: i32,
    client_id: &str,
    max_wait_ms: i32,
    min_bytes: i32,
    broker_partitions: HashMap<&str, Vec<(&str, i32, i64, i32)>>,
) -> Result<Vec<crate::protocol::fetch::OwnedFetchResponse>> {
    let now = Instant::now();
    let mut res = Vec::with_capacity(broker_partitions.len());
    for (host, partitions) in broker_partitions {
        let conn = conn_pool
            .get_conn(host, now)
            .map_err(|e| e.with_broker_context(host, "Fetch"))?;
        let (header, request) = crate::protocol::fetch::build_fetch_request(
            correlation_id,
            client_id,
            -1,
            max_wait_ms,
            min_bytes,
            0x7fff_ffff,
            &partitions,
        );
        transport::kp_send_request(conn, &header, &request, crate::protocol::API_VERSION_FETCH)
            .map_err(|e| e.with_broker_context(host, "Fetch"))?;
        let kp_resp = decode_fetch_response(conn, crate::protocol::API_VERSION_FETCH)
            .map_err(|e| e.with_broker_context(host, "Fetch"))?;
        let owned = crate::protocol::fetch::convert_fetch_response(kp_resp, correlation_id);
        res.push(owned);
    }
    Ok(res)
}