Skip to main content

heliosdb_proxy/shadow_execute/
mod.rs

1//! Shadow-execution module (T3.4 R&D).
2//!
3//! Runs the same SQL on a primary node AND a shadow node, then
4//! compares the two results. Used for:
5//!
6//! - **Major-version upgrade validation** (T2.1 stage 3): run
7//!   production-shape queries on the source-version primary and the
8//!   target-version standby; alert if results diverge.
9//! - **Schema-migration validation**: shadow a candidate schema
10//!   change against the live primary; flag plan-shape regressions
11//!   before promotion.
12//! - **Read-replica drift detection**: catch silent corruption or
13//!   replication-lag-induced staleness in CI.
14//!
15//! Results are compared by row-count + per-row hash (matches the
16//! T0-IT3 checksum design), so non-deterministic orderings are
17//! tolerated. The shadow side runs in a tokio task and is fire-and-
18//! forget for the application — the application sees only the
19//! primary's result. Drift is reported via `ShadowExecuteReport`
20//! returned from the task handle, or pushed to a channel if the
21//! caller wires one in.
22//!
23//! ## Status
24//!
25//! Module scaffolding + comparison logic + tests. Wiring this into
26//! the live request path (so every Nth query is shadowed) is a
27//! follow-up gated on an explicit feature flag — production
28//! deployments don't want surprise duplicate query load.
29
30use crate::backend::{BackendClient, BackendConfig, ParamValue, QueryResult, TextValue};
31use crate::{ProxyError, Result};
32use std::time::Instant;
33
34/// One shadow-execution result.
35#[derive(Debug, Clone)]
36pub struct ShadowExecuteReport {
37    /// SQL that was shadowed.
38    pub sql: String,
39    /// Whether both sides returned at all.
40    pub both_succeeded: bool,
41    /// Whether the row counts match.
42    pub row_count_match: bool,
43    /// Whether the row hashes match (only meaningful when
44    /// `row_count_match`).
45    pub row_hash_match: bool,
46    /// Wall-clock time on the primary side.
47    pub primary_elapsed_us: u64,
48    /// Wall-clock time on the shadow side.
49    pub shadow_elapsed_us: u64,
50    /// Error from primary, if any.
51    pub primary_error: Option<String>,
52    /// Error from shadow, if any.
53    pub shadow_error: Option<String>,
54}
55
56impl ShadowExecuteReport {
57    pub fn is_clean(&self) -> bool {
58        self.both_succeeded && self.row_count_match && self.row_hash_match
59    }
60}
61
62/// Run `sql` on `primary` and `shadow` concurrently. Returns the
63/// primary's result for the application to consume, plus a shadow
64/// report containing the comparison.
65///
66/// `params` are interpolated into the SQL using the same text-format
67/// substitution the failover-replay engine uses (no extended protocol).
68pub async fn shadow_execute(
69    primary: &mut BackendClient,
70    shadow_cfg: &BackendConfig,
71    sql: &str,
72    params: &[ParamValue],
73) -> Result<(QueryResult, ShadowExecuteReport)> {
74    let primary_start = Instant::now();
75    let primary_outcome = if params.is_empty() {
76        primary.simple_query(sql).await
77    } else {
78        primary.query_with_params(sql, params).await
79    };
80    let primary_elapsed_us = primary_start.elapsed().as_micros() as u64;
81
82    let shadow_outcome = run_shadow(shadow_cfg, sql, params).await;
83
84    let primary_qr = primary_outcome.as_ref().ok().cloned();
85    let shadow_qr = shadow_outcome.0.as_ref().ok().cloned();
86
87    let (row_count_match, row_hash_match) = match (&primary_qr, &shadow_qr) {
88        (Some(p), Some(s)) => {
89            let count_match = p.rows.len() == s.rows.len();
90            let hash_match = if count_match {
91                row_set_hash(&p.rows) == row_set_hash(&s.rows)
92            } else {
93                false
94            };
95            (count_match, hash_match)
96        }
97        _ => (false, false),
98    };
99
100    let report = ShadowExecuteReport {
101        sql: sql.to_string(),
102        both_succeeded: primary_qr.is_some() && shadow_qr.is_some(),
103        row_count_match,
104        row_hash_match,
105        primary_elapsed_us,
106        shadow_elapsed_us: shadow_outcome.1,
107        primary_error: primary_outcome.as_ref().err().map(|e| e.to_string()),
108        shadow_error: shadow_outcome.0.err().map(|e| e.to_string()),
109    };
110
111    let qr = primary_outcome
112        .map_err(|e| ProxyError::Internal(format!("primary execute: {}", e)))?;
113    Ok((qr, report))
114}
115
116async fn run_shadow(
117    cfg: &BackendConfig,
118    sql: &str,
119    params: &[ParamValue],
120) -> (Result<QueryResult>, u64) {
121    let start = Instant::now();
122    let result = match BackendClient::connect(cfg).await {
123        Ok(mut client) => {
124            let outcome = if params.is_empty() {
125                client.simple_query(sql).await
126            } else {
127                client.query_with_params(sql, params).await
128            };
129            client.close().await;
130            outcome.map_err(|e| ProxyError::Internal(format!("shadow execute: {}", e)))
131        }
132        Err(e) => Err(ProxyError::Internal(format!("shadow connect: {}", e))),
133    };
134    let us = start.elapsed().as_micros() as u64;
135    (result, us)
136}
137
138/// Order-independent hash of a row set. Each row is hashed
139/// individually (xor-fold of FNV-1a over the row's text-form),
140/// then the row hashes are combined with addition (commutative,
141/// associative — so row order in the result set doesn't change
142/// the final hash).
143///
144/// This intentionally does NOT use a cryptographic hash. The signal
145/// the comparison wants is "did we get the same set of rows," not
146/// "is this committable evidence." Cryptographic commit is the job
147/// of the audit-chain plugin (T2.4-P2).
148pub fn row_set_hash(rows: &[Vec<TextValue>]) -> u128 {
149    let mut acc: u128 = 0;
150    for row in rows {
151        acc = acc.wrapping_add(row_hash(row) as u128);
152    }
153    acc
154}
155
156fn row_hash(row: &[TextValue]) -> u64 {
157    // FNV-1a 64-bit
158    let mut h: u64 = 0xcbf2_9ce4_8422_2325;
159    for v in row {
160        let bytes = match v {
161            TextValue::Null => &[0u8][..],
162            TextValue::Text(s) => s.as_bytes(),
163        };
164        // sentinel between fields so "ab" + "" doesn't collide with "" + "ab"
165        h = h.wrapping_mul(0x100_0000_01b3) ^ 0xff;
166        for b in bytes {
167            h = h.wrapping_mul(0x100_0000_01b3) ^ (*b as u64);
168        }
169    }
170    h
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176
177    fn row(values: &[Option<&str>]) -> Vec<TextValue> {
178        values
179            .iter()
180            .map(|v| match v {
181                Some(s) => TextValue::Text((*s).to_string()),
182                None => TextValue::Null,
183            })
184            .collect()
185    }
186
187    #[test]
188    fn identical_row_sets_hash_equal() {
189        let a = vec![row(&[Some("1"), Some("alice")]), row(&[Some("2"), Some("bob")])];
190        let b = vec![row(&[Some("1"), Some("alice")]), row(&[Some("2"), Some("bob")])];
191        assert_eq!(row_set_hash(&a), row_set_hash(&b));
192    }
193
194    #[test]
195    fn order_does_not_affect_hash() {
196        let a = vec![row(&[Some("1"), Some("a")]), row(&[Some("2"), Some("b")])];
197        let b = vec![row(&[Some("2"), Some("b")]), row(&[Some("1"), Some("a")])];
198        assert_eq!(row_set_hash(&a), row_set_hash(&b));
199    }
200
201    #[test]
202    fn changed_value_changes_hash() {
203        let a = vec![row(&[Some("1"), Some("alice")])];
204        let b = vec![row(&[Some("1"), Some("ALICE")])];
205        assert_ne!(row_set_hash(&a), row_set_hash(&b));
206    }
207
208    #[test]
209    fn null_distinguishes_from_empty_string() {
210        let a = vec![row(&[None])];
211        let b = vec![row(&[Some("")])];
212        assert_ne!(row_set_hash(&a), row_set_hash(&b));
213    }
214
215    #[test]
216    fn missing_row_changes_hash() {
217        let a = vec![row(&[Some("1")])];
218        let b = vec![row(&[Some("1")]), row(&[Some("2")])];
219        assert_ne!(row_set_hash(&a), row_set_hash(&b));
220    }
221
222    #[test]
223    fn report_is_clean_only_when_all_match() {
224        let r = ShadowExecuteReport {
225            sql: "SELECT 1".into(),
226            both_succeeded: true,
227            row_count_match: true,
228            row_hash_match: true,
229            primary_elapsed_us: 1,
230            shadow_elapsed_us: 1,
231            primary_error: None,
232            shadow_error: None,
233        };
234        assert!(r.is_clean());
235
236        let mut r2 = r.clone();
237        r2.row_hash_match = false;
238        assert!(!r2.is_clean());
239
240        let mut r3 = r.clone();
241        r3.both_succeeded = false;
242        assert!(!r3.is_clean());
243    }
244
245    #[test]
246    fn field_separator_prevents_concat_collision() {
247        // Without a separator between fields, ["ab",""] and ["","ab"]
248        // would hash the same. Verify our sentinel disambiguates.
249        let a = vec![row(&[Some("ab"), Some("")])];
250        let b = vec![row(&[Some(""), Some("ab")])];
251        assert_ne!(row_set_hash(&a), row_set_hash(&b));
252    }
253}