use crate::prelude::*;
use std::{
ops::AddAssign,
sync::atomic::{AtomicI64, Ordering::Relaxed},
};
#[derive(Default, Serialize)]
pub struct Counter(pub AtomicI64);
impl Counter {
pub fn inc(&self, by: i64) {
self.0.fetch_add(by, Relaxed);
}
pub fn get(&self) -> i64 {
self.0.load(Relaxed)
}
#[cfg(feature = "persistence")]
pub fn delta(&self, base: &Self) -> Counter {
Counter(AtomicI64::new(self.get() - base.get()))
}
pub fn into_inner(self) -> i64 {
self.0.into_inner()
}
#[cfg(feature = "persistence")]
pub fn merge(&self, delta: &Self) {
self.0.fetch_add(delta.get(), Relaxed);
}
}
impl AddAssign for Counter {
fn add_assign(&mut self, rhs: Self) {
self.0.fetch_add(rhs.into_inner(), Relaxed);
}
}
impl Clone for Counter {
fn clone(&self) -> Self {
Self(AtomicI64::new(self.get()))
}
}
impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.get())
}
}
impl std::fmt::Debug for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.get())
}
}
#[derive(Debug, Serialize, Default, Clone)]
pub struct ProcessingCounters {
pub num_starts: Counter,
pub num_ends: Counter,
}
impl ProcessingCounters {
pub fn start(&self, count: i64) {
self.num_starts.inc(count);
}
pub fn end(&self, count: i64) {
self.num_ends.inc(count);
}
pub fn get_in_process(&self) -> i64 {
let ends = self.num_ends.get();
let starts = self.num_starts.get();
starts - ends
}
#[cfg(feature = "persistence")]
pub fn delta(&self, base: &Self) -> Self {
ProcessingCounters {
num_starts: self.num_starts.delta(&base.num_starts),
num_ends: self.num_ends.delta(&base.num_ends),
}
}
#[cfg(feature = "persistence")]
pub fn merge(&self, delta: &Self) {
self.num_starts.merge(&delta.num_starts);
self.num_ends.merge(&delta.num_ends);
}
}
#[derive(Debug, Serialize, Default, Clone)]
#[cfg(feature = "persistence")]
pub struct UpdateStats {
pub num_no_change: Counter,
pub num_insertions: Counter,
pub num_deletions: Counter,
pub num_updates: Counter,
pub num_reprocesses: Counter,
pub num_errors: Counter,
pub processing: ProcessingCounters,
}
#[cfg(feature = "persistence")]
impl UpdateStats {
pub fn delta(&self, base: &Self) -> Self {
UpdateStats {
num_no_change: self.num_no_change.delta(&base.num_no_change),
num_insertions: self.num_insertions.delta(&base.num_insertions),
num_deletions: self.num_deletions.delta(&base.num_deletions),
num_updates: self.num_updates.delta(&base.num_updates),
num_reprocesses: self.num_reprocesses.delta(&base.num_reprocesses),
num_errors: self.num_errors.delta(&base.num_errors),
processing: self.processing.delta(&base.processing),
}
}
pub fn merge(&self, delta: &Self) {
self.num_no_change.merge(&delta.num_no_change);
self.num_insertions.merge(&delta.num_insertions);
self.num_deletions.merge(&delta.num_deletions);
self.num_updates.merge(&delta.num_updates);
self.num_reprocesses.merge(&delta.num_reprocesses);
self.num_errors.merge(&delta.num_errors);
self.processing.merge(&delta.processing);
}
pub fn has_any_change(&self) -> bool {
self.num_insertions.get() > 0
|| self.num_deletions.get() > 0
|| self.num_updates.get() > 0
|| self.num_reprocesses.get() > 0
|| self.num_errors.get() > 0
}
}
#[derive(Debug, Default)]
pub struct OperationInProcessStats {
operation_counters: std::sync::RwLock<std::collections::HashMap<String, ProcessingCounters>>,
}
impl OperationInProcessStats {
pub fn start_processing(&self, operation_name: &str, count: i64) {
let mut counters = self.operation_counters.write().unwrap();
let counter = counters.entry(operation_name.to_string()).or_default();
counter.start(count);
}
pub fn finish_processing(&self, operation_name: &str, count: i64) {
let counters = self.operation_counters.read().unwrap();
if let Some(counter) = counters.get(operation_name) {
counter.end(count);
}
}
pub fn get_operation_in_process_count(&self, operation_name: &str) -> i64 {
let counters = self.operation_counters.read().unwrap();
counters
.get(operation_name)
.map_or(0, |counter| counter.get_in_process())
}
pub fn get_all_operations_in_process(&self) -> std::collections::HashMap<String, i64> {
let counters = self.operation_counters.read().unwrap();
counters
.iter()
.map(|(name, counter)| (name.clone(), counter.get_in_process()))
.collect()
}
pub fn get_total_in_process_count(&self) -> i64 {
let counters = self.operation_counters.read().unwrap();
counters
.values()
.map(|counter| counter.get_in_process())
.sum()
}
}
#[cfg(feature = "persistence")]
struct UpdateStatsSegment {
count: i64,
label: &'static str,
}
#[cfg(feature = "persistence")]
impl UpdateStatsSegment {
pub fn new(count: i64, label: &'static str) -> Self {
Self { count, label }
}
}
#[cfg(feature = "persistence")]
const BAR_WIDTH: u64 = 40;
#[cfg(feature = "persistence")]
impl std::fmt::Display for UpdateStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let segments: [UpdateStatsSegment; _] = [
UpdateStatsSegment::new(self.num_insertions.get(), "added"),
UpdateStatsSegment::new(self.num_updates.get(), "updated"),
UpdateStatsSegment::new(self.num_reprocesses.get(), "reprocessed"),
UpdateStatsSegment::new(self.num_deletions.get(), "deleted"),
UpdateStatsSegment::new(self.num_no_change.get(), "no change"),
UpdateStatsSegment::new(self.num_errors.get(), "errors"),
];
let num_in_process = self.processing.get_in_process();
let processed_count = segments.iter().map(|seg| seg.count).sum::<i64>();
let total = num_in_process + processed_count;
if total <= 0 {
write!(f, "No input data")?;
return Ok(());
}
let processed_bar_width = (processed_count as u64 * BAR_WIDTH) / total as u64;
write!(f, "▕")?;
for _ in 0..processed_bar_width {
write!(f, "█")?; }
for _ in processed_bar_width..BAR_WIDTH {
write!(f, " ")?; }
write!(f, "▏{processed_count}/{total} source rows")?;
if processed_count > 0 {
let mut delimiter = ':';
for seg in segments.iter() {
if seg.count > 0 {
write!(
f,
"{delimiter} {count} {label}",
count = seg.count,
label = seg.label,
)?;
delimiter = ',';
}
}
}
Ok(())
}
}
#[derive(Debug, Serialize)]
#[cfg(feature = "persistence")]
pub struct SourceUpdateInfo {
pub source_name: String,
pub stats: UpdateStats,
}
#[cfg(feature = "persistence")]
impl std::fmt::Display for SourceUpdateInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.source_name, self.stats)
}
}
#[derive(Debug, Serialize)]
#[cfg(feature = "persistence")]
pub struct IndexUpdateInfo {
pub sources: Vec<SourceUpdateInfo>,
}
#[cfg(feature = "persistence")]
impl std::fmt::Display for IndexUpdateInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for source in self.sources.iter() {
writeln!(f, "{source}")?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
#[test]
fn test_processing_counters() {
let counters = ProcessingCounters::default();
assert_eq!(counters.get_in_process(), 0);
assert_eq!(counters.num_starts.get(), 0);
assert_eq!(counters.num_ends.get(), 0);
counters.start(5);
assert_eq!(counters.get_in_process(), 5);
assert_eq!(counters.num_starts.get(), 5);
assert_eq!(counters.num_ends.get(), 0);
counters.start(3);
assert_eq!(counters.get_in_process(), 8);
assert_eq!(counters.num_starts.get(), 8);
assert_eq!(counters.num_ends.get(), 0);
counters.end(2);
assert_eq!(counters.get_in_process(), 6);
assert_eq!(counters.num_starts.get(), 8);
assert_eq!(counters.num_ends.get(), 2);
counters.end(6);
assert_eq!(counters.get_in_process(), 0);
assert_eq!(counters.num_starts.get(), 8);
assert_eq!(counters.num_ends.get(), 8);
}
#[test]
fn test_processing_counters_delta_and_merge() {
let base = ProcessingCounters::default();
let current = ProcessingCounters::default();
base.start(5);
base.end(2);
current.start(12);
current.end(4);
let delta = current.delta(&base);
assert_eq!(delta.num_starts.get(), 7); assert_eq!(delta.num_ends.get(), 2); assert_eq!(delta.get_in_process(), 5);
let merged = ProcessingCounters::default();
merged.start(10);
merged.end(3);
merged.merge(&delta);
assert_eq!(merged.num_starts.get(), 17); assert_eq!(merged.num_ends.get(), 5); assert_eq!(merged.get_in_process(), 12); }
#[test]
fn test_update_stats_in_process_tracking() {
let stats = UpdateStats::default();
assert_eq!(stats.processing.get_in_process(), 0);
stats.processing.start(5);
assert_eq!(stats.processing.get_in_process(), 5);
stats.processing.start(3);
assert_eq!(stats.processing.get_in_process(), 8);
stats.processing.end(2);
assert_eq!(stats.processing.get_in_process(), 6);
stats.processing.end(6);
assert_eq!(stats.processing.get_in_process(), 0);
}
#[test]
fn test_update_stats_thread_safety() {
let stats = Arc::new(UpdateStats::default());
let mut handles = Vec::new();
for i in 0..10 {
let stats_clone = Arc::clone(&stats);
let handle = thread::spawn(move || {
stats_clone.processing.start(100);
thread::sleep(std::time::Duration::from_millis(i * 10));
stats_clone.processing.end(100);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(stats.processing.get_in_process(), 0);
}
#[test]
fn test_operation_in_process_stats() {
let op_stats = OperationInProcessStats::default();
assert_eq!(op_stats.get_operation_in_process_count("op1"), 0);
assert_eq!(op_stats.get_total_in_process_count(), 0);
op_stats.start_processing("op1", 5);
op_stats.start_processing("op2", 3);
assert_eq!(op_stats.get_operation_in_process_count("op1"), 5);
assert_eq!(op_stats.get_operation_in_process_count("op2"), 3);
assert_eq!(op_stats.get_total_in_process_count(), 8);
let all_ops = op_stats.get_all_operations_in_process();
assert_eq!(all_ops.len(), 2);
assert_eq!(all_ops.get("op1"), Some(&5));
assert_eq!(all_ops.get("op2"), Some(&3));
op_stats.finish_processing("op1", 2);
assert_eq!(op_stats.get_operation_in_process_count("op1"), 3);
assert_eq!(op_stats.get_total_in_process_count(), 6);
op_stats.finish_processing("op1", 3);
op_stats.finish_processing("op2", 3);
assert_eq!(op_stats.get_total_in_process_count(), 0);
}
#[test]
fn test_operation_in_process_stats_thread_safety() {
let op_stats = Arc::new(OperationInProcessStats::default());
let mut handles = Vec::new();
for i in 0..5 {
let op_stats_clone = Arc::clone(&op_stats);
let op_name = format!("operation_{}", i);
let handle = thread::spawn(move || {
op_stats_clone.start_processing(&op_name, 50);
thread::sleep(std::time::Duration::from_millis(i * 20));
op_stats_clone.finish_processing(&op_name, 50);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(op_stats.get_total_in_process_count(), 0);
}
#[test]
fn test_update_stats_merge_with_in_process() {
let stats1 = UpdateStats::default();
let stats2 = UpdateStats::default();
stats1.processing.start(10);
stats1.num_insertions.inc(5);
stats2.processing.start(15);
stats2.num_updates.inc(3);
stats1.merge(&stats2);
assert_eq!(stats1.processing.get_in_process(), 25); assert_eq!(stats1.num_insertions.get(), 5);
assert_eq!(stats1.num_updates.get(), 3);
}
#[test]
fn test_update_stats_delta_with_in_process() {
let base = UpdateStats::default();
let current = UpdateStats::default();
base.processing.start(5);
base.num_insertions.inc(2);
current.processing.start(12);
current.num_insertions.inc(7);
current.num_updates.inc(3);
let delta = current.delta(&base);
assert_eq!(delta.processing.get_in_process(), 7); assert_eq!(delta.num_insertions.get(), 5); assert_eq!(delta.num_updates.get(), 3); }
#[test]
fn test_update_stats_display_with_in_process() {
let stats = UpdateStats::default();
assert_eq!(format!("{}", stats), "No input data");
stats.processing.start(5);
let display = format!("{}", stats);
assert_eq!(
display,
"▕ ▏0/5 source rows"
);
stats.num_insertions.inc(3);
stats.num_errors.inc(1);
let display = format!("{}", stats);
assert_eq!(
display,
"▕█████████████████ ▏4/9 source rows: 3 added, 1 errors"
);
}
#[test]
fn test_granular_operation_tracking_integration() {
let op_stats = OperationInProcessStats::default();
op_stats.start_processing("import_users", 5);
op_stats.start_processing("import_orders", 3);
op_stats.start_processing("transform_user_data", 4);
op_stats.start_processing("transform_order_data", 2);
op_stats.start_processing("export_to_postgres", 3);
op_stats.start_processing("export_to_elasticsearch", 2);
assert_eq!(op_stats.get_operation_in_process_count("import_users"), 5);
assert_eq!(
op_stats.get_operation_in_process_count("transform_user_data"),
4
);
assert_eq!(
op_stats.get_operation_in_process_count("export_to_postgres"),
3
);
assert_eq!(op_stats.get_total_in_process_count(), 19);
let all_ops = op_stats.get_all_operations_in_process();
assert_eq!(all_ops.len(), 6);
assert_eq!(all_ops.get("import_users"), Some(&5));
assert_eq!(all_ops.get("transform_user_data"), Some(&4));
assert_eq!(all_ops.get("export_to_postgres"), Some(&3));
op_stats.finish_processing("import_users", 2);
op_stats.finish_processing("transform_user_data", 4);
op_stats.finish_processing("export_to_postgres", 1);
assert_eq!(op_stats.get_operation_in_process_count("import_users"), 3); assert_eq!(
op_stats.get_operation_in_process_count("transform_user_data"),
0
); assert_eq!(
op_stats.get_operation_in_process_count("export_to_postgres"),
2
); assert_eq!(op_stats.get_total_in_process_count(), 12); }
#[test]
fn test_operation_tracking_with_realistic_pipeline() {
let op_stats = OperationInProcessStats::default();
op_stats.start_processing("users_import", 100);
assert_eq!(op_stats.get_total_in_process_count(), 100);
for i in 0..100 {
if i % 10 == 0 {
op_stats.finish_processing("users_import", 10);
}
op_stats.start_processing("user_transform", 1);
if i % 5 == 0 {
op_stats.finish_processing("user_transform", 1);
}
}
assert_eq!(op_stats.get_operation_in_process_count("users_import"), 0); assert_eq!(
op_stats.get_operation_in_process_count("user_transform"),
80
);
for i in 0..80 {
op_stats.finish_processing("user_transform", 1);
op_stats.start_processing("user_export", 1);
if i % 3 == 0 {
op_stats.finish_processing("user_export", 1);
}
}
assert_eq!(op_stats.get_operation_in_process_count("users_import"), 0);
assert_eq!(op_stats.get_operation_in_process_count("user_transform"), 0);
assert_eq!(op_stats.get_operation_in_process_count("user_export"), 53); assert_eq!(op_stats.get_total_in_process_count(), 53);
}
#[test]
fn test_operation_tracking_cumulative_behavior() {
let op_stats = OperationInProcessStats::default();
let snapshot1 = OperationInProcessStats::default();
op_stats.start_processing("test_op", 10);
op_stats.finish_processing("test_op", 3);
snapshot1.start_processing("test_op", 10);
snapshot1.finish_processing("test_op", 3);
op_stats.start_processing("test_op", 5);
op_stats.finish_processing("test_op", 2);
assert_eq!(op_stats.get_operation_in_process_count("test_op"), 10); }
}