use std::collections::BTreeMap;
use opensqlany::{ApModel, Page, PageStore, PageType, SlottedPage};
use crate::bv_recovery::{deobfuscate_with_bv, recover_bv_qb_data};
use crate::syscolumn::{SysColumn, iter_syscolumns};
use crate::sysobject::bridge_owners_to_tables;
use crate::systable::iter_systable_entries;
pub const VARIABLE_COLUMN_UPPER_ALLOWANCE: u32 = 64;
pub const MIN_ROW_BODY_BYTES: u32 = 4;
#[derive(Debug, Clone, Copy)]
pub struct WidthBand {
pub owner_object_id: u32,
pub low: u32,
pub high: u32,
pub column_count: u32,
}
#[derive(Debug, Clone)]
pub struct SchemaAttribution {
bands: BTreeMap<String, WidthBand>,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct ValidationStats {
pub pass: u64,
pub fail: u64,
pub no_band: u64,
pub unmeasured: u64,
}
impl ValidationStats {
pub fn total(&self) -> u64 {
self.pass + self.fail + self.no_band + self.unmeasured
}
}
fn is_fixed_width_domain(d: u8) -> bool {
matches!(d, b'N' | b'F' | b'I' | b'D' | b'T' | b'B')
}
impl SchemaAttribution {
pub fn build(store: &PageStore, model: &ApModel) -> Self {
let entries = iter_systable_entries(store, model).collect::<Vec<_>>();
let columns: Vec<SysColumn> = iter_syscolumns(store, model).collect();
let owner_to_table = bridge_owners_to_tables(store, model, &columns, &entries);
let mut cols_by_owner: BTreeMap<u32, Vec<SysColumn>> = BTreeMap::new();
for c in columns {
cols_by_owner.entry(c.owner_object_id).or_default().push(c);
}
let mut tables_by_name: BTreeMap<String, &crate::SysTableEntry> = BTreeMap::new();
for e in &entries {
tables_by_name.entry(e.name.clone()).or_insert(e);
}
let mut bands = BTreeMap::new();
for (owner, cols) in &cols_by_owner {
let Some(table_name) = owner_to_table.get(owner) else {
continue;
};
let Some(table) = tables_by_name.get(table_name) else {
continue;
};
if cols.is_empty() {
continue;
}
if let Some(declared) = table.col_count {
if declared >= 2 && (cols.len() as u32) * 2 < declared as u32 {
continue;
}
}
let mut low: u32 = MIN_ROW_BODY_BYTES;
let mut high: u32 = MIN_ROW_BODY_BYTES;
for c in cols {
let w = c.width as u32;
if is_fixed_width_domain(c.domain_char) {
low = low.saturating_add(w);
high = high.saturating_add(w);
} else {
low = low.saturating_add(1);
high = high.saturating_add(w.max(VARIABLE_COLUMN_UPPER_ALLOWANCE));
}
}
bands.insert(
table_name.clone(),
WidthBand {
owner_object_id: *owner,
low,
high,
column_count: cols.len() as u32,
},
);
}
Self { bands }
}
pub fn band(&self, table_name: &str) -> Option<&WidthBand> {
self.bands.get(table_name)
}
pub fn len(&self) -> usize {
self.bands.len()
}
pub fn is_empty(&self) -> bool {
self.bands.is_empty()
}
pub fn validate_observed(&self, table_name: &str, observed_row_body: u32) -> bool {
let Some(b) = self.bands.get(table_name) else {
return false;
};
observed_row_body >= b.low && observed_row_body <= b.high
}
pub fn measure_page(
&self,
store: &PageStore,
model: &ApModel,
page_number: u64,
) -> Option<u32> {
median_row_body(store, model, page_number)
}
pub fn validate(
&self,
store: &PageStore,
model: &ApModel,
page_number: u64,
table_name: &str,
) -> Option<bool> {
let observed = median_row_body(store, model, page_number)?;
Some(self.validate_observed(table_name, observed))
}
pub fn validate_corpus<I, S>(
&self,
store: &PageStore,
model: &ApModel,
pages: I,
) -> ValidationStats
where
I: IntoIterator<Item = (u64, S)>,
S: AsRef<str>,
{
let mut s = ValidationStats::default();
for (pn, name) in pages {
let name = name.as_ref();
if !self.bands.contains_key(name) {
s.no_band += 1;
continue;
}
match median_row_body(store, model, pn) {
None => s.unmeasured += 1,
Some(m) => {
if self.validate_observed(name, m) {
s.pass += 1;
} else {
s.fail += 1;
}
}
}
}
s
}
}
fn median_row_body(store: &PageStore, model: &ApModel, page_number: u64) -> Option<u32> {
let page = store.page(page_number).ok()?;
if page.trailer().page_type() != PageType::Extent {
return None;
}
let raw = page.bytes();
let plain = if let Some(bv) = recover_bv_qb_data(page_number, raw) {
deobfuscate_with_bv(raw, page_number, bv)
} else {
model.deobfuscate_with_store(raw, page_number, store)
};
let p = Page::from_bytes(page_number, &plain);
let sp = SlottedPage::parse(p);
sp.directory.as_ref()?;
let rows = sp.row_bytes();
if rows.is_empty() {
return None;
}
let mut lens: Vec<u32> = rows.iter().map(|(_, b)| b.len() as u32).collect();
lens.sort_unstable();
Some(lens[lens.len() / 2])
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fixed_width_domains_recognized() {
for d in [b'N', b'F', b'I', b'D', b'T', b'B'] {
assert!(is_fixed_width_domain(d));
}
for d in [b'Y', b'V', b'C', b'A', b'X'] {
assert!(!is_fixed_width_domain(d));
}
}
#[test]
fn empty_attribution_validates_nothing() {
let sa = SchemaAttribution {
bands: BTreeMap::new(),
};
assert!(sa.is_empty());
assert!(!sa.validate_observed("anything", 32));
}
#[test]
fn observed_inside_band_passes() {
let mut bands = BTreeMap::new();
bands.insert(
"t".to_string(),
WidthBand {
owner_object_id: 1,
low: 16,
high: 64,
column_count: 3,
},
);
let sa = SchemaAttribution { bands };
assert!(sa.validate_observed("t", 32));
assert!(!sa.validate_observed("t", 8));
assert!(!sa.validate_observed("t", 200));
assert!(sa.validate_observed("t", 16));
assert!(sa.validate_observed("t", 64));
}
#[test]
fn validation_stats_total_sums_components() {
let s = ValidationStats {
pass: 4,
fail: 1,
no_band: 2,
unmeasured: 3,
};
assert_eq!(s.total(), 10);
}
#[test]
fn unknown_table_does_not_validate() {
let sa = SchemaAttribution {
bands: BTreeMap::new(),
};
assert!(!sa.validate_observed("missing", 10));
}
}