use super::wal_replicator::Lsn;
use super::{ReplicationError, Result};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LsnWatermark {
Write,
Flush,
Replay,
Checkpoint,
}
#[derive(Debug, Clone)]
pub struct LsnEntry {
pub node_id: Uuid,
pub write_lsn: Lsn,
pub flush_lsn: Lsn,
pub replay_lsn: Option<Lsn>,
pub checkpoint_lsn: Lsn,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
impl Default for LsnEntry {
fn default() -> Self {
Self {
node_id: Uuid::nil(),
write_lsn: 0,
flush_lsn: 0,
replay_lsn: None,
checkpoint_lsn: 0,
updated_at: chrono::Utc::now(),
}
}
}
#[derive(Debug, Clone)]
pub struct ReplicationSlot {
pub name: String,
pub standby_id: Uuid,
pub confirmed_flush_lsn: Lsn,
pub restart_lsn: Lsn,
pub active: bool,
pub created_at: chrono::DateTime<chrono::Utc>,
}
pub struct LsnManager {
node_id: Uuid,
local_lsn: Arc<RwLock<LsnEntry>>,
remote_lsns: Arc<RwLock<HashMap<Uuid, LsnEntry>>>,
slots: Arc<RwLock<HashMap<String, ReplicationSlot>>>,
}
impl LsnManager {
pub fn new(node_id: Uuid) -> Self {
let mut local = LsnEntry::default();
local.node_id = node_id;
Self {
node_id,
local_lsn: Arc::new(RwLock::new(local)),
remote_lsns: Arc::new(RwLock::new(HashMap::new())),
slots: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn write_lsn(&self) -> Lsn {
self.local_lsn.read().await.write_lsn
}
pub async fn flush_lsn(&self) -> Lsn {
self.local_lsn.read().await.flush_lsn
}
pub async fn checkpoint_lsn(&self) -> Lsn {
self.local_lsn.read().await.checkpoint_lsn
}
pub async fn advance_write(&self, new_lsn: Lsn) -> Result<()> {
let mut entry = self.local_lsn.write().await;
if new_lsn <= entry.write_lsn {
return Err(ReplicationError::LsnTracking(format!(
"New LSN {} is not greater than current {}",
new_lsn, entry.write_lsn
)));
}
entry.write_lsn = new_lsn;
entry.updated_at = chrono::Utc::now();
Ok(())
}
pub async fn advance_flush(&self, new_lsn: Lsn) -> Result<()> {
let mut entry = self.local_lsn.write().await;
if new_lsn > entry.write_lsn {
return Err(ReplicationError::LsnTracking(format!(
"Flush LSN {} cannot exceed write LSN {}",
new_lsn, entry.write_lsn
)));
}
entry.flush_lsn = new_lsn;
entry.updated_at = chrono::Utc::now();
Ok(())
}
pub async fn set_checkpoint(&self, lsn: Lsn) -> Result<()> {
let mut entry = self.local_lsn.write().await;
entry.checkpoint_lsn = lsn;
entry.updated_at = chrono::Utc::now();
Ok(())
}
pub async fn update_remote(&self, node_id: Uuid, lsn_entry: LsnEntry) {
self.remote_lsns.write().await.insert(node_id, lsn_entry);
}
pub async fn get_remote(&self, node_id: &Uuid) -> Option<LsnEntry> {
self.remote_lsns.read().await.get(node_id).cloned()
}
pub async fn replication_lag(&self, standby_id: &Uuid) -> Option<u64> {
let local = self.local_lsn.read().await;
let remotes = self.remote_lsns.read().await;
remotes.get(standby_id).map(|remote| {
let standby_lsn = remote.replay_lsn.unwrap_or(remote.flush_lsn);
local.write_lsn.saturating_sub(standby_lsn)
})
}
pub async fn min_confirmed_flush(&self) -> Lsn {
let remotes = self.remote_lsns.read().await;
remotes
.values()
.map(|e| e.flush_lsn)
.min()
.unwrap_or(0)
}
pub async fn create_slot(&self, name: String, standby_id: Uuid) -> Result<ReplicationSlot> {
let mut slots = self.slots.write().await;
if slots.contains_key(&name) {
return Err(ReplicationError::LsnTracking(format!(
"Slot '{}' already exists",
name
)));
}
let local = self.local_lsn.read().await;
let slot = ReplicationSlot {
name: name.clone(),
standby_id,
confirmed_flush_lsn: 0,
restart_lsn: local.flush_lsn,
active: false,
created_at: chrono::Utc::now(),
};
slots.insert(name, slot.clone());
Ok(slot)
}
pub async fn drop_slot(&self, name: &str) -> Result<()> {
let mut slots = self.slots.write().await;
slots.remove(name).ok_or_else(|| {
ReplicationError::LsnTracking(format!("Slot '{}' not found", name))
})?;
Ok(())
}
pub async fn activate_slot(&self, name: &str) -> Result<()> {
let mut slots = self.slots.write().await;
let slot = slots.get_mut(name).ok_or_else(|| {
ReplicationError::LsnTracking(format!("Slot '{}' not found", name))
})?;
slot.active = true;
Ok(())
}
pub async fn deactivate_slot(&self, name: &str) -> Result<()> {
let mut slots = self.slots.write().await;
let slot = slots.get_mut(name).ok_or_else(|| {
ReplicationError::LsnTracking(format!("Slot '{}' not found", name))
})?;
slot.active = false;
Ok(())
}
pub async fn update_slot_flush(&self, name: &str, lsn: Lsn) -> Result<()> {
let mut slots = self.slots.write().await;
let slot = slots.get_mut(name).ok_or_else(|| {
ReplicationError::LsnTracking(format!("Slot '{}' not found", name))
})?;
slot.confirmed_flush_lsn = lsn;
Ok(())
}
pub async fn get_slot(&self, name: &str) -> Option<ReplicationSlot> {
self.slots.read().await.get(name).cloned()
}
pub async fn list_slots(&self) -> Vec<ReplicationSlot> {
self.slots.read().await.values().cloned().collect()
}
pub async fn local_entry(&self) -> LsnEntry {
self.local_lsn.read().await.clone()
}
pub async fn remote_entries(&self) -> HashMap<Uuid, LsnEntry> {
self.remote_lsns.read().await.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_lsn_manager_creation() {
let node_id = Uuid::new_v4();
let manager = LsnManager::new(node_id);
assert_eq!(manager.write_lsn().await, 0);
assert_eq!(manager.flush_lsn().await, 0);
}
#[tokio::test]
async fn test_advance_write_lsn() {
let manager = LsnManager::new(Uuid::new_v4());
manager.advance_write(100).await.expect("advance failed");
assert_eq!(manager.write_lsn().await, 100);
manager.advance_write(200).await.expect("advance failed");
assert_eq!(manager.write_lsn().await, 200);
assert!(manager.advance_write(200).await.is_err());
assert!(manager.advance_write(150).await.is_err());
}
#[tokio::test]
async fn test_flush_lsn_constraint() {
let manager = LsnManager::new(Uuid::new_v4());
manager.advance_write(100).await.expect("advance failed");
manager.advance_flush(50).await.expect("flush failed");
manager.advance_flush(100).await.expect("flush failed");
assert!(manager.advance_flush(150).await.is_err());
}
#[tokio::test]
async fn test_replication_lag() {
let manager = LsnManager::new(Uuid::new_v4());
manager.advance_write(1000).await.expect("advance failed");
let standby_id = Uuid::new_v4();
let standby_entry = LsnEntry {
node_id: standby_id,
write_lsn: 0,
flush_lsn: 500,
replay_lsn: Some(500),
checkpoint_lsn: 0,
updated_at: chrono::Utc::now(),
};
manager.update_remote(standby_id, standby_entry).await;
let lag = manager.replication_lag(&standby_id).await;
assert_eq!(lag, Some(500)); }
#[tokio::test]
async fn test_replication_slots() {
let manager = LsnManager::new(Uuid::new_v4());
let standby_id = Uuid::new_v4();
let slot = manager
.create_slot("standby_slot".to_string(), standby_id)
.await
.expect("create failed");
assert!(!slot.active);
manager.activate_slot("standby_slot").await.expect("activate failed");
let slot = manager.get_slot("standby_slot").await.expect("get failed");
assert!(slot.active);
manager.update_slot_flush("standby_slot", 100).await.expect("update failed");
let slot = manager.get_slot("standby_slot").await.expect("get failed");
assert_eq!(slot.confirmed_flush_lsn, 100);
let slots = manager.list_slots().await;
assert_eq!(slots.len(), 1);
manager.drop_slot("standby_slot").await.expect("drop failed");
assert!(manager.get_slot("standby_slot").await.is_none());
}
}