use crate::{
Error, NumEntries, Runtime,
circuit::{
max_level0_batch_size_records,
metadata::{
BLOOM_FILTER_BITS_PER_KEY, BLOOM_FILTER_HIT_RATE_PERCENT, BLOOM_FILTER_HITS_COUNT,
BLOOM_FILTER_MISSES_COUNT, BLOOM_FILTER_SIZE_BYTES, COMPACTION_STATE, COMPLETED_MERGES,
LOOSE_BATCHES_COUNT, LOOSE_MEMORY_RECORDS_COUNT, LOOSE_STORAGE_RECORDS_COUNT,
MERGE_BACKPRESSURE_WAIT_TIME_SECONDS, MERGE_REDUCTION_PERCENT, MERGING_BATCHES_COUNT,
MERGING_MEMORY_RECORDS_COUNT, MERGING_SIZE_BYTES, MERGING_STORAGE_RECORDS_COUNT,
MetaItem, MetricId, MetricReading, NEGATIVE_WEIGHT_COUNT, OperatorMeta,
RANGE_FILTER_HIT_RATE_PERCENT, RANGE_FILTER_HITS_COUNT, RANGE_FILTER_MISSES_COUNT,
RANGE_FILTER_SIZE_BYTES, ROARING_FILTER_HIT_RATE_PERCENT, ROARING_FILTER_HITS_COUNT,
ROARING_FILTER_MISSES_COUNT, ROARING_FILTER_SIZE_BYTES, SPINE_BATCHES_COUNT,
SPINE_STORAGE_SIZE_BYTES,
},
metrics::COMPACTION_STALL_TIME_NANOSECONDS,
negative_weight_multiplier,
runtime::{TOKIO_BUFFER_CACHE, TOKIO_WORKER_INDEX},
},
dynamic::{DynVec, Factory},
storage::{
buffer_cache::{BufferCache, CacheStats},
file::{FilterKind, FilterStats},
},
time::Timestamp,
trace::{
Batch, BatchReader, BatchReaderFactories, Builder, Cursor, Filter, GroupFilter, Trace,
cursor::{CursorList, Position},
merge_batches,
ord::fallback::pick_insert_destination,
sample_keys_from_batches,
spine_async::{
list_merger::ArcListMerger, push_merger::ArcPushMerger, snapshot::FetchList,
},
},
};
use crate::storage::file::{Deserializer, to_bytes};
use crate::trace::CommittedSpine;
use enum_map::EnumMap;
use feldera_buffer_cache::ThreadType;
use feldera_samply::Span;
use feldera_storage::{
FileCommitter, StoragePath,
fbuf::slab::{FBufSlabs, TOKIO_FBUF_SLABS},
};
use feldera_types::memory_pressure::MemoryPressure;
use feldera_types::{checkpoint::PSpineBatches, config::dev_tweaks::MergerType};
use ouroboros::self_referencing;
use rand::Rng;
use rkyv::{Archive, Archived, Deserialize, Fallible, Serialize, ser::Serializer};
use size_of::{Context, HumanBytes, SizeOf};
use std::{
borrow::Cow,
future::Future,
sync::{
Arc,
atomic::{AtomicIsize, AtomicU32},
},
};
use std::{
collections::BTreeMap,
time::{Duration, Instant},
};
use std::{collections::VecDeque, sync::atomic::Ordering};
use std::{
fmt::{self, Debug, Display, Formatter},
sync::Condvar,
};
use std::{ops::RangeInclusive, sync::Mutex};
use textwrap::indent;
use tokio::{sync::Notify, task::yield_now};
mod index_set;
mod list_merger;
mod push_merger;
mod snapshot;
pub use snapshot::{BatchReaderWithSnapshot, SpineSnapshot, WithSnapshot};
use super::{BatchLocation, cursor::CursorFactory};
pub use list_merger::ListMerger;
pub(crate) const MAX_LEVELS: usize = 9;
static LEVEL_NAMES: [&str; MAX_LEVELS] = [
"merge-0", "merge-1", "merge-2", "merge-3", "merge-4", "merge-5", "merge-6", "merge-7",
"merge-8",
];
pub(crate) const MAX_LEVEL0_BATCH_SIZE_RECORDS: u16 = 14_999;
fn scope_tokio_merger_locals<F>(
worker_index: usize,
buffer_cache: Arc<BufferCache>,
slab_allocator: Arc<FBufSlabs>,
future: F,
) -> impl Future<Output = F::Output>
where
F: Future,
{
TOKIO_WORKER_INDEX.scope(
worker_index,
TOKIO_BUFFER_CACHE.scope(buffer_cache, TOKIO_FBUF_SLABS.scope(slab_allocator, future)),
)
}
impl<B: Batch + Send + Sync> From<(Vec<String>, &Spine<B>)> for CommittedSpine {
fn from((batches, spine): (Vec<String>, &Spine<B>)) -> Self {
CommittedSpine {
batches,
merged: Vec::new(),
effort: 0,
dirty: spine.dirty,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CompactionStatus {
None,
Requested,
InProgress,
}
impl Display for CompactionStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CompactionStatus::None => write!(f, "none"),
CompactionStatus::Requested => write!(f, "requested"),
CompactionStatus::InProgress => write!(f, "in progress"),
}
}
}
#[derive(Clone, SizeOf)]
struct Slot<B>
where
B: Batch,
{
merging_batches: Option<Vec<Arc<B>>>,
loose_batches: VecDeque<Arc<B>>,
elapsed: Duration,
n_merged: usize,
n_merged_batches: usize,
n_steps: usize,
#[size_of(skip)]
notify: Arc<Notify>,
#[size_of(skip)]
compaction_status: CompactionStatus,
}
impl<B> Default for Slot<B>
where
B: Batch,
{
fn default() -> Self {
Self {
merging_batches: None,
loose_batches: VecDeque::new(),
elapsed: Duration::ZERO,
n_merged: 0,
n_merged_batches: 0,
n_steps: 0,
notify: Arc::new(Notify::new()),
compaction_status: CompactionStatus::None,
}
}
}
impl<B> Slot<B>
where
B: Batch,
{
fn try_start_merge(&mut self, level: usize) -> Option<Vec<Arc<B>>> {
const MERGE_COUNTS: [RangeInclusive<usize>; MAX_LEVELS] = [
8..=64,
8..=64,
3..=64,
3..=64,
3..=64,
3..=64,
2..=64,
2..=64,
2..=64,
];
let merge_counts = &MERGE_COUNTS[level];
if self.merging_batches.is_none()
&& (self.loose_batches.len() >= *merge_counts.start()
|| self.must_relieve_memory_pressure()
|| (self.compaction_status == CompactionStatus::Requested
&& self.loose_batches.len() > 1))
{
let max_batches = if self.compaction_status == CompactionStatus::Requested {
self.compaction_status = CompactionStatus::InProgress;
usize::MAX
} else {
*merge_counts.end()
};
let n = std::cmp::min(max_batches, self.loose_batches.len());
let batches = self.loose_batches.drain(..n).collect::<Vec<_>>();
self.merging_batches = Some(batches.clone());
Some(batches)
} else {
None
}
}
fn must_relieve_memory_pressure(&self) -> bool {
if let Some(memory_pressure) = Runtime::memory_pressure() {
memory_pressure >= MemoryPressure::High
&& self
.loose_batches
.iter()
.any(|batch| batch.location() == BatchLocation::Memory)
} else {
false
}
}
fn n_batches(&self) -> usize {
self.all_batches().count()
}
fn all_batches(&self) -> impl Iterator<Item = &Arc<B>> {
self.loose_batches
.iter()
.chain(self.merging_batches.iter().flatten())
}
}
#[derive(SizeOf)]
struct SharedState<B>
where
B: Batch,
{
#[size_of(skip)]
factories: B::Factories,
#[size_of(skip)]
key_filter: Option<Filter<B::Key>>,
#[size_of(skip)]
value_filter: Option<GroupFilter<B::Val>>,
#[size_of(skip)]
frontier: B::Time,
slots: [Slot<B>; MAX_LEVELS],
#[size_of(skip)]
request_exit: bool,
#[size_of(skip)]
spine_stats: SpineStats,
}
impl<B> SharedState<B>
where
B: Batch,
{
pub fn new(factories: &B::Factories) -> Self {
Self {
factories: factories.clone(),
key_filter: None,
value_filter: None,
frontier: B::Time::minimum(),
slots: std::array::from_fn(|_| Slot::default()),
request_exit: false,
spine_stats: SpineStats::default(),
}
}
fn add_batches(&mut self, batches: impl IntoIterator<Item = (Arc<B>, usize)>) {
for (batch, level) in batches {
if !batch.is_empty() {
self.add_batch(batch, level);
}
}
}
fn add_batch(&mut self, batch: Arc<B>, level: usize) {
debug_assert!(!batch.is_empty());
self.slots[level].loose_batches.push_back(batch);
self.slots[level].notify.notify_one();
}
fn should_apply_backpressure(&self) -> bool {
const HIGH_THRESHOLD: usize = 128;
self.slots
.iter()
.map(|s| s.loose_batches.len())
.sum::<usize>()
>= HIGH_THRESHOLD
}
fn should_relieve_backpressure(&self) -> bool {
const LOWER_THRESHOLD: usize = 127;
self.slots
.iter()
.map(|s| s.loose_batches.len())
.sum::<usize>()
<= LOWER_THRESHOLD
}
fn get_filters(&self) -> (Option<Filter<B::Key>>, Option<GroupFilter<B::Val>>) {
(self.key_filter.clone(), self.value_filter.clone())
}
fn get_batches(&self) -> Vec<Arc<B>> {
let mut batches = Vec::with_capacity(self.slots.iter().map(Slot::n_batches).sum());
for slot in &self.slots {
batches.extend(slot.all_batches().cloned());
}
batches
}
fn get_snapshot(&self) -> SpineSnapshot<B> {
SpineSnapshot::with_batches(&self.factories, self.get_batches())
}
fn take_loose_batches(&mut self) -> Vec<Arc<B>> {
let mut loose_batches =
Vec::with_capacity(self.slots.iter().map(|slot| slot.loose_batches.len()).sum());
for slot in &mut self.slots {
loose_batches.extend(slot.loose_batches.drain(..));
}
loose_batches
}
fn is_merging(&self) -> bool {
self.slots.iter().any(|slot| slot.merging_batches.is_some())
}
fn merge_complete(
&mut self,
level: usize,
new_batch: Arc<B>,
new_level: usize,
start: Instant,
elapsed: Duration,
n_steps: usize,
) {
let slot = &mut self.slots[level];
let batches = slot.merging_batches.take().unwrap();
slot.n_merged += 1;
slot.n_merged_batches += batches.len();
slot.elapsed += elapsed;
slot.n_steps += n_steps;
let cache_stats = batches.iter().fold(CacheStats::default(), |stats, batch| {
stats + batch.cache_stats()
});
let pre_len = batches.iter().map(|b| b.len()).sum();
let post_len = new_batch.len();
self.spine_stats
.report_merge(pre_len, post_len, cache_stats);
Span::new(LEVEL_NAMES[level])
.with_category("Spine")
.with_start(start)
.with_tooltip(|| {
format!(
"Merged {} batches ({pre_len} -> {post_len}) in {n_steps} steps using {:.1} ms CPU",
batches.len(),
elapsed.as_secs_f64() * 1000.0
)
})
.record();
if slot.compaction_status == CompactionStatus::InProgress {
slot.compaction_status = CompactionStatus::None;
if let Some(last_level) = self.last_non_empty_slot()
&& last_level > level
{
self.initiate_compaction_at_level(
level + 1,
if new_batch.is_empty() {
vec![]
} else {
vec![new_batch]
},
);
} else if !new_batch.is_empty() {
self.add_batch(new_batch, new_level);
}
} else if !new_batch.is_empty() {
self.add_batch(new_batch, new_level);
}
}
fn metadata_snapshot(&self) -> ([Slot<B>; MAX_LEVELS], SpineStats) {
(self.slots.clone(), self.spine_stats.clone())
}
fn first_non_empty_slot(&self) -> Option<usize> {
for (i, slot) in self.slots.iter().enumerate() {
if !slot.loose_batches.is_empty() || slot.merging_batches.is_some() {
return Some(i);
}
}
None
}
fn last_non_empty_slot(&self) -> Option<usize> {
for (i, slot) in self.slots.iter().enumerate().rev() {
if !slot.loose_batches.is_empty() || slot.merging_batches.is_some() {
return Some(i);
}
}
None
}
fn initiate_compaction(&mut self) {
let Some(level) = self.first_non_empty_slot() else {
return;
};
self.initiate_compaction_at_level(level, Vec::new());
}
fn initiate_compaction_at_level(&mut self, level: usize, batches: Vec<Arc<B>>) {
let slot = &mut self.slots[level];
slot.loose_batches.extend(batches);
slot.compaction_status = CompactionStatus::Requested;
slot.notify.notify_one();
}
}
struct AsyncMerger<B>
where
B: Batch,
{
state: Arc<Mutex<SharedState<B>>>,
no_backpressure: Arc<Notify>,
idle: Arc<Condvar>,
max_level0_batch_size_records: usize,
merge_workers: Arc<MergeWorkers<B>>,
}
impl<B> AsyncMerger<B>
where
B: Batch,
{
pub fn new(runtime: Runtime, worker_index: usize, factories: &B::Factories) -> Self {
let idle = Arc::new(Condvar::new());
let no_backpressure = Arc::new(Notify::new());
let state = Arc::new(Mutex::new(SharedState::new(factories)));
let max_level0_batch_size_records = max_level0_batch_size_records() as usize;
assert!(
max_level0_batch_size_records > 0,
"max_level0_batch_size_records must be greater than 0"
);
assert!(
max_level0_batch_size_records <= 99_999,
"max_level0_batch_size_records must be less than or equal to 99_999"
);
Self {
merge_workers: Arc::new(MergeWorkers::new(
state.clone(),
idle.clone(),
no_backpressure.clone(),
runtime,
max_level0_batch_size_records,
worker_index,
)),
state,
idle,
no_backpressure,
max_level0_batch_size_records,
}
}
fn set_key_filter(&self, key_filter: &Filter<B::Key>) {
self.state.lock().unwrap().key_filter = Some(key_filter.clone());
}
fn set_value_filter(&self, value_filter: &GroupFilter<B::Val>) {
self.state.lock().unwrap().value_filter = Some(value_filter.clone());
}
fn set_frontier(&self, frontier: &B::Time) {
self.state.lock().unwrap().frontier = frontier.clone();
}
fn add_batch(&mut self, batch: Arc<B>, merge: bool) -> bool {
debug_assert!(!batch.is_empty());
let level = Spine::<B>::size_to_level(&batch, self.max_level0_batch_size_records, merge);
self.merge_workers.start(level);
let mut state = self.state.lock().unwrap();
state.add_batch(batch, level);
state.should_apply_backpressure()
}
async fn backpressure_wait(&self) {
let start = Instant::now();
loop {
let notify = self.no_backpressure.notified();
{
let mut state = self.state.lock().unwrap();
if state.should_relieve_backpressure() {
state.spine_stats.backpressure_wait += start.elapsed();
break;
}
}
notify.await;
}
COMPACTION_STALL_TIME_NANOSECONDS
.fetch_add(start.elapsed().as_nanos() as u64, Ordering::Relaxed);
Span::new("backpressure-wait")
.with_category("Spine")
.with_start(start)
.record();
}
fn add_batches(&self, batches: impl IntoIterator<Item = (Arc<B>, usize)>) {
self.state.lock().unwrap().add_batches(batches);
}
fn get_batches(&self) -> Vec<Arc<B>> {
self.state.lock().unwrap().get_batches()
}
fn pause(&self) -> Vec<Arc<B>> {
let mut state = self.state.lock().unwrap();
let mut batches = state.take_loose_batches();
let mut state = self
.idle
.wait_while(state, |state| state.is_merging())
.unwrap();
batches.extend(state.take_loose_batches());
batches
}
fn pause_new_merges(&self) -> (Vec<Arc<B>>, Vec<Arc<B>>) {
let mut state = self.state.lock().unwrap();
let not_merging = state.take_loose_batches();
let merging = state.get_batches();
(not_merging, merging)
}
fn resume(&self, batches: impl IntoIterator<Item = Arc<B>>) {
self.add_batches(batches.into_iter().map(|batch| {
let level =
Spine::<B>::size_to_level(&batch, self.max_level0_batch_size_records, false);
(batch, level)
}));
}
fn metadata(&self, meta: &mut OperatorMeta) {
fn class_batch_count_label(class: &str) -> MetricId {
match class {
"loose" => LOOSE_BATCHES_COUNT,
"merging" => MERGING_BATCHES_COUNT,
_ => panic!("invalid class: {class}"),
}
}
fn class_tuple_count_label(class: &str, location: BatchLocation) -> MetricId {
match (class, location) {
("loose", BatchLocation::Memory) => LOOSE_MEMORY_RECORDS_COUNT,
("loose", BatchLocation::Storage) => LOOSE_STORAGE_RECORDS_COUNT,
("merging", BatchLocation::Memory) => MERGING_MEMORY_RECORDS_COUNT,
("merging", BatchLocation::Storage) => MERGING_STORAGE_RECORDS_COUNT,
_ => panic!("invalid class: {class} and location: {location:?}"),
}
}
let (mut slots, spine_stats) = self.state.lock().unwrap().metadata_snapshot();
for (index, slot) in slots.iter_mut().enumerate() {
for (class, batches) in [
("loose", slot.loose_batches.make_contiguous() as &_),
(
"merging",
slot.merging_batches
.as_ref()
.unwrap_or(&Vec::new())
.as_slice(),
),
] {
if !batches.is_empty() {
let mut tuple_counts = EnumMap::<BatchLocation, usize>::default();
for batch in batches {
tuple_counts[batch.location()] += batch.len();
}
let mut facts = Vec::with_capacity(3);
facts.push(MetricReading::new(
class_batch_count_label(class),
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Count(batches.len()),
));
for (location, count) in tuple_counts {
if count > 0 {
facts.push(MetricReading::new(
class_tuple_count_label(class, location),
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Count(count),
));
}
}
meta.extend(facts);
}
}
if slot.n_merged > 0 {
meta.extend([MetricReading::new(
COMPLETED_MERGES,
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Map(BTreeMap::from([
(Cow::Borrowed("merges"), MetaItem::Count(slot.n_merged)),
(
Cow::Borrowed("batches"),
MetaItem::Count(slot.n_merged_batches),
),
(Cow::Borrowed("steps"), MetaItem::Count(slot.n_steps)),
(
Cow::Borrowed("avg_step_time"),
MetaItem::Duration(slot.elapsed / slot.n_steps as u32),
),
])),
)]);
}
meta.extend([MetricReading::new(
COMPACTION_STATE,
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::String(slot.compaction_status.to_string()),
)]);
let mut negative_weight_count = 0;
let mut has_negative_weight_counts = false;
for batch in slot.all_batches() {
if let Some(count) = batch.negative_weight_count() {
negative_weight_count += count;
has_negative_weight_counts = true;
}
}
if has_negative_weight_counts {
meta.extend([MetricReading::new(
NEGATIVE_WEIGHT_COUNT,
vec![(Cow::Borrowed("slot"), index.to_string().into())],
MetaItem::Count(negative_weight_count as usize),
)]);
}
}
let mut batches = Vec::new();
for slot in slots {
batches.extend(slot.loose_batches.into_iter().map(|b| (b, false)));
if let Some(merging_batches) = slot.merging_batches {
batches.extend(merging_batches.into_iter().map(|b| (b, true)));
}
}
let n_batches = batches.len();
let n_merging = batches.iter().filter(|(_batch, merging)| *merging).count();
let mut cache_stats = spine_stats.cache_stats;
let mut storage_size = 0;
let mut merging_size = 0;
let mut membership_filter_stats = EnumMap::<FilterKind, FilterStats>::default();
let mut range_filter_stats = FilterStats::default();
let mut bloom_filter_records = 0;
for (batch, merging) in batches {
cache_stats += batch.cache_stats();
let kind = batch.membership_filter_kind();
if kind != FilterKind::None {
membership_filter_stats[kind] += batch.membership_filter_stats();
}
if kind == FilterKind::Bloom {
bloom_filter_records += batch.key_count();
}
range_filter_stats += batch.range_filter_stats();
let on_storage = batch.location() == BatchLocation::Storage;
if on_storage || merging {
let size = batch.approximate_byte_size();
if on_storage {
storage_size += size;
}
if merging {
merging_size += size;
}
}
}
let bloom_filter_stats = membership_filter_stats[FilterKind::Bloom];
let roaring_filter_stats = membership_filter_stats[FilterKind::Roaring];
if bloom_filter_records > 0 {
let bits_per_key =
bloom_filter_stats.size_byte as f64 * 8.0 / bloom_filter_records as f64;
let bits_per_key = bits_per_key as usize;
meta.extend(metadata! {
BLOOM_FILTER_BITS_PER_KEY => MetaItem::Int(bits_per_key)
})
}
meta.extend([
MetricReading::new(SPINE_BATCHES_COUNT, Vec::new(), MetaItem::Count(n_batches)),
MetricReading::new(
SPINE_STORAGE_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(storage_size),
),
MetricReading::new(
MERGING_BATCHES_COUNT,
Vec::new(),
MetaItem::Count(n_merging),
),
MetricReading::new(
MERGING_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(merging_size),
),
MetricReading::new(
MERGE_REDUCTION_PERCENT,
Vec::new(),
spine_stats.merge_reduction(),
),
MetricReading::new(
MERGE_BACKPRESSURE_WAIT_TIME_SECONDS,
Vec::new(),
MetaItem::Duration(spine_stats.backpressure_wait),
),
MetricReading::new(
BLOOM_FILTER_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(bloom_filter_stats.size_byte),
),
MetricReading::new(
BLOOM_FILTER_HITS_COUNT,
Vec::new(),
MetaItem::Count(bloom_filter_stats.hits),
),
MetricReading::new(
BLOOM_FILTER_MISSES_COUNT,
Vec::new(),
MetaItem::Count(bloom_filter_stats.misses),
),
MetricReading::new(
BLOOM_FILTER_HIT_RATE_PERCENT,
Vec::new(),
MetaItem::Percent {
numerator: bloom_filter_stats.hits as u64,
denominator: bloom_filter_stats.hits as u64 + bloom_filter_stats.misses as u64,
},
),
MetricReading::new(
ROARING_FILTER_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(roaring_filter_stats.size_byte),
),
MetricReading::new(
ROARING_FILTER_HITS_COUNT,
Vec::new(),
MetaItem::Count(roaring_filter_stats.hits),
),
MetricReading::new(
ROARING_FILTER_MISSES_COUNT,
Vec::new(),
MetaItem::Count(roaring_filter_stats.misses),
),
MetricReading::new(
ROARING_FILTER_HIT_RATE_PERCENT,
Vec::new(),
MetaItem::Percent {
numerator: roaring_filter_stats.hits as u64,
denominator: roaring_filter_stats.hits as u64
+ roaring_filter_stats.misses as u64,
},
),
MetricReading::new(
RANGE_FILTER_SIZE_BYTES,
Vec::new(),
MetaItem::bytes(range_filter_stats.size_byte),
),
MetricReading::new(
RANGE_FILTER_HITS_COUNT,
Vec::new(),
MetaItem::Count(range_filter_stats.hits),
),
MetricReading::new(
RANGE_FILTER_MISSES_COUNT,
Vec::new(),
MetaItem::Count(range_filter_stats.misses),
),
MetricReading::new(
RANGE_FILTER_HIT_RATE_PERCENT,
Vec::new(),
MetaItem::Percent {
numerator: range_filter_stats.hits as u64,
denominator: range_filter_stats.hits as u64 + range_filter_stats.misses as u64,
},
),
]);
cache_stats.metadata(meta);
}
fn initiate_compaction(&self) {
let mut state = self.state.lock().unwrap();
state.initiate_compaction();
}
}
impl<B> Drop for AsyncMerger<B>
where
B: Batch,
{
fn drop(&mut self) {
self.state.lock().unwrap().request_exit = true;
for level in 0..MAX_LEVELS {
self.state.lock().unwrap().slots[level].notify.notify_one();
}
}
}
struct MergeWorkers<B>
where
B: Batch,
{
workers_started: AtomicU32,
worker_state: WorkerState,
state: Arc<Mutex<SharedState<B>>>,
idle: Arc<Condvar>,
no_backpressure: Arc<Notify>,
runtime: Runtime,
max_level0_batch_size_records: usize,
worker_index: usize,
}
impl<B> MergeWorkers<B>
where
B: Batch,
{
fn new(
state: Arc<Mutex<SharedState<B>>>,
idle: Arc<Condvar>,
no_backpressure: Arc<Notify>,
runtime: Runtime,
max_level0_batch_size_records: usize,
worker_index: usize,
) -> Self {
Self {
workers_started: AtomicU32::new(0),
worker_state: WorkerState::default(),
state,
idle,
no_backpressure,
runtime,
max_level0_batch_size_records,
worker_index,
}
}
fn start(self: &Arc<Self>, level: usize) {
let bit = 1 << level;
if (self.workers_started.fetch_or(bit, Ordering::Relaxed) & bit) == 0 {
self.clone().run(level);
}
}
fn run(self: Arc<Self>, level: usize) {
let local_worker_offset = self.worker_index - self.runtime.layout().local_workers().start;
self.runtime.tokio_merger_runtime().spawn(async move {
let buffer_cache = self
.runtime
.get_buffer_cache(local_worker_offset, ThreadType::Background);
let slab_allocator = self
.runtime
.get_fbuf_slab_allocator(local_worker_offset, ThreadType::Background);
let memory_pressure_notify = self.runtime.memory_pressure_notify();
scope_tokio_merger_locals(
self.worker_index,
buffer_cache,
slab_allocator,
async move {
let mut merger = None;
let merger_type = Runtime::with_dev_tweaks(|tweaks| tweaks.merger());
let notify = self.state.lock().unwrap().slots[level].notify.clone();
loop {
self.merge_step(&mut merger, merger_type, level);
{
let state = self.state.lock().unwrap();
if state.request_exit {
self.no_backpressure.notify_waiters();
break;
}
if state.should_relieve_backpressure() {
self.no_backpressure.notify_waiters();
}
}
if merger.is_none() {
self.idle.notify_all();
tokio::select! {
_ = notify.notified() => {}
_ = memory_pressure_notify.notified() => {}
}
} else {
yield_now().await;
};
}
},
)
.await;
});
}
fn merge_step(
self: &Arc<Self>,
opt_merger: &mut Option<Merge<B>>,
merger_type: MergerType,
level: usize,
) {
let ((key_filter, value_filter), frontier) = {
let shared = self.state.lock().unwrap();
(shared.get_filters(), shared.frontier.clone())
};
if let Some(merger) = opt_merger.as_mut() {
let fuel = if level == 0 {
isize::MAX
} else {
self.worker_state.avg_slot0_merge_fuel()
};
merger.merge(&frontier, fuel);
if merger.done {
if level == 0 {
self.worker_state.report_slot0_merge(merger.fuel);
}
let merger = opt_merger.take().unwrap();
let new_batch = Arc::new(merger.builder.done());
let new_level =
Spine::<B>::size_to_level(&new_batch, self.max_level0_batch_size_records, true);
self.state.lock().unwrap().merge_complete(
level,
new_batch,
new_level,
merger.start,
merger.elapsed,
merger.n_steps,
);
self.start(new_level);
}
}
let start_merge = {
let mut state = self.state.lock().unwrap();
let last_non_empty_slot = state.last_non_empty_slot();
let slot = &mut state.slots[level];
let start_merge = slot.try_start_merge(level);
if slot.compaction_status == CompactionStatus::Requested
&& slot.merging_batches.is_none()
{
slot.compaction_status = CompactionStatus::None;
if let Some(last_level) = last_non_empty_slot
&& last_level > level
{
let batches = slot.loose_batches.drain(..).collect::<Vec<_>>();
state.initiate_compaction_at_level(level + 1, batches);
}
}
start_merge
};
let snapshot = if value_filter
.as_ref()
.map(|f| f.requires_snapshot())
.unwrap_or(false)
{
Some(Arc::new(self.state.lock().unwrap().get_snapshot()))
} else {
None
};
if let Some(batches) = start_merge {
*opt_merger = Some(Merge::new(
merger_type,
batches,
&key_filter,
&value_filter,
snapshot.clone(),
));
}
}
}
#[derive(Debug)]
struct WorkerState {
avg_slot0_merge_fuel: AtomicIsize,
}
impl Default for WorkerState {
fn default() -> Self {
Self {
avg_slot0_merge_fuel: AtomicIsize::new(10_000),
}
}
}
impl WorkerState {
fn report_slot0_merge(&self, fuel: isize) {
self.avg_slot0_merge_fuel.store(
((127 * self.avg_slot0_merge_fuel.load(Ordering::Relaxed) + fuel + 64) / 128).max(1),
Ordering::Relaxed,
);
}
fn avg_slot0_merge_fuel(&self) -> isize {
self.avg_slot0_merge_fuel.load(Ordering::Relaxed)
}
}
struct Merge<B>
where
B: Batch,
{
builder: B::Builder,
fuel: isize,
done: bool,
start: Instant,
elapsed: Duration,
n_steps: usize,
inner: MergeInner<B>,
}
enum MergeInner<B>
where
B: Batch,
{
ListMerger(ArcListMerger<B>),
PushMerger(ArcPushMerger<B>),
}
impl<B> Merge<B>
where
B: Batch,
{
fn new(
merger_type: MergerType,
batches: Vec<Arc<B>>,
key_filter: &Option<Filter<B::Key>>,
value_filter: &Option<GroupFilter<B::Val>>,
snapshot: Option<Arc<SpineSnapshot<B>>>,
) -> Self {
let factories = batches[0].factories();
let batch_refs: Vec<&B> = batches.iter().map(|b| b.as_ref()).collect();
let builder = B::Builder::for_merge(&factories, batch_refs, None);
Self {
builder,
fuel: 0,
start: Instant::now(),
elapsed: Duration::ZERO,
n_steps: 0,
done: false,
inner: match merger_type {
MergerType::ListMerger => MergeInner::ListMerger(ArcListMerger::new(
&factories,
batches,
key_filter,
value_filter,
snapshot,
)),
MergerType::PushMerger => {
let mut inner =
ArcPushMerger::new(&factories, batches, key_filter, value_filter);
inner.run();
MergeInner::PushMerger(inner)
}
},
}
}
fn merge(&mut self, frontier: &B::Time, mut fuel: isize) -> isize {
debug_assert!(fuel > 0);
let supplied_fuel = fuel;
let start = Instant::now();
match &mut self.inner {
MergeInner::ListMerger(merger) => {
merger.work(&mut self.builder, frontier, &mut fuel);
self.done = fuel > 0;
}
MergeInner::PushMerger(merger) => {
self.done =
merger.merge(&mut self.builder, frontier, &mut fuel).is_ok() && fuel > 0;
if !self.done {
merger.run();
}
}
};
self.elapsed += start.elapsed();
self.n_steps += 1;
let consumed_fuel = supplied_fuel - fuel;
self.fuel += consumed_fuel;
consumed_fuel
}
}
#[derive(Clone, Default)]
struct SpineStats {
pre_len: u64,
post_len: u64,
cache_stats: CacheStats,
backpressure_wait: Duration,
}
impl SpineStats {
fn report_merge(&mut self, pre_len: usize, post_len: usize, cache_stats: CacheStats) {
self.pre_len += pre_len as u64;
self.post_len += post_len as u64;
self.cache_stats += cache_stats;
}
fn merge_reduction(&self) -> MetaItem {
MetaItem::Percent {
numerator: self.pre_len - self.post_len,
denominator: self.pre_len,
}
}
}
pub struct Spine<B>
where
B: Batch,
{
factories: B::Factories,
dirty: bool,
key_filter: Option<Filter<B::Key>>,
value_filter: Option<GroupFilter<B::Val>>,
merger: AsyncMerger<B>,
}
impl<B> Spine<B>
where
B: Batch,
{
pub fn get_batches(&self) -> Vec<Arc<B>> {
self.merger.get_batches()
}
}
impl<B> SizeOf for Spine<B>
where
B: Batch,
{
fn size_of_children(&self, context: &mut Context) {
self.merger.get_batches().size_of_with_context(context);
}
}
impl<B> Display for Spine<B>
where
B: Batch + Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for batch in self.merger.get_batches() {
writeln!(f, "batch:\n{}", indent(&batch.to_string(), " "))?
}
Ok(())
}
}
impl<B> Debug for Spine<B>
where
B: Batch,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
let mut cursor = self.cursor();
writeln!(f, "spine:")?;
while cursor.key_valid() {
writeln!(f, "{:?}:", cursor.key())?;
while cursor.val_valid() {
writeln!(f, " {:?}:", cursor.val())?;
cursor.map_times(&mut |t, w| {
writeln!(f, " {t:?} -> {w:?}").unwrap();
});
cursor.step_val();
}
cursor.step_key();
}
writeln!(f)?;
Ok(())
}
}
impl<B> Clone for Spine<B>
where
B: Batch,
{
fn clone(&self) -> Self {
unimplemented!()
}
}
impl<B> Archive for Spine<B>
where
B: Batch,
{
type Archived = ();
type Resolver = ();
unsafe fn resolve(&self, _pos: usize, _resolver: Self::Resolver, _out: *mut Self::Archived) {
unimplemented!();
}
}
impl<B: Batch, S: Serializer + ?Sized> Serialize<S> for Spine<B> {
fn serialize(&self, _serializer: &mut S) -> Result<Self::Resolver, S::Error> {
unimplemented!();
}
}
impl<B: Batch, D: Fallible> Deserialize<Spine<B>, D> for Archived<Spine<B>> {
fn deserialize(&self, _deserializer: &mut D) -> Result<Spine<B>, D::Error> {
unimplemented!();
}
}
impl<B> NumEntries for Spine<B>
where
B: Batch,
{
const CONST_NUM_ENTRIES: Option<usize> = None;
fn num_entries_shallow(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.len())
.sum()
}
fn num_entries_deep(&self) -> usize {
self.num_entries_shallow()
}
}
impl<B> BatchReader for Spine<B>
where
B: Batch,
{
type Key = B::Key;
type Val = B::Val;
type Time = B::Time;
type R = B::R;
type Factories = B::Factories;
type Cursor<'s> = SpineCursor<B>;
fn factories(&self) -> Self::Factories {
self.factories.clone()
}
fn key_count(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.key_count())
.sum()
}
fn len(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.len())
.sum()
}
fn approximate_byte_size(&self) -> usize {
self.merger
.get_batches()
.iter()
.map(|batch| batch.approximate_byte_size())
.sum()
}
fn range_filter_stats(&self) -> FilterStats {
self.merger
.get_batches()
.iter()
.map(|batch| batch.range_filter_stats())
.sum()
}
fn cursor(&self) -> Self::Cursor<'_> {
SpineCursor::new_cursor(&self.factories, self.merger.get_batches())
}
fn sample_keys<RG>(&self, rng: &mut RG, sample_size: usize, sample: &mut DynVec<Self::Key>)
where
RG: Rng,
{
let batches = self.merger.get_batches();
let total_keys = batches.iter().map(|batch| batch.key_count()).sum::<usize>();
let batch_refs: Vec<_> = batches.iter().map(Arc::as_ref).collect();
sample_keys_from_batches(
&self.factories,
&batch_refs,
rng,
|batch| {
if sample_size == 0 || total_keys == 0 {
0
} else {
((batch.key_count() as u128) * (sample_size as u128) / (total_keys as u128))
as usize
}
},
sample,
);
}
async fn fetch<KR>(
&self,
keys: &KR,
) -> Option<Box<dyn CursorFactory<Self::Key, Self::Val, Self::Time, Self::R>>>
where
KR: BatchReader<Key = Self::Key, Time = ()>,
{
Some(Box::new(
FetchList::new(
self.merger.get_batches(),
keys,
self.factories.weight_factory(),
)
.await,
))
}
}
impl<B> Spine<B>
where
B: Batch,
{
fn checkpoint_file(base: &StoragePath, persistent_id: &str) -> StoragePath {
base.child(format!("pspine-{}.dat", persistent_id))
}
fn batchlist_file(&self, base: &StoragePath, persistent_id: &str) -> StoragePath {
base.child(format!("pspine-batches-{}.dat", persistent_id))
}
}
#[self_referencing]
pub struct SpineCursor<B: Batch> {
batches: Vec<Arc<B>>,
#[borrows(batches)]
#[not_covariant]
cursor: CursorList<B::Key, B::Val, B::Time, B::R, B::Cursor<'this>>,
}
impl<B: Batch> Clone for SpineCursor<B> {
fn clone(&self) -> Self {
let batches = self.borrow_batches().clone();
let weight_factory = self.with_cursor(|cursor| cursor.weight_factory());
SpineCursorBuilder {
batches,
cursor_builder: |batches| {
CursorList::new(
weight_factory,
batches.iter().map(|batch| batch.cursor()).collect(),
)
},
}
.build()
}
}
impl<B: Batch> SpineCursor<B> {
pub fn new_cursor(factories: &B::Factories, batches: Vec<Arc<B>>) -> Self {
SpineCursorBuilder {
batches,
cursor_builder: |batches| {
CursorList::new(
factories.weight_factory(),
batches.iter().map(|batch| batch.cursor()).collect(),
)
},
}
.build()
}
}
impl<B: Batch> Cursor<B::Key, B::Val, B::Time, B::R> for SpineCursor<B> {
fn weight_factory(&self) -> &'static dyn Factory<B::R> {
self.with_cursor(|cursor| cursor.weight_factory())
}
fn key_valid(&self) -> bool {
self.with_cursor(|cursor| cursor.key_valid())
}
fn val_valid(&self) -> bool {
self.with_cursor(|cursor| cursor.val_valid())
}
fn key(&self) -> &B::Key {
self.with_cursor(|cursor| cursor.key())
}
fn val(&self) -> &B::Val {
self.with_cursor(|cursor| cursor.val())
}
fn map_times(&mut self, logic: &mut dyn FnMut(&B::Time, &B::R)) {
self.with_cursor_mut(|cursor| cursor.map_times(logic));
}
fn map_times_through(&mut self, upper: &B::Time, logic: &mut dyn FnMut(&B::Time, &B::R)) {
self.with_cursor_mut(|cursor| cursor.map_times_through(upper, logic));
}
fn weight(&mut self) -> &B::R
where
B::Time: PartialEq<()>,
{
self.with_cursor_mut(|cursor| cursor.weight())
}
fn weight_checked(&mut self) -> &B::R {
self.with_cursor_mut(|cursor| cursor.weight_checked())
}
fn map_values(&mut self, logic: &mut dyn FnMut(&B::Val, &B::R))
where
B::Time: PartialEq<()>,
{
self.with_cursor_mut(|cursor| cursor.map_values(logic))
}
fn step_key(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_key());
}
fn step_key_reverse(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_key_reverse());
}
fn seek_key(&mut self, key: &B::Key) {
self.with_cursor_mut(|cursor| cursor.seek_key(key));
}
fn seek_key_exact(&mut self, key: &B::Key, hash: Option<u64>) -> bool {
self.with_cursor_mut(|cursor| cursor.seek_key_exact(key, hash))
}
fn seek_key_with(&mut self, predicate: &dyn Fn(&B::Key) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_key_with(predicate));
}
fn seek_key_with_reverse(&mut self, predicate: &dyn Fn(&B::Key) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_key_with_reverse(predicate));
}
fn seek_key_reverse(&mut self, key: &B::Key) {
self.with_cursor_mut(|cursor| cursor.seek_key_reverse(key));
}
fn step_val(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_val());
}
fn seek_val(&mut self, val: &B::Val) {
self.with_cursor_mut(|cursor| cursor.seek_val(val));
}
fn seek_val_with(&mut self, predicate: &dyn Fn(&B::Val) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_val_with(predicate));
}
fn rewind_keys(&mut self) {
self.with_cursor_mut(|cursor| cursor.rewind_keys());
}
fn fast_forward_keys(&mut self) {
self.with_cursor_mut(|cursor| cursor.fast_forward_keys());
}
fn rewind_vals(&mut self) {
self.with_cursor_mut(|cursor| cursor.rewind_vals());
}
fn step_val_reverse(&mut self) {
self.with_cursor_mut(|cursor| cursor.step_val_reverse());
}
fn seek_val_reverse(&mut self, val: &B::Val) {
self.with_cursor_mut(|cursor| cursor.seek_val_reverse(val));
}
fn seek_val_with_reverse(&mut self, predicate: &dyn Fn(&B::Val) -> bool) {
self.with_cursor_mut(|cursor| cursor.seek_val_with_reverse(predicate));
}
fn fast_forward_vals(&mut self) {
self.with_cursor_mut(|cursor| cursor.fast_forward_vals());
}
fn position(&self) -> Option<Position> {
self.with_cursor(|cursor| cursor.position())
}
}
impl<B> Trace for Spine<B>
where
B: Batch,
{
type Batch = B;
fn new(factories: &B::Factories) -> Self {
Self::with_runtime(
Runtime::runtime()
.expect("Attempting to create a spine merger outside of a DBSP runtime"),
Runtime::worker_index(),
factories,
)
}
fn set_frontier(&mut self, frontier: &B::Time) {
self.merger.set_frontier(frontier)
}
fn exert(&mut self, _effort: &mut isize) {}
fn consolidate(self) -> Option<B> {
let batches = self
.merger
.pause()
.into_iter()
.map(|batch| Arc::into_inner(batch).unwrap());
let result = merge_batches(
&self.factories,
batches,
&self.key_filter,
&self.value_filter,
);
if result.is_empty() {
None
} else {
Some(result)
}
}
async fn insert(&mut self, batch: impl Into<Arc<Self::Batch>>) {
let batch = Self::maybe_flush_batch(batch, &self.factories, || {
self.merger.state.lock().unwrap().get_filters()
});
if self.insert_without_blocking(batch) {
self.merger.backpressure_wait().await;
}
}
fn clear_dirty_flag(&mut self) {
self.dirty = false;
}
fn dirty(&self) -> bool {
self.dirty
}
fn retain_keys(&mut self, filter: Filter<Self::Key>) {
self.merger.set_key_filter(&filter);
self.key_filter = Some(filter);
}
fn retain_values(&mut self, filter: GroupFilter<Self::Val>) {
self.merger.set_value_filter(&filter);
self.value_filter = Some(filter);
}
fn key_filter(&self) -> &Option<Filter<Self::Key>> {
&self.key_filter
}
fn value_filter(&self) -> &Option<GroupFilter<Self::Val>> {
&self.value_filter
}
fn save(
&mut self,
base: &StoragePath,
persistent_id: &str,
files: &mut Vec<Arc<dyn FileCommitter>>,
) -> Result<(), Error> {
fn persist_batches<B>(batches: Vec<Arc<B>>) -> Vec<Arc<B>>
where
B: Batch,
{
batches
.into_iter()
.map(|batch| {
if let Some(persisted) = batch.persisted() {
Arc::new(persisted)
} else {
batch
}
})
.collect::<Vec<_>>()
}
let (not_merging, merging) = self.merger.pause_new_merges();
let not_merging = persist_batches(not_merging);
self.merger.resume(not_merging.iter().cloned());
let merging = persist_batches(merging);
let ids = not_merging
.iter()
.chain(merging.iter())
.map(|batch| {
let file = batch
.file_reader()
.expect("The batch should have been persisted");
let path = file.path().to_string();
files.push(file);
path
})
.collect::<Vec<_>>();
let backend = Runtime::storage_backend().unwrap();
let committed: CommittedSpine = (ids, self as &Self).into();
let as_bytes = to_bytes(&committed).expect("Serializing CommittedSpine should work.");
let pspine_writer = backend.write(&Self::checkpoint_file(base, persistent_id), as_bytes)?;
files.push(pspine_writer);
let pspine_batches = PSpineBatches {
files: committed.batches,
};
backend
.write_json(&self.batchlist_file(base, persistent_id), &pspine_batches)
.and_then(|reader| reader.commit())?;
Ok(())
}
fn restore(&mut self, base: &StoragePath, persistent_id: &str) -> Result<(), Error> {
let pspine_path = Self::checkpoint_file(base, persistent_id);
let content = Runtime::storage_backend().unwrap().read(&pspine_path)?;
let archived = rkyv::check_archived_root::<CommittedSpine>(&content).map_err(|e| {
crate::circuit::checkpointer::checkpoint_invalid_data_error(
"Spine checkpoint validation failed",
format!("{pspine_path}: {e}"),
)
})?;
let committed: CommittedSpine = archived
.deserialize(&mut Deserializer::default())
.map_err(|e| {
crate::circuit::checkpointer::checkpoint_invalid_data_error(
"Spine checkpoint deserialize failed",
format!("{pspine_path}: {e:?}"),
)
})?;
self.dirty = committed.dirty;
self.key_filter = None;
self.value_filter = None;
for batch in committed.batches {
let batch =
B::from_path(&self.factories.clone(), &batch.clone().into()).map_err(|error| {
crate::circuit::checkpointer::checkpoint_invalid_data_error(
"Spine batch read failed",
format!("{batch}: {error}"),
)
})?;
self.insert_without_blocking(batch);
}
Ok(())
}
fn metadata(&self, meta: &mut OperatorMeta) {
self.merger.metadata(meta);
}
fn initiate_compaction(&self) {
self.merger.initiate_compaction();
}
}
impl<B> Spine<B>
where
B: Batch,
{
fn size_to_level(batch: &B, max_level0_batch_size_records: usize, merge: bool) -> usize {
debug_assert_eq!(MAX_LEVELS, 9);
debug_assert!(max_level0_batch_size_records > 0 && max_level0_batch_size_records <= 99_999);
let len = batch.len();
let effective_len = if merge {
let negative_weight_count = batch.negative_weight_count().unwrap_or(0) as usize;
len + negative_weight_count * (negative_weight_multiplier() as usize)
} else {
len
};
if effective_len <= max_level0_batch_size_records {
return 0;
}
match effective_len {
0..=99_999 => 1,
100_000..=999_999 => 2,
1_000_000..=9_999_999 => 3,
10_000_000..=99_999_999 => 4,
100_000_000..=999_999_999 => 5,
1_000_000_000..=9_999_999_999 => 6,
10_000_000_000..=99_999_999_999 => 7,
_ => 8, }
}
pub fn with_runtime(runtime: Runtime, worker_index: usize, factories: &B::Factories) -> Self {
Spine {
factories: factories.clone(),
dirty: false,
key_filter: None,
value_filter: None,
merger: AsyncMerger::new(runtime, worker_index, factories),
}
}
pub fn complete_merges(&mut self) {
let batches = self.merger.pause();
let batch = merge_batches(
&self.factories,
batches.into_iter().map(|b| Arc::unwrap_or_clone(b)),
&self.key_filter,
&self.value_filter,
);
self.merger.resume(vec![Arc::new(batch)]);
}
pub fn maybe_flush_batch<F>(
batch: impl Into<Arc<B>>,
factories: &B::Factories,
filters: F,
) -> Arc<B>
where
F: FnOnce() -> (Option<Filter<B::Key>>, Option<GroupFilter<B::Val>>),
{
let batch = batch.into();
if batch.location() == BatchLocation::Memory
&& pick_insert_destination(&batch) == BatchLocation::Storage
{
let _span = Span::new("eager spill")
.with_category("Spine")
.with_tooltip(|| {
format!(
"Eagerly spilling {} batch with {} keys and {} values",
HumanBytes::from(batch.approximate_byte_size()),
batch.key_count(),
batch.len()
)
});
match Arc::try_unwrap(batch) {
Ok(mut batch) => {
let builder =
B::Builder::for_merge(factories, [&batch], Some(BatchLocation::Storage));
let (key_filter, value_filter) = filters();
Arc::new(ListMerger::merge(
factories,
builder,
vec![batch.consuming_cursor(key_filter, value_filter)],
))
}
Err(batch) => {
let batch_ref: &B = &batch;
let builder =
B::Builder::for_merge(factories, [batch_ref], Some(BatchLocation::Storage));
let (key_filter, value_filter) = filters();
Arc::new(ListMerger::merge(
factories,
builder,
vec![batch.merge_cursor(key_filter, value_filter)],
))
}
}
} else {
batch
}
}
pub fn insert_without_blocking(&mut self, batch: impl Into<Arc<B>>) -> bool {
let batch = batch.into();
if !batch.is_empty() {
self.dirty = true;
self.merger.add_batch(batch, false)
} else {
false
}
}
pub async fn backpressure_wait(&self) {
self.merger.backpressure_wait().await;
}
}
impl<B: Batch> WithSnapshot for Spine<B>
where
B: Batch,
{
type Batch = B;
fn ro_snapshot(&self) -> SpineSnapshot<B> {
self.into()
}
}