dsfb_database/adapters/
sqlshare_text.rs1use super::DatasetAdapter;
34use crate::residual::{workload_phase, ResidualStream};
35use anyhow::{Context, Result};
36use rand::{Rng, SeedableRng};
37use std::collections::HashMap;
38use std::path::Path;
39
40pub struct SqlShareText;
41
42const BUCKET_SIZE: usize = 200;
46
47const QUERY_DIVIDER: &str = "________________________________________";
50
51fn skeleton(q: &str) -> String {
52 let mut out = String::with_capacity(q.len());
53 let mut in_str = false;
54 let mut prev_ws = false;
55 for c in q.chars() {
56 if c == '\'' || c == '"' {
57 in_str = !in_str;
58 out.push('?');
59 continue;
60 }
61 if in_str {
62 continue;
63 }
64 if c.is_ascii_digit() {
65 out.push('?');
66 prev_ws = false;
67 continue;
68 }
69 if c.is_whitespace() {
70 if !prev_ws {
71 out.push(' ');
72 prev_ws = true;
73 }
74 continue;
75 }
76 prev_ws = false;
77 for lc in c.to_lowercase() {
78 out.push(lc);
79 }
80 }
81 out.trim().to_string()
82}
83
84fn split_queries(content: &str) -> Vec<&str> {
88 content
89 .split(QUERY_DIVIDER)
90 .map(|q| q.trim())
91 .filter(|q| !q.is_empty())
92 .collect()
93}
94
95impl DatasetAdapter for SqlShareText {
96 fn name(&self) -> &'static str {
97 "sqlshare-text"
98 }
99
100 fn load(&self, path: &Path) -> Result<ResidualStream> {
101 let content = std::fs::read_to_string(path)
102 .with_context(|| format!("reading sqlshare queries.txt at {}", path.display()))?;
103 let queries = split_queries(&content);
104
105 let mut stream = ResidualStream::new(format!(
106 "sqlshare-text@{}",
107 path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
108 ));
109
110 let mut prev_histo: Option<HashMap<String, u64>> = None;
114 let mut current_histo: HashMap<String, u64> = HashMap::new();
115 let mut in_bucket: usize = 0;
116 let mut bucket_index: usize = 0;
117
118 for q in &queries {
119 let sk = skeleton(q);
120 *current_histo.entry(sk).or_insert(0) += 1;
121 in_bucket += 1;
122 if in_bucket == BUCKET_SIZE {
123 let start = bucket_index * BUCKET_SIZE;
124 let end = start + in_bucket - 1;
125 if let Some(prev) = &prev_histo {
126 let d = workload_phase::js_divergence(prev, ¤t_histo);
127 workload_phase::push_jsd(
128 &mut stream,
129 (bucket_index * BUCKET_SIZE) as f64,
130 &format!("ord[{start}-{end}]"),
131 d,
132 );
133 }
134 prev_histo = Some(std::mem::take(&mut current_histo));
135 in_bucket = 0;
136 bucket_index += 1;
137 }
138 }
139 stream.sort();
146 Ok(stream)
147 }
148
149 fn exemplar(&self, seed: u64) -> ResidualStream {
150 let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
156 let mut stream = ResidualStream::new(format!("sqlshare-text-exemplar-seed{seed}"));
157 for b in 1..30 {
158 let start = b * BUCKET_SIZE;
159 let end = start + BUCKET_SIZE - 1;
160 let jsd = if (15..20).contains(&b) {
161 0.38 + rng.gen_range(-0.04..0.04)
162 } else {
163 0.05 + rng.gen_range(0.0..0.03)
164 };
165 workload_phase::push_jsd(
166 &mut stream,
167 (b * BUCKET_SIZE) as f64,
168 &format!("ord[{start}-{end}]"),
169 jsd,
170 );
171 }
172 stream.sort();
173 stream
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::residual::ResidualClass;
181
182 #[test]
183 fn splits_underscore_separated_queries() {
184 let text = "SELECT 1\n\n\n________________________________________\nSELECT 2\n\n________________________________________\nSELECT 3";
185 let qs = split_queries(text);
186 assert_eq!(qs, vec!["SELECT 1", "SELECT 2", "SELECT 3"]);
187 }
188
189 #[test]
190 fn drops_empty_between_divider_runs() {
191 let text =
192 "________________________________________\n\n________________________________________\nSELECT 1";
193 assert_eq!(split_queries(text), vec!["SELECT 1"]);
194 }
195
196 #[test]
197 fn skeleton_strips_literals_and_digits() {
198 let a = skeleton("SELECT * FROM t WHERE id = 123 AND name = 'alice'");
199 let b = skeleton("select * from t where id = 999 and name = 'bob'");
200 assert_eq!(a, b, "skeletons should match after literal/digit stripping");
201 assert!(a.contains("select"));
202 assert!(a.contains('?'));
203 assert!(!a.contains("alice"));
204 assert!(!a.contains("bob"));
205 }
206
207 #[test]
208 fn emits_only_workload_phase_class() {
209 let mut text = String::new();
211 for i in 0..(BUCKET_SIZE * 3) {
212 let q = if i < BUCKET_SIZE {
213 "select count(*) from t"
214 } else if i < BUCKET_SIZE * 2 {
215 "select count(*) from u"
216 } else {
217 "select avg(x) from v group by y"
218 };
219 text.push_str(q);
220 text.push_str("\n________________________________________\n");
221 }
222 let tmp = tempfile::NamedTempFile::new().unwrap();
223 std::fs::write(tmp.path(), &text).unwrap();
224 let stream = SqlShareText.load(tmp.path()).unwrap();
225 assert!(stream.source.starts_with("sqlshare-text@"));
226 for s in stream.samples.iter() {
227 assert_eq!(
228 s.class,
229 ResidualClass::WorkloadPhase,
230 "text-only mode must emit only WorkloadPhase residuals"
231 );
232 }
233 assert_eq!(stream.samples.len(), 2);
235 }
236
237 #[test]
238 fn fingerprint_is_deterministic_across_runs() {
239 let mut text = String::new();
240 for i in 0..(BUCKET_SIZE * 4) {
241 let q = if i % 2 == 0 {
242 "select * from t where x = 1"
243 } else {
244 "select a, b from u join v on u.id = v.id"
245 };
246 text.push_str(q);
247 text.push_str("\n________________________________________\n");
248 }
249 let tmp = tempfile::NamedTempFile::new().unwrap();
250 std::fs::write(tmp.path(), &text).unwrap();
251 let s1 = SqlShareText.load(tmp.path()).unwrap();
252 let s2 = SqlShareText.load(tmp.path()).unwrap();
253 assert_eq!(
254 s1.fingerprint(),
255 s2.fingerprint(),
256 "text-only SQLShare stream must be bytewise-deterministic across reads"
257 );
258 }
259}