use async_trait::async_trait;
use chrono::{DateTime, Utc};
use sea_orm::entity::prelude::*;
use sea_orm::{ActiveValue, Database, QueryOrder, QuerySelect, TransactionTrait};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub const SYNC_STATUS_PENDING: &str = "pending";
pub const SYNC_STATUS_SYNCED: &str = "synced";
pub const SYNC_STATUS_FAILED: &str = "failed";
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "global_index")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: String,
pub table_name: String,
pub record_id: String,
pub shard_id: i32,
pub index_key: String,
pub index_value: String,
pub created_at: String,
pub updated_at: String,
pub last_modified: String,
pub sync_status: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}
impl ActiveModelBehavior for ActiveModel {}
#[derive(Debug, Clone)]
pub struct IndexEntry {
pub table_name: String,
pub record_id: String,
pub shard_id: u32,
pub index_key: String,
pub index_value: String,
}
#[derive(Debug, Clone)]
pub enum SyncEvent {
Insert {
table_name: String,
record_id: String,
shard_id: u32,
index_key: String,
index_value: String,
},
Update {
table_name: String,
record_id: String,
shard_id: u32,
old_index_key: String,
old_index_value: String,
new_index_key: String,
new_index_value: String,
},
Delete {
table_name: String,
record_id: String,
shard_id: u32,
index_key: String,
index_value: String,
},
}
#[derive(Debug, Clone)]
pub struct ChangeCaptureConfig {
pub batch_size: usize,
pub poll_interval_ms: u64,
pub max_retries: u32,
pub retry_interval_ms: u64,
pub cache_ttl_seconds: u64,
}
impl Default for ChangeCaptureConfig {
fn default() -> Self {
Self {
batch_size: 1000,
poll_interval_ms: 1000,
max_retries: 3,
retry_interval_ms: 5000,
cache_ttl_seconds: 300, }
}
}
type IndexCache = HashMap<String, HashMap<String, HashMap<String, Vec<IndexEntry>>>>;
type ReverseIndex = HashMap<String, Vec<IndexLocation>>;
#[derive(Debug, Clone, PartialEq, Eq)]
struct IndexLocation {
table_name: String,
index_key: String,
index_value: String,
created_at: std::time::Instant,
}
impl std::hash::Hash for IndexLocation {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.table_name.hash(state);
self.index_key.hash(state);
self.index_value.hash(state);
}
}
#[derive(Debug)]
pub struct GlobalIndex {
conn: DatabaseConnection,
cache: Arc<RwLock<IndexCache>>,
reverse_index: Arc<RwLock<ReverseIndex>>,
config: ChangeCaptureConfig,
}
impl GlobalIndex {
pub async fn new(database_url: &str) -> Result<Self, DbErr> {
let conn = Database::connect(database_url).await?;
Self::init_schema(&conn).await?;
Ok(Self {
conn,
cache: Arc::new(RwLock::new(HashMap::new())),
reverse_index: Arc::new(RwLock::new(HashMap::new())),
config: ChangeCaptureConfig::default(),
})
}
async fn init_schema(conn: &DatabaseConnection) -> Result<(), DbErr> {
let create_sql = r#"
CREATE TABLE IF NOT EXISTS global_index (
id VARCHAR(64) PRIMARY KEY,
table_name VARCHAR(128) NOT NULL,
record_id VARCHAR(128) NOT NULL,
shard_id INTEGER NOT NULL,
index_key VARCHAR(128) NOT NULL,
index_value VARCHAR(512) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
last_modified VARCHAR(32) NOT NULL,
sync_status VARCHAR(20) DEFAULT 'synced',
CONSTRAINT uk_table_record UNIQUE (table_name, record_id)
);
CREATE INDEX IF NOT EXISTS idx_global_index_key ON global_index (table_name, index_key, index_value);
CREATE INDEX IF NOT EXISTS idx_global_index_shard ON global_index (table_name, shard_id);
CREATE INDEX IF NOT EXISTS idx_global_index_modified ON global_index (table_name, last_modified);
"#;
conn.execute_unprepared(create_sql).await?;
Ok(())
}
pub fn get_connection(&self) -> &DatabaseConnection {
&self.conn
}
pub fn start_background_worker(&self) -> tokio::task::JoinHandle<()> {
let conn = self.conn.clone();
let config = self.config.clone();
let poll_interval = std::time::Duration::from_millis(config.poll_interval_ms);
tokio::spawn(async move {
tracing::info!("Global index background worker started");
loop {
let pending_events = Entity::find()
.filter(Column::SyncStatus.eq(SYNC_STATUS_PENDING))
.limit(config.batch_size as u64)
.all(&conn)
.await;
if let Ok(events) = pending_events {
if !events.is_empty() {
tracing::info!("Processing {} pending sync events", events.len());
for event in events {
let id = event.id.clone();
if let Err(e) = Entity::update(ActiveModel {
id: ActiveValue::Set(id.clone()),
sync_status: ActiveValue::Set("processing".to_string()),
..Default::default()
})
.exec(&conn)
.await
{
tracing::error!("Failed to update sync status for event {}: {}", id, e);
continue;
}
let sync_event = Self::model_to_sync_event(&event);
let process_result = async {
let txn: sea_orm::DatabaseTransaction = conn.begin().await?;
match &sync_event {
SyncEvent::Insert {
table_name,
record_id,
shard_id,
index_key,
index_value,
} => {
let entry_id = Self::generate_id(table_name, record_id);
let existing = Entity::find_by_id(entry_id.clone()).one(&txn).await?;
if existing.is_none() {
let now = chrono::Utc::now().to_rfc3339();
let active = ActiveModel {
id: ActiveValue::Set(entry_id),
table_name: ActiveValue::Set(table_name.clone()),
record_id: ActiveValue::Set(record_id.clone()),
shard_id: ActiveValue::Set(*shard_id as i32),
index_key: ActiveValue::Set(index_key.clone()),
index_value: ActiveValue::Set(index_value.clone()),
created_at: ActiveValue::Set(now.clone()),
updated_at: ActiveValue::Set(now.clone()),
last_modified: ActiveValue::Set(now),
sync_status: ActiveValue::Set(SYNC_STATUS_SYNCED.to_string()),
};
Entity::insert(active).exec(&txn).await?;
}
}
SyncEvent::Update {
table_name,
record_id,
shard_id,
old_index_key: _,
old_index_value: _,
new_index_key,
new_index_value,
} => {
let entry_id = Self::generate_id(table_name, record_id);
Entity::delete_by_id(entry_id.clone()).exec(&txn).await?;
let now = chrono::Utc::now().to_rfc3339();
let active = ActiveModel {
id: ActiveValue::Set(entry_id),
table_name: ActiveValue::Set(table_name.clone()),
record_id: ActiveValue::Set(record_id.clone()),
shard_id: ActiveValue::Set(*shard_id as i32),
index_key: ActiveValue::Set(new_index_key.clone()),
index_value: ActiveValue::Set(new_index_value.clone()),
created_at: ActiveValue::Set(now.clone()),
updated_at: ActiveValue::Set(now.clone()),
last_modified: ActiveValue::Set(now),
sync_status: ActiveValue::Set(SYNC_STATUS_SYNCED.to_string()),
};
Entity::insert(active).exec(&txn).await?;
}
SyncEvent::Delete {
table_name, record_id, ..
} => {
let entry_id = Self::generate_id(table_name, record_id);
Entity::delete_by_id(entry_id).exec(&txn).await?;
}
}
txn.commit().await?;
Ok::<(), DbErr>(())
}
.await;
match process_result {
Ok(()) => {
let update_result = Entity::update(ActiveModel {
id: ActiveValue::Set(id.clone()),
sync_status: ActiveValue::Set(SYNC_STATUS_SYNCED.to_string()),
..Default::default()
})
.exec(&conn)
.await;
if let Err(e) = update_result {
tracing::error!("Failed to update sync status for event {}: {}", id, e);
}
}
Err(e) => {
tracing::error!("Failed to process sync event {}: {}", id, e);
let update_result = Entity::update(ActiveModel {
id: ActiveValue::Set(id.clone()),
sync_status: ActiveValue::Set(SYNC_STATUS_FAILED.to_string()),
..Default::default()
})
.exec(&conn)
.await;
if let Err(e) = update_result {
tracing::error!("Failed to update sync status for event {}: {}", id, e);
}
}
}
}
}
}
tokio::time::sleep(poll_interval).await;
}
})
}
pub async fn register_entry(&self, entry: IndexEntry) -> Result<(), DbErr> {
let txn = self.conn.begin().await?;
let id = Self::generate_id(&entry.table_name, &entry.record_id);
let now = chrono::Utc::now().to_rfc3339();
let now_clone = now.clone();
let active = ActiveModel {
id: ActiveValue::Set(id),
table_name: ActiveValue::Set(entry.table_name.clone()),
record_id: ActiveValue::Set(entry.record_id.clone()),
shard_id: ActiveValue::Set(entry.shard_id as i32),
index_key: ActiveValue::Set(entry.index_key.clone()),
index_value: ActiveValue::Set(entry.index_value.clone()),
created_at: ActiveValue::Set(now_clone),
updated_at: ActiveValue::Set(now.clone()),
last_modified: ActiveValue::Set(now),
sync_status: ActiveValue::Set(SYNC_STATUS_SYNCED.to_string()),
};
Entity::insert(active).exec(&txn).await?;
txn.commit().await?;
self.update_cache(&entry).await;
Ok(())
}
pub async fn register_entries(&self, entries: Vec<IndexEntry>) -> Result<(), DbErr> {
let now = chrono::Utc::now().to_rfc3339();
let sync_status = SYNC_STATUS_SYNCED.to_string();
let now_clone = now.clone();
let active_models: Vec<ActiveModel> = entries
.iter()
.map(|entry| {
let id = Self::generate_id(&entry.table_name, &entry.record_id);
ActiveModel {
id: ActiveValue::Set(id),
table_name: ActiveValue::Set(entry.table_name.clone()),
record_id: ActiveValue::Set(entry.record_id.clone()),
shard_id: ActiveValue::Set(entry.shard_id as i32),
index_key: ActiveValue::Set(entry.index_key.clone()),
index_value: ActiveValue::Set(entry.index_value.clone()),
created_at: ActiveValue::Set(now_clone.clone()),
updated_at: ActiveValue::Set(now.clone()),
last_modified: ActiveValue::Set(now.clone()),
sync_status: ActiveValue::Set(sync_status.clone()),
}
})
.collect();
Entity::insert_many(active_models).exec(&self.conn).await?;
for entry in entries {
self.update_cache(&entry).await;
}
Ok(())
}
pub async fn query_by_index(
&self,
table_name: &str,
index_key: &str,
index_value: &str,
) -> Result<Vec<IndexEntry>, DbErr> {
{
let cache = self.cache.read().await;
if let Some(table_cache) = cache.get(table_name) {
if let Some(key_cache) = table_cache.get(index_key) {
if let Some(entries) = key_cache.get(index_value) {
return Ok(entries.clone());
}
}
}
}
let result = Entity::find()
.filter(Column::TableName.eq(table_name))
.filter(Column::IndexKey.eq(index_key))
.filter(Column::IndexValue.eq(index_value))
.all(&self.conn)
.await?;
let entries: Vec<IndexEntry> = result
.iter()
.map(|m| IndexEntry {
table_name: m.table_name.clone(),
record_id: m.record_id.clone(),
shard_id: m.shard_id as u32,
index_key: m.index_key.clone(),
index_value: m.index_value.clone(),
})
.collect();
for entry in &entries {
self.update_cache(entry).await;
}
Ok(entries)
}
pub async fn query_all_shards(&self, table_name: &str, index_key: &str) -> Result<Vec<IndexEntry>, DbErr> {
let result = Entity::find()
.filter(Column::TableName.eq(table_name))
.filter(Column::IndexKey.eq(index_key))
.all(&self.conn)
.await?;
Ok(result
.iter()
.map(|m| IndexEntry {
table_name: m.table_name.clone(),
record_id: m.record_id.clone(),
shard_id: m.shard_id as u32,
index_key: m.index_key.clone(),
index_value: m.index_value.clone(),
})
.collect())
}
pub async fn process_sync_event(&self, event: SyncEvent) -> Result<(), DbErr> {
let max_retries = self.config.max_retries;
let retry_interval = std::time::Duration::from_millis(self.config.retry_interval_ms);
let mut last_error = None;
for attempt in 0..=max_retries {
match self.process_sync_event_internal(&event).await {
Ok(()) => {
tracing::debug!("Processed sync event successfully on attempt {}", attempt + 1);
return Ok(());
}
Err(e) => {
last_error = Some(e);
if attempt < max_retries {
tracing::warn!(
"Failed to process sync event (attempt {}/{}), retrying in {:?}: {}",
attempt + 1,
max_retries,
retry_interval,
last_error.as_ref().unwrap()
);
tokio::time::sleep(retry_interval).await;
}
}
}
}
Err(last_error.unwrap_or_else(|| DbErr::Custom("Unknown error".to_string())))
}
async fn process_sync_event_internal(&self, event: &SyncEvent) -> Result<(), DbErr> {
match event {
SyncEvent::Insert {
table_name,
record_id,
shard_id,
index_key,
index_value,
} => {
let entry_id = Self::generate_id(table_name, record_id);
let existing = Entity::find_by_id(entry_id.clone()).one(&self.conn).await?;
if existing.is_none() {
let entry = IndexEntry {
table_name: table_name.clone(),
record_id: record_id.clone(),
shard_id: *shard_id,
index_key: index_key.clone(),
index_value: index_value.clone(),
};
self.register_entry(entry).await?;
}
}
SyncEvent::Update {
table_name,
record_id,
shard_id,
old_index_key: _,
old_index_value: _,
new_index_key,
new_index_value,
} => {
let txn = self.conn.begin().await?;
let id = Self::generate_id(table_name, record_id);
Entity::delete_by_id(id.clone()).exec(&txn).await?;
let now = chrono::Utc::now().to_rfc3339();
let active = ActiveModel {
id: ActiveValue::Set(id),
table_name: ActiveValue::Set(table_name.clone()),
record_id: ActiveValue::Set(record_id.clone()),
shard_id: ActiveValue::Set(*shard_id as i32),
index_key: ActiveValue::Set(new_index_key.clone()),
index_value: ActiveValue::Set(new_index_value.clone()),
created_at: ActiveValue::Set(now.clone()),
updated_at: ActiveValue::Set(now.clone()),
last_modified: ActiveValue::Set(now),
sync_status: ActiveValue::Set(SYNC_STATUS_SYNCED.to_string()),
};
Entity::insert(active).exec(&txn).await?;
txn.commit().await?;
let entry = IndexEntry {
table_name: table_name.clone(),
record_id: record_id.clone(),
shard_id: *shard_id,
index_key: new_index_key.clone(),
index_value: new_index_value.clone(),
};
self.update_cache(&entry).await;
}
SyncEvent::Delete {
table_name, record_id, ..
} => {
self.delete_entry(table_name, record_id).await?;
}
}
Ok(())
}
async fn delete_entry(&self, table_name: &str, record_id: &str) -> Result<(), DbErr> {
let id = Self::generate_id(table_name, record_id);
Entity::delete_by_id(id).exec(&self.conn).await?;
let mut cache = self.cache.write().await;
let mut reverse_index = self.reverse_index.write().await;
if let Some(locations) = reverse_index.get(record_id) {
for location in locations {
if let Some(table_cache) = cache.get_mut(&location.table_name) {
if let Some(key_cache) = table_cache.get_mut(&location.index_key) {
if let Some(entries) = key_cache.get_mut(&location.index_value) {
entries.retain(|e| e.record_id != record_id);
}
if key_cache
.get(&location.index_value)
.map(|v| v.is_empty())
.unwrap_or(false)
{
key_cache.remove(&location.index_value);
}
}
}
}
reverse_index.remove(record_id);
}
Ok(())
}
fn generate_id(table_name: &str, record_id: &str) -> String {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::default();
hasher.update(format!("{}:{}", table_name, record_id));
format!("{:x}", hasher.finalize())
}
fn model_to_sync_event(record: &Model) -> SyncEvent {
SyncEvent::Insert {
table_name: record.table_name.clone(),
record_id: record.record_id.clone(),
shard_id: record.shard_id as u32,
index_key: record.index_key.clone(),
index_value: record.index_value.clone(),
}
}
async fn update_cache(&self, entry: &IndexEntry) {
let table_name = &entry.table_name;
let index_key = &entry.index_key;
let index_value = &entry.index_value;
let record_id = &entry.record_id;
let mut cache = self.cache.write().await;
let mut reverse_index = self.reverse_index.write().await;
if let Some(locations) = reverse_index.get(record_id) {
for location in locations {
if let Some(table_cache) = cache.get_mut(&location.table_name) {
if let Some(key_cache) = table_cache.get_mut(&location.index_key) {
if let Some(entries) = key_cache.get_mut(&location.index_value) {
entries.retain(|e| e.record_id.as_str() != record_id);
}
if key_cache
.get(&location.index_value)
.map(|v| v.is_empty())
.unwrap_or(false)
{
key_cache.remove(&location.index_value);
}
}
}
}
reverse_index.remove(record_id);
}
let table_cache = cache.entry(table_name.clone()).or_default();
let key_cache = table_cache.entry(index_key.clone()).or_default();
let entries = key_cache.entry(index_value.clone()).or_default();
if !entries.iter().any(|e| e.record_id == *record_id) {
entries.push(IndexEntry {
table_name: table_name.clone(),
record_id: record_id.clone(),
shard_id: entry.shard_id,
index_key: index_key.clone(),
index_value: index_value.clone(),
});
let location = IndexLocation {
table_name: table_name.clone(),
index_key: index_key.clone(),
index_value: index_value.clone(),
created_at: std::time::Instant::now(),
};
reverse_index.entry(record_id.clone()).or_default().push(location);
}
}
pub async fn invalidate_cache(&self) {
let ttl_duration = std::time::Duration::from_secs(self.config.cache_ttl_seconds);
let mut reverse_index = self.reverse_index.write().await;
reverse_index.retain(|_record_id, locations| {
locations.retain(|location| location.created_at.elapsed() < ttl_duration);
!locations.is_empty()
});
let mut cache = self.cache.write().await;
cache.retain(|_table_name, table_cache| {
table_cache.retain(|_index_key, key_cache| {
key_cache.retain(|_index_value, entries| {
entries.retain(|entry| {
if let Some(locations) = reverse_index.get(&entry.record_id) {
locations.iter().any(|loc| loc.created_at.elapsed() < ttl_duration)
} else {
false
}
});
!entries.is_empty()
});
!key_cache.is_empty()
});
!table_cache.is_empty()
});
tracing::debug!("Cache invalidation completed");
}
pub fn get_config(&self) -> &ChangeCaptureConfig {
&self.config
}
pub fn set_config(&mut self, config: ChangeCaptureConfig) {
self.config = config;
}
pub async fn cleanup_reverse_index(&self, max_entries: usize) {
let cache = self.cache.write().await;
let mut reverse_index = self.reverse_index.write().await;
if reverse_index.len() > max_entries {
tracing::warn!(
"Reverse index size {} exceeds limit {}, cleaning up...",
reverse_index.len(),
max_entries
);
let valid_record_ids: std::collections::HashSet<String> = cache
.values()
.flat_map(|table_cache| {
table_cache.values().flat_map(|key_cache| {
key_cache
.values()
.flat_map(|entries| entries.iter().map(|e| e.record_id.clone()))
})
})
.collect();
reverse_index.retain(|record_id, _| valid_record_ids.contains(record_id));
if reverse_index.len() > max_entries {
let to_remove = reverse_index.len() - max_entries;
let keys: Vec<_> = reverse_index.keys().take(to_remove).cloned().collect();
for key in keys {
reverse_index.remove(&key);
}
}
}
}
pub async fn reverse_index_size(&self) -> usize {
self.reverse_index.read().await.len()
}
}
#[async_trait]
pub trait ChangeCapture: Send + Sync {
async fn start(&mut self) -> Result<(), DbErr>;
async fn stop(&mut self) -> Result<(), DbErr>;
async fn next_event(&mut self) -> Option<SyncEvent>;
fn is_running(&self) -> bool;
}
#[derive(Debug, Clone)]
pub struct PollingCaptureConfig {
pub interval_ms: u64,
pub batch_size: usize,
pub watched_tables: Vec<String>,
}
impl Default for PollingCaptureConfig {
fn default() -> Self {
Self {
interval_ms: 1000,
batch_size: 100,
watched_tables: Vec::new(),
}
}
}
#[derive(Debug)]
pub struct PollingChangeCapture {
config: PollingCaptureConfig,
global_index: Arc<GlobalIndex>,
running: bool,
last_poll: RwLock<DateTime<Utc>>,
event_queue: RwLock<Vec<SyncEvent>>,
}
impl PollingChangeCapture {
pub fn new(global_index: Arc<GlobalIndex>, config: Option<PollingCaptureConfig>) -> Self {
Self {
config: config.unwrap_or_default(),
global_index,
running: false,
last_poll: RwLock::new(Utc::now()),
event_queue: RwLock::new(Vec::new()),
}
}
async fn fetch_changes(&self) -> Result<Vec<Model>, DbErr> {
let last_poll_time = *self.last_poll.read().await;
let mut query = Entity::find().filter(Column::LastModified.gt(last_poll_time.to_rfc3339()));
if !self.config.watched_tables.is_empty() {
query = query.filter(Column::TableName.is_in(&self.config.watched_tables));
}
query = query.order_by(Column::LastModified, sea_orm::Order::Asc);
query = query.limit(self.config.batch_size as u64);
let results = query.all(self.global_index.get_connection()).await?;
if let Some(latest) = results.last() {
let latest_modified = DateTime::parse_from_rfc3339(&latest.last_modified)
.map(|dt| dt.into())
.unwrap_or_else(|_| Utc::now());
let mut last_poll = self.last_poll.write().await;
if latest_modified > *last_poll {
*last_poll = latest_modified;
}
}
Ok(results)
}
#[allow(clippy::unused_self)]
fn model_to_event(model: &Model, _previous_value: Option<&Model>) -> Option<SyncEvent> {
Some(SyncEvent::Insert {
table_name: model.table_name.clone(),
record_id: model.record_id.clone(),
shard_id: model.shard_id as u32,
index_key: model.index_key.clone(),
index_value: model.index_value.clone(),
})
}
}
#[async_trait]
impl ChangeCapture for PollingChangeCapture {
async fn start(&mut self) -> Result<(), DbErr> {
self.running = true;
*self.last_poll.write().await = Utc::now();
tracing::info!(
"PollingChangeCapture started with interval {}ms",
self.config.interval_ms
);
Ok(())
}
async fn stop(&mut self) -> Result<(), DbErr> {
self.running = false;
tracing::info!("PollingChangeCapture stopped");
Ok(())
}
async fn next_event(&mut self) -> Option<SyncEvent> {
if !self.running {
return None;
}
{
let mut queue = self.event_queue.write().await;
if let Some(event) = queue.pop() {
return Some(event);
}
}
match self.fetch_changes().await {
Ok(changes) => {
for change in changes {
if let Some(event) = Self::model_to_event(&change, None) {
let mut queue = self.event_queue.write().await;
queue.push(event);
}
}
let mut queue = self.event_queue.write().await;
queue.pop()
}
Err(e) => {
tracing::error!("Failed to fetch changes: {}", e);
None
}
}
}
fn is_running(&self) -> bool {
self.running
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_id() {
let id1 = GlobalIndex::generate_id("orders", "order_123");
let id2 = GlobalIndex::generate_id("orders", "order_123");
let id3 = GlobalIndex::generate_id("orders", "order_456");
assert_eq!(id1, id2);
assert_ne!(id1, id3);
assert_eq!(id1.len(), 64);
}
#[test]
fn test_index_entry() {
let entry = IndexEntry {
table_name: "orders".to_string(),
record_id: "order_123".to_string(),
shard_id: 4,
index_key: "user_id".to_string(),
index_value: "user_456".to_string(),
};
assert_eq!(entry.table_name, "orders");
assert_eq!(entry.shard_id, 4);
}
#[test]
fn test_sync_event_variants() {
let insert = SyncEvent::Insert {
table_name: "orders".to_string(),
record_id: "order_123".to_string(),
shard_id: 4,
index_key: "user_id".to_string(),
index_value: "user_456".to_string(),
};
let update = SyncEvent::Update {
table_name: "orders".to_string(),
record_id: "order_123".to_string(),
shard_id: 4,
old_index_key: "user_id".to_string(),
old_index_value: "user_456".to_string(),
new_index_key: "user_id".to_string(),
new_index_value: "user_789".to_string(),
};
let delete = SyncEvent::Delete {
table_name: "orders".to_string(),
record_id: "order_123".to_string(),
shard_id: 4,
index_key: "user_id".to_string(),
index_value: "user_456".to_string(),
};
match insert {
SyncEvent::Insert { table_name, .. } => assert_eq!(table_name, "orders"),
_ => panic!("Expected Insert variant"),
}
match update {
SyncEvent::Update { new_index_value, .. } => assert_eq!(new_index_value, "user_789"),
_ => panic!("Expected Update variant"),
}
match delete {
SyncEvent::Delete { record_id, .. } => assert_eq!(record_id, "order_123"),
_ => panic!("Expected Delete variant"),
}
}
#[test]
fn test_change_capture_config_defaults() {
let config = ChangeCaptureConfig::default();
assert_eq!(config.batch_size, 1000);
assert_eq!(config.poll_interval_ms, 1000);
assert_eq!(config.max_retries, 3);
assert_eq!(config.retry_interval_ms, 5000);
}
#[test]
fn test_sync_status_constants() {
assert_eq!(SYNC_STATUS_PENDING, "pending");
assert_eq!(SYNC_STATUS_SYNCED, "synced");
assert_eq!(SYNC_STATUS_FAILED, "failed");
}
#[test]
fn test_model_to_sync_event_insert() {
let model = Model {
id: "test_id".to_string(),
table_name: "orders".to_string(),
record_id: "order_123".to_string(),
shard_id: 1,
index_key: "user_id".to_string(),
index_value: "user_456".to_string(),
created_at: "2024-01-01T00:00:00Z".to_string(),
updated_at: "2024-01-01T00:00:00Z".to_string(),
last_modified: "2024-01-01T00:00:00Z".to_string(),
sync_status: "synced".to_string(),
};
let event = GlobalIndex::model_to_sync_event(&model);
match event {
SyncEvent::Insert {
table_name,
record_id,
shard_id,
index_key,
index_value,
} => {
assert_eq!(table_name, "orders");
assert_eq!(record_id, "order_123");
assert_eq!(shard_id, 1);
assert_eq!(index_key, "user_id");
assert_eq!(index_value, "user_456");
}
_ => panic!("Expected Insert event"),
}
}
#[test]
fn test_index_location_hash() {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let location1 = IndexLocation {
table_name: "orders".to_string(),
index_key: "user_id".to_string(),
index_value: "user_123".to_string(),
created_at: std::time::Instant::now(),
};
std::thread::sleep(std::time::Duration::from_millis(10));
let location2 = IndexLocation {
table_name: "orders".to_string(),
index_key: "user_id".to_string(),
index_value: "user_123".to_string(),
created_at: std::time::Instant::now(),
};
let mut hasher1 = DefaultHasher::new();
location1.hash(&mut hasher1);
let hash1 = hasher1.finish();
let mut hasher2 = DefaultHasher::new();
location2.hash(&mut hasher2);
let hash2 = hasher2.finish();
assert_eq!(hash1, hash2, "Hash should be the same for same location data");
}
#[test]
fn test_index_location_equality() {
let location1 = IndexLocation {
table_name: "orders".to_string(),
index_key: "user_id".to_string(),
index_value: "user_123".to_string(),
created_at: std::time::Instant::now(),
};
std::thread::sleep(std::time::Duration::from_millis(10));
let location2 = IndexLocation {
table_name: "orders".to_string(),
index_key: "user_id".to_string(),
index_value: "user_123".to_string(),
created_at: std::time::Instant::now(),
};
assert_ne!(
location1, location2,
"Locations with different created_at should not be equal"
);
}
}