use crate::types::{format_lsn, CachePadded, XLogRecPtr};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, info};
#[derive(Debug)]
pub struct SharedLsnFeedback {
flushed_lsn: CachePadded<AtomicU64>,
applied_lsn: CachePadded<AtomicU64>,
}
impl SharedLsnFeedback {
pub fn new() -> Self {
Self {
flushed_lsn: CachePadded::new(AtomicU64::new(0)),
applied_lsn: CachePadded::new(AtomicU64::new(0)),
}
}
pub fn new_shared() -> Arc<Self> {
Arc::new(Self::new())
}
#[inline]
pub fn update_flushed_lsn(&self, lsn: XLogRecPtr) {
if lsn == 0 {
return;
}
let mut current = self.flushed_lsn.load(Ordering::Acquire);
loop {
if lsn <= current {
return;
}
match self.flushed_lsn.compare_exchange_weak(
current,
lsn,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
debug!(
"SharedLsnFeedback: Updated flushed LSN from {} to {}",
current, lsn
);
return;
}
Err(actual) => current = actual,
}
}
}
#[inline]
pub fn update_applied_lsn(&self, lsn: XLogRecPtr) {
if lsn == 0 {
return;
}
let mut current = self.applied_lsn.load(Ordering::Acquire);
let mut advanced = false;
loop {
if lsn <= current {
break;
}
match self.applied_lsn.compare_exchange_weak(
current,
lsn,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
debug!(
"SharedLsnFeedback: Updated applied LSN from {} to {}",
current, lsn
);
advanced = true;
break;
}
Err(actual) => current = actual,
}
}
if advanced {
self.update_flushed_lsn(lsn);
}
}
#[inline(always)]
pub fn get_flushed_lsn(&self) -> XLogRecPtr {
self.flushed_lsn.load(Ordering::Acquire)
}
#[inline(always)]
pub fn get_applied_lsn(&self) -> XLogRecPtr {
self.applied_lsn.load(Ordering::Acquire)
}
#[inline(always)]
pub fn get_feedback_lsn(&self) -> (XLogRecPtr, XLogRecPtr) {
let flushed = self.flushed_lsn.load(Ordering::Acquire);
let applied = self.applied_lsn.load(Ordering::Acquire);
(flushed, applied)
}
pub fn log_state(&self, prefix: &str) {
let flushed = self.get_flushed_lsn();
let applied = self.get_applied_lsn();
info!(
"{}: flushed_lsn={}, applied_lsn={}",
prefix,
format_lsn(flushed),
format_lsn(applied)
);
}
}
impl Default for SharedLsnFeedback {
fn default() -> Self {
Self::new()
}
}
impl Clone for SharedLsnFeedback {
fn clone(&self) -> Self {
Self {
flushed_lsn: CachePadded::new(AtomicU64::new(self.flushed_lsn.load(Ordering::Acquire))),
applied_lsn: CachePadded::new(AtomicU64::new(self.applied_lsn.load(Ordering::Acquire))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_shared_lsn_feedback_new() {
let feedback = SharedLsnFeedback::new();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
}
#[test]
fn test_update_flushed_lsn() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_flushed_lsn(50);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_flushed_lsn(200);
assert_eq!(feedback.get_flushed_lsn(), 200);
feedback.update_flushed_lsn(0);
assert_eq!(feedback.get_flushed_lsn(), 200);
}
#[test]
fn test_update_applied_lsn() {
let feedback = SharedLsnFeedback::new();
feedback.update_applied_lsn(100);
assert_eq!(feedback.get_applied_lsn(), 100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_applied_lsn(50);
assert_eq!(feedback.get_applied_lsn(), 100);
feedback.update_applied_lsn(200);
assert_eq!(feedback.get_applied_lsn(), 200);
assert_eq!(feedback.get_flushed_lsn(), 200);
}
#[test]
fn test_get_feedback_lsn() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
feedback.update_applied_lsn(50);
let (flushed, applied) = feedback.get_feedback_lsn();
assert_eq!(flushed, 100);
assert_eq!(applied, 50);
}
#[test]
fn test_clone() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
feedback.update_applied_lsn(50);
let cloned = feedback.clone();
assert_eq!(cloned.get_flushed_lsn(), 100);
assert_eq!(cloned.get_applied_lsn(), 50);
cloned.update_applied_lsn(200);
assert_eq!(feedback.get_applied_lsn(), 50);
assert_eq!(cloned.get_applied_lsn(), 200);
}
#[test]
fn test_new_shared() {
let feedback = SharedLsnFeedback::new_shared();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
let feedback_clone = Arc::clone(&feedback);
feedback.update_applied_lsn(100);
assert_eq!(feedback_clone.get_applied_lsn(), 100);
}
#[test]
fn test_concurrent_updates() {
use std::sync::Arc;
use std::thread;
let feedback = SharedLsnFeedback::new_shared();
let mut handles = vec![];
for i in 0..10 {
let feedback_clone = Arc::clone(&feedback);
let handle = thread::spawn(move || {
for j in 0..100 {
let lsn = (i * 100 + j) as u64;
feedback_clone.update_flushed_lsn(lsn);
feedback_clone.update_applied_lsn(lsn);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let (flushed, applied) = feedback.get_feedback_lsn();
assert!(flushed <= 999);
assert!(applied <= 999);
assert!(flushed >= 900); assert!(applied >= 900);
}
#[test]
fn test_monotonic_increase() {
let feedback = SharedLsnFeedback::new();
for i in 1..=100 {
feedback.update_flushed_lsn(i);
assert_eq!(feedback.get_flushed_lsn(), i);
}
for i in (1..=50).rev() {
feedback.update_flushed_lsn(i);
assert_eq!(feedback.get_flushed_lsn(), 100);
}
}
#[test]
fn test_applied_updates_flushed() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(50);
feedback.update_applied_lsn(100);
assert_eq!(feedback.get_applied_lsn(), 100);
assert_eq!(feedback.get_flushed_lsn(), 100);
}
#[test]
fn test_zero_lsn_ignored() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
feedback.update_flushed_lsn(0); assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_applied_lsn(0); assert_eq!(feedback.get_applied_lsn(), 0);
}
#[test]
fn test_default_trait() {
let feedback = SharedLsnFeedback::default();
assert_eq!(feedback.get_flushed_lsn(), 0);
assert_eq!(feedback.get_applied_lsn(), 0);
let (f, a) = feedback.get_feedback_lsn();
assert_eq!(f, 0);
assert_eq!(a, 0);
}
#[test]
fn test_log_state() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(1000);
feedback.update_applied_lsn(500);
feedback.log_state("test_prefix");
}
#[test]
fn test_clone_independence() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(500);
feedback.update_applied_lsn(300);
let cloned = feedback.clone();
assert_eq!(cloned.get_flushed_lsn(), 500);
assert_eq!(cloned.get_applied_lsn(), 300);
feedback.update_flushed_lsn(1000);
feedback.update_applied_lsn(800);
assert_eq!(cloned.get_flushed_lsn(), 500);
assert_eq!(cloned.get_applied_lsn(), 300);
cloned.update_flushed_lsn(2000);
cloned.update_applied_lsn(1500);
assert_eq!(feedback.get_flushed_lsn(), 1000);
assert_eq!(feedback.get_applied_lsn(), 800);
}
#[test]
fn test_log_state_with_nonzero_lsns() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(0x16B374D848);
feedback.update_applied_lsn(0x16B374D800);
feedback.log_state("replication");
}
#[test]
fn test_equal_lsn_no_update() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_flushed_lsn(100);
assert_eq!(feedback.get_flushed_lsn(), 100);
feedback.update_applied_lsn(50);
feedback.update_applied_lsn(50);
assert_eq!(feedback.get_applied_lsn(), 50);
}
#[test]
fn test_concurrent_cas_retry_path() {
use std::sync::Arc;
use std::sync::Barrier;
use std::thread;
const THREADS: usize = 16;
const ITERS: u64 = 5_000;
let feedback = SharedLsnFeedback::new_shared();
let barrier = Arc::new(Barrier::new(THREADS));
let mut handles = Vec::with_capacity(THREADS);
for tid in 0..THREADS as u64 {
let fb = Arc::clone(&feedback);
let bar = Arc::clone(&barrier);
handles.push(thread::spawn(move || {
bar.wait();
for i in 1..=ITERS {
let lsn = i * THREADS as u64 + tid;
fb.update_flushed_lsn(lsn);
fb.update_applied_lsn(lsn);
}
}));
}
for h in handles {
h.join().unwrap();
}
let max_lsn = ITERS * THREADS as u64 + (THREADS as u64 - 1);
assert!(
feedback.get_applied_lsn() >= max_lsn - (THREADS as u64 - 1),
"applied LSN regressed under contention: got {}, want >= {}",
feedback.get_applied_lsn(),
max_lsn - (THREADS as u64 - 1)
);
assert!(feedback.get_flushed_lsn() >= feedback.get_applied_lsn());
}
#[test]
fn test_applied_no_advance_does_not_modify_flushed() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(1000);
feedback.update_applied_lsn(500);
assert_eq!(feedback.get_flushed_lsn(), 1000);
assert_eq!(feedback.get_applied_lsn(), 500);
feedback.update_applied_lsn(400);
assert_eq!(feedback.get_flushed_lsn(), 1000);
assert_eq!(feedback.get_applied_lsn(), 500);
feedback.update_applied_lsn(500);
assert_eq!(feedback.get_flushed_lsn(), 1000);
assert_eq!(feedback.get_applied_lsn(), 500);
}
#[test]
fn test_applied_advance_drags_flushed_forward() {
let feedback = SharedLsnFeedback::new();
feedback.update_flushed_lsn(100);
feedback.update_applied_lsn(500);
assert_eq!(feedback.get_applied_lsn(), 500);
assert_eq!(feedback.get_flushed_lsn(), 500);
}
}