use crate::common::SignalChannelType;
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReadOnlyConfig {
#[serde(default)]
pub read_only: bool,
#[serde(default = "default_read_only_channels")]
pub allowed_channels: Vec<SignalChannelType>,
#[serde(default = "default_min_postgres_version")]
pub min_postgres_version: u32,
#[serde(default = "default_true")]
pub heartbeat_watermarking: bool,
#[serde(default = "default_true")]
pub warn_on_disabled_features: bool,
}
fn default_read_only_channels() -> Vec<SignalChannelType> {
vec![
SignalChannelType::Topic,
SignalChannelType::File,
SignalChannelType::Jmx,
SignalChannelType::Api,
]
}
fn default_min_postgres_version() -> u32 {
13
}
fn default_true() -> bool {
true
}
impl Default for ReadOnlyConfig {
fn default() -> Self {
Self {
read_only: false,
allowed_channels: default_read_only_channels(),
min_postgres_version: default_min_postgres_version(),
heartbeat_watermarking: true,
warn_on_disabled_features: true,
}
}
}
impl ReadOnlyConfig {
pub fn builder() -> ReadOnlyConfigBuilder {
ReadOnlyConfigBuilder::default()
}
pub fn is_channel_allowed(&self, channel: SignalChannelType) -> bool {
if !self.read_only {
return true; }
if channel == SignalChannelType::Source {
return false;
}
self.allowed_channels.contains(&channel)
}
pub fn filter_channels(&self, channels: &[SignalChannelType]) -> Vec<SignalChannelType> {
channels
.iter()
.filter(|c| self.is_channel_allowed(**c))
.copied()
.collect()
}
pub fn check_postgres_version(&self, version: u32) -> Result<(), ReadOnlyError> {
if self.read_only && version < self.min_postgres_version {
return Err(ReadOnlyError::UnsupportedPostgresVersion {
required: self.min_postgres_version,
actual: version,
});
}
Ok(())
}
}
#[derive(Default)]
pub struct ReadOnlyConfigBuilder {
config: ReadOnlyConfig,
}
impl ReadOnlyConfigBuilder {
pub fn read_only(mut self, enabled: bool) -> Self {
self.config.read_only = enabled;
self
}
pub fn allowed_channels(mut self, channels: Vec<SignalChannelType>) -> Self {
self.config.allowed_channels = channels;
self
}
pub fn min_postgres_version(mut self, version: u32) -> Self {
self.config.min_postgres_version = version;
self
}
pub fn heartbeat_watermarking(mut self, enabled: bool) -> Self {
self.config.heartbeat_watermarking = enabled;
self
}
pub fn warn_on_disabled_features(mut self, enabled: bool) -> Self {
self.config.warn_on_disabled_features = enabled;
self
}
pub fn build(self) -> ReadOnlyConfig {
self.config
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WatermarkSource {
TransactionId(u64),
Lsn(String),
Heartbeat { xid: u64, lsn: String },
}
impl WatermarkSource {
pub fn xid(&self) -> Option<u64> {
match self {
WatermarkSource::TransactionId(xid) => Some(*xid),
WatermarkSource::Heartbeat { xid, .. } => Some(*xid),
WatermarkSource::Lsn(_) => None,
}
}
pub fn lsn(&self) -> Option<&str> {
match self {
WatermarkSource::Lsn(lsn) => Some(lsn),
WatermarkSource::Heartbeat { lsn, .. } => Some(lsn),
WatermarkSource::TransactionId(_) => None,
}
}
}
#[derive(Debug)]
pub struct ReadOnlyWatermarkTracker {
low_watermark: Arc<AtomicU64>,
high_watermark: Arc<AtomicU64>,
window_open: Arc<AtomicBool>,
current_chunk: Arc<AtomicU64>,
stats: Arc<WatermarkStats>,
}
#[derive(Debug, Default)]
struct WatermarkStats {
events_deduplicated: AtomicU64,
windows_opened: AtomicU64,
windows_closed: AtomicU64,
}
impl Default for ReadOnlyWatermarkTracker {
fn default() -> Self {
Self::new()
}
}
impl ReadOnlyWatermarkTracker {
pub fn new() -> Self {
Self {
low_watermark: Arc::new(AtomicU64::new(0)),
high_watermark: Arc::new(AtomicU64::new(0)),
window_open: Arc::new(AtomicBool::new(false)),
current_chunk: Arc::new(AtomicU64::new(0)),
stats: Arc::new(WatermarkStats::default()),
}
}
pub fn open_window(&self, xid: u64) {
self.low_watermark.store(xid, Ordering::SeqCst);
self.window_open.store(true, Ordering::SeqCst);
self.stats.windows_opened.fetch_add(1, Ordering::Relaxed);
debug!(xid = xid, "Opened read-only snapshot window");
}
pub fn close_window(&self, xid: u64) {
self.high_watermark.store(xid, Ordering::SeqCst);
self.window_open.store(false, Ordering::SeqCst);
self.stats.windows_closed.fetch_add(1, Ordering::Relaxed);
debug!(xid = xid, "Closed read-only snapshot window");
}
pub fn should_buffer(&self, xid: u64) -> bool {
if !self.window_open.load(Ordering::SeqCst) {
return false;
}
let low = self.low_watermark.load(Ordering::SeqCst);
let high = self.high_watermark.load(Ordering::SeqCst);
if high == 0 {
xid >= low
} else {
xid >= low && xid <= high
}
}
pub fn deduplicate(&self, event_xid: u64, snapshot_xid: Option<u64>) -> DeduplicationResult {
let low = self.low_watermark.load(Ordering::SeqCst);
let high = self.high_watermark.load(Ordering::SeqCst);
if event_xid < low {
DeduplicationResult::KeepEvent
} else if high > 0 && event_xid > high {
DeduplicationResult::KeepEvent
} else if let Some(snap_xid) = snapshot_xid {
if event_xid > snap_xid {
self.stats
.events_deduplicated
.fetch_add(1, Ordering::Relaxed);
DeduplicationResult::KeepEvent
} else {
DeduplicationResult::KeepSnapshot
}
} else {
DeduplicationResult::KeepEvent
}
}
pub fn low_watermark(&self) -> u64 {
self.low_watermark.load(Ordering::SeqCst)
}
pub fn high_watermark(&self) -> u64 {
self.high_watermark.load(Ordering::SeqCst)
}
pub fn is_window_open(&self) -> bool {
self.window_open.load(Ordering::SeqCst)
}
pub fn next_chunk(&self) -> u64 {
self.current_chunk.fetch_add(1, Ordering::SeqCst)
}
pub fn current_chunk(&self) -> u64 {
self.current_chunk.load(Ordering::SeqCst)
}
pub fn stats(&self) -> ReadOnlyWatermarkStats {
ReadOnlyWatermarkStats {
events_deduplicated: self.stats.events_deduplicated.load(Ordering::Relaxed),
windows_opened: self.stats.windows_opened.load(Ordering::Relaxed),
windows_closed: self.stats.windows_closed.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeduplicationResult {
KeepEvent,
KeepSnapshot,
}
#[derive(Debug, Clone)]
pub struct ReadOnlyWatermarkStats {
pub events_deduplicated: u64,
pub windows_opened: u64,
pub windows_closed: u64,
}
#[derive(Debug)]
pub struct ReadOnlyGuard {
config: ReadOnlyConfig,
}
impl ReadOnlyGuard {
pub fn new(config: ReadOnlyConfig) -> Self {
Self { config }
}
pub fn check_feature(&self, feature: ReadOnlyFeature) -> Result<(), ReadOnlyError> {
if !self.config.read_only {
return Ok(()); }
match feature {
ReadOnlyFeature::SignalTableSource => Err(ReadOnlyError::FeatureUnavailable {
feature: "Signal table (source channel)",
reason: "Source channel requires database writes",
alternative: "Use topic, file, jmx, or api signal channels",
}),
ReadOnlyFeature::SignalTableWatermarking => Err(ReadOnlyError::FeatureUnavailable {
feature: "Signal table watermarking",
reason: "insert_insert/insert_delete strategies require database writes",
alternative: "Heartbeat-based watermarking is automatically used in read-only mode",
}),
ReadOnlyFeature::HeartbeatActionQuery => Err(ReadOnlyError::FeatureUnavailable {
feature: "Heartbeat action query",
reason: "Action queries require database writes",
alternative: "Disable heartbeat.action.query in read-only mode",
}),
ReadOnlyFeature::IncrementalSnapshot => {
Ok(())
}
ReadOnlyFeature::BlockingSnapshot => {
Ok(())
}
}
}
pub fn warn_if_disabled(&self, feature: ReadOnlyFeature) {
if !self.config.warn_on_disabled_features {
return;
}
if let Err(e) = self.check_feature(feature) {
warn!("{}", e);
}
}
pub fn is_read_only(&self) -> bool {
self.config.read_only
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadOnlyFeature {
SignalTableSource,
SignalTableWatermarking,
HeartbeatActionQuery,
IncrementalSnapshot,
BlockingSnapshot,
}
#[derive(Debug, Clone)]
pub enum ReadOnlyError {
FeatureUnavailable {
feature: &'static str,
reason: &'static str,
alternative: &'static str,
},
UnsupportedPostgresVersion { required: u32, actual: u32 },
ChannelNotAllowed(SignalChannelType),
}
impl std::fmt::Display for ReadOnlyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReadOnlyError::FeatureUnavailable {
feature,
reason,
alternative,
} => {
write!(
f,
"Feature '{}' unavailable in read-only mode: {}. {}",
feature, reason, alternative
)
}
ReadOnlyError::UnsupportedPostgresVersion { required, actual } => {
write!(
f,
"PostgreSQL {} required for read-only incremental snapshots (found {})",
required, actual
)
}
ReadOnlyError::ChannelNotAllowed(channel) => {
write!(
f,
"Signal channel '{}' not allowed in read-only mode",
channel.as_str()
)
}
}
}
}
impl std::error::Error for ReadOnlyError {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = ReadOnlyConfig::default();
assert!(!config.read_only);
assert!(config.heartbeat_watermarking);
assert_eq!(config.min_postgres_version, 13);
}
#[test]
fn test_builder() {
let config = ReadOnlyConfig::builder()
.read_only(true)
.min_postgres_version(14)
.heartbeat_watermarking(false)
.build();
assert!(config.read_only);
assert!(!config.heartbeat_watermarking);
assert_eq!(config.min_postgres_version, 14);
}
#[test]
fn test_channel_filtering() {
let config = ReadOnlyConfig::builder().read_only(true).build();
assert!(!config.is_channel_allowed(SignalChannelType::Source));
assert!(config.is_channel_allowed(SignalChannelType::Topic));
assert!(config.is_channel_allowed(SignalChannelType::File));
assert!(config.is_channel_allowed(SignalChannelType::Jmx));
assert!(config.is_channel_allowed(SignalChannelType::Api));
}
#[test]
fn test_channel_filtering_not_read_only() {
let config = ReadOnlyConfig::builder().read_only(false).build();
assert!(config.is_channel_allowed(SignalChannelType::Source));
assert!(config.is_channel_allowed(SignalChannelType::Topic));
}
#[test]
fn test_filter_channels() {
let config = ReadOnlyConfig::builder().read_only(true).build();
let channels = vec![
SignalChannelType::Source,
SignalChannelType::Topic,
SignalChannelType::File,
];
let filtered = config.filter_channels(&channels);
assert_eq!(filtered.len(), 2);
assert!(!filtered.contains(&SignalChannelType::Source));
assert!(filtered.contains(&SignalChannelType::Topic));
assert!(filtered.contains(&SignalChannelType::File));
}
#[test]
fn test_postgres_version_check() {
let config = ReadOnlyConfig::builder()
.read_only(true)
.min_postgres_version(13)
.build();
assert!(config.check_postgres_version(13).is_ok());
assert!(config.check_postgres_version(14).is_ok());
assert!(config.check_postgres_version(15).is_ok());
assert!(config.check_postgres_version(12).is_err());
}
#[test]
fn test_watermark_tracker_basic() {
let tracker = ReadOnlyWatermarkTracker::new();
assert!(!tracker.is_window_open());
assert_eq!(tracker.low_watermark(), 0);
assert_eq!(tracker.high_watermark(), 0);
}
#[test]
fn test_watermark_window_lifecycle() {
let tracker = ReadOnlyWatermarkTracker::new();
tracker.open_window(100);
assert!(tracker.is_window_open());
assert_eq!(tracker.low_watermark(), 100);
assert!(tracker.should_buffer(100));
assert!(tracker.should_buffer(150));
tracker.close_window(200);
assert!(!tracker.is_window_open());
assert_eq!(tracker.high_watermark(), 200);
assert!(!tracker.should_buffer(250));
}
#[test]
fn test_deduplication_before_window() {
let tracker = ReadOnlyWatermarkTracker::new();
tracker.open_window(100);
tracker.close_window(200);
let result = tracker.deduplicate(50, Some(100));
assert_eq!(result, DeduplicationResult::KeepEvent);
}
#[test]
fn test_deduplication_after_window() {
let tracker = ReadOnlyWatermarkTracker::new();
tracker.open_window(100);
tracker.close_window(200);
let result = tracker.deduplicate(250, Some(150));
assert_eq!(result, DeduplicationResult::KeepEvent);
}
#[test]
fn test_deduplication_in_window_event_newer() {
let tracker = ReadOnlyWatermarkTracker::new();
tracker.open_window(100);
tracker.close_window(200);
let result = tracker.deduplicate(150, Some(140));
assert_eq!(result, DeduplicationResult::KeepEvent);
}
#[test]
fn test_deduplication_in_window_snapshot_newer() {
let tracker = ReadOnlyWatermarkTracker::new();
tracker.open_window(100);
tracker.close_window(200);
let result = tracker.deduplicate(140, Some(150));
assert_eq!(result, DeduplicationResult::KeepSnapshot);
}
#[test]
fn test_watermark_stats() {
let tracker = ReadOnlyWatermarkTracker::new();
tracker.open_window(100);
tracker.close_window(200);
tracker.open_window(200);
tracker.close_window(300);
let stats = tracker.stats();
assert_eq!(stats.windows_opened, 2);
assert_eq!(stats.windows_closed, 2);
}
#[test]
fn test_watermark_source() {
let xid = WatermarkSource::TransactionId(12345);
assert_eq!(xid.xid(), Some(12345));
assert_eq!(xid.lsn(), None);
let lsn = WatermarkSource::Lsn("0/16B3748".to_string());
assert_eq!(lsn.xid(), None);
assert_eq!(lsn.lsn(), Some("0/16B3748"));
let heartbeat = WatermarkSource::Heartbeat {
xid: 12345,
lsn: "0/16B3748".to_string(),
};
assert_eq!(heartbeat.xid(), Some(12345));
assert_eq!(heartbeat.lsn(), Some("0/16B3748"));
}
#[test]
fn test_read_only_guard() {
let config = ReadOnlyConfig::builder().read_only(true).build();
let guard = ReadOnlyGuard::new(config);
assert!(guard
.check_feature(ReadOnlyFeature::SignalTableSource)
.is_err());
assert!(guard
.check_feature(ReadOnlyFeature::SignalTableWatermarking)
.is_err());
assert!(guard
.check_feature(ReadOnlyFeature::HeartbeatActionQuery)
.is_err());
assert!(guard
.check_feature(ReadOnlyFeature::IncrementalSnapshot)
.is_ok());
assert!(guard
.check_feature(ReadOnlyFeature::BlockingSnapshot)
.is_ok());
}
#[test]
fn test_read_only_guard_not_read_only() {
let config = ReadOnlyConfig::builder().read_only(false).build();
let guard = ReadOnlyGuard::new(config);
assert!(guard
.check_feature(ReadOnlyFeature::SignalTableSource)
.is_ok());
assert!(guard
.check_feature(ReadOnlyFeature::SignalTableWatermarking)
.is_ok());
assert!(guard
.check_feature(ReadOnlyFeature::HeartbeatActionQuery)
.is_ok());
}
#[test]
fn test_chunk_tracking() {
let tracker = ReadOnlyWatermarkTracker::new();
assert_eq!(tracker.current_chunk(), 0);
let chunk1 = tracker.next_chunk();
assert_eq!(chunk1, 0);
assert_eq!(tracker.current_chunk(), 1);
let chunk2 = tracker.next_chunk();
assert_eq!(chunk2, 1);
assert_eq!(tracker.current_chunk(), 2);
}
}