use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::sync::mpsc::{Receiver, TryRecvError};
use crate::committer::{
CommitterHandles, Work, committer_dead, drain_buffer_with_error, run_protected,
unwrap_protected,
};
use crate::error::GraphResult;
use crate::write_txn::{
AppendedCommit, CommitOutcome, SealedCommit, append_sealed, flush_durables, publish_appended,
};
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub enum CommitBatching {
#[default]
Off,
On {
max_commits: std::num::NonZeroUsize,
max_bytes: u64,
},
}
impl CommitBatching {
pub const DEFAULT_ON: Self = Self::On {
max_commits: match std::num::NonZeroUsize::new(64) {
Some(value) => value,
None => unreachable!(),
},
max_bytes: 8 * 1024 * 1024,
};
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct BatchLimits {
pub(crate) max_commits: usize,
pub(crate) max_bytes: u64,
}
impl BatchLimits {
pub(crate) fn resolve(batching: CommitBatching) -> Self {
match batching {
CommitBatching::Off => Self {
max_commits: 1,
max_bytes: u64::MAX,
},
CommitBatching::On {
max_commits,
max_bytes,
} => Self {
max_commits: max_commits.get(),
max_bytes,
},
}
}
}
pub(crate) fn encoded_estimate(sealed: &SealedCommit) -> u64 {
const PER_COMMIT_HEADER_BYTES: u64 = 64;
let changes = sealed.changes.len() as u64;
PER_COMMIT_HEADER_BYTES.saturating_add(changes.saturating_mul(PER_CHANGE_ESTIMATE_BYTES))
}
const PER_CHANGE_ESTIMATE_BYTES: u64 = 256;
pub(crate) enum BatchDrain {
Run {
batch: Vec<AppendedCommit>,
},
AppendFailed {
appended: Vec<AppendedCommit>,
},
}
pub(crate) fn drain_contiguous_batch(
receiver: &Receiver<Work>,
reorder: &mut BTreeMap<u64, Work>,
next_publish_seq: &mut u64,
limits: BatchLimits,
handles: &CommitterHandles,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
) -> BatchDrain {
let mut batch: Vec<AppendedCommit> = Vec::new();
let mut batch_bytes: u64 = 0;
loop {
if !reorder.contains_key(next_publish_seq) {
refill_until_contiguous(receiver, reorder, *next_publish_seq);
}
match reorder.get(next_publish_seq) {
None => return BatchDrain::Run { batch },
Some(Work::Compact { .. } | Work::VectorIndexRebuild { .. }) => {
return BatchDrain::Run { batch };
}
Some(Work::Commit { sealed, .. }) => {
let estimate = encoded_estimate(sealed);
if !batch.is_empty()
&& (batch.len() >= limits.max_commits
|| batch_bytes.saturating_add(estimate) > limits.max_bytes)
{
return BatchDrain::Run { batch };
}
let Some(Work::Commit { sealed, reply }) = reorder.remove(next_publish_seq) else {
unreachable!("checked Work::Commit at next_publish_seq above");
};
*next_publish_seq += 1;
match run_protected(|| append_sealed(sealed, &handles.durable_providers)) {
Ok(Ok(mut appended)) => {
appended.reply = Some(reply);
batch_bytes = batch_bytes.saturating_add(estimate);
batch.push(appended);
}
failed => {
let error = match unwrap_protected(failed, poisoned) {
Ok(_appended) => unreachable!("append-failure arm is never Ok"),
Err(error) => error,
};
let _: Result<(), std::sync::mpsc::SendError<GraphResult<CommitOutcome>>> =
reply.send(Err(error));
return BatchDrain::AppendFailed { appended: batch };
}
}
}
}
}
}
fn refill_until_contiguous(
receiver: &Receiver<Work>,
reorder: &mut BTreeMap<u64, Work>,
next: u64,
) {
loop {
match receiver.try_recv() {
Ok(work) => {
let seq = work.seal_seq();
reorder.insert(seq, work);
if seq == next {
return;
}
}
Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => return,
}
}
}
pub(crate) fn flush_and_publish_batch(
batch: Vec<AppendedCommit>,
reorder: &mut BTreeMap<u64, Work>,
handles: &CommitterHandles,
poisoned: &Arc<std::sync::atomic::AtomicBool>,
) -> bool {
if batch.is_empty() {
return false;
}
match run_protected(|| flush_durables(&handles.durable_providers)) {
Ok(Ok(())) => {}
failed => {
let _ = unwrap_protected(failed, poisoned);
ack_appended_with_error(batch);
drain_buffer_with_error(reorder);
return true;
}
}
let mut batch = batch.into_iter();
while let Some(mut appended) = batch.next() {
let reply = appended.reply.take();
let result = unwrap_protected(
run_protected(|| {
Ok(publish_appended(
appended,
&handles.snapshot,
&handles.schema_version,
&handles.providers,
))
}),
poisoned,
);
if let Some(reply) = reply {
let _ = reply.send(result);
}
if poisoned.load(Ordering::Acquire) {
ack_appended_with_error(batch.collect());
drain_buffer_with_error(reorder);
return true;
}
}
false
}
pub(crate) fn ack_appended_with_error(batch: Vec<AppendedCommit>) {
for appended in batch {
if let Some(reply) = appended.reply {
let _: Result<(), std::sync::mpsc::SendError<GraphResult<CommitOutcome>>> =
reply.send(Err(committer_dead()));
}
}
}
#[cfg(test)]
#[path = "committer_batch_tests.rs"]
mod tests;