kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::HashSet;
use std::time::Duration;

use anyhow::Result as AnyResult;

use crate::config::ProducerConfig;
use crate::metadata::{BrokerAddress, MetadataCache, MetadataRefresh, refresh_metadata};

#[derive(Default)]
pub(super) struct ProducerMetadata {
    cache: MetadataCache,
    tracked_topics: HashSet<String>,
}

impl ProducerMetadata {
    pub(super) fn track(&mut self, topic: String) {
        self.tracked_topics.insert(topic);
    }

    pub(super) fn cache(&self) -> &MetadataCache {
        &self.cache
    }

    pub(super) fn leader_for(&self, topic: &str, partition: i32) -> Option<i32> {
        self.cache.leader_for(topic, partition)
    }

    pub(super) fn broker(&self, broker_id: i32) -> Option<&BrokerAddress> {
        self.cache.broker(broker_id)
    }

    pub(super) fn contains_broker(&self, broker_id: i32) -> bool {
        self.cache.contains_broker(broker_id)
    }

    pub(super) fn invalidate_topic(&mut self, topic: &str) {
        self.cache.invalidate_topic(topic);
    }

    pub(super) fn next_refresh_in(&self, max_age: Duration) -> Duration {
        if self.tracked_topics.is_empty() {
            return Duration::from_millis(250);
        }

        let Some(last_refresh) = self.cache.last_refresh() else {
            return Duration::ZERO;
        };

        max_age.saturating_sub(last_refresh.elapsed())
    }

    pub(super) fn needs_periodic_refresh(&self, max_age: Duration) -> bool {
        !self.tracked_topics.is_empty() && self.next_refresh_in(max_age) == Duration::ZERO
    }

    pub(super) async fn refresh_tracked(&mut self, config: &ProducerConfig) -> AnyResult<()> {
        self.refresh_topics(config, self.tracked_topics.iter().cloned().collect())
            .await
    }

    pub(super) async fn refresh_topics(
        &mut self,
        config: &ProducerConfig,
        topics: Vec<String>,
    ) -> AnyResult<()> {
        if topics.is_empty() {
            return Ok(());
        }

        refresh_metadata(MetadataRefresh {
            bootstrap_servers: &config.bootstrap_servers,
            client_id: &config.client_id,
            request_timeout: config.request_timeout,
            security_protocol: config.security_protocol,
            tls: &config.tls,
            sasl: &config.sasl,
            tcp_connector: &config.tcp_connector,
            metadata: &mut self.cache,
            topics: &topics,
        })
        .await
    }
}