Skip to main content

dsfb_database/residual/
mod.rs

1//! Residual streams.
2//!
3//! A *residual* is the difference between an expectation a SQL engine had
4//! about something and what actually happened — almost every modern engine
5//! computes residuals internally and then logs only shallow summaries of them
6//! (`paneldiscussion.txt` in the paperstack lists six families). This module
7//! defines the canonical typed residual stream that the DSFB observer and the
8//! motif grammar consume.
9//!
10//! Residual *construction* is engine-specific (see the per-class submodules
11//! below); residual *interpretation* is engine-agnostic (see `grammar`).
12
13use serde::{Deserialize, Serialize};
14
15pub mod cache_io;
16pub mod cardinality;
17pub mod contention;
18pub mod plan_regression;
19pub mod workload_phase;
20
21/// The five residual classes emitted by SQL engines that DSFB-Database
22/// structures. Names and definitions match Section 3 (Residual Taxonomy) of
23/// the paper.
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
25pub enum ResidualClass {
26    /// Latency vs rolling baseline; plan-hash transitions.
27    PlanRegression,
28    /// `actual_rows / estimated_rows` divergence per plan node or per query.
29    Cardinality,
30    /// Lock-wait depth, blocked-by chain length, queue depth.
31    Contention,
32    /// Buffer / cache hit-ratio drop with I/O-wait amplification.
33    CacheIo,
34    /// Digest-mix entropy and class-distribution drift across query workload.
35    WorkloadPhase,
36}
37
38impl ResidualClass {
39    pub const ALL: [ResidualClass; 5] = [
40        Self::PlanRegression,
41        Self::Cardinality,
42        Self::Contention,
43        Self::CacheIo,
44        Self::WorkloadPhase,
45    ];
46
47    pub fn name(&self) -> &'static str {
48        match self {
49            Self::PlanRegression => "plan_regression",
50            Self::Cardinality => "cardinality",
51            Self::Contention => "contention",
52            Self::CacheIo => "cache_io",
53            Self::WorkloadPhase => "workload_phase",
54        }
55    }
56}
57
58/// A single residual sample. `t` is logical time (seconds since stream start).
59/// `value` is the residual quantity in the class-specific natural units (the
60/// units are documented per class in the paper's Table 2).
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct ResidualSample {
63    pub t: f64,
64    pub class: ResidualClass,
65    /// The residual itself: `actual − expected` (or `actual / expected` for
66    /// cardinality, log-transformed). Never NaN; missing values are dropped
67    /// at the adapter boundary so downstream code can rely on this.
68    pub value: f64,
69    /// Optional channel discriminator (e.g. plan_hash, table id, wait_event
70    /// name). Used by the motif grammar to scope episodes.
71    pub channel: Option<String>,
72}
73
74impl ResidualSample {
75    pub fn new(t: f64, class: ResidualClass, value: f64) -> Self {
76        debug_assert!(value.is_finite(), "residual value must be finite");
77        Self {
78            t,
79            class,
80            value,
81            channel: None,
82        }
83    }
84
85    pub fn with_channel(mut self, channel: impl Into<String>) -> Self {
86        self.channel = Some(channel.into());
87        self
88    }
89}
90
91/// A typed, time-ordered stream of residuals from a single source (one
92/// dataset, one engine, one observation window). Construction is the
93/// adapter's responsibility; the stream is otherwise immutable.
94#[derive(Debug, Clone, Default, Serialize, Deserialize)]
95pub struct ResidualStream {
96    /// Source label — dataset name, engine, version, subset.
97    pub source: String,
98    /// Samples sorted by `t` ascending. Adapters MUST sort.
99    pub samples: Vec<ResidualSample>,
100}
101
102impl ResidualStream {
103    pub fn new(source: impl Into<String>) -> Self {
104        Self {
105            source: source.into(),
106            samples: Vec::new(),
107        }
108    }
109
110    pub fn push(&mut self, s: ResidualSample) {
111        self.samples.push(s);
112    }
113
114    pub fn sort(&mut self) {
115        self.samples.sort_by(|a, b| {
116            debug_assert!(
117                a.t.is_finite() && b.t.is_finite(),
118                "residual t must be finite"
119            );
120            a.t.partial_cmp(&b.t).unwrap_or(std::cmp::Ordering::Equal)
121        });
122    }
123
124    pub fn len(&self) -> usize {
125        self.samples.len()
126    }
127
128    pub fn is_empty(&self) -> bool {
129        self.samples.is_empty()
130    }
131
132    pub fn duration(&self) -> f64 {
133        match (self.samples.first(), self.samples.last()) {
134            (Some(a), Some(b)) => b.t - a.t,
135            (None, None) => 0.0,
136            (None, Some(_)) | (Some(_), None) => {
137                debug_assert!(false, "first/last disagree on emptiness");
138                0.0
139            }
140        }
141    }
142
143    /// View-only iterator over samples of a single class (used by the
144    /// per-motif state machines).
145    pub fn iter_class(&self, class: ResidualClass) -> impl Iterator<Item = &ResidualSample> + '_ {
146        self.samples.iter().filter(move |s| s.class == class)
147    }
148
149    /// Stable hash of the residual stream — used by the
150    /// replay-determinism test to confirm bytewise identical runs.
151    pub fn fingerprint(&self) -> [u8; 32] {
152        use sha2::{Digest, Sha256};
153        let mut h = Sha256::new();
154        h.update(self.source.as_bytes());
155        for s in &self.samples {
156            h.update(s.t.to_le_bytes());
157            h.update((s.class as u8).to_le_bytes());
158            h.update(s.value.to_le_bytes());
159            if let Some(c) = &s.channel {
160                h.update(c.as_bytes());
161            }
162            h.update(b"|");
163        }
164        h.finalize().into()
165    }
166}