heliosdb_proxy/shadow_execute/
mod.rs1use crate::backend::{BackendClient, BackendConfig, ParamValue, QueryResult, TextValue};
31use crate::{ProxyError, Result};
32use std::time::Instant;
33
34#[derive(Debug, Clone)]
36pub struct ShadowExecuteReport {
37 pub sql: String,
39 pub both_succeeded: bool,
41 pub row_count_match: bool,
43 pub row_hash_match: bool,
46 pub primary_elapsed_us: u64,
48 pub shadow_elapsed_us: u64,
50 pub primary_error: Option<String>,
52 pub shadow_error: Option<String>,
54}
55
56impl ShadowExecuteReport {
57 pub fn is_clean(&self) -> bool {
58 self.both_succeeded && self.row_count_match && self.row_hash_match
59 }
60}
61
62pub async fn shadow_execute(
69 primary: &mut BackendClient,
70 shadow_cfg: &BackendConfig,
71 sql: &str,
72 params: &[ParamValue],
73) -> Result<(QueryResult, ShadowExecuteReport)> {
74 let primary_start = Instant::now();
75 let primary_outcome = if params.is_empty() {
76 primary.simple_query(sql).await
77 } else {
78 primary.query_with_params(sql, params).await
79 };
80 let primary_elapsed_us = primary_start.elapsed().as_micros() as u64;
81
82 let shadow_outcome = run_shadow(shadow_cfg, sql, params).await;
83
84 let primary_qr = primary_outcome.as_ref().ok().cloned();
85 let shadow_qr = shadow_outcome.0.as_ref().ok().cloned();
86
87 let (row_count_match, row_hash_match) = match (&primary_qr, &shadow_qr) {
88 (Some(p), Some(s)) => {
89 let count_match = p.rows.len() == s.rows.len();
90 let hash_match = if count_match {
91 row_set_hash(&p.rows) == row_set_hash(&s.rows)
92 } else {
93 false
94 };
95 (count_match, hash_match)
96 }
97 _ => (false, false),
98 };
99
100 let report = ShadowExecuteReport {
101 sql: sql.to_string(),
102 both_succeeded: primary_qr.is_some() && shadow_qr.is_some(),
103 row_count_match,
104 row_hash_match,
105 primary_elapsed_us,
106 shadow_elapsed_us: shadow_outcome.1,
107 primary_error: primary_outcome.as_ref().err().map(|e| e.to_string()),
108 shadow_error: shadow_outcome.0.err().map(|e| e.to_string()),
109 };
110
111 let qr = primary_outcome
112 .map_err(|e| ProxyError::Internal(format!("primary execute: {}", e)))?;
113 Ok((qr, report))
114}
115
116async fn run_shadow(
117 cfg: &BackendConfig,
118 sql: &str,
119 params: &[ParamValue],
120) -> (Result<QueryResult>, u64) {
121 let start = Instant::now();
122 let result = match BackendClient::connect(cfg).await {
123 Ok(mut client) => {
124 let outcome = if params.is_empty() {
125 client.simple_query(sql).await
126 } else {
127 client.query_with_params(sql, params).await
128 };
129 client.close().await;
130 outcome.map_err(|e| ProxyError::Internal(format!("shadow execute: {}", e)))
131 }
132 Err(e) => Err(ProxyError::Internal(format!("shadow connect: {}", e))),
133 };
134 let us = start.elapsed().as_micros() as u64;
135 (result, us)
136}
137
138pub fn row_set_hash(rows: &[Vec<TextValue>]) -> u128 {
149 let mut acc: u128 = 0;
150 for row in rows {
151 acc = acc.wrapping_add(row_hash(row) as u128);
152 }
153 acc
154}
155
156fn row_hash(row: &[TextValue]) -> u64 {
157 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
159 for v in row {
160 let bytes = match v {
161 TextValue::Null => &[0u8][..],
162 TextValue::Text(s) => s.as_bytes(),
163 };
164 h = h.wrapping_mul(0x100_0000_01b3) ^ 0xff;
166 for b in bytes {
167 h = h.wrapping_mul(0x100_0000_01b3) ^ (*b as u64);
168 }
169 }
170 h
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176
177 fn row(values: &[Option<&str>]) -> Vec<TextValue> {
178 values
179 .iter()
180 .map(|v| match v {
181 Some(s) => TextValue::Text((*s).to_string()),
182 None => TextValue::Null,
183 })
184 .collect()
185 }
186
187 #[test]
188 fn identical_row_sets_hash_equal() {
189 let a = vec![row(&[Some("1"), Some("alice")]), row(&[Some("2"), Some("bob")])];
190 let b = vec![row(&[Some("1"), Some("alice")]), row(&[Some("2"), Some("bob")])];
191 assert_eq!(row_set_hash(&a), row_set_hash(&b));
192 }
193
194 #[test]
195 fn order_does_not_affect_hash() {
196 let a = vec![row(&[Some("1"), Some("a")]), row(&[Some("2"), Some("b")])];
197 let b = vec![row(&[Some("2"), Some("b")]), row(&[Some("1"), Some("a")])];
198 assert_eq!(row_set_hash(&a), row_set_hash(&b));
199 }
200
201 #[test]
202 fn changed_value_changes_hash() {
203 let a = vec![row(&[Some("1"), Some("alice")])];
204 let b = vec![row(&[Some("1"), Some("ALICE")])];
205 assert_ne!(row_set_hash(&a), row_set_hash(&b));
206 }
207
208 #[test]
209 fn null_distinguishes_from_empty_string() {
210 let a = vec![row(&[None])];
211 let b = vec![row(&[Some("")])];
212 assert_ne!(row_set_hash(&a), row_set_hash(&b));
213 }
214
215 #[test]
216 fn missing_row_changes_hash() {
217 let a = vec![row(&[Some("1")])];
218 let b = vec![row(&[Some("1")]), row(&[Some("2")])];
219 assert_ne!(row_set_hash(&a), row_set_hash(&b));
220 }
221
222 #[test]
223 fn report_is_clean_only_when_all_match() {
224 let r = ShadowExecuteReport {
225 sql: "SELECT 1".into(),
226 both_succeeded: true,
227 row_count_match: true,
228 row_hash_match: true,
229 primary_elapsed_us: 1,
230 shadow_elapsed_us: 1,
231 primary_error: None,
232 shadow_error: None,
233 };
234 assert!(r.is_clean());
235
236 let mut r2 = r.clone();
237 r2.row_hash_match = false;
238 assert!(!r2.is_clean());
239
240 let mut r3 = r.clone();
241 r3.both_succeeded = false;
242 assert!(!r3.is_clean());
243 }
244
245 #[test]
246 fn field_separator_prevents_concat_collision() {
247 let a = vec![row(&[Some("ab"), Some("")])];
250 let b = vec![row(&[Some(""), Some("ab")])];
251 assert_ne!(row_set_hash(&a), row_set_hash(&b));
252 }
253}