use std::sync::{Arc, RwLock};
use tracing::warn;
use crate::metadata_group::cache::MetadataCache;
use crate::metadata_group::codec::decode_entry;
pub trait MetadataApplier: Send + Sync + 'static {
fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64;
}
#[derive(Clone)]
pub struct CacheApplier {
cache: Arc<RwLock<MetadataCache>>,
}
impl CacheApplier {
pub fn new(cache: Arc<RwLock<MetadataCache>>) -> Self {
Self { cache }
}
pub fn cache(&self) -> Arc<RwLock<MetadataCache>> {
self.cache.clone()
}
}
impl MetadataApplier for CacheApplier {
fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
let mut last = 0u64;
let mut guard = self
.cache
.write()
.unwrap_or_else(|poison| poison.into_inner());
for (index, data) in entries {
last = *index;
if data.is_empty() {
continue;
}
match decode_entry(data) {
Ok(entry) => guard.apply(*index, &entry),
Err(e) => warn!(index = *index, error = %e, "metadata decode failed"),
}
}
last
}
}
pub struct NoopMetadataApplier;
impl MetadataApplier for NoopMetadataApplier {
fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
entries.last().map(|(idx, _)| *idx).unwrap_or(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::metadata_group::codec::encode_entry;
use crate::metadata_group::entry::{MetadataEntry, TopologyChange};
#[test]
fn cache_applier_counts_catalog_ddl() {
let cache = Arc::new(RwLock::new(MetadataCache::new()));
let applier = CacheApplier::new(cache.clone());
let ddl = encode_entry(&MetadataEntry::CatalogDdl {
payload: vec![1, 2, 3],
})
.unwrap();
let topo = encode_entry(&MetadataEntry::TopologyChange(TopologyChange::Join {
node_id: 7,
addr: "10.0.0.7:9000".into(),
}))
.unwrap();
let last = applier.apply(&[(1, ddl), (2, topo)]);
assert_eq!(last, 2);
let guard = cache.read().unwrap();
assert_eq!(guard.applied_index, 2);
assert_eq!(guard.catalog_entries_applied, 1);
assert_eq!(guard.topology_log.len(), 1);
}
#[test]
fn cache_applier_idempotent() {
let cache = Arc::new(RwLock::new(MetadataCache::new()));
let applier = CacheApplier::new(cache.clone());
let bytes = encode_entry(&MetadataEntry::CatalogDdl {
payload: vec![9, 9],
})
.unwrap();
applier.apply(&[(5, bytes.clone())]);
applier.apply(&[(3, bytes)]);
let guard = cache.read().unwrap();
assert_eq!(guard.applied_index, 5);
assert_eq!(guard.catalog_entries_applied, 1);
}
#[test]
fn noop_applier_advances_watermark() {
let noop = NoopMetadataApplier;
assert_eq!(noop.apply(&[(7, b"x".to_vec()), (9, b"y".to_vec())]), 9);
assert_eq!(noop.apply(&[]), 0);
}
}