heliosdb_proxy/shadow_execute/
mod.rs1use crate::backend::{BackendClient, BackendConfig, ParamValue, QueryResult, TextValue};
31use crate::{ProxyError, Result};
32use std::time::Instant;
33
34#[derive(Debug, Clone)]
36pub struct ShadowExecuteReport {
37 pub sql: String,
39 pub both_succeeded: bool,
41 pub row_count_match: bool,
43 pub row_hash_match: bool,
46 pub primary_elapsed_us: u64,
48 pub shadow_elapsed_us: u64,
50 pub primary_error: Option<String>,
52 pub shadow_error: Option<String>,
54}
55
56impl ShadowExecuteReport {
57 pub fn is_clean(&self) -> bool {
58 self.both_succeeded && self.row_count_match && self.row_hash_match
59 }
60}
61
62pub async fn shadow_execute(
69 primary: &mut BackendClient,
70 shadow_cfg: &BackendConfig,
71 sql: &str,
72 params: &[ParamValue],
73) -> Result<(QueryResult, ShadowExecuteReport)> {
74 let primary_start = Instant::now();
75 let primary_outcome = if params.is_empty() {
76 primary.simple_query(sql).await
77 } else {
78 primary.query_with_params(sql, params).await
79 };
80 let primary_elapsed_us = primary_start.elapsed().as_micros() as u64;
81
82 let shadow_outcome = run_shadow(shadow_cfg, sql, params).await;
83
84 let primary_qr = primary_outcome.as_ref().ok().cloned();
85 let shadow_qr = shadow_outcome.0.as_ref().ok().cloned();
86
87 let (row_count_match, row_hash_match) = match (&primary_qr, &shadow_qr) {
88 (Some(p), Some(s)) => {
89 let count_match = p.rows.len() == s.rows.len();
90 let hash_match = if count_match {
91 row_set_hash(&p.rows) == row_set_hash(&s.rows)
92 } else {
93 false
94 };
95 (count_match, hash_match)
96 }
97 _ => (false, false),
98 };
99
100 let report = ShadowExecuteReport {
101 sql: sql.to_string(),
102 both_succeeded: primary_qr.is_some() && shadow_qr.is_some(),
103 row_count_match,
104 row_hash_match,
105 primary_elapsed_us,
106 shadow_elapsed_us: shadow_outcome.1,
107 primary_error: primary_outcome.as_ref().err().map(|e| e.to_string()),
108 shadow_error: shadow_outcome.0.err().map(|e| e.to_string()),
109 };
110
111 let qr =
112 primary_outcome.map_err(|e| ProxyError::Internal(format!("primary execute: {}", e)))?;
113 Ok((qr, report))
114}
115
116async fn run_shadow(
117 cfg: &BackendConfig,
118 sql: &str,
119 params: &[ParamValue],
120) -> (Result<QueryResult>, u64) {
121 let start = Instant::now();
122 let result = match BackendClient::connect(cfg).await {
123 Ok(mut client) => {
124 let outcome = if params.is_empty() {
125 client.simple_query(sql).await
126 } else {
127 client.query_with_params(sql, params).await
128 };
129 client.close().await;
130 outcome.map_err(|e| ProxyError::Internal(format!("shadow execute: {}", e)))
131 }
132 Err(e) => Err(ProxyError::Internal(format!("shadow connect: {}", e))),
133 };
134 let us = start.elapsed().as_micros() as u64;
135 (result, us)
136}
137
138pub fn row_set_hash(rows: &[Vec<TextValue>]) -> u128 {
149 let mut acc: u128 = 0;
150 for row in rows {
151 acc = acc.wrapping_add(row_hash(row) as u128);
152 }
153 acc
154}
155
156fn row_hash(row: &[TextValue]) -> u64 {
157 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
159 for v in row {
160 let bytes = match v {
161 TextValue::Null => &[0u8][..],
162 TextValue::Text(s) => s.as_bytes(),
163 };
164 h = h.wrapping_mul(0x100_0000_01b3) ^ 0xff;
166 for b in bytes {
167 h = h.wrapping_mul(0x100_0000_01b3) ^ (*b as u64);
168 }
169 }
170 h
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 fn row(values: &[Option<&str>]) -> Vec<TextValue> {
178 values
179 .iter()
180 .map(|v| match v {
181 Some(s) => TextValue::Text((*s).to_string()),
182 None => TextValue::Null,
183 })
184 .collect()
185 }
186
187 #[test]
188 fn identical_row_sets_hash_equal() {
189 let a = vec![
190 row(&[Some("1"), Some("alice")]),
191 row(&[Some("2"), Some("bob")]),
192 ];
193 let b = vec![
194 row(&[Some("1"), Some("alice")]),
195 row(&[Some("2"), Some("bob")]),
196 ];
197 assert_eq!(row_set_hash(&a), row_set_hash(&b));
198 }
199
200 #[test]
201 fn order_does_not_affect_hash() {
202 let a = vec![row(&[Some("1"), Some("a")]), row(&[Some("2"), Some("b")])];
203 let b = vec![row(&[Some("2"), Some("b")]), row(&[Some("1"), Some("a")])];
204 assert_eq!(row_set_hash(&a), row_set_hash(&b));
205 }
206
207 #[test]
208 fn changed_value_changes_hash() {
209 let a = vec![row(&[Some("1"), Some("alice")])];
210 let b = vec![row(&[Some("1"), Some("ALICE")])];
211 assert_ne!(row_set_hash(&a), row_set_hash(&b));
212 }
213
214 #[test]
215 fn null_distinguishes_from_empty_string() {
216 let a = vec![row(&[None])];
217 let b = vec![row(&[Some("")])];
218 assert_ne!(row_set_hash(&a), row_set_hash(&b));
219 }
220
221 #[test]
222 fn missing_row_changes_hash() {
223 let a = vec![row(&[Some("1")])];
224 let b = vec![row(&[Some("1")]), row(&[Some("2")])];
225 assert_ne!(row_set_hash(&a), row_set_hash(&b));
226 }
227
228 #[test]
229 fn report_is_clean_only_when_all_match() {
230 let r = ShadowExecuteReport {
231 sql: "SELECT 1".into(),
232 both_succeeded: true,
233 row_count_match: true,
234 row_hash_match: true,
235 primary_elapsed_us: 1,
236 shadow_elapsed_us: 1,
237 primary_error: None,
238 shadow_error: None,
239 };
240 assert!(r.is_clean());
241
242 let mut r2 = r.clone();
243 r2.row_hash_match = false;
244 assert!(!r2.is_clean());
245
246 let mut r3 = r.clone();
247 r3.both_succeeded = false;
248 assert!(!r3.is_clean());
249 }
250
251 #[test]
252 fn field_separator_prevents_concat_collision() {
253 let a = vec![row(&[Some("ab"), Some("")])];
256 let b = vec![row(&[Some(""), Some("ab")])];
257 assert_ne!(row_set_hash(&a), row_set_hash(&b));
258 }
259}