use std::fmt;
use std::sync::{Mutex, PoisonError};
use futures::future;
use rand::{RngExt, rngs::SmallRng};
use crate::options::{ConflictRangeType, MutationType, TransactionOption};
use crate::tuple::{PackError, Subspace};
use crate::*;
const ONE_BYTES: &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
pub enum HcaError {
FdbError(FdbError),
PackError(PackError),
InvalidDirectoryLayerMetadata,
PoisonError,
}
impl fmt::Debug for HcaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
HcaError::FdbError(err) => err.fmt(f),
HcaError::PackError(err) => err.fmt(f),
HcaError::InvalidDirectoryLayerMetadata => {
write!(f, "invalid directory layer metadata")
}
HcaError::PoisonError => write!(f, "mutex poisoned"),
}
}
}
impl From<FdbError> for HcaError {
fn from(err: FdbError) -> Self {
Self::FdbError(err)
}
}
impl From<PackError> for HcaError {
fn from(err: PackError) -> Self {
Self::PackError(err)
}
}
impl<T> From<PoisonError<T>> for HcaError {
fn from(_err: PoisonError<T>) -> Self {
Self::PoisonError
}
}
impl TransactError for HcaError {
fn try_into_fdb_error(self) -> Result<FdbError, Self> {
match self {
HcaError::FdbError(err) => Ok(err),
_ => Err(self),
}
}
}
#[derive(Debug)]
pub struct HighContentionAllocator {
counters: Subspace,
recent: Subspace,
allocation_mutex: Mutex<()>,
}
impl HighContentionAllocator {
pub fn new(subspace: Subspace) -> HighContentionAllocator {
HighContentionAllocator {
counters: subspace.subspace(&0i64),
recent: subspace.subspace(&1i64),
allocation_mutex: Mutex::new(()),
}
}
pub async fn allocate(&self, trx: &Transaction) -> Result<i64, HcaError> {
let (begin, end) = self.counters.range();
let begin = KeySelector::first_greater_or_equal(begin);
let end = KeySelector::first_greater_than(end);
let counters_range = RangeOption {
begin,
end,
limit: Some(1),
reverse: true,
..RangeOption::default()
};
let mut rng: SmallRng = rand::make_rng();
loop {
let kvs = trx.get_range(&counters_range, 1, true).await?;
let mut start: i64 = if let Some(first) = kvs.first() {
self.counters.unpack(first.key())?
} else {
0
};
let mut window_advanced = false;
let window = loop {
let counters_start = self.counters.subspace(&start);
let count_future = {
let _mutex_guard = self.allocation_mutex.lock()?;
if window_advanced {
trx.clear_range(self.counters.bytes(), counters_start.bytes());
trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
trx.clear_range(self.recent.bytes(), self.recent.subspace(&start).bytes());
};
trx.atomic_op(counters_start.bytes(), ONE_BYTES, MutationType::Add);
trx.get(counters_start.bytes(), true)
};
let count_value = count_future.await?;
let count = if let Some(count_value) = count_value {
if count_value.len() != 8 {
return Err(HcaError::InvalidDirectoryLayerMetadata);
}
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&count_value);
i64::from_le_bytes(bytes)
} else {
0
};
let window = Self::window_size(start);
if count * 2 < window {
break window;
}
start += window;
window_advanced = true;
};
loop {
let candidate: i64 = rng.random_range(start..start + window);
let recent_candidate = self.recent.subspace(&candidate);
let (latest_counter, candidate_value) = {
let _mutex_guard = self.allocation_mutex.lock()?;
let latest_counter = trx.get_range(&counters_range, 1, true);
let candidate_value = trx.get(recent_candidate.bytes(), false);
trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
trx.set(recent_candidate.bytes(), &[]);
(latest_counter, candidate_value)
};
let (latest_counter, candidate_value) =
future::try_join(latest_counter, candidate_value).await?;
let current_window_start: i64 = if let Some(first) = latest_counter.first() {
self.counters.unpack(first.key())?
} else {
0
};
if current_window_start > start {
break;
}
if candidate_value.is_none() {
let mut after = recent_candidate.bytes().to_vec();
after.push(0x00);
trx.add_conflict_range(
recent_candidate.bytes(),
&after,
ConflictRangeType::Write,
)?;
return Ok(candidate);
}
}
}
}
fn window_size(start: i64) -> i64 {
match start {
_ if start < 255 => 64,
_ if start < 65535 => 1024,
_ => 8192,
}
}
}