use regex::Regex;
use std::collections::HashSet;
use super::admin::KafkaAdmin;
use super::config::{KafkaConfig, SuppressionRule};
use crate::transport::error::{TransportError, TransportResult};
pub struct TopicResolver {
admin: KafkaAdmin,
suppression_rules: Vec<SuppressionRule>,
include_patterns: Vec<Regex>,
exclude_patterns: Vec<Regex>,
}
impl TopicResolver {
pub fn new(config: &KafkaConfig) -> TransportResult<Self> {
let admin = KafkaAdmin::new(config)?;
let include_patterns = compile_patterns(&config.topic_include)?;
let exclude_patterns = compile_patterns(&config.topic_exclude)?;
Ok(Self {
admin,
suppression_rules: config.topic_suppression_rules.clone(),
include_patterns,
exclude_patterns,
})
}
pub fn resolve(&self) -> TransportResult<Vec<String>> {
let all_topics = self.admin.list_topics()?;
tracing::debug!(total = all_topics.len(), "Fetched broker topic list");
let after_suppression = apply_suppression_rules(all_topics, &self.suppression_rules);
let mut resolved: Vec<String> = after_suppression
.into_iter()
.filter(|t| passes_filters(t, &self.include_patterns, &self.exclude_patterns))
.collect();
resolved.sort();
resolved.dedup();
tracing::info!(count = resolved.len(), topics = ?resolved, "Resolved Kafka topics");
Ok(resolved)
}
}
impl std::fmt::Debug for TopicResolver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TopicResolver")
.field(
"suppression_rules",
&self
.suppression_rules
.iter()
.map(|r| format!("{} → {}", r.preferred_suffix, r.suppressed_suffix))
.collect::<Vec<_>>(),
)
.field(
"include_patterns",
&self
.include_patterns
.iter()
.map(Regex::as_str)
.collect::<Vec<_>>(),
)
.field(
"exclude_patterns",
&self
.exclude_patterns
.iter()
.map(Regex::as_str)
.collect::<Vec<_>>(),
)
.finish_non_exhaustive()
}
}
#[must_use]
pub fn apply_suppression_rules(topics: Vec<String>, rules: &[SuppressionRule]) -> Vec<String> {
if rules.is_empty() {
return topics;
}
let mut result = topics;
for rule in rules {
let preferred_bases: HashSet<String> = result
.iter()
.filter_map(|t| {
t.strip_suffix(rule.preferred_suffix.as_str())
.map(String::from)
})
.collect();
result.retain(|t| {
if let Some(base) = t.strip_suffix(rule.suppressed_suffix.as_str()) {
!preferred_bases.contains(base)
} else {
true
}
});
}
result
}
#[must_use]
pub fn passes_filters(topic: &str, include: &[Regex], exclude: &[Regex]) -> bool {
if !include.is_empty() && !include.iter().any(|r| r.is_match(topic)) {
return false;
}
if exclude.iter().any(|r| r.is_match(topic)) {
return false;
}
true
}
fn compile_patterns(patterns: &[String]) -> TransportResult<Vec<Regex>> {
patterns
.iter()
.map(|p| {
Regex::new(p).map_err(|e| {
TransportError::Config(format!("Invalid topic filter regex '{p}': {e}"))
})
})
.collect()
}
pub struct TopicRefreshHandle {
rx: tokio::sync::watch::Receiver<Vec<String>>,
last_seen: Vec<String>,
_task: tokio::task::JoinHandle<()>,
}
impl TopicRefreshHandle {
pub fn check_changed(&mut self) -> Option<Vec<String>> {
if self.rx.has_changed().unwrap_or(false) {
self.rx.mark_changed();
let current = self.rx.borrow().clone();
if current == self.last_seen {
return None;
}
self.last_seen.clone_from(¤t);
Some(current)
} else {
None
}
}
#[must_use]
pub fn current(&self) -> Vec<String> {
self.rx.borrow().clone()
}
#[cfg(test)]
#[must_use]
pub fn new_for_test(rx: tokio::sync::watch::Receiver<Vec<String>>) -> Self {
let last_seen = rx.borrow().clone();
Self {
rx,
last_seen,
_task: tokio::task::spawn(async {}),
}
}
}
impl std::fmt::Debug for TopicRefreshHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TopicRefreshHandle")
.field("last_seen_count", &self.last_seen.len())
.finish_non_exhaustive()
}
}
impl TopicResolver {
#[must_use]
pub fn start_refresh_loop(
self,
interval: std::time::Duration,
shutdown: tokio_util::sync::CancellationToken,
) -> TopicRefreshHandle {
let initial = self.resolve().unwrap_or_default();
let (tx, rx) = tokio::sync::watch::channel(initial.clone());
let task = tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await;
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::debug!("Topic refresh loop shutting down");
break;
}
_tick = ticker.tick() => {
match self.resolve() {
Ok(new_topics) => {
if tx.send(new_topics).is_err() {
break; }
}
Err(e) => {
tracing::warn!(error = %e, "Topic refresh failed, retaining previous list");
}
}
}
}
}
});
TopicRefreshHandle {
rx,
last_seen: initial,
_task: task,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_suppression_load_over_land() {
let rules = vec![SuppressionRule {
preferred_suffix: "_load".into(),
suppressed_suffix: "_land".into(),
}];
let topics = vec![
"auth_land".into(),
"auth_load".into(),
"events_land".into(),
"syslog_load".into(),
];
let result = apply_suppression_rules(topics, &rules);
assert!(!result.contains(&"auth_land".to_string()));
assert!(result.contains(&"auth_load".to_string()));
assert!(result.contains(&"events_land".to_string()));
assert!(result.contains(&"syslog_load".to_string()));
assert_eq!(result.len(), 3);
}
#[test]
fn no_suppression_rules() {
let topics = vec!["auth_land".into(), "auth_load".into()];
let result = apply_suppression_rules(topics.clone(), &[]);
assert_eq!(result, topics);
}
#[test]
fn custom_suppression_rule() {
let rules = vec![SuppressionRule {
preferred_suffix: "_enriched".into(),
suppressed_suffix: "_raw".into(),
}];
let topics = vec![
"events_raw".into(),
"events_enriched".into(),
"other_raw".into(),
];
let result = apply_suppression_rules(topics, &rules);
assert!(!result.contains(&"events_raw".to_string()));
assert!(result.contains(&"events_enriched".to_string()));
assert!(result.contains(&"other_raw".to_string()));
}
#[test]
fn multiple_suppression_rules() {
let rules = vec![
SuppressionRule {
preferred_suffix: "_load".into(),
suppressed_suffix: "_land".into(),
},
SuppressionRule {
preferred_suffix: "_enriched".into(),
suppressed_suffix: "_raw".into(),
},
];
let topics = vec![
"auth_land".into(),
"auth_load".into(),
"events_raw".into(),
"events_enriched".into(),
"other_land".into(),
];
let result = apply_suppression_rules(topics, &rules);
assert_eq!(result.len(), 3);
assert!(result.contains(&"auth_load".to_string()));
assert!(result.contains(&"events_enriched".to_string()));
assert!(result.contains(&"other_land".to_string()));
}
#[test]
fn passes_filters_empty() {
assert!(passes_filters("auth_land", &[], &[]));
}
#[test]
fn passes_filters_include_only() {
let include = vec![Regex::new("^auth").unwrap()];
assert!(passes_filters("auth_land", &include, &[]));
assert!(!passes_filters("events_land", &include, &[]));
}
#[test]
fn passes_filters_exclude_only() {
let exclude = vec![Regex::new("^test_").unwrap()];
assert!(passes_filters("auth_land", &[], &exclude));
assert!(!passes_filters("test_land", &[], &exclude));
}
#[test]
fn passes_filters_both() {
let include = vec![Regex::new("_land$").unwrap()];
let exclude = vec![Regex::new("^test_").unwrap()];
assert!(passes_filters("auth_land", &include, &exclude));
assert!(!passes_filters("test_land", &include, &exclude));
assert!(!passes_filters("auth_load", &include, &exclude));
}
#[test]
fn compile_patterns_invalid_regex() {
let result = compile_patterns(&["[invalid".to_string()]);
assert!(result.is_err());
}
#[test]
fn compile_patterns_valid() {
let result = compile_patterns(&["^auth".to_string(), "_land$".to_string()]);
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 2);
}
#[test]
fn suppression_no_matching_pairs() {
let rules = vec![SuppressionRule {
preferred_suffix: "_load".into(),
suppressed_suffix: "_land".into(),
}];
let topics = vec!["auth_land".into(), "events_land".into()];
let result = apply_suppression_rules(topics.clone(), &rules);
assert_eq!(result, topics);
}
}