use std::ops::Deref;
use parking_lot::Mutex;
use tracing::{debug, info};
use crate::protocol::messages::MetadataResponse;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MetadataCacheGeneration(usize);
#[derive(Debug)]
pub(crate) struct MetadataCache {
cache: Mutex<(Option<MetadataResponse>, MetadataCacheGeneration)>,
}
impl Default for MetadataCache {
fn default() -> Self {
Self {
cache: Mutex::new((None, MetadataCacheGeneration(0))),
}
}
}
impl MetadataCache {
pub(crate) fn get(
&self,
topics: &Option<Vec<String>>,
) -> Option<(MetadataResponse, MetadataCacheGeneration)> {
let (mut m, r#gen) = match self.cache.lock().deref() {
(Some(m), r#gen) => (m.clone(), *r#gen),
(None, _) => {
return None;
}
};
if let Some(want) = topics {
m.topics.retain(|t| want.contains(&t.name.0));
if m.topics.len() != want.len() {
debug!("cached metadata query for unknown topic");
self.invalidate("get from metadata cache: unknown topic", r#gen);
return None;
}
}
debug!(?m, "using cached metadata response");
Some((m, r#gen))
}
pub(crate) fn invalidate(&self, reason: &'static str, r#gen: MetadataCacheGeneration) {
let mut guard = self.cache.lock();
if guard.1 != r#gen {
debug!(
reason,
current_gen = guard.1.0,
request_gen = r#gen.0,
"stale invalidation request for metadata cache",
);
return;
}
guard.0 = None;
info!(reason, "invalidated metadata cache",);
}
pub(crate) fn update(&self, m: MetadataResponse) {
let mut guard = self.cache.lock();
guard.0 = Some(m);
guard.1.0 += 1;
debug!("updated metadata cache");
}
}
#[cfg(test)]
mod tests {
use crate::protocol::{
messages::MetadataResponseTopic,
primitives::{Int32, String_},
};
use super::*;
fn response_with_topics(topics: Option<&'static [&'static str]>) -> MetadataResponse {
let topics = topics
.into_iter()
.flatten()
.map(|t| MetadataResponseTopic {
name: String_(t.to_string()),
error: Default::default(),
is_internal: Default::default(),
partitions: Default::default(),
})
.collect();
MetadataResponse {
throttle_time_ms: Some(Int32(42)),
brokers: Default::default(),
cluster_id: Default::default(),
controller_id: Default::default(),
topics,
}
}
#[test]
fn test_get() {
let cache = MetadataCache::default();
assert!(cache.get(&None).is_none());
let m = response_with_topics(None);
cache.update(m.clone());
let (got, _gen) = cache.get(&None).expect("should have cached entry");
assert_eq!(m, got);
}
#[test]
fn test_get_topic_subset_filtered() {
let cache = MetadataCache::default();
cache.update(response_with_topics(Some(&["bananas", "platanos"])));
let (got, _gen) = cache
.get(&Some(vec!["bananas".to_string()]))
.expect("should have cached entry");
assert_eq!(response_with_topics(Some(&["bananas"])), got);
let (got, _gen) = cache.get(&Some(vec![])).expect("should have cached entry");
assert_eq!(response_with_topics(Some(&[])), got);
let (got, _gen) = cache.get(&None).expect("should have cached entry");
assert_eq!(response_with_topics(Some(&["bananas", "platanos"])), got);
}
#[test]
fn test_get_missing_topic_invalidate() {
let cache = MetadataCache::default();
cache.update(response_with_topics(Some(&["bananas", "platanos"])));
assert!(cache.get(&Some(vec!["bananas".to_string()])).is_some());
assert!(cache.get(&Some(vec!["goats".to_string()])).is_none());
assert!(cache.get(&Some(vec!["bananas".to_string()])).is_none());
}
#[test]
fn test_explicit_invalidate() {
let cache = MetadataCache::default();
cache.update(MetadataResponse {
throttle_time_ms: Default::default(),
brokers: Default::default(),
cluster_id: Default::default(),
controller_id: Default::default(),
topics: Default::default(),
});
let (_data, gen1) = cache.get(&None).unwrap();
cache.invalidate("test", gen1);
assert!(cache.get(&None).is_none());
cache.update(MetadataResponse {
throttle_time_ms: Default::default(),
brokers: Default::default(),
cluster_id: Default::default(),
controller_id: Default::default(),
topics: Default::default(),
});
let (_data, gen2) = cache.get(&None).unwrap();
cache.invalidate("test", gen1);
assert!(cache.get(&None).is_some());
cache.invalidate("test", gen2);
assert!(cache.get(&None).is_none());
}
}