use std::collections::btree_map::{IntoValues, Values};
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::ops::Bound::{Included, Unbounded};
use std::ops::RangeInclusive;
use crate::model::oplog::OplogIndex;
use bincode::{Decode, Encode};
use range_set_blaze::RangeSetBlaze;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Eq, PartialEq, Encode, Decode, Serialize, Deserialize)]
#[cfg_attr(feature = "poem", derive(poem_openapi::Object))]
#[cfg_attr(feature = "poem", oai(rename_all = "camelCase"))]
#[serde(rename_all = "camelCase")]
pub struct OplogRegion {
pub start: OplogIndex,
pub end: OplogIndex,
}
impl OplogRegion {
pub fn contains(&self, target: OplogIndex) -> bool {
self.start <= target && target <= self.end
}
pub fn union(&self, other: &OplogRegion) -> Option<OplogRegion> {
if self.contains(other.start) || other.contains(self.start) {
Some(OplogRegion {
start: self.start.min(other.start),
end: self.end.max(other.end),
})
} else {
None
}
}
pub fn from_index_range(range: RangeInclusive<OplogIndex>) -> OplogRegion {
OplogRegion {
start: *range.start(),
end: *range.end(),
}
}
pub fn from_range(range: RangeInclusive<u64>) -> OplogRegion {
OplogRegion {
start: OplogIndex::from_u64(*range.start()),
end: OplogIndex::from_u64(*range.end()),
}
}
pub fn to_range(&self) -> RangeInclusive<u64> {
self.start.into()..=self.end.into()
}
}
impl Display for OplogRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "<{} => {}>", self.start, self.end)
}
}
pub struct DeletedRegionsBuilder {
regions: RangeSetBlaze<u64>,
}
impl Default for DeletedRegionsBuilder {
fn default() -> Self {
Self::new()
}
}
impl DeletedRegionsBuilder {
pub fn new() -> Self {
Self {
regions: RangeSetBlaze::new(),
}
}
pub fn from_regions(regions: impl IntoIterator<Item = OplogRegion>) -> Self {
Self {
regions: RangeSetBlaze::from_iter(regions.into_iter().map(|region| region.to_range())),
}
}
pub fn add(&mut self, region: OplogRegion) {
self.regions.ranges_insert(region.to_range());
}
pub fn build(self) -> DeletedRegions {
DeletedRegions::from_regions(self.regions.into_ranges().map(OplogRegion::from_range))
}
}
#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
pub struct DeletedRegions {
regions: Vec<BTreeMap<OplogIndex, OplogRegion>>,
}
impl Default for DeletedRegions {
fn default() -> Self {
Self::new()
}
}
impl DeletedRegions {
pub fn new() -> Self {
Self {
regions: vec![BTreeMap::new()],
}
}
pub fn from_regions(regions: impl IntoIterator<Item = OplogRegion>) -> Self {
Self {
regions: vec![BTreeMap::from_iter(
regions.into_iter().map(|region| (region.start, region)),
)],
}
}
pub fn add(&mut self, region: OplogRegion) {
let current = self.regions.pop().unwrap();
let mut builder = DeletedRegionsBuilder::from_regions(current.into_values());
builder.add(region);
let mut temp = builder.build().regions;
self.regions.push(temp.pop().unwrap());
}
pub fn set_override(&mut self, other: DeletedRegions) {
if self.is_overridden() {
self.drop_override();
}
if !other.is_empty() {
let current = self.regions.last().unwrap();
let mut builder = DeletedRegionsBuilder::from_regions(current.clone().into_values());
for region in current.values() {
builder.add(region.clone());
}
for region in other.into_regions() {
builder.add(region);
}
self.regions.push(builder.build().regions.pop().unwrap());
}
}
pub fn drop_override(&mut self) {
self.regions.pop();
}
pub fn get_override(&self) -> Option<DeletedRegions> {
if self.is_overridden() {
self.regions.last().map(|regions| {
DeletedRegionsBuilder::from_regions(regions.values().cloned().collect::<Vec<_>>())
.build()
})
} else {
None
}
}
pub fn merge_override(&mut self) {
if let Some(regions) = self.regions.pop() {
let current = self.regions.pop().unwrap();
let mut builder = DeletedRegionsBuilder::from_regions(current.into_values());
for region in regions.values() {
builder.add(region.clone());
}
let mut temp = builder.build().regions;
self.regions.push(temp.pop().unwrap());
}
}
pub fn is_empty(&self) -> bool {
self.regions.last().unwrap().is_empty()
}
pub fn is_overridden(&self) -> bool {
self.regions.len() > 1
}
pub fn regions(&self) -> Values<'_, OplogIndex, OplogRegion> {
self.regions.last().unwrap().values()
}
pub fn into_regions(mut self) -> IntoValues<OplogIndex, OplogRegion> {
self.regions.pop().unwrap().into_values()
}
pub fn find_next_deleted_region(&self, from: OplogIndex) -> Option<OplogRegion> {
self.regions
.last()
.unwrap()
.range((Included(from), Unbounded))
.next()
.map(|(_, region)| region.clone())
}
pub fn is_in_deleted_region(&self, oplog_index: OplogIndex) -> bool {
self.regions
.last()
.unwrap()
.values()
.any(|region| region.contains(oplog_index))
}
}
impl Display for DeletedRegions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"[{}]",
self.regions
.last()
.unwrap()
.values()
.map(|region| region.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
}
#[cfg(feature = "protobuf")]
pub mod protobuf {
use crate::model::regions::OplogRegion;
use crate::model::OplogIndex;
impl From<golem_api_grpc::proto::golem::worker::OplogRegion> for OplogRegion {
fn from(value: golem_api_grpc::proto::golem::worker::OplogRegion) -> Self {
OplogRegion {
start: OplogIndex::from_u64(value.start),
end: OplogIndex::from_u64(value.end),
}
}
}
impl From<OplogRegion> for golem_api_grpc::proto::golem::worker::OplogRegion {
fn from(value: OplogRegion) -> Self {
golem_api_grpc::proto::golem::worker::OplogRegion {
start: value.start.into(),
end: value.end.into(),
}
}
}
}
#[cfg(test)]
mod tests {
use test_r::test;
use crate::model::oplog::OplogIndex;
use crate::model::regions::{DeletedRegionsBuilder, OplogRegion};
fn oplog_region(start: u64, end: u64) -> OplogRegion {
OplogRegion {
start: OplogIndex::from_u64(start),
end: OplogIndex::from_u64(end),
}
}
#[test]
pub fn builder_from_overlapping_ranges() {
let mut builder = DeletedRegionsBuilder::new();
builder.add(oplog_region(2, 8));
builder.add(oplog_region(2, 14));
builder.add(oplog_region(20, 22));
let deleted_regions = builder.build();
assert_eq!(
deleted_regions.into_regions().collect::<Vec<_>>(),
vec![oplog_region(2, 14), oplog_region(20, 22),]
);
}
#[test]
pub fn builder_from_initial_state() {
let mut builder =
DeletedRegionsBuilder::from_regions(vec![oplog_region(2, 8), oplog_region(20, 22)]);
builder.add(oplog_region(20, 24));
builder.add(oplog_region(30, 40));
let deleted_regions = builder.build();
assert_eq!(
deleted_regions.into_regions().collect::<Vec<_>>(),
vec![
oplog_region(2, 8),
oplog_region(20, 24),
oplog_region(30, 40),
]
);
}
#[test]
pub fn find_next_deleted_region() {
let deleted_regions =
DeletedRegionsBuilder::from_regions(vec![oplog_region(2, 8), oplog_region(20, 22)])
.build();
assert_eq!(
deleted_regions.find_next_deleted_region(OplogIndex::from_u64(0)),
Some(oplog_region(2, 8))
);
assert_eq!(
deleted_regions.find_next_deleted_region(OplogIndex::from_u64(2)),
Some(oplog_region(2, 8))
);
assert_eq!(
deleted_regions.find_next_deleted_region(OplogIndex::from_u64(8)),
Some(oplog_region(20, 22))
);
assert_eq!(
deleted_regions.find_next_deleted_region(OplogIndex::from_u64(22)),
None
);
}
}