use futures_util::future::join_all;
use crate::core::{Error, Event, Result};
use crate::sink::{BoxedSink, SinkAdapter, SinkDeliveryGuarantee, SinkDeliveryMetrics};
fn glob_segment_matches(pattern: &str, s: &str) -> bool {
if pattern == "*" {
return true;
}
if !pattern.contains(['*', '?']) {
return pattern == s;
}
glob_match(pattern.as_bytes(), s.as_bytes())
}
fn glob_match(pat: &[u8], s: &[u8]) -> bool {
match (pat.split_first(), s.split_first()) {
(None, None) => true,
(None, Some(_)) => false,
(Some((&b'*', rest_pat)), _) => {
glob_match(rest_pat, s) || (!s.is_empty() && glob_match(pat, &s[1..]))
}
(Some((&b'?', rest_pat)), Some((_, rest_s))) => glob_match(rest_pat, rest_s),
(Some((p, rest_pat)), Some((c, rest_s))) => p == c && glob_match(rest_pat, rest_s),
_ => false,
}
}
pub fn table_matches(pattern: &str, table_key: &str) -> bool {
if pattern == "*" {
return true;
}
let pat_dot = pattern.find('.');
let tbl_dot = table_key.find('.');
match (pat_dot, tbl_dot) {
(Some(pi), Some(ti)) => {
let (ps, pt) = pattern.split_at(pi);
let (ts, tt) = table_key.split_at(ti);
glob_segment_matches(ps, ts) && glob_segment_matches(&pt[1..], &tt[1..])
}
(Some(_), None) => false,
(None, Some(ti)) => {
let (_, tt) = table_key.split_at(ti);
glob_segment_matches(pattern, &tt[1..])
}
(None, None) => glob_segment_matches(pattern, table_key),
}
}
pub type HeterogeneousTableRouter = TableRouter<BoxedSink>;
#[derive(Debug)]
pub struct TableRoute<S> {
pub pattern: String,
pub sink: S,
}
impl<S> TableRoute<S> {
pub fn new(pattern: impl Into<String>, sink: S) -> Self {
Self {
pattern: pattern.into(),
sink,
}
}
}
pub struct TableRouterBuilder<S> {
name: String,
routes: Vec<TableRoute<S>>,
default: Option<S>,
drop_unrouted: bool,
}
impl<S: SinkAdapter> TableRouterBuilder<S> {
fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
routes: Vec::new(),
default: None,
drop_unrouted: false,
}
}
pub fn route(mut self, pattern: impl Into<String>, sink: S) -> Self {
self.routes.push(TableRoute::new(pattern, sink));
self
}
pub fn default(mut self, sink: S) -> Self {
self.default = Some(sink);
self
}
pub fn drop_unrouted(mut self, drop: bool) -> Self {
self.drop_unrouted = drop;
self
}
pub fn build(self) -> Result<TableRouter<S>> {
let mut errors: Vec<String> = Vec::new();
let mut seen = std::collections::HashSet::new();
for route in &self.routes {
let pat = route.pattern.as_str();
if pat.is_empty() {
errors.push("empty pattern".into());
} else if !seen.insert(pat) {
errors.push(format!("duplicate pattern '{pat}'"));
} else if let Some(dot) = pat.find('.') {
let (schema, rest) = pat.split_at(dot);
let table = &rest[1..];
if schema.is_empty() || table.is_empty() {
errors.push(format!(
"malformed pattern '{pat}': schema and table segments must both be non-empty"
));
}
}
}
if !errors.is_empty() {
return Err(Error::ConfigError(format!(
"TableRouterBuilder '{}': invalid route pattern(s): {}",
self.name,
errors.join(", "),
)));
}
Ok(self.build_unchecked())
}
pub fn build_unchecked(self) -> TableRouter<S> {
TableRouter {
name: self.name,
routes: self.routes,
default: self.default,
drop_unrouted: self.drop_unrouted,
closed: false,
}
}
}
#[derive(Debug)]
pub struct TableRouter<S> {
name: String,
routes: Vec<TableRoute<S>>,
default: Option<S>,
drop_unrouted: bool,
closed: bool,
}
impl<S: SinkAdapter> TableRouter<S> {
pub fn builder(name: impl Into<String>) -> TableRouterBuilder<S> {
TableRouterBuilder::new(name)
}
pub fn new(name: impl Into<String>, routes: Vec<TableRoute<S>>, default: Option<S>) -> Self {
Self {
name: name.into(),
routes,
default,
drop_unrouted: false,
closed: false,
}
}
pub fn routes(&self) -> &[TableRoute<S>] {
&self.routes
}
pub fn default_sink(&self) -> Option<&S> {
self.default.as_ref()
}
pub fn route_for(&self, event: &Event) -> Option<&S> {
let key = event.qualified_table_name();
for route in &self.routes {
if table_matches(&route.pattern, &key) {
return Some(&route.sink);
}
}
self.default.as_ref()
}
pub fn matched_pattern_for(&self, event: &Event) -> Option<&str> {
let key = event.qualified_table_name();
for route in &self.routes {
if table_matches(&route.pattern, &key) {
return Some(&route.pattern);
}
}
if self.default.is_some() {
Some("*")
} else {
None
}
}
pub fn pattern_names(&self) -> impl Iterator<Item = &str> {
self.routes.iter().map(|r| r.pattern.as_str())
}
pub async fn flush_all(&mut self) -> Result<()> {
let mut errors: Vec<String> = Vec::new();
for route in &mut self.routes {
if let Err(e) = route.sink.flush().await {
errors.push(format!("route '{}': {e}", route.pattern));
}
}
if let Some(ref mut d) = self.default {
if let Err(e) = d.flush().await {
errors.push(format!("default: {e}"));
}
}
if errors.is_empty() {
Ok(())
} else {
Err(Error::StateError(errors.join("; ")))
}
}
pub async fn close_all(&mut self) -> Result<()> {
let mut errors: Vec<String> = Vec::new();
for route in &mut self.routes {
if let Err(e) = route.sink.close().await {
errors.push(format!("route '{}': {e}", route.pattern));
}
}
if let Some(ref mut d) = self.default {
if let Err(e) = d.close().await {
errors.push(format!("default: {e}"));
}
}
self.closed = true;
if errors.is_empty() {
Ok(())
} else {
Err(Error::StateError(errors.join("; ")))
}
}
}
impl<S: SinkAdapter> SinkAdapter for TableRouter<S> {
async fn send(&mut self, event: &Event) -> Result<()> {
if self.closed {
return Err(Error::StateError("TableRouter is closed".into()));
}
let key = event.qualified_table_name();
for route in &mut self.routes {
if table_matches(&route.pattern, &key) {
return route.sink.send(event).await;
}
}
if let Some(ref mut default_sink) = self.default {
return default_sink.send(event).await;
}
if self.drop_unrouted {
Ok(())
} else {
Err(Error::StateError(format!(
"TableRouter '{}': no route matched '{key}' and no default sink is configured",
self.name,
)))
}
}
async fn flush(&mut self) -> Result<()> {
self.flush_all().await
}
async fn close(&mut self) -> Result<()> {
self.close_all().await
}
fn name(&self) -> &str {
&self.name
}
fn is_closed(&self) -> bool {
self.closed
}
fn delivery_guarantee(&self) -> SinkDeliveryGuarantee {
let mut g = SinkDeliveryGuarantee::EffectivelyOnce;
let mut has_sink = false;
for route in &self.routes {
g = g.weakest(route.sink.delivery_guarantee());
has_sink = true;
}
if let Some(ref d) = self.default {
g = g.weakest(d.delivery_guarantee());
has_sink = true;
}
if has_sink {
g
} else {
SinkDeliveryGuarantee::default()
}
}
fn idempotent_delivery_capable(&self) -> bool {
self.routes
.iter()
.all(|r| r.sink.idempotent_delivery_capable())
&& self
.default
.as_ref()
.is_none_or(|d| d.idempotent_delivery_capable())
}
fn transactional_checkpoint_barrier_capable(&self) -> bool {
!self.routes.is_empty()
&& self
.routes
.iter()
.all(|r| r.sink.transactional_checkpoint_barrier_capable())
&& self
.default
.as_ref()
.is_none_or(|d| d.transactional_checkpoint_barrier_capable())
}
fn queue_depth(&self) -> Option<usize> {
let mut total: usize = 0;
let mut any = false;
for route in &self.routes {
if let Some(d) = route.sink.queue_depth() {
total = total.saturating_add(d);
any = true;
}
}
if let Some(ref d) = self.default {
if let Some(depth) = d.queue_depth() {
total = total.saturating_add(depth);
any = true;
}
}
if any {
Some(total)
} else {
None
}
}
fn flush_tick_interval(&self) -> Option<std::time::Duration> {
self.routes
.iter()
.filter_map(|r| r.sink.flush_tick_interval())
.chain(self.default.iter().filter_map(|d| d.flush_tick_interval()))
.min()
}
fn delivery_metrics(&self) -> Option<SinkDeliveryMetrics> {
let mut any = false;
let mut agg = SinkDeliveryMetrics::default();
for route in &self.routes {
if let Some(m) = route.sink.delivery_metrics() {
agg.merge(&m);
any = true;
}
}
if let Some(ref d) = self.default {
if let Some(m) = d.delivery_metrics() {
agg.merge(&m);
any = true;
}
}
if any {
Some(agg)
} else {
None
}
}
async fn begin_checkpoint_barrier(&mut self) -> Result<()> {
let n = self.routes.len();
for i in 0..n {
if let Err(e) = self.routes[i].sink.begin_checkpoint_barrier().await {
for j in (0..i).rev() {
let _ = self.routes[j].sink.abort_checkpoint_barrier().await;
}
return Err(e);
}
}
if let Some(ref mut d) = self.default {
if let Err(e) = d.begin_checkpoint_barrier().await {
for j in (0..n).rev() {
let _ = self.routes[j].sink.abort_checkpoint_barrier().await;
}
return Err(e);
}
}
Ok(())
}
async fn commit_checkpoint_barrier(&mut self) -> Result<()> {
type B<'a> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>;
let mut futs: Vec<B<'_>> = self
.routes
.iter_mut()
.map(|r| -> B<'_> { Box::pin(r.sink.commit_checkpoint_barrier()) })
.collect();
if let Some(ref mut d) = self.default {
futs.push(Box::pin(d.commit_checkpoint_barrier()));
}
join_all(futs)
.await
.into_iter()
.find(|r| r.is_err())
.unwrap_or(Ok(()))
}
async fn abort_checkpoint_barrier(&mut self) -> Result<()> {
type B<'a> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>;
let mut futs: Vec<B<'_>> = self
.routes
.iter_mut()
.map(|r| -> B<'_> { Box::pin(r.sink.abort_checkpoint_barrier()) })
.collect();
if let Some(ref mut d) = self.default {
futs.push(Box::pin(d.abort_checkpoint_barrier()));
}
join_all(futs)
.await
.into_iter()
.find(|r| r.is_err())
.unwrap_or(Ok(()))
}
async fn preflight_check(&mut self) -> Result<()> {
type B<'a> = std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>>;
let mut futs: Vec<B<'_>> = self
.routes
.iter_mut()
.map(|r| -> B<'_> { Box::pin(r.sink.preflight_check()) })
.collect();
if let Some(ref mut d) = self.default {
futs.push(Box::pin(d.preflight_check()));
}
join_all(futs)
.await
.into_iter()
.find(|r| r.is_err())
.unwrap_or(Ok(()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{Operation, SourceMetadata, EVENT_ENVELOPE_VERSION};
use crate::sink::MemorySinkAdapter;
fn make_event(schema: Option<&str>, table: &str) -> Event {
Event {
before: None,
after: Some(serde_json::json!({"id": 1})),
op: Operation::Insert,
source: SourceMetadata {
source_name: "test".into(),
offset: "0".into(),
timestamp: 0,
},
ts: 0,
schema: schema.map(Into::into),
table: table.into(),
primary_key: None,
snapshot: None,
transaction: None,
envelope_version: EVENT_ENVELOPE_VERSION,
before_is_key_only: false,
}
}
#[test]
fn glob_star_matches_anything() {
assert!(glob_segment_matches("*", "foo"));
assert!(glob_segment_matches("*", ""));
}
#[test]
fn glob_exact_matches() {
assert!(glob_segment_matches("orders", "orders"));
assert!(!glob_segment_matches("orders", "order"));
}
#[test]
fn glob_suffix_wildcard() {
assert!(glob_segment_matches("order*", "orders"));
assert!(glob_segment_matches("order*", "order_items"));
assert!(!glob_segment_matches("order*", "my_orders"));
}
#[test]
fn glob_prefix_wildcard() {
assert!(glob_segment_matches("*_audit", "user_audit"));
assert!(!glob_segment_matches("*_audit", "audit_log"));
}
#[test]
fn glob_question_mark() {
assert!(glob_segment_matches("t_?", "t_1"));
assert!(glob_segment_matches("t_?", "t_a"));
assert!(!glob_segment_matches("t_?", "t_12"));
}
#[test]
fn catch_all_matches_qualified() {
assert!(table_matches("*", "public.orders"));
}
#[test]
fn catch_all_matches_bare() {
assert!(table_matches("*", "orders"));
}
#[test]
fn schema_wildcard_matches_correct_schema() {
assert!(table_matches("public.*", "public.orders"));
assert!(!table_matches("public.*", "private.orders"));
}
#[test]
fn table_wildcard_matches_correct_table() {
assert!(table_matches("*.orders", "public.orders"));
assert!(!table_matches("*.orders", "public.products"));
}
#[test]
fn exact_qualified_match() {
assert!(table_matches("public.orders", "public.orders"));
assert!(!table_matches("public.orders", "public.products"));
}
#[test]
fn bare_pattern_matches_table_part_of_qualified() {
assert!(table_matches("orders", "public.orders"));
}
#[test]
fn qualified_pattern_does_not_match_bare_table() {
assert!(!table_matches("public.orders", "orders"));
}
#[tokio::test]
async fn routes_event_to_matching_sink() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("public.orders", MemorySinkAdapter::new("orders"))
.route("public.products", MemorySinkAdapter::new("products"))
.build()
.expect("valid test routes");
let event = make_event(Some("public"), "orders");
router.send(&event).await.unwrap();
let orders = router.routes()[0].sink.events();
let products = router.routes()[1].sink.events();
assert_eq!(orders.len(), 1);
assert_eq!(products.len(), 0);
}
#[tokio::test]
async fn unmatched_event_goes_to_default_sink() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("public.orders", MemorySinkAdapter::new("orders"))
.default(MemorySinkAdapter::new("fallback"))
.build()
.expect("valid test routes");
let event = make_event(Some("public"), "customers");
router.send(&event).await.unwrap();
assert_eq!(router.routes()[0].sink.events().len(), 0);
assert_eq!(
router
.default_sink()
.unwrap()
.exported_events()
.unwrap()
.len(),
1
);
}
#[tokio::test]
async fn drop_unrouted_silently_discards_event() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("public.orders", MemorySinkAdapter::new("orders"))
.drop_unrouted(true)
.build()
.expect("valid test routes");
let event = make_event(Some("public"), "unrelated");
router.send(&event).await.unwrap(); assert_eq!(router.routes()[0].sink.events().len(), 0);
}
#[tokio::test]
async fn no_route_and_no_default_returns_error_when_strict() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("strict")
.route("public.orders", MemorySinkAdapter::new("orders"))
.drop_unrouted(false) .build()
.expect("valid test routes");
let event = make_event(Some("public"), "unrelated");
let result = router.send(&event).await;
assert!(
result.is_err(),
"should error on unmatched event in strict mode"
);
}
#[tokio::test]
async fn first_matching_route_wins() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("*", MemorySinkAdapter::new("catch-all"))
.route("public.orders", MemorySinkAdapter::new("orders"))
.build()
.expect("valid test routes");
let event = make_event(Some("public"), "orders");
router.send(&event).await.unwrap();
assert_eq!(router.routes()[0].sink.events().len(), 1);
assert_eq!(router.routes()[1].sink.events().len(), 0);
}
#[tokio::test]
async fn flush_all_propagates_errors() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("*", MemorySinkAdapter::new("a"))
.build()
.expect("valid test routes");
router.flush_all().await.unwrap();
}
#[tokio::test]
async fn sink_adapter_send_after_close_returns_error() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("*", MemorySinkAdapter::new("a"))
.build()
.expect("valid test routes");
router.close().await.unwrap();
let event = make_event(Some("public"), "orders");
assert!(router.send(&event).await.is_err());
}
#[tokio::test]
async fn route_for_returns_correct_sink() {
let router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("public.orders", MemorySinkAdapter::new("orders"))
.default(MemorySinkAdapter::new("fallback"))
.build()
.expect("valid test routes");
let orders_event = make_event(Some("public"), "orders");
let other_event = make_event(Some("public"), "customers");
let orders_sink = router.route_for(&orders_event).unwrap();
assert_eq!(orders_sink.name(), "orders");
let fallback = router.route_for(&other_event).unwrap();
assert_eq!(fallback.name(), "fallback");
}
#[tokio::test]
async fn glob_prefix_route_matches_multiple_tables() {
let mut router: TableRouter<MemorySinkAdapter> = TableRouter::builder("test")
.route("public.order*", MemorySinkAdapter::new("orders"))
.build()
.expect("valid test routes");
router
.send(&make_event(Some("public"), "orders"))
.await
.unwrap();
router
.send(&make_event(Some("public"), "order_items"))
.await
.unwrap();
assert_eq!(router.routes()[0].sink.events().len(), 2);
}
}