use std::{
sync::atomic::{AtomicU64, Ordering},
time::Duration,
};
use crate::{
manifest::SstEntry,
observability::{log_debug, log_info, log_warn},
ondisk::sstable::SsTableDescriptor,
};
#[derive(Debug, Clone, Copy)]
pub struct CompactionIoStats {
pub bytes: u64,
pub rows: u64,
pub tombstones: u64,
pub complete: bool,
}
impl Default for CompactionIoStats {
fn default() -> Self {
Self {
bytes: 0,
rows: 0,
tombstones: 0,
complete: true,
}
}
}
impl CompactionIoStats {
pub(crate) fn from_descriptors(descriptors: &[SsTableDescriptor]) -> Self {
let mut stats = Self {
bytes: 0,
rows: 0,
tombstones: 0,
complete: true,
};
for desc in descriptors {
let Some(sst_stats) = desc.stats() else {
stats.complete = false;
continue;
};
stats.bytes = stats.bytes.saturating_add(sst_stats.bytes as u64);
stats.rows = stats.rows.saturating_add(sst_stats.rows as u64);
stats.tombstones = stats.tombstones.saturating_add(sst_stats.tombstones as u64);
}
stats
}
pub(crate) fn from_entries(entries: &[SstEntry]) -> Self {
let mut stats = Self {
bytes: 0,
rows: 0,
tombstones: 0,
complete: true,
};
for entry in entries {
let Some(sst_stats) = entry.stats() else {
stats.complete = false;
continue;
};
stats.bytes = stats.bytes.saturating_add(sst_stats.bytes as u64);
stats.rows = stats.rows.saturating_add(sst_stats.rows as u64);
stats.tombstones = stats.tombstones.saturating_add(sst_stats.tombstones as u64);
}
stats
}
}
#[derive(Debug, Clone, Copy)]
pub struct CompactionJobSnapshot {
pub source_level: usize,
pub target_level: usize,
pub input_sst_count: usize,
pub output_sst_count: usize,
pub input: CompactionIoStats,
pub output: CompactionIoStats,
pub duration_ms: u64,
pub cas_retries: u64,
pub cas_aborted: bool,
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy)]
pub struct CompactionMetricsSnapshot {
pub job_count: u64,
pub job_failures: u64,
pub cas_retries: u64,
pub cas_aborts: u64,
pub queue_drops_planner_full: u64,
pub queue_drops_planner_closed: u64,
pub queue_drops_cascade_full: u64,
pub queue_drops_cascade_closed: u64,
pub cascades_scheduled: u64,
pub cascades_blocked_cooldown: u64,
pub cascades_blocked_budget: u64,
pub backpressure_slowdown: u64,
pub backpressure_stall: u64,
pub trigger_kick: u64,
pub trigger_periodic: u64,
pub bytes_in: u64,
pub bytes_out: u64,
pub rows_in: u64,
pub rows_out: u64,
pub tombstones_in: u64,
pub tombstones_out: u64,
pub duration_ms_total: u64,
pub sst_sweep_runs: u64,
pub sst_deleted_objects: u64,
pub sst_deleted_bytes: u64,
pub sst_sweep_duration_ms_total: u64,
pub sst_delete_failures: u64,
pub gc_plan_write_runs: u64,
pub gc_plan_overwrite_non_empty: u64,
pub gc_plan_previous_sst_candidates: u64,
pub gc_plan_previous_wal_candidates: u64,
pub gc_plan_written_sst_candidates: u64,
pub gc_plan_written_wal_candidates: u64,
pub gc_plan_take_runs: u64,
pub gc_plan_taken_sst_candidates: u64,
pub gc_plan_authorized_sst_candidates: u64,
pub gc_plan_blocked_sst_candidates: u64,
pub gc_plan_requeued_sst_candidates: u64,
pub last_job: Option<CompactionJobSnapshot>,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct SstSweepSummary {
pub deleted_objects: u64,
pub deleted_bytes: u64,
pub delete_failures: u64,
pub duration_ms: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SstGcStatus {
pub staged_sst_candidates: u64,
pub authorized_sst_candidates: u64,
pub blocked_sst_candidates: u64,
pub obsolete_wal_segments: u64,
pub protected_versions: u64,
pub active_snapshot_versions: u64,
pub protected_sst_objects: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SstGcCandidateInspection {
pub sst_id: u64,
pub level: u32,
pub data_path: String,
pub delete_path: Option<String>,
pub authorized: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SstGcInspection {
pub staged_sst_candidates: u64,
pub authorized_sst_candidates: u64,
pub blocked_sst_candidates: u64,
pub obsolete_wal_segments: u64,
pub protected_versions: u64,
pub active_snapshot_versions: u64,
pub protected_sst_objects: u64,
pub candidates: Vec<SstGcCandidateInspection>,
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum CompactionTriggerReason {
Kick,
Periodic,
}
impl CompactionTriggerReason {
pub(crate) const fn as_str(self) -> &'static str {
match self {
CompactionTriggerReason::Kick => "kick",
CompactionTriggerReason::Periodic => "periodic",
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum CompactionQueueDropReason {
Full,
Closed,
}
impl CompactionQueueDropReason {
pub(crate) const fn as_str(self) -> &'static str {
match self {
CompactionQueueDropReason::Full => "full",
CompactionQueueDropReason::Closed => "closed",
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum CompactionQueueDropContext {
Planner,
Cascade,
}
impl CompactionQueueDropContext {
pub(crate) const fn as_str(self) -> &'static str {
match self {
CompactionQueueDropContext::Planner => "planner",
CompactionQueueDropContext::Cascade => "cascade",
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum CompactionCascadeDecision {
Scheduled,
BlockedCooldown,
BlockedBudget,
}
impl CompactionCascadeDecision {
pub(crate) const fn as_str(self) -> &'static str {
match self {
CompactionCascadeDecision::Scheduled => "scheduled",
CompactionCascadeDecision::BlockedCooldown => "blocked_cooldown",
CompactionCascadeDecision::BlockedBudget => "blocked_budget",
}
}
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum CompactionBackpressureSignal {
Slowdown,
Stall,
}
impl CompactionBackpressureSignal {
pub(crate) const fn as_str(self) -> &'static str {
match self {
CompactionBackpressureSignal::Slowdown => "slowdown",
CompactionBackpressureSignal::Stall => "stall",
}
}
}
#[derive(Debug)]
pub struct CompactionMetrics {
emit_logs: bool,
job_count: AtomicU64,
job_failures: AtomicU64,
cas_retries: AtomicU64,
cas_aborts: AtomicU64,
queue_drops_planner_full: AtomicU64,
queue_drops_planner_closed: AtomicU64,
queue_drops_cascade_full: AtomicU64,
queue_drops_cascade_closed: AtomicU64,
cascades_scheduled: AtomicU64,
cascades_blocked_cooldown: AtomicU64,
cascades_blocked_budget: AtomicU64,
backpressure_slowdown: AtomicU64,
backpressure_stall: AtomicU64,
trigger_kick: AtomicU64,
trigger_periodic: AtomicU64,
bytes_in: AtomicU64,
bytes_out: AtomicU64,
rows_in: AtomicU64,
rows_out: AtomicU64,
tombstones_in: AtomicU64,
tombstones_out: AtomicU64,
duration_ms_total: AtomicU64,
sst_sweep_runs: AtomicU64,
sst_deleted_objects: AtomicU64,
sst_deleted_bytes: AtomicU64,
sst_sweep_duration_ms_total: AtomicU64,
sst_delete_failures: AtomicU64,
gc_plan_write_runs: AtomicU64,
gc_plan_overwrite_non_empty: AtomicU64,
gc_plan_previous_sst_candidates: AtomicU64,
gc_plan_previous_wal_candidates: AtomicU64,
gc_plan_written_sst_candidates: AtomicU64,
gc_plan_written_wal_candidates: AtomicU64,
gc_plan_take_runs: AtomicU64,
gc_plan_taken_sst_candidates: AtomicU64,
gc_plan_authorized_sst_candidates: AtomicU64,
gc_plan_blocked_sst_candidates: AtomicU64,
gc_plan_requeued_sst_candidates: AtomicU64,
last_job_present: AtomicU64,
last_job_source_level: AtomicU64,
last_job_target_level: AtomicU64,
last_job_input_sst_count: AtomicU64,
last_job_output_sst_count: AtomicU64,
last_job_input_bytes: AtomicU64,
last_job_output_bytes: AtomicU64,
last_job_input_rows: AtomicU64,
last_job_output_rows: AtomicU64,
last_job_input_tombstones: AtomicU64,
last_job_output_tombstones: AtomicU64,
last_job_input_complete: AtomicU64,
last_job_output_complete: AtomicU64,
last_job_duration_ms: AtomicU64,
last_job_cas_retries: AtomicU64,
last_job_cas_aborted: AtomicU64,
}
impl Default for CompactionMetrics {
fn default() -> Self {
Self::new()
}
}
impl CompactionMetrics {
pub fn new() -> Self {
Self::with_logs(false)
}
#[allow(dead_code)]
pub fn with_job_logging() -> Self {
Self::with_logs(true)
}
fn with_logs(emit_logs: bool) -> Self {
Self {
emit_logs,
job_count: AtomicU64::new(0),
job_failures: AtomicU64::new(0),
cas_retries: AtomicU64::new(0),
cas_aborts: AtomicU64::new(0),
queue_drops_planner_full: AtomicU64::new(0),
queue_drops_planner_closed: AtomicU64::new(0),
queue_drops_cascade_full: AtomicU64::new(0),
queue_drops_cascade_closed: AtomicU64::new(0),
cascades_scheduled: AtomicU64::new(0),
cascades_blocked_cooldown: AtomicU64::new(0),
cascades_blocked_budget: AtomicU64::new(0),
backpressure_slowdown: AtomicU64::new(0),
backpressure_stall: AtomicU64::new(0),
trigger_kick: AtomicU64::new(0),
trigger_periodic: AtomicU64::new(0),
bytes_in: AtomicU64::new(0),
bytes_out: AtomicU64::new(0),
rows_in: AtomicU64::new(0),
rows_out: AtomicU64::new(0),
tombstones_in: AtomicU64::new(0),
tombstones_out: AtomicU64::new(0),
duration_ms_total: AtomicU64::new(0),
sst_sweep_runs: AtomicU64::new(0),
sst_deleted_objects: AtomicU64::new(0),
sst_deleted_bytes: AtomicU64::new(0),
sst_sweep_duration_ms_total: AtomicU64::new(0),
sst_delete_failures: AtomicU64::new(0),
gc_plan_write_runs: AtomicU64::new(0),
gc_plan_overwrite_non_empty: AtomicU64::new(0),
gc_plan_previous_sst_candidates: AtomicU64::new(0),
gc_plan_previous_wal_candidates: AtomicU64::new(0),
gc_plan_written_sst_candidates: AtomicU64::new(0),
gc_plan_written_wal_candidates: AtomicU64::new(0),
gc_plan_take_runs: AtomicU64::new(0),
gc_plan_taken_sst_candidates: AtomicU64::new(0),
gc_plan_authorized_sst_candidates: AtomicU64::new(0),
gc_plan_blocked_sst_candidates: AtomicU64::new(0),
gc_plan_requeued_sst_candidates: AtomicU64::new(0),
last_job_present: AtomicU64::new(0),
last_job_source_level: AtomicU64::new(0),
last_job_target_level: AtomicU64::new(0),
last_job_input_sst_count: AtomicU64::new(0),
last_job_output_sst_count: AtomicU64::new(0),
last_job_input_bytes: AtomicU64::new(0),
last_job_output_bytes: AtomicU64::new(0),
last_job_input_rows: AtomicU64::new(0),
last_job_output_rows: AtomicU64::new(0),
last_job_input_tombstones: AtomicU64::new(0),
last_job_output_tombstones: AtomicU64::new(0),
last_job_input_complete: AtomicU64::new(0),
last_job_output_complete: AtomicU64::new(0),
last_job_duration_ms: AtomicU64::new(0),
last_job_cas_retries: AtomicU64::new(0),
last_job_cas_aborted: AtomicU64::new(0),
}
}
#[allow(dead_code)]
pub fn snapshot(&self) -> CompactionMetricsSnapshot {
let present = self.last_job_present.load(Ordering::Relaxed) != 0;
let last_job = if present {
Some(CompactionJobSnapshot {
source_level: self.last_job_source_level.load(Ordering::Relaxed) as usize,
target_level: self.last_job_target_level.load(Ordering::Relaxed) as usize,
input_sst_count: self.last_job_input_sst_count.load(Ordering::Relaxed) as usize,
output_sst_count: self.last_job_output_sst_count.load(Ordering::Relaxed) as usize,
input: CompactionIoStats {
bytes: self.last_job_input_bytes.load(Ordering::Relaxed),
rows: self.last_job_input_rows.load(Ordering::Relaxed),
tombstones: self.last_job_input_tombstones.load(Ordering::Relaxed),
complete: self.last_job_input_complete.load(Ordering::Relaxed) != 0,
},
output: CompactionIoStats {
bytes: self.last_job_output_bytes.load(Ordering::Relaxed),
rows: self.last_job_output_rows.load(Ordering::Relaxed),
tombstones: self.last_job_output_tombstones.load(Ordering::Relaxed),
complete: self.last_job_output_complete.load(Ordering::Relaxed) != 0,
},
duration_ms: self.last_job_duration_ms.load(Ordering::Relaxed),
cas_retries: self.last_job_cas_retries.load(Ordering::Relaxed),
cas_aborted: self.last_job_cas_aborted.load(Ordering::Relaxed) != 0,
})
} else {
None
};
CompactionMetricsSnapshot {
job_count: self.job_count.load(Ordering::Relaxed),
job_failures: self.job_failures.load(Ordering::Relaxed),
cas_retries: self.cas_retries.load(Ordering::Relaxed),
cas_aborts: self.cas_aborts.load(Ordering::Relaxed),
queue_drops_planner_full: self.queue_drops_planner_full.load(Ordering::Relaxed),
queue_drops_planner_closed: self.queue_drops_planner_closed.load(Ordering::Relaxed),
queue_drops_cascade_full: self.queue_drops_cascade_full.load(Ordering::Relaxed),
queue_drops_cascade_closed: self.queue_drops_cascade_closed.load(Ordering::Relaxed),
cascades_scheduled: self.cascades_scheduled.load(Ordering::Relaxed),
cascades_blocked_cooldown: self.cascades_blocked_cooldown.load(Ordering::Relaxed),
cascades_blocked_budget: self.cascades_blocked_budget.load(Ordering::Relaxed),
backpressure_slowdown: self.backpressure_slowdown.load(Ordering::Relaxed),
backpressure_stall: self.backpressure_stall.load(Ordering::Relaxed),
trigger_kick: self.trigger_kick.load(Ordering::Relaxed),
trigger_periodic: self.trigger_periodic.load(Ordering::Relaxed),
bytes_in: self.bytes_in.load(Ordering::Relaxed),
bytes_out: self.bytes_out.load(Ordering::Relaxed),
rows_in: self.rows_in.load(Ordering::Relaxed),
rows_out: self.rows_out.load(Ordering::Relaxed),
tombstones_in: self.tombstones_in.load(Ordering::Relaxed),
tombstones_out: self.tombstones_out.load(Ordering::Relaxed),
duration_ms_total: self.duration_ms_total.load(Ordering::Relaxed),
sst_sweep_runs: self.sst_sweep_runs.load(Ordering::Relaxed),
sst_deleted_objects: self.sst_deleted_objects.load(Ordering::Relaxed),
sst_deleted_bytes: self.sst_deleted_bytes.load(Ordering::Relaxed),
sst_sweep_duration_ms_total: self.sst_sweep_duration_ms_total.load(Ordering::Relaxed),
sst_delete_failures: self.sst_delete_failures.load(Ordering::Relaxed),
gc_plan_write_runs: self.gc_plan_write_runs.load(Ordering::Relaxed),
gc_plan_overwrite_non_empty: self.gc_plan_overwrite_non_empty.load(Ordering::Relaxed),
gc_plan_previous_sst_candidates: self
.gc_plan_previous_sst_candidates
.load(Ordering::Relaxed),
gc_plan_previous_wal_candidates: self
.gc_plan_previous_wal_candidates
.load(Ordering::Relaxed),
gc_plan_written_sst_candidates: self
.gc_plan_written_sst_candidates
.load(Ordering::Relaxed),
gc_plan_written_wal_candidates: self
.gc_plan_written_wal_candidates
.load(Ordering::Relaxed),
gc_plan_take_runs: self.gc_plan_take_runs.load(Ordering::Relaxed),
gc_plan_taken_sst_candidates: self.gc_plan_taken_sst_candidates.load(Ordering::Relaxed),
gc_plan_authorized_sst_candidates: self
.gc_plan_authorized_sst_candidates
.load(Ordering::Relaxed),
gc_plan_blocked_sst_candidates: self
.gc_plan_blocked_sst_candidates
.load(Ordering::Relaxed),
gc_plan_requeued_sst_candidates: self
.gc_plan_requeued_sst_candidates
.load(Ordering::Relaxed),
last_job,
}
}
pub(crate) fn record_trigger(&self, trigger: CompactionTriggerReason) {
match trigger {
CompactionTriggerReason::Kick => {
add_saturating(&self.trigger_kick, 1);
}
CompactionTriggerReason::Periodic => {
add_saturating(&self.trigger_periodic, 1);
}
}
if self.emit_logs {
log_debug!(
component = "compaction",
event = "compaction_trigger",
reason = trigger.as_str(),
);
}
}
pub(crate) fn record_queue_drop(
&self,
context: CompactionQueueDropContext,
reason: CompactionQueueDropReason,
) {
match (context, reason) {
(CompactionQueueDropContext::Planner, CompactionQueueDropReason::Full) => {
add_saturating(&self.queue_drops_planner_full, 1);
}
(CompactionQueueDropContext::Planner, CompactionQueueDropReason::Closed) => {
add_saturating(&self.queue_drops_planner_closed, 1);
}
(CompactionQueueDropContext::Cascade, CompactionQueueDropReason::Full) => {
add_saturating(&self.queue_drops_cascade_full, 1);
}
(CompactionQueueDropContext::Cascade, CompactionQueueDropReason::Closed) => {
add_saturating(&self.queue_drops_cascade_closed, 1);
}
}
if self.emit_logs {
log_warn!(
component = "compaction",
event = "compaction_queue_drop",
context = context.as_str(),
reason = reason.as_str(),
);
}
}
pub(crate) fn record_cascade(&self, decision: CompactionCascadeDecision) {
match decision {
CompactionCascadeDecision::Scheduled => {
add_saturating(&self.cascades_scheduled, 1);
}
CompactionCascadeDecision::BlockedCooldown => {
add_saturating(&self.cascades_blocked_cooldown, 1);
}
CompactionCascadeDecision::BlockedBudget => {
add_saturating(&self.cascades_blocked_budget, 1);
}
}
if self.emit_logs {
log_info!(
component = "compaction",
event = "compaction_cascade_decision",
decision = decision.as_str(),
);
}
}
pub(crate) fn record_backpressure(
&self,
signal: CompactionBackpressureSignal,
delay: Duration,
) {
match signal {
CompactionBackpressureSignal::Slowdown => {
add_saturating(&self.backpressure_slowdown, 1);
}
CompactionBackpressureSignal::Stall => {
add_saturating(&self.backpressure_stall, 1);
}
}
if self.emit_logs {
let delay_ms = duration_ms(delay);
match signal {
CompactionBackpressureSignal::Slowdown => {
log_info!(
component = "compaction",
event = "compaction_backpressure",
signal = signal.as_str(),
delay_ms,
);
}
CompactionBackpressureSignal::Stall => {
log_warn!(
component = "compaction",
event = "compaction_backpressure",
signal = signal.as_str(),
delay_ms,
);
}
}
}
}
pub(crate) fn record_job_success(&self, job: CompactionJobSnapshot) {
add_saturating(&self.job_count, 1);
add_saturating(&self.cas_retries, job.cas_retries);
add_saturating(&self.duration_ms_total, job.duration_ms);
if job.input.complete {
add_saturating(&self.bytes_in, job.input.bytes);
add_saturating(&self.rows_in, job.input.rows);
add_saturating(&self.tombstones_in, job.input.tombstones);
}
if job.output.complete {
add_saturating(&self.bytes_out, job.output.bytes);
add_saturating(&self.rows_out, job.output.rows);
add_saturating(&self.tombstones_out, job.output.tombstones);
}
self.set_last_job(job);
if self.emit_logs {
self.log_job("completed", job);
}
}
pub(crate) fn record_job_abort(&self, job: CompactionJobSnapshot) {
add_saturating(&self.job_failures, 1);
add_saturating(&self.cas_retries, job.cas_retries);
if job.cas_aborted {
add_saturating(&self.cas_aborts, 1);
}
self.set_last_job(job);
if self.emit_logs {
self.log_job("aborted", job);
}
}
pub(crate) fn record_sst_sweep(&self, sweep: SstSweepSummary) {
add_saturating(&self.sst_sweep_runs, 1);
add_saturating(&self.sst_deleted_objects, sweep.deleted_objects);
add_saturating(&self.sst_deleted_bytes, sweep.deleted_bytes);
add_saturating(&self.sst_sweep_duration_ms_total, sweep.duration_ms);
add_saturating(&self.sst_delete_failures, sweep.delete_failures);
if self.emit_logs {
log_info!(
component = "compaction",
event = "sst_sweep_summary",
deleted_objects = sweep.deleted_objects,
deleted_bytes = sweep.deleted_bytes,
delete_failures = sweep.delete_failures,
duration_ms = sweep.duration_ms,
);
}
}
pub(crate) fn record_gc_plan_write(
&self,
previous_sst_candidates: u64,
previous_wal_candidates: u64,
written_sst_candidates: u64,
written_wal_candidates: u64,
) {
add_saturating(&self.gc_plan_write_runs, 1);
if previous_sst_candidates > 0 || previous_wal_candidates > 0 {
add_saturating(&self.gc_plan_overwrite_non_empty, 1);
}
add_saturating(
&self.gc_plan_previous_sst_candidates,
previous_sst_candidates,
);
add_saturating(
&self.gc_plan_previous_wal_candidates,
previous_wal_candidates,
);
add_saturating(&self.gc_plan_written_sst_candidates, written_sst_candidates);
add_saturating(&self.gc_plan_written_wal_candidates, written_wal_candidates);
}
pub(crate) fn record_gc_plan_take(
&self,
staged_sst_candidates: u64,
authorized_sst_candidates: u64,
blocked_sst_candidates: u64,
requeued_sst_candidates: u64,
) {
add_saturating(&self.gc_plan_take_runs, 1);
add_saturating(&self.gc_plan_taken_sst_candidates, staged_sst_candidates);
add_saturating(
&self.gc_plan_authorized_sst_candidates,
authorized_sst_candidates,
);
add_saturating(&self.gc_plan_blocked_sst_candidates, blocked_sst_candidates);
add_saturating(
&self.gc_plan_requeued_sst_candidates,
requeued_sst_candidates,
);
}
fn set_last_job(&self, job: CompactionJobSnapshot) {
self.last_job_present.store(1, Ordering::Relaxed);
self.last_job_source_level
.store(job.source_level as u64, Ordering::Relaxed);
self.last_job_target_level
.store(job.target_level as u64, Ordering::Relaxed);
self.last_job_input_sst_count
.store(job.input_sst_count as u64, Ordering::Relaxed);
self.last_job_output_sst_count
.store(job.output_sst_count as u64, Ordering::Relaxed);
self.last_job_input_bytes
.store(job.input.bytes, Ordering::Relaxed);
self.last_job_output_bytes
.store(job.output.bytes, Ordering::Relaxed);
self.last_job_input_rows
.store(job.input.rows, Ordering::Relaxed);
self.last_job_output_rows
.store(job.output.rows, Ordering::Relaxed);
self.last_job_input_tombstones
.store(job.input.tombstones, Ordering::Relaxed);
self.last_job_output_tombstones
.store(job.output.tombstones, Ordering::Relaxed);
self.last_job_input_complete
.store(u64::from(job.input.complete), Ordering::Relaxed);
self.last_job_output_complete
.store(u64::from(job.output.complete), Ordering::Relaxed);
self.last_job_duration_ms
.store(job.duration_ms, Ordering::Relaxed);
self.last_job_cas_retries
.store(job.cas_retries, Ordering::Relaxed);
self.last_job_cas_aborted
.store(u64::from(job.cas_aborted), Ordering::Relaxed);
}
fn log_job(&self, status: &str, job: CompactionJobSnapshot) {
log_info!(
component = "compaction",
event = "compaction_job_summary",
status,
source_level = job.source_level,
target_level = job.target_level,
input_count = job.input_sst_count,
output_count = job.output_sst_count,
input_bytes = job.input.bytes,
output_bytes = job.output.bytes,
input_rows = job.input.rows,
output_rows = job.output.rows,
input_tombstones = job.input.tombstones,
output_tombstones = job.output.tombstones,
duration_ms = job.duration_ms,
cas_retries = job.cas_retries,
cas_aborted = job.cas_aborted,
input_stats_complete = job.input.complete,
output_stats_complete = job.output.complete,
);
}
}
fn add_saturating(counter: &AtomicU64, delta: u64) {
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
Some(current.saturating_add(delta))
});
}
fn duration_ms(duration: Duration) -> u64 {
duration.as_millis().try_into().unwrap_or(u64::MAX)
}