use std::{fmt::Debug, mem::MaybeUninit, sync::atomic::Ordering};
use crossbeam_epoch::{self as epoch, Atomic};
use log::{debug, log_enabled, trace};
use epoch::{Guard, Owned};
use crate::local_array::tree::*;
use crate::prefix_record::InternalPrefixRecord;
use crate::AddressFamily;
#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct NodeSet<AF: AddressFamily, S: Stride>(
pub Atomic<[MaybeUninit<Atomic<StoredNode<AF, S>>>]>,
);
#[derive(Debug)]
pub struct StoredNode<AF, S>
where
Self: Sized,
S: Stride,
AF: AddressFamily,
{
pub(crate) node_id: StrideNodeId<AF>,
pub(crate) node: TreeBitMapNode<AF, S>,
pub(crate) node_set: NodeSet<AF, S>,
}
impl<AF: AddressFamily, S: Stride> NodeSet<AF, S> {
pub fn init(size: usize) -> Self {
if log_enabled!(log::Level::Debug) {
debug!(
"{} store: creating space for {} nodes",
std::thread::current().name().unwrap(),
&size
);
}
let mut l =
Owned::<[MaybeUninit<Atomic<StoredNode<AF, S>>>]>::init(size);
for i in 0..size {
l[i] = MaybeUninit::new(Atomic::null());
}
NodeSet(l.into())
}
}
#[derive(Debug)]
pub struct StoredPrefix<AF: AddressFamily, M: routecore::record::Meta> {
pub serial: usize,
pub prefix: PrefixId<AF>,
pub(crate) super_agg_record: AtomicSuperAggRecord<AF, M>,
pub next_bucket: PrefixSet<AF, M>,
}
impl<AF: AddressFamily, M: routecore::record::Meta> StoredPrefix<AF, M> {
pub fn new<PB: PrefixBuckets<AF, M>>(
record: InternalPrefixRecord<AF, M>,
level: u8,
) -> Self {
let pfx_id = PrefixId::new(record.net, record.len);
let this_level = PB::get_bits_for_len(pfx_id.get_len(), level);
let next_level = PB::get_bits_for_len(pfx_id.get_len(), level + 1);
trace!("this level {} next level {}", this_level, next_level);
let next_bucket: PrefixSet<AF, M> = if next_level > 0 {
debug!(
"{} store: INSERT with new bucket of size {} at prefix len {}",
std::thread::current().name().unwrap(),
1 << (next_level - this_level),
pfx_id.get_len()
);
PrefixSet::init((1 << (next_level - this_level)) as usize)
} else {
debug!(
"{} store: INSERT at LAST LEVEL with empty bucket at prefix len {}",
std::thread::current().name().unwrap(),
pfx_id.get_len()
);
PrefixSet::empty()
};
StoredPrefix {
serial: 1,
prefix: record.get_prefix_id(),
super_agg_record: AtomicSuperAggRecord::<AF, M>::new(
record.get_prefix_id(),
record.meta,
),
next_bucket,
}
}
pub(crate) fn get_record<'a>(
&'a self,
guard: &'a Guard,
) -> Option<&'a InternalPrefixRecord<AF, M>> {
self.super_agg_record.get_record(guard)
}
}
#[derive(Debug)]
pub(crate) struct AtomicSuperAggRecord<
AF: AddressFamily,
M: routecore::record::Meta,
>(pub Atomic<InternalPrefixRecord<AF, M>>);
impl<AF: AddressFamily, M: routecore::record::Meta>
AtomicSuperAggRecord<AF, M>
{
pub fn new(prefix: PrefixId<AF>, record: M) -> Self {
debug!(
"{} store: create new stored prefix record {}/{} with {}",
std::thread::current().name().unwrap(),
prefix.get_net().into_ipaddr(),
prefix.get_len(),
record
);
AtomicSuperAggRecord(Atomic::new(InternalPrefixRecord {
net: prefix.get_net(),
len: prefix.get_len(),
meta: record,
}))
}
pub fn get_record<'a>(
&self,
guard: &'a Guard,
) -> Option<&'a InternalPrefixRecord<AF, M>> {
let rec = self.0.load(Ordering::SeqCst, guard);
match rec.is_null() {
true => None,
false => Some(unsafe { rec.as_ref() }.unwrap()),
}
}
}
#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct AtomicStoredPrefix<AF: AddressFamily, M: routecore::record::Meta>(
pub Atomic<StoredPrefix<AF, M>>,
);
impl<AF: AddressFamily, Meta: routecore::record::Meta>
AtomicStoredPrefix<AF, Meta>
{
pub(crate) fn empty() -> Self {
AtomicStoredPrefix(Atomic::null())
}
pub(crate) fn is_empty(&self, guard: &Guard) -> bool {
let pfx = self.0.load(Ordering::SeqCst, guard);
pfx.is_null()
|| unsafe { pfx.deref() }
.super_agg_record
.0
.load(Ordering::SeqCst, guard)
.is_null()
}
pub(crate) fn get_stored_prefix<'a>(
&'a self,
guard: &'a Guard,
) -> Option<&'a StoredPrefix<AF, Meta>> {
let pfx = self.0.load(Ordering::Acquire, guard);
match pfx.is_null() {
true => None,
false => Some(unsafe { pfx.deref() }),
}
}
pub(crate) fn get_stored_prefix_mut<'a>(
&'a self,
guard: &'a Guard,
) -> Option<&'a StoredPrefix<AF, Meta>> {
let mut pfx = self.0.load(Ordering::SeqCst, guard);
match pfx.is_null() {
true => None,
false => Some(unsafe { pfx.deref_mut() }),
}
}
#[allow(dead_code)]
pub(crate) fn get_serial(&self) -> usize {
let guard = &epoch::pin();
unsafe { self.0.load(Ordering::SeqCst, guard).into_owned() }.serial
}
pub(crate) fn get_prefix_id(&self) -> PrefixId<AF> {
let guard = &epoch::pin();
match self.get_stored_prefix(guard) {
None => {
panic!("AtomicStoredPrefix::get_prefix_id: empty prefix");
}
Some(pfx) => pfx.prefix,
}
}
pub fn get_agg_record<'a>(
&'a self,
guard: &'a Guard,
) -> Option<&InternalPrefixRecord<AF, Meta>> {
self.get_stored_prefix(guard).map(|stored_prefix| unsafe {
stored_prefix
.super_agg_record
.0
.load(Ordering::SeqCst, guard)
.deref()
})
}
pub(crate) fn get_next_bucket<'a>(
&'a self,
guard: &'a Guard,
) -> Option<&PrefixSet<AF, Meta>> {
if let Some(stored_prefix) = self.get_stored_prefix(guard) {
if !&stored_prefix
.next_bucket
.0
.load(Ordering::SeqCst, guard)
.is_null()
{
Some(&stored_prefix.next_bucket)
} else {
None
}
} else {
None
}
}
}
pub trait NodeBuckets<AF: AddressFamily> {
fn init() -> Self;
fn len_to_store_bits(len: u8, level: u8) -> u8;
fn get_stride_sizes(&self) -> &[u8];
fn get_stride_for_id(&self, id: StrideNodeId<AF>) -> u8;
fn get_store3(&self, id: StrideNodeId<AF>) -> &NodeSet<AF, Stride3>;
fn get_store4(&self, id: StrideNodeId<AF>) -> &NodeSet<AF, Stride4>;
fn get_store5(&self, id: StrideNodeId<AF>) -> &NodeSet<AF, Stride5>;
fn get_strides_len() -> u8;
fn get_first_stride_size() -> u8;
}
pub trait PrefixBuckets<AF: AddressFamily, M: routecore::record::Meta>
where
Self: Sized,
{
fn init() -> Self;
fn remove(&mut self, id: PrefixId<AF>) -> Option<M>;
fn get_root_prefix_set(&self, len: u8) -> &'_ PrefixSet<AF, M>;
fn get_bits_for_len(len: u8, level: u8) -> u8;
}
#[derive(Debug)]
#[repr(align(8))]
pub struct PrefixSet<AF: AddressFamily, M: routecore::record::Meta>(
pub Atomic<[MaybeUninit<AtomicStoredPrefix<AF, M>>]>,
);
impl<AF: AddressFamily, M: routecore::record::Meta> PrefixSet<AF, M> {
pub fn init(size: usize) -> Self {
let mut l =
Owned::<[MaybeUninit<AtomicStoredPrefix<AF, M>>]>::init(size);
trace!("creating space for {} prefixes in prefix_set", &size);
for i in 0..size {
l[i] = MaybeUninit::new(AtomicStoredPrefix::empty());
}
PrefixSet(l.into())
}
pub fn get_len_recursive(&self) -> usize {
fn recurse_len<AF: AddressFamily, M: routecore::record::Meta>(
start_set: &PrefixSet<AF, M>,
) -> usize {
let mut size: usize = 0;
let guard = &epoch::pin();
let start_set = start_set.0.load(Ordering::SeqCst, guard);
for p in unsafe { start_set.deref() } {
let pfx = unsafe { p.assume_init_ref() };
if !pfx.is_empty(guard) {
size += 1;
trace!(
"recurse found pfx {:?} cur size {}",
pfx.get_prefix_id(),
size
);
if let Some(next_bucket) = pfx.get_next_bucket(guard) {
trace!("found next bucket");
size += recurse_len(next_bucket);
}
}
}
size
}
recurse_len(self)
}
pub(crate) fn get_by_index<'a>(
&'a self,
index: usize,
guard: &'a Guard,
) -> &'a AtomicStoredPrefix<AF, M> {
assert!(!self.0.load(Ordering::SeqCst, guard).is_null());
unsafe {
self.0.load(Ordering::SeqCst, guard).deref()[index as usize]
.assume_init_ref()
}
}
pub(crate) fn empty() -> Self {
PrefixSet(Atomic::null())
}
}