use std::collections::HashSet;
use std::time::Duration;
use anyhow::Result as AnyResult;
use crate::config::ProducerConfig;
use crate::metadata::{BrokerAddress, MetadataCache, MetadataRefresh, refresh_metadata};
#[derive(Default)]
pub(super) struct ProducerMetadata {
cache: MetadataCache,
tracked_topics: HashSet<String>,
}
impl ProducerMetadata {
pub(super) fn track(&mut self, topic: String) {
self.tracked_topics.insert(topic);
}
pub(super) fn cache(&self) -> &MetadataCache {
&self.cache
}
pub(super) fn leader_for(&self, topic: &str, partition: i32) -> Option<i32> {
self.cache.leader_for(topic, partition)
}
pub(super) fn broker(&self, broker_id: i32) -> Option<&BrokerAddress> {
self.cache.broker(broker_id)
}
pub(super) fn contains_broker(&self, broker_id: i32) -> bool {
self.cache.contains_broker(broker_id)
}
pub(super) fn invalidate_topic(&mut self, topic: &str) {
self.cache.invalidate_topic(topic);
}
pub(super) fn next_refresh_in(&self, max_age: Duration) -> Duration {
if self.tracked_topics.is_empty() {
return Duration::from_millis(250);
}
let Some(last_refresh) = self.cache.last_refresh() else {
return Duration::ZERO;
};
max_age.saturating_sub(last_refresh.elapsed())
}
pub(super) fn needs_periodic_refresh(&self, max_age: Duration) -> bool {
!self.tracked_topics.is_empty() && self.next_refresh_in(max_age) == Duration::ZERO
}
pub(super) async fn refresh_tracked(&mut self, config: &ProducerConfig) -> AnyResult<()> {
self.refresh_topics(config, self.tracked_topics.iter().cloned().collect())
.await
}
pub(super) async fn refresh_topics(
&mut self,
config: &ProducerConfig,
topics: Vec<String>,
) -> AnyResult<()> {
if topics.is_empty() {
return Ok(());
}
refresh_metadata(MetadataRefresh {
bootstrap_servers: &config.bootstrap_servers,
client_id: &config.client_id,
request_timeout: config.request_timeout,
security_protocol: config.security_protocol,
tls: &config.tls,
sasl: &config.sasl,
tcp_connector: &config.tcp_connector,
metadata: &mut self.cache,
topics: &topics,
})
.await
}
}