use std::cmp::min;
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::ops::{Bound, Deref, DerefMut};
use sst::SstMetadata;
use zerror::Z;
use zerror_core::ErrorCore;
use super::*;
fn key_range_overlap(lhs: &SstMetadata, rhs: &SstMetadata) -> bool {
lhs.first_key <= rhs.last_key && rhs.first_key <= lhs.last_key
}
#[derive(Clone, Debug)]
pub struct Vertex {
level: usize,
color: usize,
peers: usize,
bytes_within_color: u64,
}
#[derive(Clone, Debug, Default)]
pub struct AdjacencyList(BTreeSet<(usize, usize)>);
impl Deref for AdjacencyList {
type Target = BTreeSet<(usize, usize)>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for AdjacencyList {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<BTreeSet<(usize, usize)>> for AdjacencyList {
fn from(adj_list: BTreeSet<(usize, usize)>) -> Self {
Self(adj_list)
}
}
pub fn recover(options: LsmtkOptions, metadata: Vec<SstMetadata>) -> Result<Version, Error> {
let forward_adj_list = construct_adj_list(&metadata)?;
let mut vertices = Vec::with_capacity(metadata.len());
vertices.resize(
metadata.len(),
Vertex {
color: metadata.len(),
level: metadata.len(),
peers: 0,
bytes_within_color: 0,
},
);
tarjan_scc(&mut vertices, &forward_adj_list);
let mut colors = BTreeSet::new();
for idx in 0..vertices.len() {
let color = vertices[idx].color;
colors.insert(color);
vertices[color].peers += 1;
vertices[color].bytes_within_color += metadata[idx].file_size;
}
let mut color_forward_adj_list = BTreeSet::new();
let mut color_reverse_adj_list = BTreeSet::new();
for (src, dst) in forward_adj_list.iter() {
let src: usize = *src;
let dst: usize = *dst;
if vertices[src].color != vertices[dst].color {
color_forward_adj_list.insert((vertices[src].color, vertices[dst].color));
color_reverse_adj_list.insert((vertices[dst].color, vertices[src].color));
}
}
let mut colors_stack = Vec::new();
for color in colors.iter().copied() {
let lower = Bound::Included((color, 0));
let upper = Bound::Included((color, usize::MAX));
if color_reverse_adj_list.range((lower, upper)).count() == 0 {
vertices[color].level = 0;
colors_stack.push(color);
}
}
while let Some(color) = colors_stack.pop() {
let lower = Bound::Included((color, usize::MIN));
let upper = Bound::Included((color, usize::MAX));
for (u, v) in color_forward_adj_list.range((lower, upper)).copied() {
assert_eq!(color, u);
if vertices[v].level == vertices.len() || vertices[v].level <= vertices[color].level {
vertices[v].level = vertices[color].level + 1;
colors_stack.push(v);
}
}
}
for idx in 0..vertices.len() {
vertices[idx].level = vertices[vertices[idx].color].level;
vertices[idx].peers = vertices[vertices[idx].color].peers;
vertices[idx].bytes_within_color = vertices[vertices[idx].color].bytes_within_color;
}
let max_level = vertices
.iter()
.map(|x| x.level)
.max()
.unwrap_or(NUM_LEVELS - 1);
if max_level >= NUM_LEVELS {
let delta = max_level - NUM_LEVELS + 1;
for v in vertices.iter_mut() {
if v.level < delta {
v.level = 0;
} else {
v.level -= delta;
}
}
}
let mut levels = vec![Level::default(); NUM_LEVELS];
for (v, m) in std::iter::zip(vertices, metadata) {
levels[v.level].ssts.push(Arc::new(m));
}
levels[0].ssts.sort_by_key(|lhs| lhs.smallest_timestamp);
for level in levels[1..].iter_mut() {
level.ssts.sort_by(|lhs, rhs| {
KeyRef::new(&lhs.first_key, lhs.smallest_timestamp)
.cmp(&KeyRef::new(&rhs.first_key, rhs.smallest_timestamp))
});
}
let ongoing = Arc::new(Mutex::default());
Ok(Version {
options,
levels,
ongoing,
})
}
fn construct_adj_list(metadata: &[SstMetadata]) -> Result<AdjacencyList, Error> {
let mut forward_adj_list = AdjacencyList::default();
for i in 0..metadata.len() {
if metadata[i].smallest_timestamp > metadata[i].biggest_timestamp {
let err = Error::Corruption {
core: ErrorCore::default(),
context: "metadata timestamps not in order".to_string(),
}
.with_info("SST", metadata[i].setsum)
.with_info("smallest_timestamp", metadata[i].smallest_timestamp)
.with_info("biggest_timestamp", metadata[i].biggest_timestamp);
return Err(err);
}
for j in i + 1..metadata.len() {
if !key_range_overlap(&metadata[i], &metadata[j]) {
continue;
}
if metadata[i].biggest_timestamp < metadata[j].smallest_timestamp {
forward_adj_list.insert((j, i));
} else if metadata[j].biggest_timestamp < metadata[i].smallest_timestamp {
forward_adj_list.insert((i, j));
} else {
forward_adj_list.insert((i, j));
forward_adj_list.insert((j, i));
}
}
}
Ok(forward_adj_list)
}
#[derive(Clone)]
struct TarjanVertex {
index: usize,
lowlink: usize,
on_stack: bool,
}
fn tarjan_scc(vertices: &mut [Vertex], adj_list: &BTreeSet<(usize, usize)>) {
let mut state = Vec::with_capacity(vertices.len());
state.resize(
vertices.len(),
TarjanVertex {
index: vertices.len(),
lowlink: vertices.len(),
on_stack: false,
},
);
let mut index = 0;
let mut stack = Vec::new();
for idx in 0..vertices.len() {
if state[idx].index != vertices.len() {
continue;
}
let lower = Bound::Included((idx, usize::MIN));
let upper = Bound::Included((idx, usize::MAX));
let iter = adj_list.range((lower, upper));
let mut recursion = Vec::new();
recursion.push((idx, iter));
state[idx].index = index;
state[idx].lowlink = index;
index += 1;
stack.push(idx);
state[idx].on_stack = true;
while !recursion.is_empty() {
let recursion_idx = recursion.len() - 1;
let (v, iter) = &mut recursion[recursion_idx];
if let Some((src, w)) = iter.next() {
assert_eq!(*v, *src);
if state[*w].index == vertices.len() {
let lower = Bound::Included((*w, 0));
let upper = Bound::Included((*w, usize::MAX));
let iter = adj_list.range((lower, upper));
recursion.push((*w, iter));
state[*w].index = index;
state[*w].lowlink = index;
index += 1;
stack.push(*w);
state[*w].on_stack = true;
} else if state[*w].on_stack {
state[*v].lowlink = min(state[*v].lowlink, state[*w].index);
}
} else {
if state[*v].lowlink == state[*v].index {
vertices[*v].color = *v;
while !stack.is_empty() && stack[stack.len() - 1] != *v {
let w = stack.pop().unwrap();
state[w].on_stack = false;
vertices[w].color = *v;
}
assert!(!stack.is_empty() && stack[stack.len() - 1] == *v);
stack.pop();
state[*v].on_stack = false;
}
let prev_w: usize = *v;
recursion.pop();
if !recursion.is_empty() {
let prev_v = recursion[recursion.len() - 1].0;
state[prev_v].lowlink = min(state[prev_v].lowlink, state[prev_w].lowlink);
}
}
}
}
}