use crate::{
Error, NumEntries, Runtime,
circuit::{
max_level0_batch_size_records,
metadata::{
BLOOM_FILTER_BITS_PER_KEY, BLOOM_FILTER_HIT_RATE_PERCENT, BLOOM_FILTER_HITS_COUNT,
BLOOM_FILTER_MISSES_COUNT, BLOOM_FILTER_SIZE_BYTES, COMPLETED_MERGES,
LOOSE_BATCHES_COUNT, LOOSE_MEMORY_RECORDS_COUNT, LOOSE_STORAGE_RECORDS_COUNT,
MERGE_BACKPRESSURE_WAIT_TIME_SECONDS, MERGE_REDUCTION_PERCENT, MERGING_BATCHES_COUNT,
MERGING_MEMORY_RECORDS_COUNT, MERGING_SIZE_BYTES, MERGING_STORAGE_RECORDS_COUNT,
MetaItem, MetricId, MetricReading, NEGATIVE_WEIGHT_COUNT, OperatorMeta,
RANGE_FILTER_HIT_RATE_PERCENT, RANGE_FILTER_HITS_COUNT, RANGE_FILTER_MISSES_COUNT,
RANGE_FILTER_SIZE_BYTES, SPINE_BATCHES_COUNT, SPINE_STORAGE_SIZE_BYTES,
},
metrics::COMPACTION_STALL_TIME_NANOSECONDS,
negative_weight_multiplier,
runtime::{TOKIO_BUFFER_CACHE, TOKIO_WORKER_INDEX},
},
dynamic::{DynVec, Factory, Weight},
samply::SamplySpan,
storage::{
buffer_cache::{BufferCache, CacheStats},
filter_stats::FilterStats,
},
time::Timestamp,
trace::{
Batch, BatchReader, BatchReaderFactories, Builder, Cursor, Filter, GroupFilter, Trace,
cursor::{CursorList, Position},
merge_batches,
ord::fallback::pick_insert_destination,
spine_async::{
list_merger::ArcListMerger, push_merger::ArcPushMerger, snapshot::FetchList,
},
},
};
use crate::storage::file::{Deserializer, to_bytes};
use crate::trace::CommittedSpine;
use enum_map::EnumMap;
use feldera_buffer_cache::ThreadType;
use feldera_storage::{
FileCommitter, StoragePath,
fbuf::slab::{FBufSlabs, TOKIO_FBUF_SLABS},
};
use feldera_types::memory_pressure::MemoryPressure;
use feldera_types::{checkpoint::PSpineBatches, config::dev_tweaks::MergerType};
use ouroboros::self_referencing;
use rand::Rng;
use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, ser::Serializer};
use size_of::{Context, HumanBytes, SizeOf};
use std::{
borrow::Cow,
future::Future,
sync::{Arc, MutexGuard, atomic::AtomicIsize},
};
use std::{
collections::BTreeMap,
time::{Duration, Instant},
};
use std::{collections::VecDeque, sync::atomic::Ordering};
use std::{
fmt::{self, Debug, Display, Formatter},
ops::DerefMut,
sync::Condvar,
};
use std::{ops::RangeInclusive, sync::Mutex};
use textwrap::indent;
use tokio::{sync::Notify, task::yield_now};
mod index_set;
mod list_merger;
mod push_merger;
mod snapshot;
pub use snapshot::{BatchReaderWithSnapshot, SpineSnapshot, WithSnapshot};
use super::{BatchLocation, cursor::CursorFactory};
pub use list_merger::ListMerger;
pub(crate) const MAX_LEVELS: usize = 9;
static LEVEL_NAMES: [&str; MAX_LEVELS] = [
"merge-0", "merge-1", "merge-2", "merge-3", "merge-4", "merge-5", "merge-6", "merge-7",
"merge-8",
];
pub(crate) const MAX_LEVEL0_BATCH_SIZE_RECORDS: u16 = 14_999;
fn scope_tokio_merger_locals<F>(
worker_index: usize,
buffer_cache: Arc<BufferCache>,
slab_allocator: Arc<FBufSlabs>,
future: F,
) -> impl Future<Output = F::Output>
where
F: Future,
{
TOKIO_WORKER_INDEX.scope(
worker_index,
TOKIO_BUFFER_CACHE.scope(buffer_cache, TOKIO_FBUF_SLABS.scope(slab_allocator, future)),
)
}
impl<B: Batch + Send + Sync> From<(Vec<String>, &Spine<B>)> for CommittedSpine {
fn from((batches, spine): (Vec<String>, &Spine<B>)) -> Self {
CommittedSpine {
batches,
merged: Vec::new(),
effort: 0,
dirty: spine.dirty,
}
}
}
#[derive(Clone, SizeOf)]
struct Slot<B>
where
B: Batch,
{
merging_batches: Option<Vec<Arc<B>>>,
loose_batches: VecDeque<Arc<B>>,
elapsed: Duration,
n_merged: usize,
n_merged_batches: usize,
n_steps: usize,
#[size_of(skip)]
notify: Arc<Notify>,
}
impl<B> Default for Slot<B>
where
B: Batch,
{
fn default() -> Self {
Self {
merging_batches: None,
loose_batches: VecDeque::new(),
elapsed: Duration::ZERO,
n_merged: 0,
n_merged_batches: 0,
n_steps: 0,
notify: Arc::new(Notify::new()),
}
}
}
impl<B> Slot<B>
where
B: Batch,
{
fn try_start_merge(&mut self, level: usize) -> Option<Vec<Arc<B>>> {
const MERGE_COUNTS: [RangeInclusive<usize>; MAX_LEVELS] = [
8..=64,
8..=64,
3..=64,
3..=64,
3..=64,
3..=64,
2..=64,
2..=64,
2..=64,
];
let merge_counts = &MERGE_COUNTS[level];
if self.merging_batches.is_none()
&& (self.loose_batches.len() >= *merge_counts.start()
|| self.must_relieve_memory_pressure())
{
let n = std::cmp::min(*merge_counts.end(), self.loose_batches.len());
let batches = self.loose_batches.drain(..n).collect::<Vec<_>>();
self.merging_batches = Some(batches.clone());
Some(batches)
} else {
None
}
}
fn must_relieve_memory_pressure(&self) -> bool {
if let Some(memory_pressure) = Runtime::memory_pressure() {
memory_pressure >= MemoryPressure::High
&& self
.loose_batches
.iter()
.any(|batch| batch.location() == BatchLocation::Memory)
} else {
false
}
}
fn n_batches(&self) -> usize {
self.all_batches().count()
}
fn all_batches(&self) -> impl Iterator<Item = &Arc<B>> {
self.loose_batches
.iter()
.chain(self.merging_batches.iter().flatten())
}
}
#[derive(SizeOf)]
struct SharedState<B>
where
B: Batch,
{
#[size_of(skip)]
factories: B::Factories,
#[size_of(skip)]
key_filter: Option<Filter<B::Key>>,
#[size_of(skip)]
value_filter: Option<GroupFilter<B::Val>>,
#[size_of(skip)]
frontier: B::Time,
slots: [Slot<B>; MAX_LEVELS],
#[size_of(skip)]
request_exit: bool,
#[size_of(skip)]
spine_stats: SpineStats,
max_level0_batch_size_records: usize,
}
impl<B> SharedState<B>
where
B: Batch,
{
pub fn new(factories: &B::Factories) -> Self {
let max_level0_batch_size_records = max_level0_batch_size_records() as usize;
assert!(
max_level0_batch_size_records > 0,
"max_level0_batch_size_records must be greater than 0"
);
assert!(
max_level0_batch_size_records <= 99_999,
"max_level0_batch_size_records must be less than or equal to 99_999"
);
Self {
factories: factories.clone(),
key_filter: None,
value_filter: None,
frontier: B::Time::minimum(),
slots: std::array::from_fn(|_| Slot::default()),
request_exit: false,
spine_stats: SpineStats::default(),
max_level0_batch_size_records,
}
}
fn add_batches(&mut self, batches: impl IntoIterator<Item = Arc<B>>, merge: bool) {
for batch in batches {
if !batch.is_empty() {
self.add_batch(batch, merge);
}
}
}
fn add_batch(&mut self, batch: Arc<B>, merge: bool) {
debug_assert!(!batch.is_empty());
let level = Spine::<B>::size_to_level(&batch, self.max_level0_batch_size_records, merge);
self.slots[level].loose_batches.push_back(batch);
self.slots[level].notify.notify_one();
}
fn should_apply_backpressure(&self) -> bool {
const HIGH_THRESHOLD: usize = 128;
self.slots
.iter()
.map(|s| s.loose_batches.len())
.sum::<usize>()
>= HIGH_THRESHOLD
}
fn should_relieve_backpressure(&self) -> bool {
const LOWER_THRESHOLD: usize = 127;
self.slots
.iter()
.map(|s| s.loose_batches.len())
.sum::<usize>()
<= LOWER_THRESHOLD
}
fn get_filters(&self) -> (Option<Filter<B::Key>>, Option<GroupFilter<B::Val>>) {
(self.key_filter.clone(), self.value_filter.clone())
}
fn get_batches(&self) -> Vec<Arc<B>> {
let mut batches = Vec::with_capacity(self.slots.iter().map(Slot::n_batches).sum());
for slot in &self.slots {
batches.extend(slot.all_batches().cloned());
}
batches
}
fn get_snapshot(&self) -> SpineSnapshot<B> {
SpineSnapshot::with_batches(&self.factories, self.get_batches())
}
fn take_loose_batches(&mut self) -> Vec<Arc<B>> {
let mut loose_batches =
Vec::with_capacity(self.slots.iter().map(|slot| slot.loose_batches.len()).sum());
for slot in &mut self.slots {
loose_batches.extend(slot.loose_batches.drain(..));
}
loose_batches
}
fn is_merging(&self) -> bool {
self.slots.iter().any(|slot| slot.merging_batches.is_some())
}
fn merge_complete(
&mut self,
level: usize,
new_batch: Arc<B>,
start: Instant,
elapsed: Duration,
n_steps: usize,
) {
let slot = &mut self.slots[level];
let batches = slot.merging_batches.take().unwrap();
slot.n_merged += 1;
slot.n_merged_batches += batches.len();
slot.elapsed += elapsed;
slot.n_steps += n_steps;
let cache_stats = batches.iter().fold(CacheStats::default(), |stats, batch| {
stats + batch.cache_stats()
});
let pre_len = batches.iter().map(|b| b.len()).sum();
let post_len = new_batch.len();
self.spine_stats
.report_merge(pre_len, post_len, cache_stats);
SamplySpan::new(LEVEL_NAMES[level])
.with_category("Spine")
.with_start(start)
.with_tooltip(|| {
format!(
"Merged {} batches ({pre_len} -> {post_len}) in {n_steps} steps using {:.1} ms CPU",
batches.len(),
elapsed.as_secs_f64() * 1000.0
)
})
.record();
self.add_batches([new_batch], true);
}
fn metadata_snapshot(&self) -> ([Slot<B>; MAX_LEVELS], SpineStats) {
(self.slots.clone(), self.spine_stats.clone())
}
}
struct AsyncMerger<B>
where
B: Batch,
{
state: Arc<Mutex<SharedState<B>>>,
no_backpressure: Arc<Condvar>,
idle: Arc<Condvar>,
}
enum WorkerStatus {
Yield,
Idle,
Done,
}
impl<B> AsyncMerger<B>
where
B: Batch,
{
fn new(factories: &B::Factories) -> Self {
let idle = Arc::new(Condvar::new());
let no_backpressure = Arc::new(Condvar::new());
let state = Arc::new(Mutex::new(SharedState::new(factories)));
let worker_state = Arc::new(WorkerState::default());
for level in 0..MAX_LEVELS {
let worker_state = worker_state.clone();
let state = state.clone();
let idle = idle.clone();
let no_backpressure = no_backpressure.clone();
let worker_index = Runtime::worker_index();
let runtime = Runtime::runtime()
.expect("Attempting to create a spine merger outside of a DBSP runtime");
let buffer_cache =
runtime.get_buffer_cache(Runtime::local_worker_offset(), ThreadType::Background);
let slab_allocator = runtime
.get_fbuf_slab_allocator(Runtime::local_worker_offset(), ThreadType::Background);
let memory_pressure_notify = runtime.memory_pressure_notify();
runtime.tokio_merger_runtime().spawn(async move {
scope_tokio_merger_locals(worker_index, buffer_cache, slab_allocator, async move {
let state = Arc::clone(&state);
let idle = Arc::clone(&idle);
let no_backpressure = Arc::clone(&no_backpressure);
let mut merger = None;
let merger_type = Runtime::with_dev_tweaks(|tweaks| tweaks.merger());
let notify = state.lock().unwrap().slots[level].notify.clone();
loop {
let status = Self::run(
&worker_state,
level,
&mut merger,
merger_type,
&state,
&idle,
&no_backpressure,
);
match status {
WorkerStatus::Yield => yield_now().await,
WorkerStatus::Idle => {
tokio::select! {
_ = notify.notified() => {}
_ = memory_pressure_notify.notified() => {}
}
}
WorkerStatus::Done => {
break;
}
}
}
})
.await;
});
}
Self {
state,
idle,
no_backpressure,
}
}
fn set_key_filter(&self, key_filter: &Filter<B::Key>) {
self.state.lock().unwrap().key_filter = Some(key_filter.clone());
}
fn set_value_filter(&self, value_filter: &GroupFilter<B::Val>) {
self.state.lock().unwrap().value_filter = Some(value_filter.clone());
}
fn set_frontier(&self, frontier: &B::Time) {
self.state.lock().unwrap().frontier = frontier.clone();
}
fn add_batch(&self, batch: Arc<B>, merge: bool) {
debug_assert!(!batch.is_empty());
let mut state = self.state.lock().unwrap();
state.add_batch(batch, merge);
if state.should_apply_backpressure() {
let start = Instant::now();
let mut state = self.no_backpressure.wait(state).unwrap();
state.spine_stats.backpressure_wait += start.elapsed();
COMPACTION_STALL_TIME_NANOSECONDS
.fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
SamplySpan::new("backpressure-wait")
.with_category("Spine")
.with_start(start)
.record();
}
}
fn add_batches(&self, batches: impl IntoIterator<Item = Arc<B>>, merge: bool) {
self.state.lock().unwrap().add_batches(batches, merge);
}
fn get_batches(&self) -> Vec<Arc<B>> {
self.state.lock().unwrap().get_batches()
}
fn pause(&self) -> Vec<Arc<B>> {
let mut state = self.state.lock().unwrap();
let mut batches = state.take_loose_batches();
let mut state = self
.idle
.wait_while(state, |state| state.is_merging())
.unwrap();
batches.extend(state.take_loose_batches());
batches
}
fn pause_new_merges(&self) -> (Vec<Arc<B>>, Vec<Arc<B>>) {
let mut state = self.state.lock().unwrap();
let not_merging = state.take_loose_batches();
let merging = state.get_batches();
(not_merging, merging)
}
fn resume(&self, batches: impl IntoIterator<Item = Arc<B>>) {
self.add_batches(batches, false);
}
fn metadata(&self, meta: &mut OperatorMeta) {
fn class_batch_count_label(class: &str) -> MetricId {
match class {
"loose" => LOOSE_BATCHES_COUNT,
"merging" => MERGING_BATCHES_COUNT,
_ => panic!("invalid class: {class}"),
}
}
fn class_tuple_count_label(class: &str, location: BatchLocation) -> MetricId {
match (class, location) {
("loose", BatchLocation::Memory) => LOOSE_MEMORY_RECORDS_COUNT,
("loose", BatchLocation::Storage) => LOOSE_STORAGE_RECORDS_COUNT,
("merging", BatchLocation::Memory) => MERGING_MEMORY_RECORDS_COUNT,
("merging", BatchLocation::Storage) => MERGING_STORAGE_RECORDS_COUNT,
_ => panic!("invalid class: {class} and location: {location:?}"),
}
}
let (mut slots, spine_stats) = self.state.lock().unwrap().metadata_snapshot();
for (index, slot) in slots.iter_mut().enumerate() {
for (class, batches) in [
("loose", slot.loose_batches.make_contiguous() as &_),
(
"merging",
slot.merging_batches
.as_ref()
.unwrap_or(&Vec::new())
.as_slice(),
),
] {
if !batches.is_empty() {
let mut tuple_counts = EnumMap::<BatchLocation, usize>::default();
for batch in batches {
tuple_counts[batch.location()] += batch.len();
}
let mut facts = Vec::with_capacity(3);
facts.push(MetricReading::new(
class_batch_count_label(class),
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Count(batches.len()),
));
for (location, count) in tuple_counts {
if count > 0 {
facts.push(MetricReading::new(
class_tuple_count_label(class, location),
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Count(count),
));
}
}
meta.extend(facts);
}
}
if slot.n_merged > 0 {
meta.extend([MetricReading::new(
COMPLETED_MERGES,
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Map(BTreeMap::from([
(Cow::Borrowed("merges"), MetaItem::Count(slot.n_merged)),
(
Cow::Borrowed("batches"),
MetaItem::Count(slot.n_merged_batches),
),
(Cow::Borrowed("steps"), MetaItem::Count(slot.n_steps)),
(
Cow::Borrowed("avg_step_time"),
MetaItem::Duration(slot.elapsed / slot.n_steps as u32),
),
])),
)]);
}
let mut negative_weight_count = 0;
let mut has_negative_weight_counts = false;
for batch in slot.all_batches() {
if let Some(count) = batch.negative_weight_count() {
negative_weight_count += count;
has_negative_weight_counts = true;
}
}
if has_negative_weight_counts {
meta.extend([MetricReading::new(
NEGATIVE_WEIGHT_COUNT,
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Count(negative_weight_count as usize),
)]);
}
}
let mut batches = Vec::new();
for slot in slots {
batches.extend(slot.loose_batches.into_iter().map(|b| (b, false)));
if let Some(merging_batches) = slot.merging_batches {
batches.extend(merging_batches.into_iter().map(|b| (b, true)));
}
}
let n_batches = batches.len();
let n_merging = batches.iter().filter(|(_batch, merging)| *merging).count();
let mut cache_stats = spine_stats.cache_stats;
let mut storage_size = 0;
let mut merging_size = 0;
let mut membership_filter_stats = FilterStats::default();
let mut range_filter_stats = FilterStats::default();
let mut storage_records = 0;
for (batch, merging) in batches {
cache_stats += batch.cache_stats();
membership_filter_stats += batch.membership_filter_stats();
range_filter_stats += batch.range_filter_stats();
let on_storage = batch.location() == BatchLocation::Storage;
if on_storage || merging {
let size = batch.approximate_byte_size();
if on_storage {
storage_size += size;
storage_records += batch.key_count();
}
if merging {
merging_size += size;
}
}
}
if storage_records > 0 {
let bits_per_key =
membership_filter_stats.size_byte as f64 * 8.0 / storage_records as f64;
let bits_per_key = bits_per_key as usize;
meta.extend(metadata! {
BLOOM_FILTER_BITS_PER_KEY => MetaItem::Int(bits_per_key)
})
}
meta.extend([
MetricReading::new(SPINE_BATCHES_COUNT, Vec::new(), MetaItem::Count(n_batches)),
MetricReading::new(
SPINE_STORAGE_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(storage_size),
),
MetricReading::new(
MERGING_BATCHES_COUNT,
Vec::new(),
MetaItem::Count(n_merging),
),
MetricReading::new(
MERGING_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(merging_size),
),
MetricReading::new(
MERGE_REDUCTION_PERCENT,
Vec::new(),
spine_stats.merge_reduction(),
),
MetricReading::new(
MERGE_BACKPRESSURE_WAIT_TIME_SECONDS,
Vec::new(),
MetaItem::Duration(spine_stats.backpressure_wait),
),
MetricReading::new(
BLOOM_FILTER_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(membership_filter_stats.size_byte),
),
MetricReading::new(
BLOOM_FILTER_HITS_COUNT,
Vec::new(),
MetaItem::Count(membership_filter_stats.hits),
),
MetricReading::new(
BLOOM_FILTER_MISSES_COUNT,
Vec::new(),
MetaItem::Count(membership_filter_stats.misses),
),
MetricReading::new(
BLOOM_FILTER_HIT_RATE_PERCENT,
Vec::new(),
MetaItem::Percent {
numerator: membership_filter_stats.hits as u64,
denominator: membership_filter_stats.hits as u64
+ membership_filter_stats.misses as u64,
},
),
MetricReading::new(
RANGE_FILTER_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(range_filter_stats.size_byte),
),
MetricReading::new(
RANGE_FILTER_HITS_COUNT,
Vec::new(),
MetaItem::Count(range_filter_stats.hits),
),
MetricReading::new(
RANGE_FILTER_MISSES_COUNT,
Vec::new(),
MetaItem::Count(range_filter_stats.misses),
),
MetricReading::new(
RANGE_FILTER_HIT_RATE_PERCENT,
Vec::new(),
MetaItem::Percent {
numerator: range_filter_stats.hits as u64,
denominator: range_filter_stats.hits as u64 + range_filter_stats.misses as u64,
},
),
]);
cache_stats.metadata(meta);
}
fn maybe_relieve_backpressure(
no_backpressure: &Arc<Condvar>,
state: &MutexGuard<SharedState<B>>,
) {
if state.should_relieve_backpressure() {
Self::relieve_backpressure(no_backpressure);
}
}
fn relieve_backpressure(no_backpressure: &Arc<Condvar>) {
no_backpressure.notify_all();
}
fn run(
worker_state: &Arc<WorkerState>,
level: usize,
opt_merger: &mut Option<Merge<B>>,
merger_type: MergerType,
state: &Arc<Mutex<SharedState<B>>>,
idle: &Arc<Condvar>,
no_backpressure: &Arc<Condvar>,
) -> WorkerStatus {
let ((key_filter, value_filter), frontier) = {
let shared = state.lock().unwrap();
(shared.get_filters(), shared.frontier.clone())
};
if let Some(merger) = opt_merger.as_mut() {
let fuel = if level == 0 {
isize::MAX
} else {
worker_state.avg_slot0_merge_fuel()
};
merger.merge(&frontier, fuel);
if merger.done {
if level == 0 {
worker_state.report_slot0_merge(merger.fuel);
}
let merger = opt_merger.take().unwrap();
let new_batch = Arc::new(merger.builder.done());
state.lock().unwrap().merge_complete(
level,
new_batch,
merger.start,
merger.elapsed,
merger.n_steps,
);
}
}
let start_merge = state.lock().unwrap().slots[level].try_start_merge(level);
let snapshot = if value_filter
.as_ref()
.map(|f| f.requires_snapshot())
.unwrap_or(false)
{
Some(Arc::new(state.lock().unwrap().get_snapshot()))
} else {
None
};
if let Some(batches) = start_merge {
*opt_merger = Some(Merge::new(
merger_type,
batches,
&key_filter,
&value_filter,
snapshot.clone(),
));
}
let state = state.lock().unwrap();
if state.request_exit {
Self::relieve_backpressure(no_backpressure);
WorkerStatus::Done
} else if opt_merger.is_none() {
Self::maybe_relieve_backpressure(no_backpressure, &state);
idle.notify_all(); WorkerStatus::Idle
} else {
Self::maybe_relieve_backpressure(no_backpressure, &state);
WorkerStatus::Yield
}
}
}
impl<B> Drop for AsyncMerger<B>
where
B: Batch,
{
fn drop(&mut self) {
self.state.lock().unwrap().request_exit = true;
for level in 0..MAX_LEVELS {
self.state.lock().unwrap().slots[level].notify.notify_one();
}
}
}
#[derive(Debug)]
struct WorkerState {
avg_slot0_merge_fuel: AtomicIsize,
}
impl Default for WorkerState {
fn default() -> Self {
Self {
avg_slot0_merge_fuel: AtomicIsize::new(10_000),
}
}
}
impl WorkerState {
fn report_slot0_merge(&self, fuel: isize) {
self.avg_slot0_merge_fuel.store(
((127 * self.avg_slot0_merge_fuel.load(Ordering::Relaxed) + fuel + 64) / 128).max(1),
Ordering::Relaxed,
);
}
fn avg_slot0_merge_fuel(&self) -> isize {
self.avg_slot0_merge_fuel.load(Ordering::Relaxed)
}
}
struct Merge<B>
where
B: Batch,
{
builder: B::Builder,
fuel: isize,
done: bool,
start: Instant,
elapsed: Duration,
n_steps: usize,
inner: MergeInner<B>,
}
enum MergeInner<B>
where
B: Batch,
{
ListMerger(ArcListMerger<B>),
PushMerger(ArcPushMerger<B>),
}
impl<B> Merge<B>
where
B: Batch,
{
fn new(
merger_type: MergerType,
batches: Vec<Arc<B>>,
key_filter: &Option<Filter<B::Key>>,
value_filter: &Option<GroupFilter<B::Val>>,
snapshot: Option<Arc<SpineSnapshot<B>>>,
) -> Self {
let factories = batches[0].factories();
let builder = B::Builder::for_merge(&factories, &batches, None);
Self {
builder,
fuel: 0,
start: Instant::now(),
elapsed: Duration::ZERO,
n_steps: 0,
done: false,
inner: match merger_type {
MergerType::ListMerger => MergeInner::ListMerger(ArcListMerger::new(
&factories,
batches,
key_filter,
value_filter,
snapshot,
)),
MergerType::PushMerger => {
let mut inner =
ArcPushMerger::new(&factories, batches, key_filter, value_filter);
inner.run();
MergeInner::PushMerger(inner)
}
},
}
}
fn merge(&mut self, frontier: &B::Time, mut fuel: isize) -> isize {
debug_assert!(fuel > 0);
let supplied_fuel = fuel;
let start = Instant::now();
match &mut self.inner {
MergeInner::ListMerger(merger) => {
merger.work(&mut self.builder, frontier, &mut fuel);
self.done = fuel > 0;
}
MergeInner::PushMerger(merger) => {
self.done =
merger.merge(&mut self.builder, frontier, &mut fuel).is_ok() && fuel > 0;
if !self.done {
merger.run();
}
}
};
self.elapsed += start.elapsed();
self.n_steps += 1;
let consumed_fuel = supplied_fuel - fuel;
self.fuel += consumed_fuel;
consumed_fuel
}
}
#[derive(Clone, Default)]
struct SpineStats {
pre_len: u64,
post_len: u64,
cache_stats: CacheStats,
backpressure_wait: Duration,
}
impl SpineStats {
fn report_merge(&mut self, pre_len: usize, post_len: usize, cache_stats: CacheStats) {
self.pre_len += pre_len as u64;
self.post_len += post_len as u64;
self.cache_stats += cache_stats;
}
fn merge_reduction(&self) -> MetaItem {
MetaItem::Percent {
numerator: self.pre_len - self.post_len,
denominator: self.pre_len,
}
}
}
pub struct Spine<B>
where
B: Batch,
{
factories: B::Factories,
dirty: bool,
key_filter: Option<Filter<B::Key>>,
value_filter: Option<GroupFilter<B::Val>>,
merger: AsyncMerger<B>,
}
impl<B> Spine<B>
where
B: Batch,
{
pub fn get_batches(&self) -> Vec<Arc<B>> {
self.merger.get_batches()
}
}
impl<B> SizeOf for Spine<B>
where
B: Batch,
{
fn size_of_children(&self, context: &mut Context) {
self.merger.get_batches().size_of_with_context(context);
}
}
impl<B> Display for Spine<B>
where
B: Batch + Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for batch in self.merger.get_batches() {
writeln!(f, "batch:\n{}", indent(&batch.to_string(), " "))?
}
Ok(())
}
}
impl<B> Debug for Spine<B>
where
B: Batch,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
let mut cursor = self.cursor();
writeln!(f, "spine:")?;
while cursor.key_valid() {
writeln!(f, "{:?}:", cursor.key())?;
while cursor.val_valid() {
writeln!(f, " {:?}:", cursor.val())?;
cursor.map_times(&mut |t, w| {
writeln!(f, " {t:?} -> {w:?}").unwrap();
});
cursor.step_val();
}
cursor.step_key();
}
writeln!(f)?;
Ok(())
}
}
impl<B> Clone for Spine<B>
where
B: Batch,
{
fn clone(&self) -> Self {
unimplemented!()
}
}
impl<B> Archive for Spine<B>
where
B: Batch,
{
type Archived = ();
type Resolver = ();
unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
unimplemented!();
}
}
impl<B: Batch, S: Serializer + ?Sized> Serialize<S> for Spine<B> {
fn serialize(&self, _serializer: &mut S) -> Result<Self::Resolver, S::Error> {
unimplemented!();
}
}
impl<B: Batch, D: Fallible> Deserialize<Spine<B>, D> for Archived<Spine<B>> {
fn deserialize(&self, _deserializer: &mut D) -> Result<Spine<B>, D::Error> {
unimplemented!();
}
}
impl<B> NumEntries for Spine<B>
where
B: Batch,
{
const CONST_NUM_ENTRIES: Option<usize> = None;
fn num_entries_shallow(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.len())
.sum()
}
fn num_entries_deep(&self) -> usize {
self.num_entries_shallow()
}
}
pub(crate) fn sample_keys_from_batches<B, RG>(
factories: &B::Factories,
batches: &[Arc<B>],
rng: &mut RG,
sample_size: usize,
sample: &mut DynVec<B::Key>,
) where
B: Batch,
B::Time: PartialEq<()>,
RG: Rng,
{
let total_keys = batches.iter().map(|batch| batch.key_count()).sum::<usize>();
if sample_size == 0 || total_keys == 0 {
return;
}
let mut intermediate = factories.keys_factory().default_box();
intermediate.reserve(sample_size);
for batch in batches {
batch.sample_keys(
rng,
((batch.key_count() as u128) * (sample_size as u128) / (total_keys as u128)) as usize,
intermediate.as_mut(),
);
}
intermediate.deref_mut().sort_unstable();
intermediate.dedup();
let mut cursor = SpineCursor::new_cursor(factories, batches.to_vec());
for key in intermediate.dyn_iter_mut() {
cursor.seek_key(key);
if let Some(current_key) = cursor.get_key()
&& current_key == key
{
debug_assert!(cursor.val_valid() && !cursor.weight().is_zero());
sample.push_ref(key);
}
}
}
impl<B> BatchReader for Spine<B>
where
B: Batch,
{
type Key = B::Key;
type Val = B::Val;
type Time = B::Time;
type R = B::R;
type Factories = B::Factories;
type Cursor<'s> = SpineCursor<B>;
fn factories(&self) -> Self::Factories {
self.factories.clone()
}
fn key_count(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.key_count())
.sum()
}
fn len(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.len())
.sum()
}
fn approximate_byte_size(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.approximate_byte_size())
.sum()
}
fn membership_filter_stats(&self) -> FilterStats {
self.merger
.get_batches()
.iter()
.map(|batch| batch.membership_filter_stats())
.sum()
}
fn range_filter_stats(&self) -> FilterStats {
self.merger
.get_batches()
.iter()
.map(|batch| batch.range_filter_stats())
.sum()
}
fn cursor(&self) -> Self::Cursor<'_> {
SpineCursor::new_cursor(&self.factories, self.merger.get_batches())
}
fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
where
Self::Time: PartialEq<()>,
RG: Rng,
{
sample_keys_from_batches(
&self.factories,
&self.merger.get_batches(),
rng,
sample_size,
sample,
);
}
async fn fetch<KR>(
&self,
keys: &KR,
) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
where
KR: BatchReader<Key = Self::Key, Time = ()>,
{
Some(Box::new(
FetchList::new(
self.merger.get_batches(),
keys,
self.factories.weight_factory(),
)
.await,
))
}
}
impl<B> Spine<B>
where
B: Batch,
{
fn checkpoint_file(base: &StoragePath, persistent_id: &str) -> StoragePath {
base.child(format!("pspine-{}.dat", persistent_id))
}
fn batchlist_file(&self, base: &StoragePath, persistent_id: &str) -> StoragePath {
base.child(format!("pspine-batches-{}.dat", persistent_id))
}
}
#[self_referencing]
pub struct SpineCursor<B: Batch> {
batches: Vec<Arc<B>>,
#[borrows(batches)]
#[not_covariant]
cursor: CursorList<B::Key, B::Val, B::Time, B::R, B::Cursor<'this>>,
}
impl<B: Batch> Clone for SpineCursor<B> {
fn clone(&self) -> Self {
let batches = self.borrow_batches().clone();
let weight_factory = self.with_cursor(|cursor| cursor.weight_factory());
SpineCursorBuilder {
batches,
cursor_builder: |batches| {
CursorList::new(
weight_factory,
batches.iter().map(|batch| batch.cursor()).collect(),
)
},
}
.build()
}
}
impl<B: Batch> SpineCursor<B> {
pub fn new_cursor(factories: &B::Factories, batches: Vec<Arc<B>>) -> Self {
SpineCursorBuilder {
batches,
cursor_builder: |batches| {
CursorList::new(
factories.weight_factory(),
batches.iter().map(|batch| batch.cursor()).collect(),
)
},
}
.build()
}
}
impl<B: Batch> Cursor<B::Key, B::Val, B::Time, B::R> for SpineCursor<B> {
fn weight_factory(&self) -> &'static dyn Factory<B::R> {
self.with_cursor(|cursor| cursor.weight_factory())
}
fn key_valid(&self) -> bool {
self.with_cursor(|cursor| cursor.key_valid())
}
fn val_valid(&self) -> bool {
self.with_cursor(|cursor| cursor.val_valid())
}
fn key(&self) -> &B::Key {
self.with_cursor(|cursor| cursor.key())
}
fn val(&self) -> &B::Val {
self.with_cursor(|cursor| cursor.val())
}
fn map_times(&mut self, logic: &mut dyn FnMut(&B::Time, &B::R)) {
self.with_cursor_mut(|cursor| cursor.map_times(logic));
}
fn map_times_through(&mut self, upper: &B::Time, logic: &mut dyn FnMut(&B::Time, &B::R)) {
self.with_cursor_mut(|cursor| cursor.map_times_through(upper, logic));
}
fn weight(&mut self) -> &B::R
where
B::Time: PartialEq<()>,
{
self.with_cursor_mut(|cursor| cursor.weight())
}
fn weight_checked(&mut self) -> &B::R {
self.with_cursor_mut(|cursor| cursor.weight_checked())
}
fn map_values(&mut self, logic: &mut dyn FnMut(&B::Val, &B::R))
where
B::Time: PartialEq<()>,
{
self.with_cursor_mut(|cursor| cursor.map_values(logic))
}
fn step_key(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_key());
}
fn step_key_reverse(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_key_reverse());
}
fn seek_key(&mut self, key: &B::Key) {
self.with_cursor_mut(|cursor| cursor.seek_key(key));
}
fn seek_key_exact(&mut self, key: &B::Key, hash: Option<u64>) -> bool {
self.with_cursor_mut(|cursor| cursor.seek_key_exact(key, hash))
}
fn seek_key_with(&mut self, predicate: &dyn Fn(&B::Key) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_key_with(predicate));
}
fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&B::Key) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_key_with_reverse(predicate));
}
fn seek_key_reverse(&mut self, key: &B::Key) {
self.with_cursor_mut(|cursor| cursor.seek_key_reverse(key));
}
fn step_val(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_val());
}
fn seek_val(&mut self, val: &B::Val) {
self.with_cursor_mut(|cursor| cursor.seek_val(val));
}
fn seek_val_with(&mut self, predicate: &dyn Fn(&B::Val) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_val_with(predicate));
}
fn rewind_keys(&mut self) {
self.with_cursor_mut(|cursor| cursor.rewind_keys());
}
fn fast_forward_keys(&mut self) {
self.with_cursor_mut(|cursor| cursor.fast_forward_keys());
}
fn rewind_vals(&mut self) {
self.with_cursor_mut(|cursor| cursor.rewind_vals());
}
fn step_val_reverse(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_val_reverse());
}
fn seek_val_reverse(&mut self, val: &B::Val) {
self.with_cursor_mut(|cursor| cursor.seek_val_reverse(val));
}
fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&B::Val) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_val_with_reverse(predicate));
}
fn fast_forward_vals(&mut self) {
self.with_cursor_mut(|cursor| cursor.fast_forward_vals());
}
fn position(&self) -> Option<Position> {
self.with_cursor(|cursor| cursor.position())
}
}
impl<B> Trace for Spine<B>
where
B: Batch,
{
type Batch = B;
fn new(factories: &B::Factories) -> Self {
Self::with_effort(factories, 1)
}
fn set_frontier(&mut self, frontier: &B::Time) {
self.merger.set_frontier(frontier)
}
fn exert(&mut self, _effort: &mut isize) {}
fn consolidate(self) -> Option<B> {
let batches = self
.merger
.pause()
.into_iter()
.map(|batch| Arc::into_inner(batch).unwrap());
let result = merge_batches(
&self.factories,
batches,
&self.key_filter,
&self.value_filter,
);
if result.is_empty() {
None
} else {
Some(result)
}
}
fn insert(&mut self, mut batch: Self::Batch) {
if !batch.is_empty() {
let batch = if batch.location() == BatchLocation::Memory
&& pick_insert_destination(&batch) == BatchLocation::Storage
{
let _span = SamplySpan::new("eager spill")
.with_category("Spine")
.with_tooltip(|| {
format!(
"Eagerly spilling {} batch with {} keys and {} values",
HumanBytes::from(batch.approximate_byte_size()),
batch.key_count(),
batch.len()
)
});
let factories = batch.factories();
let builder =
B::Builder::for_merge(&factories, [&batch], Some(BatchLocation::Storage));
let (key_filter, value_filter) = self.merger.state.lock().unwrap().get_filters();
ListMerger::merge(
&factories,
builder,
vec![batch.consuming_cursor(key_filter, value_filter)],
)
} else {
batch
};
self.dirty = true;
self.merger.add_batch(Arc::new(batch), false);
}
}
fn insert_arc(&mut self, batch: Arc<Self::Batch>) {
if !batch.is_empty() {
let batch = if batch.location() == BatchLocation::Memory
&& pick_insert_destination(&batch) == BatchLocation::Storage
{
let factories = batch.factories();
let builder =
B::Builder::for_merge(&factories, [&batch], Some(BatchLocation::Storage));
let (key_filter, value_filter) = self.merger.state.lock().unwrap().get_filters();
Arc::new(ListMerger::merge(
&factories,
builder,
vec![batch.merge_cursor(key_filter, value_filter)],
))
} else {
batch
};
self.dirty = true;
self.merger.add_batch(batch, false);
}
}
fn clear_dirty_flag(&mut self) {
self.dirty = false;
}
fn dirty(&self) -> bool {
self.dirty
}
fn retain_keys(&mut self, filter: Filter<Self::Key>) {
self.merger.set_key_filter(&filter);
self.key_filter = Some(filter);
}
fn retain_values(&mut self, filter: GroupFilter<Self::Val>) {
self.merger.set_value_filter(&filter);
self.value_filter = Some(filter);
}
fn key_filter(&self) -> &Option<Filter<Self::Key>> {
&self.key_filter
}
fn value_filter(&self) -> &Option<GroupFilter<Self::Val>> {
&self.value_filter
}
fn save(
&mut self,
base: &StoragePath,
persistent_id: &str,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), Error> {
fn persist_batches<B>(batches: Vec<Arc<B>>) -> Vec<Arc<B>>
where
B: Batch,
{
batches
.into_iter()
.map(|batch| {
if let Some(persisted) = batch.persisted() {
Arc::new(persisted)
} else {
batch
}
})
.collect::<Vec<_>>()
}
let (not_merging, merging) = self.merger.pause_new_merges();
let not_merging = persist_batches(not_merging);
self.merger.resume(not_merging.iter().cloned());
let merging = persist_batches(merging);
let ids = not_merging
.iter()
.chain(merging.iter())
.map(|batch| {
let file = batch
.file_reader()
.expect("The batch should have been persisted");
let path = file.path().to_string();
files.push(file);
path
})
.collect::<Vec<_>>();
let backend = Runtime::storage_backend().unwrap();
let committed: CommittedSpine = (ids, self as &Self).into();
let as_bytes = to_bytes(&committed).expect("Serializing CommittedSpine should work.");
backend.write(&Self::checkpoint_file(base, persistent_id), as_bytes)?;
let pspine_batches = PSpineBatches {
files: committed.batches,
};
backend
.write_json(&self.batchlist_file(base, persistent_id), &pspine_batches)
.and_then(|reader| reader.commit())?;
Ok(())
}
fn restore(&mut self, base: &StoragePath, persistent_id: &str) -> Result<(), Error> {
let pspine_path = Self::checkpoint_file(base, persistent_id);
let content = Runtime::storage_backend().unwrap().read(&pspine_path)?;
let archived = unsafe { rkyv::archived_root::<CommittedSpine>(&content) };
let committed: CommittedSpine = archived.deserialize(&mut Deserializer::default()).unwrap();
self.dirty = committed.dirty;
self.key_filter = None;
self.value_filter = None;
for batch in committed.batches {
let batch = B::from_path(&self.factories.clone(), &batch.clone().into())
.unwrap_or_else(|error| {
panic!("Failed to read batch {batch} for checkpoint ({error}).")
});
self.insert(batch);
}
Ok(())
}
fn metadata(&self, meta: &mut OperatorMeta) {
self.merger.metadata(meta);
}
}
impl<B> Spine<B>
where
B: Batch,
{
fn size_to_level(batch: &B, max_level0_batch_size_records: usize, merge: bool) -> usize {
debug_assert_eq!(MAX_LEVELS, 9);
debug_assert!(max_level0_batch_size_records > 0 && max_level0_batch_size_records <= 99_999);
let len = batch.len();
let effective_len = if merge {
let negative_weight_count = batch.negative_weight_count().unwrap_or(0) as usize;
len + negative_weight_count * (negative_weight_multiplier() as usize)
} else {
len
};
if effective_len <= max_level0_batch_size_records {
return 0;
}
match effective_len {
0..=99_999 => 1,
100_000..=999_999 => 2,
1_000_000..=9_999_999 => 3,
10_000_000..=99_999_999 => 4,
100_000_000..=999_999_999 => 5,
1_000_000_000..=9_999_999_999 => 6,
10_000_000_000..=99_999_999_999 => 7,
_ => 8, }
}
pub fn with_effort(factories: &B::Factories, _effort: usize) -> Self {
Spine {
factories: factories.clone(),
dirty: false,
key_filter: None,
value_filter: None,
merger: AsyncMerger::new(factories),
}
}
pub fn complete_merges(&mut self) {
let batches = self.merger.pause();
let batch = merge_batches(
&self.factories,
batches.into_iter().map(|b| Arc::unwrap_or_clone(b)),
&self.key_filter,
&self.value_filter,
);
self.merger.resume(vec![Arc::new(batch)]);
}
}
impl<B: Batch> WithSnapshot for Spine<B>
where
B: Batch,
{
type Batch = B;
fn ro_snapshot(&self) -> SpineSnapshot<B> {
self.into()
}
}