dsfb-database 0.1.1

DSFB-Database: deterministic, read-only structural observer for residual trajectories in SQL database telemetry. Empirical prior-art demonstration on Snowset, SQLShare, CEB, JOB, and TPC-DS.
Documentation
//! Snowset adapter (Vuppalapati et al., NSDI 2020).
//!
//! Real subset: CSV distributed at
//! [github.com/resource-disaggregation/snowset](https://github.com/resource-disaggregation/snowset)
//! (mirror: `http://www.cs.cornell.edu/~midhul/snowset/snowset-main.csv.gz`),
//! verified schema (2026-04) for the columns this adapter touches:
//!   * `queryId`, `warehouseId`
//!   * `createdTime` — ISO-8601 UTC string with microsecond precision,
//!     e.g. `2018-03-02 14:44:02.768000+00:00`
//!   * `execTime` — query execution time in microseconds
//!   * `persistentReadBytesCache` — bytes served from the persistent
//!     cache (the analogue of the earlier-documented
//!     `bytesScannedFromCache`)
//!   * `persistentReadBytesS3` — bytes read from S3 (the analogue of
//!     `bytesScannedFromStorage`)
//!
//! What we extract:
//!   * `PlanRegression` — `execTime − rolling_baseline(execTime)`
//!     per `(warehouseId, queryId)` pair (proxy for query class —
//!     Snowset anonymises SQL text per fact #16).
//!   * `WorkloadPhase` — JS divergence over the per-warehouse query-class
//!     histogram in 5-minute buckets.
//!   * `CacheIo` — `persistentReadBytesS3 /
//!     (persistentReadBytesCache + persistentReadBytesS3)` drift
//!     (cache-miss-rate residual).
//!
//! What we cannot extract (paper says so explicitly):
//!   * `Cardinality` — Snowset does not publish `est_rows`/`actual_rows`.
//!   * `Contention` — no lock-wait stream.

use super::DatasetAdapter;
use crate::residual::{cache_io, plan_regression, workload_phase, ResidualStream};
use anyhow::{Context, Result};
use chrono::DateTime;
use rand::Rng;
use rand::SeedableRng;
use std::collections::{HashMap, VecDeque};
use std::path::Path;

/// Upper bound on Snowset rows read from a CSV. The published
/// `snowset-main.csv.gz` unpacks to ~70M rows; 100M is the next
/// order-of-magnitude cap that still accepts the real release plus
/// any resampling.
const MAX_SNOWSET_ROWS: usize = 100_000_000;

/// Rolling-baseline window for per-`(warehouse, query)` plan-regression
/// and per-warehouse cache-hit-ratio residuals.
const SNOWSET_BASELINE_WIN: usize = 64;

/// Workload-phase histogram bucket width in seconds (5 minutes).
const SNOWSET_BUCKET_SECONDS: f64 = 300.0;

pub struct Snowset;

#[derive(Debug, serde::Deserialize)]
struct RawRow {
    #[serde(rename = "queryId")]
    query_id: String,
    #[serde(rename = "warehouseId")]
    warehouse_id: String,
    #[serde(rename = "createdTime")]
    created_time: String,
    #[serde(rename = "execTime")]
    exec_time_us: f64,
    #[serde(default, rename = "persistentReadBytesCache")]
    bytes_cache: f64,
    #[serde(default, rename = "persistentReadBytesS3")]
    bytes_storage: f64,
}

struct Row {
    query_id: String,
    warehouse_id: String,
    created_time_us: f64,
    execution_time_us: f64,
    bytes_cache: f64,
    bytes_storage: f64,
}

fn parse_created_time(s: &str) -> Option<f64> {
    // Snowset ships UTC timestamps with microsecond precision, e.g.
    // "2018-03-02 14:44:02.768000+00:00". chrono parses both the
    // hyphen-offset ("+00:00") and the Z form.
    let parsed = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f%:z")
        .or_else(|_| DateTime::parse_from_rfc3339(s))
        .ok()?;
    Some(parsed.timestamp_micros() as f64)
}

fn load_snowset_rows(path: &Path) -> Result<Vec<Row>> {
    let mut rdr = csv::Reader::from_path(path)
        .with_context(|| format!("opening snowset subset at {}", path.display()))?;
    let mut rows: Vec<Row> = Vec::new();
    for r in rdr.deserialize().take(MAX_SNOWSET_ROWS) {
        debug_assert!(rows.len() < MAX_SNOWSET_ROWS, "row-count bound enforced");
        let raw: RawRow = match r {
            Ok(r) => r,
            Err(_) => continue,
        };
        let Some(created_time_us) = parse_created_time(&raw.created_time) else {
            continue;
        };
        if !raw.exec_time_us.is_finite() {
            continue;
        }
        rows.push(Row {
            query_id: raw.query_id,
            warehouse_id: raw.warehouse_id,
            created_time_us,
            execution_time_us: raw.exec_time_us,
            bytes_cache: raw.bytes_cache,
            bytes_storage: raw.bytes_storage,
        });
    }
    rows.sort_by(|a, b| {
        a.created_time_us
            .partial_cmp(&b.created_time_us)
            .unwrap_or(std::cmp::Ordering::Equal)
    });
    Ok(rows)
}

fn emit_snowset_plan_and_cache(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
    debug_assert!(t0.is_finite(), "t0 must be finite");
    let mut baselines: HashMap<(String, String), VecDeque<f64>> = HashMap::new();
    let mut cache_baseline: HashMap<String, VecDeque<f64>> = HashMap::new();

    for r in rows.iter() {
        let t = (r.created_time_us - t0) / 1e6;
        emit_snowset_plan_sample(stream, &mut baselines, r, t);
        emit_snowset_cache_sample(stream, &mut cache_baseline, r, t);
    }
}

fn emit_snowset_plan_sample(
    stream: &mut ResidualStream,
    baselines: &mut HashMap<(String, String), VecDeque<f64>>,
    r: &Row,
    t: f64,
) {
    debug_assert!(t.is_finite(), "t must be finite");
    debug_assert!(
        r.execution_time_us.is_finite(),
        "execution_time_us must be finite"
    );
    let key = (r.warehouse_id.clone(), r.query_id.clone());
    let q = baselines.entry(key).or_default();
    let baseline = if q.is_empty() {
        r.execution_time_us
    } else {
        q.iter().sum::<f64>() / q.len() as f64
    };
    plan_regression::push_latency(
        stream,
        t,
        &format!("{}/{}", r.warehouse_id, r.query_id),
        r.execution_time_us / 1e3,
        baseline / 1e3,
    );
    q.push_back(r.execution_time_us);
    if q.len() > SNOWSET_BASELINE_WIN {
        q.pop_front();
    }
    debug_assert!(
        q.len() <= SNOWSET_BASELINE_WIN,
        "rolling window bound enforced"
    );
}

fn emit_snowset_cache_sample(
    stream: &mut ResidualStream,
    cache_baseline: &mut HashMap<String, VecDeque<f64>>,
    r: &Row,
    t: f64,
) {
    debug_assert!(t.is_finite(), "t must be finite");
    let total = r.bytes_cache + r.bytes_storage;
    if total <= 0.0 {
        return;
    }
    let cache_ratio = r.bytes_cache / total;
    debug_assert!((0.0..=1.0).contains(&cache_ratio), "cache ratio in [0,1]");
    let cb = cache_baseline.entry(r.warehouse_id.clone()).or_default();
    let expected = if cb.is_empty() {
        cache_ratio
    } else {
        cb.iter().sum::<f64>() / cb.len() as f64
    };
    cache_io::push_hit_ratio(stream, t, &r.warehouse_id, expected, cache_ratio);
    cb.push_back(cache_ratio);
    if cb.len() > SNOWSET_BASELINE_WIN {
        cb.pop_front();
    }
    debug_assert!(
        cb.len() <= SNOWSET_BASELINE_WIN,
        "rolling window bound enforced"
    );
}

fn emit_snowset_workload_phase(stream: &mut ResidualStream, rows: &[Row], t0: f64) {
    debug_assert!(t0.is_finite(), "t0 must be finite");
    let mut histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
    let mut prev_histos: HashMap<String, HashMap<String, u64>> = HashMap::new();
    let mut current_bucket = 0_i64;
    for r in rows.iter() {
        let t = (r.created_time_us - t0) / 1e6;
        let bucket = (t / SNOWSET_BUCKET_SECONDS) as i64;
        if bucket != current_bucket {
            flush_snowset_phase_deltas(stream, &histos, &prev_histos, current_bucket);
            prev_histos = std::mem::take(&mut histos);
            current_bucket = bucket;
        }
        *histos
            .entry(r.warehouse_id.clone())
            .or_default()
            .entry(r.query_id.clone())
            .or_insert(0) += 1;
    }
}

fn flush_snowset_phase_deltas(
    stream: &mut ResidualStream,
    histos: &HashMap<String, HashMap<String, u64>>,
    prev_histos: &HashMap<String, HashMap<String, u64>>,
    current_bucket: i64,
) {
    debug_assert!(current_bucket >= 0, "bucket index non-negative");
    for (wh, h) in histos.iter() {
        if let Some(prev) = prev_histos.get(wh) {
            let d = workload_phase::js_divergence(prev, h);
            debug_assert!((0.0..=1.0).contains(&d), "JSD in [0,1]");
            workload_phase::push_jsd(
                stream,
                current_bucket as f64 * SNOWSET_BUCKET_SECONDS,
                wh,
                d,
            );
        }
    }
}

impl DatasetAdapter for Snowset {
    fn name(&self) -> &'static str {
        "snowset"
    }

    fn load(&self, path: &Path) -> Result<ResidualStream> {
        let rows = load_snowset_rows(path)?;
        debug_assert!(rows.len() <= MAX_SNOWSET_ROWS, "row-count bound enforced");
        let mut stream = ResidualStream::new(format!(
            "snowset@{}",
            path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
        ));
        let t0 = rows.first().map(|r| r.created_time_us).unwrap_or(0.0);
        debug_assert!(t0.is_finite(), "t0 must be finite");

        emit_snowset_plan_and_cache(&mut stream, &rows, t0);
        emit_snowset_workload_phase(&mut stream, &rows, t0);

        stream.sort();
        Ok(stream)
    }

    fn exemplar(&self, seed: u64) -> ResidualStream {
        let mut rng = rand_pcg::Pcg64::seed_from_u64(seed);
        let mut stream = ResidualStream::new(format!("snowset-exemplar-seed{seed}"));
        let warehouses = ["wh_a", "wh_b", "wh_c"];
        let queries = ["q1", "q2", "q3", "q4", "q5"];
        // Stable phase: 3000 s of low-residual traffic.
        for i in 0..3000 {
            let t = i as f64;
            let w = warehouses[(i / 200) % warehouses.len()];
            let q = queries[(i / 13) % queries.len()];
            let base = 50.0;
            let jitter: f64 = rng.gen_range(-3.0..3.0);
            plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
            cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.92 + rng.gen_range(-0.01..0.01));
        }
        // Phase shift: warehouse `wh_b` adopts a heavier query mix; phase JSD
        // crosses threshold around t=3300.
        for i in 3000..6000 {
            let t = i as f64;
            let w = "wh_b";
            let q = if rng.gen_bool(0.7) { "q_heavy" } else { "q5" };
            let base = 80.0;
            let jitter: f64 = rng.gen_range(-5.0..15.0);
            plan_regression::push_latency(&mut stream, t, &format!("{w}/{q}"), base + jitter, base);
            cache_io::push_hit_ratio(&mut stream, t, w, 0.92, 0.55 + rng.gen_range(-0.05..0.05));
        }
        // Synthetic JSD residual rising at the phase boundary
        for k in 0..30 {
            let t = 3000.0 + 50.0 * k as f64;
            let d = if (10..20).contains(&k) {
                0.4 + rng.gen_range(-0.05..0.05)
            } else {
                0.05 + rng.gen_range(0.0..0.03)
            };
            workload_phase::push_jsd(&mut stream, t, "wh_b", d);
        }
        stream.sort();
        stream
    }
}