use crate::types::{format_lsn, CachePadded, XLogRecPtr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, info};
#[derive(Debug)]
pub struct SharedLsnFeedback {
flushed_lsn: CachePadded<AtomicU64>,
applied_lsn: CachePadded<AtomicU64>,
}
impl SharedLsnFeedback {
pub fn new() -> Self {
Self {
flushed_lsn: CachePadded::new(AtomicU64::new(0)),
applied_lsn: CachePadded::new(AtomicU64::new(0)),
}
}
pub fn new_shared() -> Arc<Self> {
Arc::new(Self::new())
}
pub fn update_flushed_lsn(&self, lsn: XLogRecPtr) {
if lsn == 0 {
return;
}
loop {
let current = self.flushed_lsn.load(Ordering::Acquire);
if lsn <= current {
return;
}
match self.flushed_lsn.compare_exchange_weak(
current,
lsn,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
debug!(
"SharedLsnFeedback: Updated flushed LSN from {} to {}",
current, lsn
);
return;
}
Err(_) => continue,
}
}
}
pub fn update_applied_lsn(&self, lsn: XLogRecPtr) {
if lsn == 0 {
return;
}
loop {
let current = self.applied_lsn.load(Ordering::Acquire);
if lsn <= current {
break;
}
match self.applied_lsn.compare_exchange_weak(
current,
lsn,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
debug!(
"SharedLsnFeedback: Updated applied LSN from {} to {}",
current, lsn
);
break;
}
Err(_) => continue,
}
}
self.update_flushed_lsn(lsn);
}
#[inline(always)]
pub fn get_flushed_lsn(&self) -> XLogRecPtr {
self.flushed_lsn.load(Ordering::Acquire)
}
#[inline(always)]
pub fn get_applied_lsn(&self) -> XLogRecPtr {
self.applied_lsn.load(Ordering::Acquire)
}
#[inline(always)]
pub fn get_feedback_lsn(&self) -> (XLogRecPtr, XLogRecPtr) {
let flushed = self.flushed_lsn.load(Ordering::Acquire);
let applied = self.applied_lsn.load(Ordering::Acquire);
(flushed, applied)
}
pub fn log_state(&self, prefix: &str) {
let flushed = self.get_flushed_lsn();
let applied = self.get_applied_lsn();
info!(
"{}: flushed_lsn={}, applied_lsn={}",
prefix,
format_lsn(flushed),
format_lsn(applied)
);
}
}
impl Default for SharedLsnFeedback {
fn default() -> Self {
Self::new()
}
}
impl Clone for SharedLsnFeedback {
fn clone(&self) -> Self {
Self {
flushed_lsn: CachePadded::new(AtomicU64::new(self.flushed_lsn.load(Ordering::Acquire))),
applied_lsn: CachePadded::new(AtomicU64::new(self.applied_lsn.load(Ordering::Acquire))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_shared_lsn_feedback_new() {
let feedback = SharedLsnFeedback::new();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
}
#[test]
fn test_update_flushed_lsn() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_flushed_lsn(50);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_flushed_lsn(200);
assert_eq!(feedback.get_flushed_lsn(), 200);
feedback.update_flushed_lsn(0);
assert_eq!(feedback.get_flushed_lsn(), 200);
}
#[test]
fn test_update_applied_lsn() {
let feedback = SharedLsnFeedback::new();
feedback.update_applied_lsn(100);
assert_eq!(feedback.get_applied_lsn(), 100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_applied_lsn(50);
assert_eq!(feedback.get_applied_lsn(), 100);
feedback.update_applied_lsn(200);
assert_eq!(feedback.get_applied_lsn(), 200);
assert_eq!(feedback.get_flushed_lsn(), 200);
}
#[test]
fn test_get_feedback_lsn() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
feedback.update_applied_lsn(50);
let (flushed, applied) = feedback.get_feedback_lsn();
assert_eq!(flushed, 100);
assert_eq!(applied, 50);
}
#[test]
fn test_clone() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
feedback.update_applied_lsn(50);
let cloned = feedback.clone();
assert_eq!(cloned.get_flushed_lsn(), 100);
assert_eq!(cloned.get_applied_lsn(), 50);
cloned.update_applied_lsn(200);
assert_eq!(feedback.get_applied_lsn(), 50);
assert_eq!(cloned.get_applied_lsn(), 200);
}
#[test]
fn test_new_shared() {
let feedback = SharedLsnFeedback::new_shared();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
let feedback_clone = Arc::clone(&feedback);
feedback.update_applied_lsn(100);
assert_eq!(feedback_clone.get_applied_lsn(), 100);
}
#[test]
fn test_concurrent_updates() {
use std::sync::Arc;
use std::thread;
let feedback = SharedLsnFeedback::new_shared();
let mut handles = vec![];
for i in 0..10 {
let feedback_clone = Arc::clone(&feedback);
let handle = thread::spawn(move || {
for j in 0..100 {
let lsn = (i * 100 + j) as u64;
feedback_clone.update_flushed_lsn(lsn);
feedback_clone.update_applied_lsn(lsn);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let (flushed, applied) = feedback.get_feedback_lsn();
assert!(flushed <= 999);
assert!(applied <= 999);
assert!(flushed >= 900); assert!(applied >= 900);
}
#[test]
fn test_monotonic_increase() {
let feedback = SharedLsnFeedback::new();
for i in 1..=100 {
feedback.update_flushed_lsn(i);
assert_eq!(feedback.get_flushed_lsn(), i);
}
for i in (1..=50).rev() {
feedback.update_flushed_lsn(i);
assert_eq!(feedback.get_flushed_lsn(), 100);
}
}
#[test]
fn test_applied_updates_flushed() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(50);
feedback.update_applied_lsn(100);
assert_eq!(feedback.get_applied_lsn(), 100);
assert_eq!(feedback.get_flushed_lsn(), 100);
}
#[test]
fn test_zero_lsn_ignored() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
feedback.update_flushed_lsn(0); assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_applied_lsn(0); assert_eq!(feedback.get_applied_lsn(), 0);
}
#[test]
fn test_default_trait() {
let feedback = SharedLsnFeedback::default();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
let (f, a) = feedback.get_feedback_lsn();
assert_eq!(f, 0);
assert_eq!(a, 0);
}
#[test]
fn test_log_state() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(1000);
feedback.update_applied_lsn(500);
feedback.log_state("test_prefix");
}
#[test]
fn test_clone_independence() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(500);
feedback.update_applied_lsn(300);
let cloned = feedback.clone();
assert_eq!(cloned.get_flushed_lsn(), 500);
assert_eq!(cloned.get_applied_lsn(), 300);
feedback.update_flushed_lsn(1000);
feedback.update_applied_lsn(800);
assert_eq!(cloned.get_flushed_lsn(), 500);
assert_eq!(cloned.get_applied_lsn(), 300);
cloned.update_flushed_lsn(2000);
cloned.update_applied_lsn(1500);
assert_eq!(feedback.get_flushed_lsn(), 1000);
assert_eq!(feedback.get_applied_lsn(), 800);
}
#[test]
fn test_log_state_with_nonzero_lsns() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(0x16B374D848);
feedback.update_applied_lsn(0x16B374D800);
feedback.log_state("replication");
}
#[test]
fn test_equal_lsn_no_update() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_flushed_lsn(100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_applied_lsn(50);
feedback.update_applied_lsn(50);
assert_eq!(feedback.get_applied_lsn(), 50);
}
}