use crate::common::{CdcError, CdcEvent, Result};
use apache_avro::Schema as AvroSchema;
use rivven_core::{schema_registry::SchemaRegistry, Partition};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
pub type SchemaGenerator =
Box<dyn Fn(&str, &str, &[(String, i32, String)]) -> Result<AvroSchema> + Send + Sync>;
pub struct CdcConnector {
schema_cache: Arc<RwLock<HashMap<String, AvroSchema>>>,
partitions: Arc<RwLock<HashMap<String, Arc<Partition>>>>,
schema_registry: Option<Arc<dyn SchemaRegistry>>,
schema_generator: Option<SchemaGenerator>,
source_type: String,
}
impl CdcConnector {
pub fn new() -> Self {
Self {
schema_cache: Arc::new(RwLock::new(HashMap::new())),
partitions: Arc::new(RwLock::new(HashMap::new())),
schema_registry: None,
schema_generator: None,
source_type: "unknown".into(),
}
}
#[cfg(feature = "postgres")]
pub fn for_postgres() -> Self {
use crate::postgres::PostgresTypeMapper;
Self::new()
.with_source_type("postgres")
.with_schema_generator(Box::new(|schema, table, columns| {
PostgresTypeMapper::generate_avro_schema(schema, table, columns)
.map_err(|e| CdcError::Schema(e.to_string()))
}))
}
#[cfg(feature = "mysql")]
pub fn for_mysql() -> Self {
use crate::mysql::MySqlTypeMapper;
Self::new()
.with_source_type("mysql")
.with_schema_generator(Box::new(|schema, table, columns| {
MySqlTypeMapper::generate_avro_schema(schema, table, columns)
.map_err(|e| CdcError::Schema(e.to_string()))
}))
}
#[cfg(feature = "mariadb")]
pub fn for_mariadb() -> Self {
Self::for_mysql().with_source_type("mariadb")
}
pub fn with_source_type(mut self, source_type: &str) -> Self {
self.source_type = source_type.into();
self
}
pub fn with_schema_registry(mut self, registry: Arc<dyn SchemaRegistry>) -> Self {
self.schema_registry = Some(registry);
self
}
pub fn with_schema_generator(mut self, generator: SchemaGenerator) -> Self {
self.schema_generator = Some(generator);
self
}
pub fn source_type(&self) -> &str {
&self.source_type
}
pub async fn register_topic(&self, topic: String, partition: Arc<Partition>) {
let mut partitions = self.partitions.write().await;
partitions.insert(topic.clone(), partition);
info!("Registered CDC topic: {}", topic);
}
pub async fn unregister_topic(&self, topic: &str) -> Option<Arc<Partition>> {
let mut partitions = self.partitions.write().await;
let result = partitions.remove(topic);
if result.is_some() {
info!("Unregistered CDC topic: {}", topic);
}
result
}
pub async fn topics(&self) -> Vec<String> {
let partitions = self.partitions.read().await;
partitions.keys().cloned().collect()
}
pub async fn get_or_infer_schema(
&self,
database: &str,
schema: &str,
table: &str,
columns: &[(String, i32, String)],
) -> Result<AvroSchema> {
let cache_key = format!("{}.{}", schema, table);
{
let cache = self.schema_cache.read().await;
if let Some(avro_schema) = cache.get(&cache_key) {
return Ok(avro_schema.clone());
}
}
info!(
"Inferring Avro schema for {}.{}.{} ({})",
database, schema, table, self.source_type
);
let avro_schema = match &self.schema_generator {
Some(generator) => generator(schema, table, columns)?,
None => {
return Err(CdcError::Schema("No schema generator configured".into()));
}
};
if let Some(registry) = &self.schema_registry {
let subject = format!("cdc.{}.{}.{}", database, schema, table);
let schema_str = avro_schema.canonical_form();
match registry.register(&subject, &schema_str).await {
Ok(schema_id) => {
info!("Registered schema {} with ID {}", subject, schema_id);
}
Err(e) => {
warn!("Failed to register schema {}: {}", subject, e);
}
}
}
let mut cache = self.schema_cache.write().await;
cache.insert(cache_key, avro_schema.clone());
Ok(avro_schema)
}
pub async fn has_schema(&self, schema: &str, table: &str) -> bool {
let cache_key = format!("{}.{}", schema, table);
let cache = self.schema_cache.read().await;
cache.contains_key(&cache_key)
}
pub async fn clear_schema_cache(&self) {
let mut cache = self.schema_cache.write().await;
cache.clear();
info!("Cleared schema cache");
}
pub async fn route_event(&self, event: &CdcEvent) -> Result<u64> {
let topic_name = format!("cdc.{}.{}.{}", event.database, event.schema, event.table);
let partitions = self.partitions.read().await;
let partition = partitions.get(&topic_name).ok_or_else(|| {
CdcError::Topic(format!("No partition registered for topic: {}", topic_name))
})?;
let message = event
.to_message()
.map_err(|e| CdcError::Serialization(e.to_string()))?;
let offset = partition
.append(message)
.await
.map_err(|e| CdcError::Io(std::io::Error::other(e.to_string())))?;
debug!(
"Routed {:?} event for {}.{} to topic {} at offset {}",
event.op, event.schema, event.table, topic_name, offset
);
Ok(offset)
}
pub async fn process_events(&self, events: Vec<CdcEvent>) -> Result<usize> {
let mut success_count = 0;
for event in events {
match self.route_event(&event).await {
Ok(_) => success_count += 1,
Err(e) => {
warn!("Failed to route event: {}", e);
}
}
}
Ok(success_count)
}
pub async fn process_events_with_callback<F>(
&self,
events: Vec<CdcEvent>,
mut callback: F,
) -> Result<usize>
where
F: FnMut(&CdcEvent, std::result::Result<u64, &CdcError>),
{
let mut success_count = 0;
for event in events {
match self.route_event(&event).await {
Ok(offset) => {
callback(&event, Ok(offset));
success_count += 1;
}
Err(e) => {
callback(&event, Err(&e));
warn!("Failed to route event: {}", e);
}
}
}
Ok(success_count)
}
}
impl Default for CdcConnector {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for CdcConnector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CdcConnector")
.field("source_type", &self.source_type)
.field("has_schema_registry", &self.schema_registry.is_some())
.field("has_schema_generator", &self.schema_generator.is_some())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::common::CdcOp;
#[test]
fn test_topic_naming() {
let event = CdcEvent {
source_type: "postgres".into(),
database: "mydb".into(),
schema: "public".into(),
table: "users".into(),
op: CdcOp::Insert,
before: None,
after: Some(serde_json::json!({"id": 1, "name": "Alice"})),
timestamp: 0,
transaction: None,
};
let topic_name = format!("cdc.{}.{}.{}", event.database, event.schema, event.table);
assert_eq!(topic_name, "cdc.mydb.public.users");
}
#[test]
fn test_connector_creation() {
let connector = CdcConnector::new();
assert_eq!(connector.source_type(), "unknown");
}
#[test]
fn test_connector_with_source_type() {
let connector = CdcConnector::new().with_source_type("postgres");
assert_eq!(connector.source_type(), "postgres");
}
#[cfg(feature = "postgres")]
#[test]
fn test_postgres_connector() {
let connector = CdcConnector::for_postgres();
assert_eq!(connector.source_type(), "postgres");
assert!(connector.schema_generator.is_some());
}
#[cfg(feature = "mysql")]
#[test]
fn test_mysql_connector() {
let connector = CdcConnector::for_mysql();
assert_eq!(connector.source_type(), "mysql");
assert!(connector.schema_generator.is_some());
}
#[tokio::test]
async fn test_schema_cache_operations() {
let connector = CdcConnector::new();
assert!(!connector.has_schema("public", "users").await);
connector.clear_schema_cache().await;
}
#[tokio::test]
async fn test_topic_registration() {
let connector = CdcConnector::new();
assert!(connector.topics().await.is_empty());
}
}