use std::{
fmt::Debug,
sync::atomic::{AtomicUsize, Ordering},
};
use crossbeam_epoch::{self as epoch, Atomic};
use crossbeam_utils::Backoff;
use log::{debug, info, log_enabled, trace};
use epoch::{CompareExchangeError, Guard, Owned, Shared};
use std::marker::PhantomData;
use crate::local_array::tree::*;
use crate::local_array::{
bit_span::BitSpan, store::errors::PrefixStoreError,
};
use crate::prefix_record::InternalPrefixRecord;
use crate::{
impl_search_level, retrieve_node_mut_with_guard_closure,
store_node_closure,
};
use super::atomic_types::*;
use crate::AddressFamily;
#[derive(Debug)]
pub struct CustomAllocStorage<
AF: AddressFamily,
Meta: routecore::record::Meta + routecore::record::MergeUpdate,
NB: NodeBuckets<AF>,
PB: PrefixBuckets<AF, Meta>,
> {
pub(crate) buckets: NB,
pub prefixes: PB,
pub default_route_prefix_serial: AtomicUsize,
_m: PhantomData<Meta>,
_af: PhantomData<AF>,
}
impl<
'a,
AF: AddressFamily,
Meta: routecore::record::Meta,
NB: NodeBuckets<AF>,
PB: PrefixBuckets<AF, Meta>,
> CustomAllocStorage<AF, Meta, NB, PB>
{
pub(crate) fn init(
root_node: SizedStrideNode<AF>,
guard: &'a Guard,
) -> Result<Self, Box<dyn std::error::Error>> {
info!("store: initialize store");
let store = CustomAllocStorage {
buckets: NodeBuckets::<AF>::init(),
prefixes: PrefixBuckets::<AF, Meta>::init(),
default_route_prefix_serial: AtomicUsize::new(0),
_af: PhantomData,
_m: PhantomData,
};
let _retry_count = store.store_node(
StrideNodeId::dangerously_new_with_id_as_is(AF::zero(), 0),
root_node,
guard,
)?;
Ok(store)
}
pub(crate) fn acquire_new_node_id(
&self,
(prefix_net, sub_prefix_len): (AF, u8),
) -> StrideNodeId<AF> {
StrideNodeId::new_with_cleaned_id(prefix_net, sub_prefix_len)
}
#[allow(clippy::type_complexity)]
pub(crate) fn store_node(
&self,
id: StrideNodeId<AF>,
next_node: SizedStrideNode<AF>,
guard: &Guard,
) -> Result<(StrideNodeId<AF>, u32), PrefixStoreError> {
struct SearchLevel<'s, AF: AddressFamily, S: Stride> {
f: &'s dyn Fn(
&SearchLevel<AF, S>,
&NodeSet<AF, S>,
TreeBitMapNode<AF, S>,
u8, // the store level
u32, // retry_count
) -> Result<
(StrideNodeId<AF>, u32),
PrefixStoreError,
>,
}
let back_off = crossbeam_utils::Backoff::new();
let search_level_3 =
store_node_closure![Stride3; id; guard; back_off;];
let search_level_4 =
store_node_closure![Stride4; id; guard; back_off;];
let search_level_5 =
store_node_closure![Stride5; id; guard; back_off;];
if log_enabled!(log::Level::Trace) {
debug!(
"{} store: Store node {}: {:?}",
std::thread::current().name().unwrap(),
id,
next_node
);
}
match next_node {
SizedStrideNode::Stride3(new_node) => (search_level_3.f)(
&search_level_3,
self.buckets.get_store3(id),
new_node,
0,
0,
),
SizedStrideNode::Stride4(new_node) => (search_level_4.f)(
&search_level_4,
self.buckets.get_store4(id),
new_node,
0,
0,
),
SizedStrideNode::Stride5(new_node) => (search_level_5.f)(
&search_level_5,
self.buckets.get_store5(id),
new_node,
0,
0,
),
}
}
#[allow(clippy::type_complexity)]
pub(crate) fn retrieve_node_with_guard(
&'a self,
id: StrideNodeId<AF>,
guard: &'a Guard,
) -> Option<SizedStrideRef<'a, AF>> {
struct SearchLevel<'s, AF: AddressFamily, S: Stride> {
f: &'s dyn for<'a> Fn(
&SearchLevel<AF, S>,
&NodeSet<AF, S>,
u8,
&'a Guard,
)
-> Option<SizedStrideRef<'a, AF>>,
}
let search_level_3 = impl_search_level![Stride3; id;];
let search_level_4 = impl_search_level![Stride4; id;];
let search_level_5 = impl_search_level![Stride5; id;];
if log_enabled!(log::Level::Trace) {
trace!(
"{} store: Retrieve node {} from l{}",
std::thread::current().name().unwrap(),
id,
id.get_id().1
);
}
match self.get_stride_for_id(id) {
3 => (search_level_3.f)(
&search_level_3,
self.buckets.get_store3(id),
0,
guard,
),
4 => (search_level_4.f)(
&search_level_4,
self.buckets.get_store4(id),
0,
guard,
),
_ => (search_level_5.f)(
&search_level_5,
self.buckets.get_store5(id),
0,
guard,
),
}
}
#[allow(clippy::type_complexity)]
pub(crate) fn retrieve_node_mut_with_guard(
&'a self,
id: StrideNodeId<AF>,
guard: &'a Guard,
) -> Option<SizedStrideRefMut<'a, AF>> {
struct SearchLevel<'s, AF: AddressFamily, S: Stride> {
f: &'s dyn for<'a> Fn(
&SearchLevel<AF, S>,
&NodeSet<AF, S>,
// [u8; 10],
u8,
&'a Guard,
)
-> Option<SizedStrideRefMut<'a, AF>>,
}
let search_level_3 =
retrieve_node_mut_with_guard_closure![Stride3; id;];
let search_level_4 =
retrieve_node_mut_with_guard_closure![Stride4; id;];
let search_level_5 =
retrieve_node_mut_with_guard_closure![Stride5; id;];
if log_enabled!(log::Level::Trace) {
trace!(
"{} store: Retrieve node {} from l{}",
std::thread::current().name().unwrap(),
id,
id.get_id().1
);
}
match self.buckets.get_stride_for_id(id) {
3 => (search_level_3.f)(
&search_level_3,
self.buckets.get_store3(id),
0,
guard,
),
4 => (search_level_4.f)(
&search_level_4,
self.buckets.get_store4(id),
0,
guard,
),
_ => (search_level_5.f)(
&search_level_5,
self.buckets.get_store5(id),
0,
guard,
),
}
}
pub(crate) fn get_root_node_id(&self) -> StrideNodeId<AF> {
StrideNodeId::dangerously_new_with_id_as_is(AF::zero(), 0)
}
pub fn get_nodes_len(&self) -> usize {
0
}
pub(crate) fn load_default_route_prefix_serial(&self) -> usize {
self.default_route_prefix_serial.load(Ordering::SeqCst)
}
#[allow(dead_code)]
fn increment_default_route_prefix_serial(&self) -> usize {
self.default_route_prefix_serial
.fetch_add(1, Ordering::SeqCst)
}
pub(crate) fn upsert_prefix(
&self,
mut record: InternalPrefixRecord<AF, Meta>,
guard: &Guard,
) -> Result<u32, PrefixStoreError> {
let backoff = Backoff::new();
let mut retry_count = 0;
let (atomic_stored_prefix, level) = self
.non_recursive_retrieve_prefix_mut_with_guard(
PrefixId::new(record.net, record.len),
guard,
)?;
let inner_stored_prefix =
atomic_stored_prefix.0.load(Ordering::SeqCst, guard);
loop {
match inner_stored_prefix.is_null() {
true => {
if log_enabled!(log::Level::Debug) {
debug!(
"{} store: Create new super-aggregated prefix record",
std::thread::current().name().unwrap()
);
}
let new_stored_prefix =
StoredPrefix::new::<PB>(record, level);
match atomic_stored_prefix.0.compare_exchange(
Shared::null(),
Owned::new(new_stored_prefix).with_tag(1),
Ordering::SeqCst,
Ordering::SeqCst,
guard,
) {
Ok(spfx) => {
if log_enabled!(log::Level::Info) {
let StoredPrefix {
prefix,
super_agg_record,
..
} = unsafe { spfx.deref() };
info!(
"{} store: Inserted new prefix record {}/{} with {:?}",
std::thread::current().name().unwrap(),
prefix.get_net().into_ipaddr(), prefix.get_len(),
super_agg_record.get_record(guard).unwrap().meta
);
}
return Ok(retry_count);
}
Err(CompareExchangeError { current, new }) => {
if log_enabled!(log::Level::Debug) {
debug!(
"{} store: Prefix can't be inserted as new {:?}",
std::thread::current().name().unwrap(),
current
);
}
retry_count += 1;
record = *unsafe {
(*new.into_box())
.super_agg_record
.0
.load(Ordering::Relaxed, guard)
.into_owned()
.into_box()
};
continue;
}
}
}
false => {
if log_enabled!(log::Level::Debug) {
debug!(
"{} store: Found existing super-aggregated prefix record for {}/{}",
std::thread::current().name().unwrap(),
record.net,
record.len
);
}
let super_agg_record =
&unsafe { inner_stored_prefix.deref() }
.super_agg_record
.0;
let mut inner_agg_record =
super_agg_record.load(Ordering::Acquire, guard);
loop {
let prefix_record =
unsafe { inner_agg_record.as_ref() }.unwrap();
let new_record = Owned::new(InternalPrefixRecord::<
AF,
Meta,
>::new_with_meta(
record.net,
record.len,
prefix_record
.meta
.clone_merge_update(&record.meta)
.unwrap(),
))
.into_shared(guard);
match super_agg_record.compare_exchange(
inner_agg_record,
new_record,
Ordering::AcqRel,
Ordering::Acquire,
guard,
) {
Ok(_rec) => {
if log_enabled!(log::Level::Info) {
let record = unsafe { _rec.deref() };
info!(
"{} store: Updated existing prefix record {}/{} with {:?}",
std::thread::current().name().unwrap(),
record.net.into_ipaddr(), record.len,
unsafe { super_agg_record.load(Ordering::Relaxed, guard).deref() }.meta
);
}
if !inner_agg_record.is_null() {
unsafe {
guard.defer_unchecked(move || {
std::sync::atomic::fence(
Ordering::Acquire,
);
std::mem::drop(
inner_agg_record.into_owned(),
)
});
}
};
return Ok(retry_count);
}
Err(next_agg) => {
if log_enabled!(log::Level::Warn) {
debug!("{} store: Contention. Retrying prefix {:?}. Attempt {}",
std::thread::current().name().unwrap(), next_agg.current, retry_count);
}
retry_count += 1;
inner_agg_record = next_agg.current;
backoff.spin();
continue;
}
}
}
}
}
}
}
#[allow(clippy::type_complexity)]
fn non_recursive_retrieve_prefix_mut_with_guard(
&'a self,
search_prefix_id: PrefixId<AF>,
guard: &'a Guard,
) -> Result<(&'a AtomicStoredPrefix<AF, Meta>, u8), PrefixStoreError>
{
let mut prefix_set = self
.prefixes
.get_root_prefix_set(search_prefix_id.get_len());
let mut level: u8 = 0;
let mut stored_prefix = None;
loop {
let index = Self::hash_prefix_id(search_prefix_id, level);
let prefixes = prefix_set.0.load(Ordering::SeqCst, guard);
trace!("prefixes at level {}? {:?}", level, !prefixes.is_null());
let prefix_probe = if !prefixes.is_null() {
trace!("prefix set found.");
unsafe { &prefixes.deref()[index] }
} else {
trace!("no prefix set.");
return stored_prefix
.map(|sp| (sp, level))
.ok_or(PrefixStoreError::StoreNotReadyError);
};
stored_prefix = Some(unsafe { prefix_probe.assume_init_ref() });
if let Some(StoredPrefix {
prefix,
next_bucket,
..
}) = stored_prefix.unwrap().get_stored_prefix_mut(guard)
{
if search_prefix_id == *prefix {
trace!("found requested prefix {:?}", search_prefix_id);
return stored_prefix
.map(|sp| (sp, level))
.ok_or(PrefixStoreError::StoreNotReadyError);
} else {
level += 1;
prefix_set = next_bucket;
continue;
}
}
return stored_prefix
.map(|sp| (sp, level))
.ok_or(PrefixStoreError::StoreNotReadyError);
}
}
#[allow(clippy::type_complexity)]
pub(crate) fn non_recursive_retrieve_prefix_with_guard(
&'a self,
id: PrefixId<AF>,
guard: &'a Guard,
) -> (
Option<&StoredPrefix<AF, Meta>>,
Option<(
PrefixId<AF>,
u8,
&'a PrefixSet<AF, Meta>,
[Option<(&'a PrefixSet<AF, Meta>, usize)>; 26],
usize,
)>,
) {
let mut prefix_set = self.prefixes.get_root_prefix_set(id.get_len());
let mut parents = [None; 26];
let mut level: u8 = 0;
let backoff = Backoff::new();
loop {
let index = Self::hash_prefix_id(id, level);
let mut prefixes = prefix_set.0.load(Ordering::Acquire, guard);
if !prefixes.is_null() {
let prefix_ref = unsafe { &mut prefixes.deref_mut()[index] };
if let Some(stored_prefix) =
unsafe { prefix_ref.assume_init_ref() }
.get_stored_prefix(guard)
{
if let Some(pfx_rec) = stored_prefix.get_record(guard) {
if id == pfx_rec.get_prefix_id() {
trace!("found requested prefix {:?}", id);
parents[level as usize] =
Some((prefix_set, index));
return (
Some(stored_prefix),
Some((id, level, prefix_set, parents, index)),
);
};
prefix_set = &stored_prefix.next_bucket;
level += 1;
backoff.spin();
continue;
}
}
}
trace!("no prefix found for {:?}", id);
parents[level as usize] = Some((prefix_set, index));
return (None, Some((id, level, prefix_set, parents, index)));
}
}
#[allow(clippy::type_complexity)]
pub(crate) fn retrieve_prefix_with_guard(
&'a self,
prefix_id: PrefixId<AF>,
guard: &'a Guard,
) -> Option<(&StoredPrefix<AF, Meta>, &'a usize)> {
struct SearchLevel<'s, AF: AddressFamily, M: routecore::record::Meta> {
f: &'s dyn for<'a> Fn(
&SearchLevel<AF, M>,
&PrefixSet<AF, M>,
u8,
&'a Guard,
) -> Option<(
&'a StoredPrefix<AF, M>,
&'a usize,
)>,
}
let search_level = SearchLevel {
f: &|search_level: &SearchLevel<AF, Meta>,
prefix_set: &PrefixSet<AF, Meta>,
mut level: u8,
guard: &Guard| {
let index = Self::hash_prefix_id(prefix_id, level);
let prefixes = prefix_set.0.load(Ordering::SeqCst, guard);
let prefix_ref = unsafe { &prefixes.deref()[index] };
if let Some(stored_prefix) =
unsafe { prefix_ref.assume_init_ref() }
.get_stored_prefix(guard)
{
if let Some(pfx_rec) =
stored_prefix.super_agg_record.get_record(guard)
{
if prefix_id
== PrefixId::new(pfx_rec.net, pfx_rec.len)
{
trace!("found requested prefix {:?}", prefix_id);
return Some((
stored_prefix,
&stored_prefix.serial,
));
};
level += 1;
(search_level.f)(
search_level,
&stored_prefix.next_bucket,
level,
guard,
);
};
}
None
},
};
(search_level.f)(
&search_level,
self.prefixes.get_root_prefix_set(prefix_id.get_len()),
0,
guard,
)
}
#[allow(dead_code)]
fn remove_prefix(&mut self, index: PrefixId<AF>) -> Option<Meta> {
match index.is_empty() {
false => self.prefixes.remove(index),
true => None,
}
}
pub fn get_prefixes_len(&self) -> usize {
(0..=AF::BITS)
.map(|pfx_len| -> usize {
self.prefixes
.get_root_prefix_set(pfx_len)
.get_len_recursive()
})
.sum()
}
pub(crate) fn get_stride_for_id(&self, id: StrideNodeId<AF>) -> u8 {
self.buckets.get_stride_for_id(id)
}
pub fn get_stride_sizes(&self) -> &[u8] {
self.buckets.get_stride_sizes()
}
pub(crate) fn get_strides_len() -> u8 {
NB::get_strides_len()
}
pub(crate) fn get_first_stride_size() -> u8 {
NB::get_first_stride_size()
}
pub(crate) fn get_node_id_for_prefix(
&self,
prefix: &PrefixId<AF>,
) -> (StrideNodeId<AF>, BitSpan) {
let mut acc = 0;
for i in self.get_stride_sizes() {
acc += *i;
if acc >= prefix.get_len() {
let node_len = acc - i;
return (
StrideNodeId::new_with_cleaned_id(
prefix.get_net(),
node_len,
),
BitSpan::new(
((prefix.get_net() << node_len)
>> (AF::BITS - (prefix.get_len() - node_len)))
.dangerously_truncate_to_u32(),
prefix.get_len() - node_len,
),
);
}
}
panic!("prefix length for {:?} is too long", prefix);
}
pub(crate) fn hash_node_id(id: StrideNodeId<AF>, level: u8) -> usize {
let last_level = if level > 0 {
<NB>::len_to_store_bits(id.get_id().1, level - 1)
} else {
0
};
let this_level = <NB>::len_to_store_bits(id.get_id().1, level);
trace!("bits division {}", this_level);
trace!(
"calculated index ({} << {}) >> {}",
id.get_id().0,
last_level,
((<AF>::BITS - (this_level - last_level)) % <AF>::BITS) as usize
);
((id.get_id().0 << last_level)
>> ((<AF>::BITS - (this_level - last_level)) % <AF>::BITS))
.dangerously_truncate_to_u32() as usize
}
pub(crate) fn hash_prefix_id(id: PrefixId<AF>, level: u8) -> usize {
let last_level = if level > 0 {
<PB>::get_bits_for_len(id.get_len(), level - 1)
} else {
0
};
let this_level = <PB>::get_bits_for_len(id.get_len(), level);
trace!("bits division {}", this_level);
trace!(
"calculated index ({} << {}) >> {}",
id.get_net(),
last_level,
((<AF>::BITS - (this_level - last_level)) % <AF>::BITS) as usize
);
((id.get_net() << last_level)
>> ((<AF>::BITS - (this_level - last_level)) % <AF>::BITS))
.dangerously_truncate_to_u32() as usize
}
}