use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use parking_lot::RwLock;
use std::time::Instant;
use std::collections::VecDeque;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChangeType {
Insert,
Update,
Delete,
}
#[derive(Debug, Clone)]
pub struct Change {
pub sequence_number: u64,
pub timestamp: Instant,
pub change_type: ChangeType,
pub table_name: String,
pub row_key: String,
pub old_values: Option<Vec<u8>>,
pub new_values: Option<Vec<u8>>,
}
pub type Result<T> = std::result::Result<T, DirtyTrackerError>;
#[derive(Debug, Clone)]
pub enum DirtyTrackerError {
BufferOverflow { max_size: usize, current_size: usize },
LockError(String),
InvalidParameter(String),
}
impl std::fmt::Display for DirtyTrackerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BufferOverflow { max_size, current_size } => {
write!(f, "Buffer overflow: current size {} exceeds max {}", current_size, max_size)
}
Self::LockError(msg) => write!(f, "Lock error: {}", msg),
Self::InvalidParameter(msg) => write!(f, "Invalid parameter: {}", msg),
}
}
}
impl std::error::Error for DirtyTrackerError {}
const DEFAULT_MAX_BUFFER_SIZE: usize = 100_000;
struct ChangeBuffer {
changes: VecDeque<Change>,
max_size: usize,
next_sequence: u64,
}
impl ChangeBuffer {
fn new(max_size: usize) -> Self {
Self {
changes: VecDeque::with_capacity(max_size),
max_size,
next_sequence: 1,
}
}
fn add_change(&mut self, change: Change) -> Result<()> {
if self.changes.len() >= self.max_size {
self.changes.pop_front();
}
self.changes.push_back(change);
self.next_sequence += 1;
Ok(())
}
fn get_changes_since(&self, seq_num: u64) -> Vec<Change> {
self.changes
.iter()
.filter(|c| c.sequence_number > seq_num)
.cloned()
.collect()
}
fn clear(&mut self) {
self.changes.clear();
}
fn len(&self) -> usize {
self.changes.len()
}
fn next_sequence_number(&self) -> u64 {
self.next_sequence
}
}
pub struct DirtyTracker {
changes: Arc<RwLock<ChangeBuffer>>,
last_dump_seq: Arc<AtomicU64>,
is_dirty: Arc<AtomicBool>,
}
impl DirtyTracker {
pub fn new() -> Self {
Self::with_max_buffer_size(DEFAULT_MAX_BUFFER_SIZE)
}
pub fn with_max_buffer_size(max_size: usize) -> Self {
Self {
changes: Arc::new(RwLock::new(ChangeBuffer::new(max_size))),
last_dump_seq: Arc::new(AtomicU64::new(0)),
is_dirty: Arc::new(AtomicBool::new(false)),
}
}
pub fn track_insert(&self, table: &str, row_key: &str, values: &[u8]) -> Result<()> {
if table.is_empty() || row_key.is_empty() {
return Err(DirtyTrackerError::InvalidParameter(
"Table name and row key cannot be empty".to_string()
));
}
let mut buffer = self.changes.write();
let sequence = buffer.next_sequence_number();
let change = Change {
sequence_number: sequence,
timestamp: Instant::now(),
change_type: ChangeType::Insert,
table_name: table.to_string(),
row_key: row_key.to_string(),
old_values: None,
new_values: Some(values.to_vec()),
};
buffer.add_change(change)?;
self.is_dirty.store(true, Ordering::Release);
Ok(())
}
pub fn track_update(
&self,
table: &str,
row_key: &str,
old_values: &[u8],
new_values: &[u8],
) -> Result<()> {
if table.is_empty() || row_key.is_empty() {
return Err(DirtyTrackerError::InvalidParameter(
"Table name and row key cannot be empty".to_string()
));
}
let mut buffer = self.changes.write();
let sequence = buffer.next_sequence_number();
let change = Change {
sequence_number: sequence,
timestamp: Instant::now(),
change_type: ChangeType::Update,
table_name: table.to_string(),
row_key: row_key.to_string(),
old_values: Some(old_values.to_vec()),
new_values: Some(new_values.to_vec()),
};
buffer.add_change(change)?;
self.is_dirty.store(true, Ordering::Release);
Ok(())
}
pub fn track_delete(&self, table: &str, row_key: &str, values: &[u8]) -> Result<()> {
if table.is_empty() || row_key.is_empty() {
return Err(DirtyTrackerError::InvalidParameter(
"Table name and row key cannot be empty".to_string()
));
}
let mut buffer = self.changes.write();
let sequence = buffer.next_sequence_number();
let change = Change {
sequence_number: sequence,
timestamp: Instant::now(),
change_type: ChangeType::Delete,
table_name: table.to_string(),
row_key: row_key.to_string(),
old_values: Some(values.to_vec()),
new_values: None,
};
buffer.add_change(change)?;
self.is_dirty.store(true, Ordering::Release);
Ok(())
}
pub fn is_dirty(&self) -> bool {
self.is_dirty.load(Ordering::Acquire)
}
pub fn get_dirty_count(&self) -> u64 {
let buffer = self.changes.read();
let last_dump = self.last_dump_seq.load(Ordering::Acquire);
buffer.changes
.iter()
.filter(|c| c.sequence_number > last_dump)
.count() as u64
}
pub fn get_dirty_tables(&self) -> Vec<String> {
let buffer = self.changes.read();
let last_dump = self.last_dump_seq.load(Ordering::Acquire);
let mut table_set = std::collections::HashSet::new();
for change in buffer.changes.iter() {
if change.sequence_number > last_dump {
table_set.insert(change.table_name.clone());
}
}
let mut tables: Vec<String> = table_set.into_iter().collect();
tables.sort();
tables
}
pub fn clear_dirty_state(&self) -> Result<()> {
let buffer = self.changes.read();
let last_used_seq = buffer.next_sequence_number().saturating_sub(1);
drop(buffer);
self.last_dump_seq.store(last_used_seq, Ordering::Release);
let buffer = self.changes.read();
if buffer.next_sequence_number() == last_used_seq + 1 {
self.is_dirty.store(false, Ordering::Release);
}
Ok(())
}
pub fn get_changes_since(&self, seq_num: u64) -> Vec<Change> {
let buffer = self.changes.read();
buffer.get_changes_since(seq_num)
}
pub fn current_sequence_number(&self) -> u64 {
let buffer = self.changes.read();
buffer.next_sequence_number()
}
pub fn last_dump_sequence_number(&self) -> u64 {
self.last_dump_seq.load(Ordering::Acquire)
}
}
impl Default for DirtyTracker {
fn default() -> Self {
Self::new()
}
}
impl Clone for DirtyTracker {
fn clone(&self) -> Self {
Self {
changes: Arc::clone(&self.changes),
last_dump_seq: Arc::clone(&self.last_dump_seq),
is_dirty: Arc::clone(&self.is_dirty),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_track_insert() {
let tracker = DirtyTracker::new();
assert!(!tracker.is_dirty());
let values = vec![1, 2, 3, 4];
tracker.track_insert("users", "key1", &values).unwrap();
assert!(tracker.is_dirty());
assert_eq!(tracker.get_dirty_count(), 1);
}
#[test]
fn test_track_update() {
let tracker = DirtyTracker::new();
let old_values = vec![1, 2, 3];
let new_values = vec![4, 5, 6];
tracker.track_update("users", "key1", &old_values, &new_values).unwrap();
assert!(tracker.is_dirty());
assert_eq!(tracker.get_dirty_count(), 1);
let changes = tracker.get_changes_since(0);
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].change_type, ChangeType::Update);
assert_eq!(changes[0].table_name, "users");
assert_eq!(changes[0].row_key, "key1");
assert_eq!(changes[0].old_values, Some(old_values));
assert_eq!(changes[0].new_values, Some(new_values));
}
#[test]
fn test_track_delete() {
let tracker = DirtyTracker::new();
let values = vec![1, 2, 3];
tracker.track_delete("users", "key1", &values).unwrap();
assert!(tracker.is_dirty());
assert_eq!(tracker.get_dirty_count(), 1);
let changes = tracker.get_changes_since(0);
assert_eq!(changes.len(), 1);
assert_eq!(changes[0].change_type, ChangeType::Delete);
assert!(changes[0].new_values.is_none());
assert!(changes[0].old_values.is_some());
}
#[test]
fn test_dirty_state_flag() {
let tracker = DirtyTracker::new();
assert!(!tracker.is_dirty());
tracker.track_insert("users", "key1", &[1, 2, 3]).unwrap();
assert!(tracker.is_dirty());
tracker.clear_dirty_state().unwrap();
assert!(!tracker.is_dirty());
}
#[test]
fn test_changes_since_sequence() {
let tracker = DirtyTracker::new();
tracker.track_insert("users", "key1", &[1]).unwrap();
tracker.track_insert("users", "key2", &[2]).unwrap();
tracker.track_insert("users", "key3", &[3]).unwrap();
let changes = tracker.get_changes_since(0);
assert_eq!(changes.len(), 3);
let changes = tracker.get_changes_since(1);
assert_eq!(changes.len(), 2);
let changes = tracker.get_changes_since(2);
assert_eq!(changes.len(), 1);
let changes = tracker.get_changes_since(3);
assert_eq!(changes.len(), 0);
}
#[test]
fn test_get_dirty_tables() {
let tracker = DirtyTracker::new();
tracker.track_insert("users", "key1", &[1]).unwrap();
tracker.track_insert("orders", "key2", &[2]).unwrap();
tracker.track_insert("users", "key3", &[3]).unwrap();
tracker.track_insert("products", "key4", &[4]).unwrap();
let mut tables = tracker.get_dirty_tables();
tables.sort();
assert_eq!(tables, vec!["orders", "products", "users"]);
}
#[test]
fn test_buffer_overflow_handling() {
let tracker = DirtyTracker::with_max_buffer_size(3);
tracker.track_insert("users", "key1", &[1]).unwrap();
tracker.track_insert("users", "key2", &[2]).unwrap();
tracker.track_insert("users", "key3", &[3]).unwrap();
tracker.track_insert("users", "key4", &[4]).unwrap();
tracker.track_insert("users", "key5", &[5]).unwrap();
let changes = tracker.get_changes_since(0);
assert_eq!(changes.len(), 3);
assert_eq!(changes[0].row_key, "key3");
assert_eq!(changes[1].row_key, "key4");
assert_eq!(changes[2].row_key, "key5");
}
#[test]
fn test_concurrent_tracking() {
let tracker = Arc::new(DirtyTracker::new());
let mut handles = vec![];
for thread_id in 0..10 {
let tracker_clone = Arc::clone(&tracker);
let handle = thread::spawn(move || {
for i in 0..100 {
let key = format!("key_{}_{}", thread_id, i);
let values = vec![thread_id as u8, i as u8];
tracker_clone.track_insert("users", &key, &values).unwrap();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert!(tracker.is_dirty());
assert_eq!(tracker.get_dirty_count(), 1000);
}
#[test]
fn test_clear_dirty_state_incremental() {
let tracker = DirtyTracker::new();
tracker.track_insert("users", "key1", &[1]).unwrap();
tracker.track_insert("users", "key2", &[2]).unwrap();
assert_eq!(tracker.get_dirty_count(), 2);
tracker.clear_dirty_state().unwrap();
assert!(!tracker.is_dirty());
assert_eq!(tracker.get_dirty_count(), 0);
tracker.track_insert("users", "key3", &[3]).unwrap();
tracker.track_insert("users", "key4", &[4]).unwrap();
assert_eq!(tracker.get_dirty_count(), 2);
let all_changes = tracker.get_changes_since(0);
assert_eq!(all_changes.len(), 4);
let last_dump = tracker.last_dump_sequence_number();
let dirty_changes = tracker.get_changes_since(last_dump);
assert_eq!(dirty_changes.len(), 2);
}
#[test]
fn test_invalid_parameters() {
let tracker = DirtyTracker::new();
let result = tracker.track_insert("", "key1", &[1, 2, 3]);
assert!(result.is_err());
let result = tracker.track_insert("users", "", &[1, 2, 3]);
assert!(result.is_err());
}
#[test]
fn test_sequence_numbers_monotonic() {
let tracker = DirtyTracker::new();
tracker.track_insert("users", "key1", &[1]).unwrap();
let seq1 = tracker.current_sequence_number();
tracker.track_insert("users", "key2", &[2]).unwrap();
let seq2 = tracker.current_sequence_number();
tracker.track_insert("users", "key3", &[3]).unwrap();
let seq3 = tracker.current_sequence_number();
assert!(seq2 > seq1);
assert!(seq3 > seq2);
}
#[test]
fn test_clone_shares_state() {
let tracker1 = DirtyTracker::new();
tracker1.track_insert("users", "key1", &[1]).unwrap();
let tracker2 = tracker1.clone();
assert!(tracker2.is_dirty());
assert_eq!(tracker2.get_dirty_count(), 1);
tracker2.track_insert("users", "key2", &[2]).unwrap();
assert_eq!(tracker1.get_dirty_count(), 2);
}
}