use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SelectiveColumnConfig {
#[serde(default = "default_selective_enabled")]
pub enabled: bool,
#[serde(skip)]
pub pk_columns: Vec<usize>,
#[serde(default = "default_min_changed_columns")]
pub min_changed_columns: usize,
#[serde(default = "default_max_changed_columns_ratio")]
pub max_changed_columns_ratio: f64,
}
fn default_selective_enabled() -> bool {
true
}
fn default_min_changed_columns() -> usize {
1
}
fn default_max_changed_columns_ratio() -> f64 {
0.5
}
impl Default for SelectiveColumnConfig {
fn default() -> Self {
Self {
enabled: default_selective_enabled(),
pk_columns: vec![0], min_changed_columns: default_min_changed_columns(),
max_changed_columns_ratio: default_max_changed_columns_ratio(),
}
}
}
impl SelectiveColumnConfig {
pub fn with_pk_columns(&self, pk_columns: Vec<usize>) -> Self {
Self {
enabled: self.enabled,
pk_columns,
min_changed_columns: self.min_changed_columns,
max_changed_columns_ratio: self.max_changed_columns_ratio,
}
}
}
#[derive(Debug, Clone)]
pub struct ColumnDiff {
pub changed_columns: Vec<usize>,
pub included_columns: Vec<usize>,
}
pub fn compute_column_diff(
old_row: &crate::Row,
new_row: &crate::Row,
pk_columns: &[usize],
) -> Option<ColumnDiff> {
if old_row.values.len() != new_row.values.len() {
return None;
}
let mut changed_columns = Vec::new();
for (idx, (old_val, new_val)) in old_row.values.iter().zip(new_row.values.iter()).enumerate() {
if old_val != new_val {
changed_columns.push(idx);
}
}
if changed_columns.is_empty() {
return None;
}
let mut included_columns: Vec<usize> = pk_columns.to_vec();
for &idx in &changed_columns {
if !included_columns.contains(&idx) {
included_columns.push(idx);
}
}
included_columns.sort_unstable();
Some(ColumnDiff { changed_columns, included_columns })
}
pub fn should_use_selective_update(
diff: &ColumnDiff,
total_columns: usize,
config: &SelectiveColumnConfig,
) -> bool {
if !config.enabled {
return false;
}
if diff.changed_columns.len() < config.min_changed_columns {
return false;
}
let changed_ratio = diff.changed_columns.len() as f64 / total_columns as f64;
if changed_ratio > config.max_changed_columns_ratio {
return false;
}
true
}
pub fn should_use_selective_update_with_metrics(
diff: &ColumnDiff,
total_columns: usize,
config: &SelectiveColumnConfig,
metrics: Option<&crate::observability::metrics::ServerMetrics>,
) -> bool {
if !config.enabled {
if let Some(m) = metrics {
m.record_partial_update_fallback("disabled");
}
return false;
}
if diff.changed_columns.len() < config.min_changed_columns {
return false;
}
let changed_ratio = diff.changed_columns.len() as f64 / total_columns as f64;
if changed_ratio > config.max_changed_columns_ratio {
if let Some(m) = metrics {
m.record_partial_update_fallback("threshold_exceeded");
}
return false;
}
true
}
pub fn create_partial_row_update(
old_row: &[Option<Vec<u8>>],
new_row: &[Option<Vec<u8>>],
pk_columns: &[usize],
config: &SelectiveColumnConfig,
) -> Option<crate::protocol::messages::PartialRowUpdate> {
if old_row.len() != new_row.len() {
return None;
}
let total_columns = new_row.len();
let mut changed_columns = Vec::new();
for (idx, (old_val, new_val)) in old_row.iter().zip(new_row.iter()).enumerate() {
if old_val != new_val {
changed_columns.push(idx);
}
}
if changed_columns.is_empty() {
return None;
}
let changed_ratio = changed_columns.len() as f64 / total_columns as f64;
if !config.enabled || changed_ratio > config.max_changed_columns_ratio {
return None;
}
let mut included_columns: Vec<usize> = pk_columns.to_vec();
for &idx in &changed_columns {
if !included_columns.contains(&idx) {
included_columns.push(idx);
}
}
included_columns.sort_unstable();
let values: Vec<Option<Vec<u8>>> =
included_columns.iter().map(|&idx| new_row[idx].clone()).collect();
let present_columns: Vec<u16> = included_columns.iter().map(|&idx| idx as u16).collect();
Some(crate::protocol::messages::PartialRowUpdate::new(
total_columns as u16,
&present_columns,
values,
))
}
pub fn create_partial_row_update_with_metrics(
old_row: &[Option<Vec<u8>>],
new_row: &[Option<Vec<u8>>],
pk_columns: &[usize],
config: &SelectiveColumnConfig,
metrics: Option<&crate::observability::metrics::ServerMetrics>,
) -> Option<crate::protocol::messages::PartialRowUpdate> {
if old_row.len() != new_row.len() {
if let Some(m) = metrics {
m.record_partial_update_fallback("row_count_mismatch");
}
return None;
}
let total_columns = new_row.len();
let mut changed_columns = Vec::new();
for (idx, (old_val, new_val)) in old_row.iter().zip(new_row.iter()).enumerate() {
if old_val != new_val {
changed_columns.push(idx);
}
}
if changed_columns.is_empty() {
if let Some(m) = metrics {
m.record_partial_update_fallback("no_changes");
}
return None;
}
let changed_ratio = changed_columns.len() as f64 / total_columns as f64;
if !config.enabled || changed_ratio > config.max_changed_columns_ratio {
if let Some(m) = metrics {
if !config.enabled {
m.record_partial_update_fallback("disabled");
} else {
m.record_partial_update_fallback("threshold_exceeded");
}
}
return None;
}
let mut included_columns: Vec<usize> = pk_columns.to_vec();
for &idx in &changed_columns {
if !included_columns.contains(&idx) {
included_columns.push(idx);
}
}
included_columns.sort_unstable();
let values: Vec<Option<Vec<u8>>> =
included_columns.iter().map(|&idx| new_row[idx].clone()).collect();
let present_columns: Vec<u16> = included_columns.iter().map(|&idx| idx as u16).collect();
Some(crate::protocol::messages::PartialRowUpdate::new(
total_columns as u16,
&present_columns,
values,
))
}