use super::VectorClock;
use crate::types::Schema;
use crate::{Error, Result};
use rocksdb::{DB, WriteBatch, IteratorMode, ReadOptions};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, info, warn, error};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ChangeType {
Insert {
table: String,
row_id: u64,
data: Vec<u8>,
},
Update {
table: String,
row_id: u64,
old_data: Vec<u8>,
new_data: Vec<u8>,
},
Delete {
table: String,
row_id: u64,
data: Vec<u8>,
},
CreateTable {
table: String,
schema: Schema,
},
DropTable {
table: String,
},
}
impl ChangeType {
pub fn table_name(&self) -> &str {
match self {
ChangeType::Insert { table, .. } => table,
ChangeType::Update { table, .. } => table,
ChangeType::Delete { table, .. } => table,
ChangeType::CreateTable { table, .. } => table,
ChangeType::DropTable { table } => table,
}
}
pub fn is_ddl(&self) -> bool {
matches!(self, ChangeType::CreateTable { .. } | ChangeType::DropTable { .. })
}
pub fn is_dml(&self) -> bool {
!self.is_ddl()
}
pub fn row_id(&self) -> Option<u64> {
match self {
ChangeType::Insert { row_id, .. } => Some(*row_id),
ChangeType::Update { row_id, .. } => Some(*row_id),
ChangeType::Delete { row_id, .. } => Some(*row_id),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeEntry {
pub lsn: u64,
pub timestamp: u64,
pub transaction_id: u64,
pub change_type: ChangeType,
pub vector_clock: VectorClock,
}
impl ChangeEntry {
pub fn new(
lsn: u64,
transaction_id: u64,
change_type: ChangeType,
vector_clock: VectorClock,
) -> Self {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0);
Self {
lsn,
timestamp,
transaction_id,
change_type,
vector_clock,
}
}
pub fn serialize(&self) -> Result<Vec<u8>> {
bincode::serialize(self)
.map_err(|e| Error::storage(format!("Failed to serialize change entry: {}", e)))
}
pub fn deserialize(data: &[u8]) -> Result<Self> {
bincode::deserialize(data)
.map_err(|e| Error::storage(format!("Failed to deserialize change entry: {}", e)))
}
pub fn table_name(&self) -> &str {
self.change_type.table_name()
}
}
#[derive(Debug, Clone)]
pub struct QueryOptions {
pub start_lsn: Option<u64>,
pub end_lsn: Option<u64>,
pub table: Option<String>,
pub start_timestamp: Option<u64>,
pub end_timestamp: Option<u64>,
pub limit: Option<usize>,
}
impl Default for QueryOptions {
fn default() -> Self {
Self {
start_lsn: None,
end_lsn: None,
table: None,
start_timestamp: None,
end_timestamp: None,
limit: None,
}
}
}
impl QueryOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_start_lsn(mut self, lsn: u64) -> Self {
self.start_lsn = Some(lsn);
self
}
pub fn with_end_lsn(mut self, lsn: u64) -> Self {
self.end_lsn = Some(lsn);
self
}
pub fn with_table(mut self, table: String) -> Self {
self.table = Some(table);
self
}
pub fn with_timestamp_range(mut self, start: u64, end: u64) -> Self {
self.start_timestamp = Some(start);
self.end_timestamp = Some(end);
self
}
pub fn with_limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
fn matches(&self, entry: &ChangeEntry) -> bool {
if let Some(start) = self.start_lsn {
if entry.lsn < start {
return false;
}
}
if let Some(end) = self.end_lsn {
if entry.lsn > end {
return false;
}
}
if let Some(ref table) = self.table {
if entry.table_name() != table {
return false;
}
}
if let Some(start) = self.start_timestamp {
if entry.timestamp < start {
return false;
}
}
if let Some(end) = self.end_timestamp {
if entry.timestamp > end {
return false;
}
}
true
}
}
#[derive(Debug, Clone, Default)]
pub struct ChangeLogStats {
pub total_entries: u64,
pub current_lsn: u64,
pub compaction_watermark: u64,
pub oldest_lsn: Option<u64>,
pub oldest_timestamp: Option<u64>,
pub newest_timestamp: Option<u64>,
pub estimated_size_bytes: u64,
}
pub struct ChangeLog {
storage: Arc<DB>,
current_lsn: Arc<AtomicU64>,
compaction_watermark: Arc<AtomicU64>,
}
impl ChangeLog {
pub fn new(storage: Arc<DB>) -> Result<Self> {
debug!("Initializing change log");
let current_lsn = match storage.get(b"change_meta:current_lsn") {
Ok(Some(bytes)) => {
let lsn = u64::from_le_bytes(
bytes.as_slice().try_into()
.map_err(|e| Error::storage(format!("Invalid LSN format: {:?}", e)))?
);
debug!("Loaded current LSN: {}", lsn);
lsn
}
Ok(None) => {
debug!("No existing LSN, starting at 0");
0
}
Err(e) => {
return Err(Error::storage(format!("Failed to load current LSN: {}", e)));
}
};
let compaction_watermark = match storage.get(b"change_meta:compaction_watermark") {
Ok(Some(bytes)) => {
let watermark = u64::from_le_bytes(
bytes.as_slice().try_into()
.map_err(|e| Error::storage(format!("Invalid watermark format: {:?}", e)))?
);
debug!("Loaded compaction watermark: {}", watermark);
watermark
}
Ok(None) => {
debug!("No existing watermark, starting at 0");
0
}
Err(e) => {
return Err(Error::storage(format!("Failed to load compaction watermark: {}", e)));
}
};
info!("Change log initialized with LSN={}, watermark={}", current_lsn, compaction_watermark);
Ok(Self {
storage,
current_lsn: Arc::new(AtomicU64::new(current_lsn)),
compaction_watermark: Arc::new(AtomicU64::new(compaction_watermark)),
})
}
pub fn append(
&self,
transaction_id: u64,
change_type: ChangeType,
vector_clock: VectorClock,
) -> Result<u64> {
let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
let entry = ChangeEntry::new(lsn, transaction_id, change_type, vector_clock);
debug!("Appending change entry: LSN={}, table={}, tx={}",
lsn, entry.table_name(), transaction_id);
let entry_bytes = entry.serialize()?;
let mut batch = WriteBatch::default();
let entry_key = format!("change_log:{:020}", lsn);
batch.put(entry_key.as_bytes(), &entry_bytes);
let index_key = format!("change_index:{}:{:020}", entry.table_name(), entry.timestamp);
batch.put(index_key.as_bytes(), &lsn.to_le_bytes());
batch.put(b"change_meta:current_lsn", &(lsn + 1).to_le_bytes());
self.storage.write(batch)
.map_err(|e| Error::storage(format!("Failed to write change entry: {}", e)))?;
debug!("Successfully appended change entry LSN={}", lsn);
Ok(lsn)
}
pub fn query_since_lsn(&self, start_lsn: u64, limit: Option<usize>) -> Result<Vec<ChangeEntry>> {
debug!("Querying changes since LSN={} with limit={:?}", start_lsn, limit);
let mut options = QueryOptions::new().with_start_lsn(start_lsn);
if let Some(limit_val) = limit {
options = options.with_limit(limit_val);
}
self.query(&options)
}
pub fn query_by_timestamp(&self, start_timestamp: u64, end_timestamp: u64) -> Result<Vec<ChangeEntry>> {
debug!("Querying changes between timestamps {} and {}", start_timestamp, end_timestamp);
let options = QueryOptions::new().with_timestamp_range(start_timestamp, end_timestamp);
self.query(&options)
}
pub fn query_by_table(&self, table_name: &str) -> Result<Vec<ChangeEntry>> {
debug!("Querying changes for table '{}'", table_name);
let options = QueryOptions::new().with_table(table_name.to_string());
self.query(&options)
}
pub fn query(&self, options: &QueryOptions) -> Result<Vec<ChangeEntry>> {
let mut entries = Vec::new();
let prefix = b"change_log:";
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
let iter = self.storage.iterator_opt(IteratorMode::Start, read_opts);
for item in iter {
let (key, value) = item
.map_err(|e| Error::storage(format!("Iterator error: {}", e)))?;
if !key.starts_with(prefix) {
continue;
}
if !key.is_empty() && key.len() >= prefix.len() && &key[0..prefix.len()] != prefix {
break;
}
let entry = ChangeEntry::deserialize(&value)?;
if !options.matches(&entry) {
continue;
}
entries.push(entry);
if let Some(limit) = options.limit {
if entries.len() >= limit {
break;
}
}
}
debug!("Query returned {} entries", entries.len());
Ok(entries)
}
pub fn compact(&self, watermark_lsn: u64) -> Result<usize> {
info!("Compacting change log up to LSN={}", watermark_lsn);
let mut deleted_count = 0;
let mut batch = WriteBatch::default();
let prefix = b"change_log:";
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
let iter = self.storage.iterator_opt(IteratorMode::Start, read_opts);
for item in iter {
let (key, value) = item
.map_err(|e| Error::storage(format!("Iterator error during compaction: {}", e)))?;
if !key.starts_with(prefix) {
continue;
}
let key_str = std::str::from_utf8(&key)
.map_err(|e| Error::storage(format!("Invalid key UTF-8: {}", e)))?;
if let Some(lsn_str) = key_str.strip_prefix("change_log:") {
if let Ok(lsn) = lsn_str.parse::<u64>() {
if lsn < watermark_lsn {
if let Ok(entry) = ChangeEntry::deserialize(&value) {
batch.delete(&key);
let index_key = format!("change_index:{}:{:020}",
entry.table_name(), entry.timestamp);
batch.delete(index_key.as_bytes());
deleted_count += 1;
if deleted_count % 1000 == 0 {
self.storage.write(batch)
.map_err(|e| Error::storage(format!("Failed to write compaction batch: {}", e)))?;
batch = WriteBatch::default();
debug!("Compaction progress: {} entries deleted", deleted_count);
}
}
} else {
break;
}
}
}
}
if deleted_count % 1000 != 0 {
self.storage.write(batch)
.map_err(|e| Error::storage(format!("Failed to write final compaction batch: {}", e)))?;
}
self.compaction_watermark.store(watermark_lsn, Ordering::SeqCst);
self.storage.put(b"change_meta:compaction_watermark", &watermark_lsn.to_le_bytes())
.map_err(|e| Error::storage(format!("Failed to update compaction watermark: {}", e)))?;
info!("Compaction complete: deleted {} entries", deleted_count);
Ok(deleted_count)
}
pub fn get_latest_lsn(&self) -> u64 {
self.current_lsn.load(Ordering::SeqCst)
}
pub fn get_compaction_watermark(&self) -> u64 {
self.compaction_watermark.load(Ordering::SeqCst)
}
pub fn get_entry(&self, lsn: u64) -> Result<Option<ChangeEntry>> {
let key = format!("change_log:{:020}", lsn);
match self.storage.get(key.as_bytes()) {
Ok(Some(bytes)) => {
let entry = ChangeEntry::deserialize(&bytes)?;
Ok(Some(entry))
}
Ok(None) => Ok(None),
Err(e) => Err(Error::storage(format!("Failed to get entry: {}", e))),
}
}
pub fn get_stats(&self) -> Result<ChangeLogStats> {
let mut stats = ChangeLogStats::default();
stats.current_lsn = self.current_lsn.load(Ordering::SeqCst);
stats.compaction_watermark = self.compaction_watermark.load(Ordering::SeqCst);
let prefix = b"change_log:";
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
let iter = self.storage.iterator_opt(IteratorMode::Start, read_opts);
for item in iter {
let (key, value) = item
.map_err(|e| Error::storage(format!("Iterator error: {}", e)))?;
if !key.starts_with(prefix) {
continue;
}
if let Ok(entry) = ChangeEntry::deserialize(&value) {
stats.total_entries += 1;
stats.estimated_size_bytes += value.len() as u64;
if stats.oldest_lsn.is_none() || Some(entry.lsn) < stats.oldest_lsn {
stats.oldest_lsn = Some(entry.lsn);
stats.oldest_timestamp = Some(entry.timestamp);
}
if stats.newest_timestamp.is_none() || Some(entry.timestamp) > stats.newest_timestamp {
stats.newest_timestamp = Some(entry.timestamp);
}
}
}
Ok(stats)
}
#[cfg(test)]
pub fn reset(&self) -> Result<()> {
warn!("Resetting change log - all data will be deleted");
let mut batch = WriteBatch::default();
let prefixes = [b"change_log:", b"change_index:", b"change_meta:"];
for prefix in &prefixes {
let mut read_opts = ReadOptions::default();
read_opts.set_total_order_seek(true);
let iter = self.storage.iterator_opt(IteratorMode::Start, read_opts);
for item in iter {
let (key, _) = item
.map_err(|e| Error::storage(format!("Iterator error: {}", e)))?;
if key.starts_with(prefix) {
batch.delete(&key);
}
}
}
self.storage.write(batch)
.map_err(|e| Error::storage(format!("Failed to reset change log: {}", e)))?;
self.current_lsn.store(0, Ordering::SeqCst);
self.compaction_watermark.store(0, Ordering::SeqCst);
info!("Change log reset complete");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use rocksdb::{DB, Options};
fn create_test_db() -> (Arc<DB>, TempDir) {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, temp_dir.path()).expect("Failed to open DB");
(Arc::new(db), temp_dir)
}
#[test]
fn test_change_log_new() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
assert_eq!(change_log.get_latest_lsn(), 0);
assert_eq!(change_log.get_compaction_watermark(), 0);
}
#[test]
fn test_append_and_query() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
let change = ChangeType::Insert {
table: "users".to_string(),
row_id: 42,
data: vec![1, 2, 3],
};
let vector_clock = VectorClock::new();
let lsn = change_log.append(1, change, vector_clock)
.expect("Failed to append");
assert_eq!(lsn, 0);
assert_eq!(change_log.get_latest_lsn(), 1);
let entries = change_log.query_since_lsn(0, None)
.expect("Failed to query");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].lsn, 0);
assert_eq!(entries[0].transaction_id, 1);
}
#[test]
fn test_multiple_appends() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
for i in 0..10 {
let change = ChangeType::Insert {
table: format!("table_{}", i % 3),
row_id: i,
data: vec![i as u8],
};
let vector_clock = VectorClock::new();
let lsn = change_log.append(i, change, vector_clock)
.expect("Failed to append");
assert_eq!(lsn, i);
}
assert_eq!(change_log.get_latest_lsn(), 10);
let entries = change_log.query_since_lsn(0, None)
.expect("Failed to query");
assert_eq!(entries.len(), 10);
}
#[test]
fn test_query_by_table() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
for i in 0..10 {
let change = ChangeType::Insert {
table: if i % 2 == 0 { "even" } else { "odd" }.to_string(),
row_id: i,
data: vec![i as u8],
};
let vector_clock = VectorClock::new();
change_log.append(i, change, vector_clock)
.expect("Failed to append");
}
let even_entries = change_log.query_by_table("even")
.expect("Failed to query");
assert_eq!(even_entries.len(), 5);
let odd_entries = change_log.query_by_table("odd")
.expect("Failed to query");
assert_eq!(odd_entries.len(), 5);
}
#[test]
fn test_compaction() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
for i in 0..100 {
let change = ChangeType::Insert {
table: "test".to_string(),
row_id: i,
data: vec![i as u8],
};
let vector_clock = VectorClock::new();
change_log.append(i, change, vector_clock)
.expect("Failed to append");
}
let deleted = change_log.compact(50)
.expect("Failed to compact");
assert_eq!(deleted, 50);
assert_eq!(change_log.get_compaction_watermark(), 50);
let entries = change_log.query_since_lsn(0, None)
.expect("Failed to query");
assert_eq!(entries.len(), 50);
assert_eq!(entries[0].lsn, 50);
}
#[test]
fn test_query_with_limit() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
for i in 0..100 {
let change = ChangeType::Insert {
table: "test".to_string(),
row_id: i,
data: vec![i as u8],
};
let vector_clock = VectorClock::new();
change_log.append(i, change, vector_clock)
.expect("Failed to append");
}
let options = QueryOptions::new().with_limit(10);
let entries = change_log.query(&options)
.expect("Failed to query");
assert_eq!(entries.len(), 10);
}
#[test]
fn test_get_entry() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
let change = ChangeType::Update {
table: "users".to_string(),
row_id: 42,
old_data: vec![1, 2, 3],
new_data: vec![4, 5, 6],
};
let vector_clock = VectorClock::new();
let lsn = change_log.append(1, change, vector_clock)
.expect("Failed to append");
let entry = change_log.get_entry(lsn)
.expect("Failed to get entry")
.expect("Entry not found");
assert_eq!(entry.lsn, lsn);
assert!(matches!(entry.change_type, ChangeType::Update { .. }));
}
#[test]
fn test_get_stats() {
let (db, _temp_dir) = create_test_db();
let change_log = ChangeLog::new(db).expect("Failed to create change log");
for i in 0..50 {
let change = ChangeType::Insert {
table: "test".to_string(),
row_id: i,
data: vec![i as u8; 100],
};
let vector_clock = VectorClock::new();
change_log.append(i, change, vector_clock)
.expect("Failed to append");
}
let stats = change_log.get_stats()
.expect("Failed to get stats");
assert_eq!(stats.total_entries, 50);
assert_eq!(stats.current_lsn, 50);
assert_eq!(stats.oldest_lsn, Some(0));
assert!(stats.estimated_size_bytes > 0);
}
#[test]
fn test_change_type_methods() {
let insert = ChangeType::Insert {
table: "users".to_string(),
row_id: 1,
data: vec![1, 2, 3],
};
assert_eq!(insert.table_name(), "users");
assert!(insert.is_dml());
assert!(!insert.is_ddl());
assert_eq!(insert.row_id(), Some(1));
let create_table = ChangeType::CreateTable {
table: "products".to_string(),
schema: Schema::new(vec![]),
};
assert_eq!(create_table.table_name(), "products");
assert!(create_table.is_ddl());
assert!(!create_table.is_dml());
assert_eq!(create_table.row_id(), None);
}
#[test]
fn test_persistence() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let db_path = temp_dir.path();
{
let mut opts = Options::default();
opts.create_if_missing(true);
let db = DB::open(&opts, db_path).expect("Failed to open DB");
let change_log = ChangeLog::new(Arc::new(db)).expect("Failed to create change log");
for i in 0..10 {
let change = ChangeType::Insert {
table: "test".to_string(),
row_id: i,
data: vec![i as u8],
};
let vector_clock = VectorClock::new();
change_log.append(i, change, vector_clock)
.expect("Failed to append");
}
}
{
let opts = Options::default();
let db = DB::open(&opts, db_path).expect("Failed to open DB");
let change_log = ChangeLog::new(Arc::new(db)).expect("Failed to create change log");
assert_eq!(change_log.get_latest_lsn(), 10);
let entries = change_log.query_since_lsn(0, None)
.expect("Failed to query");
assert_eq!(entries.len(), 10);
}
}
#[test]
fn test_concurrent_access() {
use std::thread;
let (db, _temp_dir) = create_test_db();
let change_log = Arc::new(ChangeLog::new(db).expect("Failed to create change log"));
let mut handles = vec![];
for thread_id in 0..4 {
let change_log_clone = Arc::clone(&change_log);
let handle = thread::spawn(move || {
for i in 0..25 {
let change = ChangeType::Insert {
table: format!("thread_{}", thread_id),
row_id: i,
data: vec![i as u8],
};
let vector_clock = VectorClock::new();
change_log_clone.append(thread_id * 100 + i, change, vector_clock)
.expect("Failed to append");
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("Thread panicked");
}
assert_eq!(change_log.get_latest_lsn(), 100);
let entries = change_log.query_since_lsn(0, None)
.expect("Failed to query");
assert_eq!(entries.len(), 100);
}
}