use super::IndexEntry;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct EntityRun {
pub(crate) entity: String,
pub(crate) start: u64,
pub(crate) len: u64,
pub(crate) first_sequence: u64,
pub(crate) last_sequence: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct RestoreChunkSummary {
pub(crate) start: u64,
pub(crate) len: u64,
pub(crate) first_sequence: u64,
pub(crate) last_sequence: u64,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct RoutingSummary {
pub(crate) entry_count: u64,
pub(crate) chunk_count: u64,
pub(crate) chunks: Vec<RestoreChunkSummary>,
pub(crate) entity_runs: Vec<EntityRun>,
}
pub(super) struct RestoreBase {
pub(super) entries_by_sequence: Vec<Arc<IndexEntry>>,
pub(super) entries_by_entity: Vec<Arc<IndexEntry>>,
pub(super) routing: RoutingSummary,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum RoutingValidation {
Valid,
Invalid(RoutingValidationError),
}
impl RoutingValidation {
pub(crate) fn is_valid(self) -> bool {
matches!(self, Self::Valid)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) enum RoutingValidationError {
EntryCountMismatch,
ChunkCountMismatch,
ChunkStartOverflow,
ChunkLenOverflow,
ChunkLenZero,
ChunkEndOverflow,
ChunkEndOutOfBounds,
ChunkFirstSequenceMismatch,
ChunkLastSequenceMismatch,
ChunkTotalMismatch,
EntityRunStartOverflow,
EntityRunLenOverflow,
EntityRunLenZero,
EntityRunEndOverflow,
EntityRunEndOutOfBounds,
EntityRunEntityMismatch,
EntityRunFirstSequenceMismatch,
EntityRunLastSequenceMismatch,
EntityRunInternalEntityMismatch,
EntityRunTotalMismatch,
}
impl RestoreBase {
pub(super) fn from_sorted_entries(
entries: Vec<IndexEntry>,
chunk_count: usize,
routing_hint: Option<&RoutingSummary>,
) -> Self {
let entries_by_sequence: Vec<Arc<IndexEntry>> = entries.into_iter().map(Arc::new).collect();
let mut entries_by_entity = entries_by_sequence.clone();
sort_entries_by_entity(&mut entries_by_entity);
Self {
routing: routing_hint
.and_then(|routing| {
let validation =
routing.validate_detailed(&entries_by_sequence, &entries_by_entity);
debug_assert_eq!(
routing.validate(&entries_by_sequence, &entries_by_entity),
validation.is_valid(),
"restore routing bool validation must stay a projection of detailed validation"
);
match validation {
RoutingValidation::Valid => Some(routing.clone()),
RoutingValidation::Invalid(error) => {
tracing::debug!(
?error,
"ignored stale restore routing hint and rebuilt routing summary"
);
None
}
}
})
.unwrap_or_else(|| {
RoutingSummary::from_entries(
&entries_by_sequence,
&entries_by_entity,
chunk_count,
)
}),
entries_by_sequence,
entries_by_entity,
}
}
}
impl RoutingSummary {
pub(crate) fn from_sorted_entries(entries: &[IndexEntry], chunk_count: usize) -> Self {
let entries_by_sequence: Vec<Arc<IndexEntry>> =
entries.iter().cloned().map(Arc::new).collect();
let mut entries_by_entity = entries_by_sequence.clone();
sort_entries_by_entity(&mut entries_by_entity);
Self::from_entries(&entries_by_sequence, &entries_by_entity, chunk_count)
}
pub(super) fn from_entries(
entries_by_sequence: &[Arc<IndexEntry>],
entries_by_entity: &[Arc<IndexEntry>],
chunk_count: usize,
) -> Self {
let chunk_count = chunk_count.max(1);
let mut entity_runs = Vec::new();
let mut cursor = 0usize;
while cursor < entries_by_entity.len() {
let entity = entries_by_entity[cursor].coord.entity().to_owned();
let start = cursor;
let first_sequence = entries_by_entity[cursor].global_sequence;
while cursor < entries_by_entity.len()
&& entries_by_entity[cursor].coord.entity() == entity.as_str()
{
let previous = cursor;
cursor += 1;
debug_assert!(
cursor > previous,
"restore routing entity-run scan must advance cursor"
);
}
let last_sequence = entries_by_entity[cursor - 1].global_sequence;
entity_runs.push(EntityRun {
entity,
start: start as u64,
len: (cursor - start) as u64,
first_sequence,
last_sequence,
});
}
let mut chunks = Vec::new();
if !entries_by_sequence.is_empty() {
let base = entries_by_sequence.len() / chunk_count;
let remainder = entries_by_sequence.len() % chunk_count;
let mut start = 0usize;
for chunk_index in 0..chunk_count {
let len = base + usize::from(chunk_index < remainder);
if len == 0 {
continue;
}
let end = start + len;
let first_sequence = entries_by_sequence[start].global_sequence;
let last_sequence = entries_by_sequence[end - 1].global_sequence;
chunks.push(RestoreChunkSummary {
start: start as u64,
len: len as u64,
first_sequence,
last_sequence,
});
start = end;
}
}
Self {
entry_count: entries_by_entity.len() as u64,
chunk_count: chunks.len() as u64,
chunks,
entity_runs,
}
}
pub(crate) fn validate(
&self,
entries_by_sequence: &[Arc<IndexEntry>],
entries_by_entity: &[Arc<IndexEntry>],
) -> bool {
self.validate_detailed(entries_by_sequence, entries_by_entity)
.is_valid()
}
pub(crate) fn validate_detailed(
&self,
entries_by_sequence: &[Arc<IndexEntry>],
entries_by_entity: &[Arc<IndexEntry>],
) -> RoutingValidation {
if self.entry_count != entries_by_sequence.len() as u64
|| self.entry_count != entries_by_entity.len() as u64
{
return RoutingValidation::Invalid(RoutingValidationError::EntryCountMismatch);
}
if self.chunk_count != self.chunks.len() as u64 {
return RoutingValidation::Invalid(RoutingValidationError::ChunkCountMismatch);
}
let mut chunk_total = 0usize;
for chunk in &self.chunks {
let start = match usize::try_from(chunk.start) {
Ok(start) => start,
Err(_) => {
return RoutingValidation::Invalid(RoutingValidationError::ChunkStartOverflow);
}
};
let len = match usize::try_from(chunk.len) {
Ok(len) => len,
Err(_) => {
return RoutingValidation::Invalid(RoutingValidationError::ChunkLenOverflow);
}
};
let end = match start.checked_add(len) {
Some(end) => end,
None => {
return RoutingValidation::Invalid(RoutingValidationError::ChunkEndOverflow);
}
};
if len == 0 {
return RoutingValidation::Invalid(RoutingValidationError::ChunkLenZero);
}
if end > entries_by_sequence.len() {
return RoutingValidation::Invalid(RoutingValidationError::ChunkEndOutOfBounds);
}
if entries_by_sequence[start].global_sequence != chunk.first_sequence {
return RoutingValidation::Invalid(
RoutingValidationError::ChunkFirstSequenceMismatch,
);
}
if entries_by_sequence[end - 1].global_sequence != chunk.last_sequence {
return RoutingValidation::Invalid(
RoutingValidationError::ChunkLastSequenceMismatch,
);
}
chunk_total += len;
}
if chunk_total != entries_by_sequence.len() {
return RoutingValidation::Invalid(RoutingValidationError::ChunkTotalMismatch);
}
let mut run_total = 0usize;
for run in &self.entity_runs {
let start = match usize::try_from(run.start) {
Ok(start) => start,
Err(_) => {
return RoutingValidation::Invalid(
RoutingValidationError::EntityRunStartOverflow,
);
}
};
let len = match usize::try_from(run.len) {
Ok(len) => len,
Err(_) => {
return RoutingValidation::Invalid(
RoutingValidationError::EntityRunLenOverflow,
);
}
};
let end = match start.checked_add(len) {
Some(end) => end,
None => {
return RoutingValidation::Invalid(
RoutingValidationError::EntityRunEndOverflow,
);
}
};
if len == 0 {
return RoutingValidation::Invalid(RoutingValidationError::EntityRunLenZero);
}
if end > entries_by_entity.len() {
return RoutingValidation::Invalid(RoutingValidationError::EntityRunEndOutOfBounds);
}
let slice = &entries_by_entity[start..end];
if slice[0].coord.entity() != run.entity
|| slice[end - start - 1].coord.entity() != run.entity
{
return RoutingValidation::Invalid(RoutingValidationError::EntityRunEntityMismatch);
}
if slice[0].global_sequence != run.first_sequence {
return RoutingValidation::Invalid(
RoutingValidationError::EntityRunFirstSequenceMismatch,
);
}
if slice[end - start - 1].global_sequence != run.last_sequence {
return RoutingValidation::Invalid(
RoutingValidationError::EntityRunLastSequenceMismatch,
);
}
if slice.iter().any(|entry| entry.coord.entity() != run.entity) {
return RoutingValidation::Invalid(
RoutingValidationError::EntityRunInternalEntityMismatch,
);
}
run_total += len;
}
if run_total == entries_by_entity.len() {
RoutingValidation::Valid
} else {
RoutingValidation::Invalid(RoutingValidationError::EntityRunTotalMismatch)
}
}
}
fn sort_entries_by_entity(entries: &mut [Arc<IndexEntry>]) {
entries.sort_by(|left, right| {
left.coord
.entity()
.cmp(right.coord.entity())
.then(left.wall_ms.cmp(&right.wall_ms))
.then(left.clock.cmp(&right.clock))
.then(left.event_id.cmp(&right.event_id))
});
}
pub(crate) fn recommended_restore_chunk_count(entry_count: usize) -> usize {
let chunks = entry_count.div_ceil(65_536);
chunks.clamp(1, 32)
}
pub(crate) fn restore_chunk_ranges(
entry_count: usize,
routing: &RoutingSummary,
) -> Vec<(usize, usize)> {
fn even_ranges(entry_count: usize) -> Vec<(usize, usize)> {
let chunk_count = recommended_restore_chunk_count(entry_count);
let base = entry_count / chunk_count;
let remainder = entry_count % chunk_count;
let mut start = 0usize;
let mut ranges = Vec::new();
for chunk_index in 0..chunk_count {
let len = base + usize::from(chunk_index < remainder);
if len == 0 {
continue;
}
ranges.push((start, len));
start += len;
}
ranges
}
if routing.chunks.is_empty() {
return even_ranges(entry_count);
}
let mut ranges = Vec::with_capacity(routing.chunks.len());
let mut expected_start = 0usize;
for chunk in &routing.chunks {
let Ok(start) = usize::try_from(chunk.start) else {
return even_ranges(entry_count);
};
let Ok(len) = usize::try_from(chunk.len) else {
return even_ranges(entry_count);
};
let Some(end) = start.checked_add(len) else {
return even_ranges(entry_count);
};
if len == 0 || start != expected_start || end > entry_count {
return even_ranges(entry_count);
}
ranges.push((start, len));
expected_start = end;
}
if expected_start == entry_count {
ranges
} else {
even_ranges(entry_count)
}
}
#[cfg(test)]
mod tests;