use std::collections::BTreeMap;
use opensqlany::{ApModel, PageStore};
use crate::systable::{SysTableEntry, collect_unique};
#[derive(Debug, Clone)]
pub struct PageAttribution {
by_root: Vec<SysTableEntry>,
}
impl PageAttribution {
pub fn from_catalog(catalog: Vec<SysTableEntry>) -> Self {
let mut by_root: Vec<SysTableEntry> = catalog
.into_iter()
.filter(|e| matches!(e.data_root_page, Some(p) if p > 0))
.collect();
by_root.sort_by_key(|e| e.data_root_page.unwrap_or(0));
Self { by_root }
}
pub fn build(store: &PageStore, model: &ApModel) -> Self {
Self::from_catalog(collect_unique(store, model))
}
pub fn attribute(&self, page_number: u64) -> Option<&SysTableEntry> {
if self.by_root.is_empty() {
return None;
}
let key = page_number as u32;
let idx = self
.by_root
.partition_point(|e| e.data_root_page.unwrap_or(0) <= key);
if idx == 0 {
return None;
}
let entry = &self.by_root[idx - 1];
if let (Some(last), Some(root)) = (entry.last_page, entry.data_root_page) {
if last >= root && (page_number as u32) > last {
return None;
}
}
Some(entry)
}
pub fn len(&self) -> usize {
self.by_root.len()
}
pub fn is_empty(&self) -> bool {
self.by_root.is_empty()
}
pub fn entries(&self) -> impl Iterator<Item = &SysTableEntry> {
self.by_root.iter()
}
pub fn group_pages<I>(&self, pages: I) -> BTreeMap<String, Vec<u64>>
where
I: IntoIterator<Item = u64>,
{
let mut out: BTreeMap<String, Vec<u64>> = BTreeMap::new();
for pn in pages {
let key = self
.attribute(pn)
.map(|e| e.name.clone())
.unwrap_or_default();
out.entry(key).or_default().push(pn);
}
out
}
}
#[cfg(test)]
mod tests {
use super::*;
fn entry(tid: u32, name: &str, root: Option<u32>, last: Option<u32>) -> SysTableEntry {
SysTableEntry {
table_id: tid,
name: name.to_owned(),
magic: [0; 4],
col_count: None,
data_root_page: root,
last_page: last,
page_number: 0,
row_offset: 0,
}
}
#[test]
fn empty_catalog_returns_none() {
let attr = PageAttribution::from_catalog(vec![]);
assert!(attr.is_empty());
assert!(attr.attribute(123).is_none());
}
#[test]
fn skips_entries_without_root() {
let attr = PageAttribution::from_catalog(vec![
entry(1, "a", None, None),
entry(2, "b", Some(0), None),
entry(3, "c", Some(100), Some(200)),
]);
assert_eq!(attr.len(), 1);
assert_eq!(attr.attribute(150).unwrap().name, "c");
}
#[test]
fn picks_nearest_lower_root() {
let attr = PageAttribution::from_catalog(vec![
entry(1, "alpha", Some(100), Some(200)),
entry(2, "beta", Some(300), Some(400)),
entry(3, "gamma", Some(500), Some(600)),
]);
assert!(attr.attribute(50).is_none());
assert_eq!(attr.attribute(100).unwrap().name, "alpha");
assert_eq!(attr.attribute(150).unwrap().name, "alpha");
assert!(attr.attribute(250).is_none());
assert_eq!(attr.attribute(300).unwrap().name, "beta");
assert_eq!(attr.attribute(550).unwrap().name, "gamma");
assert!(attr.attribute(700).is_none());
}
#[test]
fn missing_last_page_extends_indefinitely() {
let attr = PageAttribution::from_catalog(vec![
entry(1, "alpha", Some(100), None),
entry(2, "beta", Some(500), Some(600)),
]);
assert_eq!(attr.attribute(400).unwrap().name, "alpha");
assert!(attr.attribute(700).is_none());
}
#[test]
fn group_pages_buckets_correctly() {
let attr = PageAttribution::from_catalog(vec![
entry(1, "alpha", Some(100), Some(200)),
entry(2, "beta", Some(300), Some(400)),
]);
let groups = attr.group_pages(vec![50u64, 150, 250, 350, 500]);
assert_eq!(groups.get("alpha").unwrap(), &vec![150]);
assert_eq!(groups.get("beta").unwrap(), &vec![350]);
assert_eq!(groups.get("").unwrap(), &vec![50, 250, 500]);
}
}