use crate::types::Lsn;
use chrono::{DateTime, Utc};
use pg_walstream::format_lsn;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use tracing::{debug, info, warn};
pub use pg_walstream::SharedLsnFeedback;
const METADATA_VERSION: &str = "1.0";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CdcMetadata {
pub version: String,
pub last_updated: DateTime<Utc>,
pub lsn_tracking: LsnTracking,
pub consumer_state: ConsumerState,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LsnTracking {
pub flush_lsn: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsumerState {
pub last_processed_tx_id: Option<u32>,
pub last_processed_timestamp: Option<DateTime<Utc>>,
pub pending_file_count: usize,
}
impl Default for CdcMetadata {
fn default() -> Self {
Self {
version: METADATA_VERSION.to_string(),
last_updated: Utc::now(),
lsn_tracking: LsnTracking { flush_lsn: 0 },
consumer_state: ConsumerState {
last_processed_tx_id: None,
last_processed_timestamp: None,
pending_file_count: 0,
},
}
}
}
impl CdcMetadata {
pub fn new() -> Self {
Self::default()
}
fn update_flush_lsn(&mut self, lsn: u64) {
if lsn > self.lsn_tracking.flush_lsn {
self.lsn_tracking.flush_lsn = lsn;
self.last_updated = Utc::now();
}
}
pub fn update_consumer_state(
&mut self,
tx_id: u32,
timestamp: DateTime<Utc>,
pending_count: usize,
) {
self.consumer_state.last_processed_tx_id = Some(tx_id);
self.consumer_state.last_processed_timestamp = Some(timestamp);
self.consumer_state.pending_file_count = pending_count;
self.last_updated = Utc::now();
}
pub fn get_lsn(&self) -> u64 {
self.lsn_tracking.flush_lsn
}
}
#[derive(Debug)]
pub struct LsnTracker {
metadata: Arc<Mutex<CdcMetadata>>,
last_persisted_lsn: AtomicU64,
dirty: AtomicBool,
lsn_file_path: String,
}
impl LsnTracker {
pub async fn new(lsn_file_path: Option<&str>) -> Arc<Self> {
let mut path: String = lsn_file_path
.map(String::from)
.or_else(|| std::env::var("CDC_LAST_LSN_FILE").ok())
.unwrap_or_else(|| "./pg2any_last_lsn.metadata".to_string());
if !path.ends_with(".metadata") {
path.push_str(".metadata");
}
if let Some(parent) = std::path::Path::new(&path).parent() {
if !parent.as_os_str().is_empty() && tokio::fs::metadata(parent).await.is_err() {
if let Err(e) = tokio::fs::create_dir_all(parent).await {
warn!("Failed to create directory for LSN file {}: {}", path, e);
} else {
info!("Created directory for LSN metadata: {:?}", parent);
}
}
}
Arc::new(Self {
metadata: Arc::new(Mutex::new(CdcMetadata::default())),
last_persisted_lsn: AtomicU64::new(0),
dirty: AtomicBool::new(false),
lsn_file_path: path,
})
}
pub async fn new_with_load(lsn_file_path: Option<&str>) -> (Arc<Self>, Option<Lsn>) {
let tracker = Self::new(lsn_file_path).await;
let loaded_metadata = tracker.load_from_file().await;
if let Some(metadata) = loaded_metadata {
let mut current_metadata = tracker.metadata.lock().unwrap();
*current_metadata = metadata.clone();
drop(current_metadata);
let flush_lsn = metadata.get_lsn();
tracker
.last_persisted_lsn
.store(flush_lsn, Ordering::Release);
let loaded_lsn = if flush_lsn > 0 {
Some(Lsn(flush_lsn))
} else {
None
};
(tracker, loaded_lsn)
} else {
(tracker, None)
}
}
pub async fn shutdown_async(&self) {
info!("Shutting down LSN tracker");
if let Err(e) = self.persist_async().await {
warn!("Failed to persist on shutdown: {}", e);
} else {
info!("Final state persisted on shutdown");
}
info!("LSN tracker stopped gracefully");
}
pub fn shutdown_sync(&self) {
info!("Initiating sync shutdown of LSN tracker");
if let Err(e) = self.persist_sync() {
warn!("Failed to persist LSN on sync shutdown: {}", e);
}
}
pub async fn load_from_file(&self) -> Option<CdcMetadata> {
match tokio::fs::read_to_string(&self.lsn_file_path).await {
Ok(contents) => {
let s = contents.trim();
if s.is_empty() {
info!(
"LSN metadata file {} is empty, starting from latest",
self.lsn_file_path
);
None
} else {
match serde_json::from_str::<CdcMetadata>(s) {
Ok(metadata) => {
info!(
"Loaded CDC metadata from {} (flush_lsn: {})",
self.lsn_file_path,
format_lsn(metadata.lsn_tracking.flush_lsn)
);
Some(metadata)
}
Err(e) => {
warn!(
"Failed to parse metadata from {}: {}. File must contain valid JSON.",
self.lsn_file_path, e
);
None
}
}
}
}
Err(_) => {
info!(
"No persisted metadata file found at {}, starting from latest",
self.lsn_file_path
);
None
}
}
}
#[inline]
pub fn commit_lsn(&self, lsn: u64) {
if lsn == 0 {
return;
}
let mut metadata = self.metadata.lock().unwrap();
if lsn > metadata.lsn_tracking.flush_lsn {
metadata.update_flush_lsn(lsn);
drop(metadata);
self.dirty.store(true, Ordering::Release);
}
}
pub fn update_consumer_state(
&self,
tx_id: u32,
timestamp: DateTime<Utc>,
pending_count: usize,
) {
{
let mut metadata = self.metadata.lock().unwrap();
metadata.update_consumer_state(tx_id, timestamp, pending_count);
}
self.dirty.store(true, Ordering::Release);
debug!(
"Updated consumer state: tx_id={}, pending_count={}",
tx_id, pending_count
);
}
#[inline]
pub fn get(&self) -> u64 {
let metadata = self.metadata.lock().unwrap();
metadata.get_lsn()
}
pub fn get_lsn(&self) -> Option<Lsn> {
let v = self.get();
if v == 0 {
None
} else {
Some(Lsn(v))
}
}
pub fn get_metadata(&self) -> CdcMetadata {
let metadata = self.metadata.lock().unwrap();
metadata.clone()
}
pub async fn persist_async(&self) -> std::io::Result<()> {
self.persist_internal().await
}
async fn persist_internal(&self) -> std::io::Result<()> {
let metadata = {
let m = self.metadata.lock().unwrap();
m.clone()
};
let flush_lsn = metadata.get_lsn();
let json_content =
serde_json::to_string_pretty(&metadata).map_err(std::io::Error::other)?;
let temp_path = format!("{}.tmp", self.lsn_file_path);
tokio::fs::write(&temp_path, &json_content).await?;
tokio::fs::rename(&temp_path, &self.lsn_file_path).await?;
self.last_persisted_lsn.store(flush_lsn, Ordering::Release);
self.dirty.store(false, Ordering::Release);
debug!(
"Persisted CDC metadata to {} (flush_lsn: {})",
self.lsn_file_path,
format_lsn(flush_lsn)
);
Ok(())
}
fn persist_sync(&self) -> std::io::Result<()> {
let metadata = {
let m = self.metadata.lock().unwrap();
m.clone()
};
let flush_lsn = metadata.get_lsn();
let json_content =
serde_json::to_string_pretty(&metadata).map_err(std::io::Error::other)?;
let temp_path = format!("{}.tmp", self.lsn_file_path);
debug!("Writing LSN metadata to temp file (sync): {}", temp_path);
std::fs::write(&temp_path, &json_content)?;
debug!("Renaming (sync) {} -> {}", temp_path, self.lsn_file_path);
std::fs::rename(&temp_path, &self.lsn_file_path)?;
self.last_persisted_lsn.store(flush_lsn, Ordering::Release);
self.dirty.store(false, Ordering::Release);
debug!(
"Persisted CDC metadata to {} (sync) - flush_lsn: {}",
self.lsn_file_path,
format_lsn(flush_lsn)
);
Ok(())
}
pub fn file_path(&self) -> &str {
&self.lsn_file_path
}
pub fn is_dirty(&self) -> bool {
self.dirty.load(Ordering::Acquire)
}
}
pub async fn create_lsn_tracker_with_load(
lsn_file_path: Option<&str>,
) -> (Arc<LsnTracker>, Option<Lsn>) {
LsnTracker::new_with_load(lsn_file_path).await
}
#[cfg(test)]
mod lsn_tracker_tests {
use super::*;
#[tokio::test]
async fn test_lsn_tracker_new() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn")).await;
assert_eq!(tracker.get(), 0);
assert_eq!(tracker.file_path(), "/tmp/test_lsn.metadata");
}
#[tokio::test]
async fn test_lsn_tracker_get_lsn() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn_get_lsn")).await;
tracker.commit_lsn(100);
let lsn = tracker.get_lsn();
assert_eq!(lsn, Some(Lsn(100)));
}
#[tokio::test]
async fn test_lsn_tracker_commit_lsn() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn_commit_lsn")).await;
tracker.commit_lsn(100);
assert_eq!(tracker.get(), 100);
tracker.commit_lsn(50);
assert_eq!(tracker.get(), 100);
tracker.commit_lsn(200);
assert_eq!(tracker.get(), 200);
}
#[tokio::test]
async fn test_zero_lsn_ignored() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn_zero")).await;
tracker.commit_lsn(100);
tracker.commit_lsn(0);
assert_eq!(tracker.get(), 100);
}
#[tokio::test]
async fn test_persist_async() {
let path = "/tmp/test_lsn_persist_async";
let _ = std::fs::remove_file(format!("{}.metadata", path));
let tracker = LsnTracker::new(Some(path)).await;
tracker.commit_lsn(12345);
tracker.persist_async().await.unwrap();
let content = tokio::fs::read_to_string(format!("{}.metadata", path))
.await
.unwrap();
let metadata: CdcMetadata = serde_json::from_str(&content).unwrap();
assert_eq!(metadata.lsn_tracking.flush_lsn, 12345);
let _ = std::fs::remove_file(format!("{}.metadata", path));
}
#[tokio::test]
async fn test_persist_skips_when_not_dirty() {
let path = "/tmp/test_lsn_persist_skip";
let _ = std::fs::remove_file(format!("{}.metadata", path));
let tracker = LsnTracker::new(Some(path)).await;
tracker.commit_lsn(12345);
tracker.persist_async().await.unwrap();
let result = tracker.persist_async().await;
assert!(result.is_ok());
let _ = std::fs::remove_file(format!("{}.metadata", path));
}
#[tokio::test]
async fn test_commit_lsn_marks_dirty() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn_dirty")).await;
assert!(!tracker.is_dirty());
tracker.commit_lsn(100);
assert!(tracker.is_dirty());
}
#[tokio::test]
async fn test_load_from_file() {
let path = "/tmp/test_lsn_load";
let metadata_path = format!("{}.metadata", path);
let metadata = CdcMetadata {
version: "1.0".to_string(),
last_updated: Utc::now(),
lsn_tracking: LsnTracking { flush_lsn: 54321 },
consumer_state: ConsumerState {
last_processed_tx_id: Some(999),
last_processed_timestamp: Some(Utc::now()),
pending_file_count: 5,
},
};
let json = serde_json::to_string_pretty(&metadata).unwrap();
tokio::fs::write(&metadata_path, json).await.unwrap();
let tracker = LsnTracker::new(Some(path)).await;
let loaded = tracker.load_from_file().await;
assert!(loaded.is_some());
let loaded_metadata = loaded.unwrap();
assert_eq!(loaded_metadata.lsn_tracking.flush_lsn, 54321);
assert_eq!(
loaded_metadata.consumer_state.last_processed_tx_id,
Some(999)
);
let _ = std::fs::remove_file(metadata_path);
}
#[tokio::test]
async fn test_new_with_load() {
let path = "/tmp/test_lsn_new_with_load";
let metadata_path = format!("{}.metadata", path);
let metadata = CdcMetadata {
version: "1.0".to_string(),
last_updated: Utc::now(),
lsn_tracking: LsnTracking { flush_lsn: 67890 },
consumer_state: ConsumerState {
last_processed_tx_id: None,
last_processed_timestamp: None,
pending_file_count: 0,
},
};
let json = serde_json::to_string_pretty(&metadata).unwrap();
tokio::fs::write(&metadata_path, json).await.unwrap();
let (tracker, loaded_lsn) = LsnTracker::new_with_load(Some(path)).await;
assert_eq!(loaded_lsn, Some(Lsn(67890)));
assert_eq!(tracker.get(), 67890);
let _ = std::fs::remove_file(metadata_path);
}
#[tokio::test]
async fn test_metadata_extension_not_present() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn_no_ext")).await;
assert!(tracker.file_path().ends_with(".metadata"));
}
#[tokio::test]
async fn test_metadata_extension_already_present() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn.metadata")).await;
assert_eq!(tracker.file_path().matches(".metadata").count(), 1);
}
#[tokio::test]
async fn test_shared_across_threads() {
let tracker = LsnTracker::new(Some("/tmp/test_lsn_threads")).await;
let tracker_clone = tracker.clone();
let handle = tokio::spawn(async move {
tracker_clone.commit_lsn(12345);
});
handle.await.unwrap();
assert_eq!(tracker.get(), 12345);
}
#[tokio::test]
async fn test_shutdown_without_background_task() {
let path = "/tmp/test_lsn_shutdown_simple";
let _ = std::fs::remove_file(format!("{}.metadata", path));
let tracker = LsnTracker::new(Some(path)).await;
tracker.commit_lsn(99999);
tracker.shutdown_async().await;
let content = tokio::fs::read_to_string(format!("{}.metadata", path))
.await
.unwrap();
let metadata: CdcMetadata = serde_json::from_str(&content).unwrap();
assert_eq!(metadata.lsn_tracking.flush_lsn, 99999);
let _ = std::fs::remove_file(format!("{}.metadata", path));
}
#[tokio::test]
async fn test_double_shutdown_is_safe() {
let path = "/tmp/test_lsn_double_shutdown";
let tracker = LsnTracker::new(Some(path)).await;
tracker.shutdown_async().await;
tracker.shutdown_async().await;
let _ = std::fs::remove_file(format!("{}.metadata", path));
}
#[test]
fn test_shared_lsn_feedback_new() {
let feedback = SharedLsnFeedback::new_shared();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
}
#[test]
fn test_update_flushed_lsn() {
let feedback = SharedLsnFeedback::new_shared();
feedback.update_flushed_lsn(100);
assert_eq!(feedback.get_flushed_lsn(), 100);
}
#[test]
fn test_update_applied_lsn() {
let feedback = SharedLsnFeedback::new_shared();
feedback.update_applied_lsn(200);
assert_eq!(feedback.get_applied_lsn(), 200);
}
#[test]
fn test_get_feedback_lsn() {
let feedback = SharedLsnFeedback::new_shared();
feedback.update_flushed_lsn(150);
feedback.update_applied_lsn(200);
let (flushed, applied) = feedback.get_feedback_lsn();
assert_eq!(flushed, 200);
assert_eq!(applied, 200);
}
}