use crate::backend::{BackendClient, BackendConfig, ParamValue, QueryResult, TextValue};
use crate::{ProxyError, Result};
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct ShadowExecuteReport {
pub sql: String,
pub both_succeeded: bool,
pub row_count_match: bool,
pub row_hash_match: bool,
pub primary_elapsed_us: u64,
pub shadow_elapsed_us: u64,
pub primary_error: Option<String>,
pub shadow_error: Option<String>,
}
impl ShadowExecuteReport {
pub fn is_clean(&self) -> bool {
self.both_succeeded && self.row_count_match && self.row_hash_match
}
}
pub async fn shadow_execute(
primary: &mut BackendClient,
shadow_cfg: &BackendConfig,
sql: &str,
params: &[ParamValue],
) -> Result<(QueryResult, ShadowExecuteReport)> {
let primary_start = Instant::now();
let primary_outcome = if params.is_empty() {
primary.simple_query(sql).await
} else {
primary.query_with_params(sql, params).await
};
let primary_elapsed_us = primary_start.elapsed().as_micros() as u64;
let shadow_outcome = run_shadow(shadow_cfg, sql, params).await;
let primary_qr = primary_outcome.as_ref().ok().cloned();
let shadow_qr = shadow_outcome.0.as_ref().ok().cloned();
let (row_count_match, row_hash_match) = match (&primary_qr, &shadow_qr) {
(Some(p), Some(s)) => {
let count_match = p.rows.len() == s.rows.len();
let hash_match = if count_match {
row_set_hash(&p.rows) == row_set_hash(&s.rows)
} else {
false
};
(count_match, hash_match)
}
_ => (false, false),
};
let report = ShadowExecuteReport {
sql: sql.to_string(),
both_succeeded: primary_qr.is_some() && shadow_qr.is_some(),
row_count_match,
row_hash_match,
primary_elapsed_us,
shadow_elapsed_us: shadow_outcome.1,
primary_error: primary_outcome.as_ref().err().map(|e| e.to_string()),
shadow_error: shadow_outcome.0.err().map(|e| e.to_string()),
};
let qr = primary_outcome
.map_err(|e| ProxyError::Internal(format!("primary execute: {}", e)))?;
Ok((qr, report))
}
async fn run_shadow(
cfg: &BackendConfig,
sql: &str,
params: &[ParamValue],
) -> (Result<QueryResult>, u64) {
let start = Instant::now();
let result = match BackendClient::connect(cfg).await {
Ok(mut client) => {
let outcome = if params.is_empty() {
client.simple_query(sql).await
} else {
client.query_with_params(sql, params).await
};
client.close().await;
outcome.map_err(|e| ProxyError::Internal(format!("shadow execute: {}", e)))
}
Err(e) => Err(ProxyError::Internal(format!("shadow connect: {}", e))),
};
let us = start.elapsed().as_micros() as u64;
(result, us)
}
pub fn row_set_hash(rows: &[Vec<TextValue>]) -> u128 {
let mut acc: u128 = 0;
for row in rows {
acc = acc.wrapping_add(row_hash(row) as u128);
}
acc
}
fn row_hash(row: &[TextValue]) -> u64 {
let mut h: u64 = 0xcbf2_9ce4_8422_2325;
for v in row {
let bytes = match v {
TextValue::Null => &[0u8][..],
TextValue::Text(s) => s.as_bytes(),
};
h = h.wrapping_mul(0x100_0000_01b3) ^ 0xff;
for b in bytes {
h = h.wrapping_mul(0x100_0000_01b3) ^ (*b as u64);
}
}
h
}
#[cfg(test)]
mod tests {
use super::*;
fn row(values: &[Option<&str>]) -> Vec<TextValue> {
values
.iter()
.map(|v| match v {
Some(s) => TextValue::Text((*s).to_string()),
None => TextValue::Null,
})
.collect()
}
#[test]
fn identical_row_sets_hash_equal() {
let a = vec![row(&[Some("1"), Some("alice")]), row(&[Some("2"), Some("bob")])];
let b = vec![row(&[Some("1"), Some("alice")]), row(&[Some("2"), Some("bob")])];
assert_eq!(row_set_hash(&a), row_set_hash(&b));
}
#[test]
fn order_does_not_affect_hash() {
let a = vec![row(&[Some("1"), Some("a")]), row(&[Some("2"), Some("b")])];
let b = vec![row(&[Some("2"), Some("b")]), row(&[Some("1"), Some("a")])];
assert_eq!(row_set_hash(&a), row_set_hash(&b));
}
#[test]
fn changed_value_changes_hash() {
let a = vec![row(&[Some("1"), Some("alice")])];
let b = vec![row(&[Some("1"), Some("ALICE")])];
assert_ne!(row_set_hash(&a), row_set_hash(&b));
}
#[test]
fn null_distinguishes_from_empty_string() {
let a = vec![row(&[None])];
let b = vec![row(&[Some("")])];
assert_ne!(row_set_hash(&a), row_set_hash(&b));
}
#[test]
fn missing_row_changes_hash() {
let a = vec![row(&[Some("1")])];
let b = vec![row(&[Some("1")]), row(&[Some("2")])];
assert_ne!(row_set_hash(&a), row_set_hash(&b));
}
#[test]
fn report_is_clean_only_when_all_match() {
let r = ShadowExecuteReport {
sql: "SELECT 1".into(),
both_succeeded: true,
row_count_match: true,
row_hash_match: true,
primary_elapsed_us: 1,
shadow_elapsed_us: 1,
primary_error: None,
shadow_error: None,
};
assert!(r.is_clean());
let mut r2 = r.clone();
r2.row_hash_match = false;
assert!(!r2.is_clean());
let mut r3 = r.clone();
r3.both_succeeded = false;
assert!(!r3.is_clean());
}
#[test]
fn field_separator_prevents_concat_collision() {
let a = vec![row(&[Some("ab"), Some("")])];
let b = vec![row(&[Some(""), Some("ab")])];
assert_ne!(row_set_hash(&a), row_set_hash(&b));
}
}