use crate::BackendError;
use std::sync::Arc;
use std::time::{Duration, Instant};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OperationType {
Store,
Get,
Delete,
BatchStore,
BatchGet,
BatchDelete,
ChordIncrement,
HealthCheck,
Query,
Transaction,
}
impl OperationType {
pub fn as_str(&self) -> &'static str {
match self {
OperationType::Store => "store",
OperationType::Get => "get",
OperationType::Delete => "delete",
OperationType::BatchStore => "batch_store",
OperationType::BatchGet => "batch_get",
OperationType::BatchDelete => "batch_delete",
OperationType::ChordIncrement => "chord_increment",
OperationType::HealthCheck => "health_check",
OperationType::Query => "query",
OperationType::Transaction => "transaction",
}
}
}
#[derive(Debug, Clone)]
pub struct OperationContext {
pub operation: OperationType,
pub task_id: Option<Uuid>,
pub batch_size: Option<usize>,
pub start_time: Instant,
pub metadata: Vec<(String, String)>,
}
impl OperationContext {
pub fn new(operation: OperationType) -> Self {
Self {
operation,
task_id: None,
batch_size: None,
start_time: Instant::now(),
metadata: Vec::new(),
}
}
pub fn with_task_id(mut self, task_id: Uuid) -> Self {
self.task_id = Some(task_id);
self
}
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = Some(size);
self
}
pub fn with_metadata(mut self, key: String, value: String) -> Self {
self.metadata.push((key, value));
self
}
pub fn elapsed(&self) -> Duration {
self.start_time.elapsed()
}
}
#[derive(Debug, Clone)]
pub struct OperationResult {
pub context: OperationContext,
pub duration: Duration,
pub success: bool,
pub error: Option<String>,
pub data_size: Option<usize>,
}
impl OperationResult {
pub fn success(context: OperationContext) -> Self {
let duration = context.elapsed();
Self {
context,
duration,
success: true,
error: None,
data_size: None,
}
}
pub fn failure(context: OperationContext, error: &BackendError) -> Self {
let duration = context.elapsed();
Self {
context,
duration,
success: false,
error: Some(error.to_string()),
data_size: None,
}
}
pub fn with_data_size(mut self, size: usize) -> Self {
self.data_size = Some(size);
self
}
}
pub trait TelemetryHook: Send + Sync {
fn before_operation(&self, _context: &OperationContext) {}
fn after_operation(&self, _result: &OperationResult) {}
fn on_error(&self, _operation: OperationType, _error: &BackendError) {}
fn on_event(&self, _event_name: &str, _data: &[(String, String)]) {}
}
#[derive(Debug, Clone, Default)]
pub struct NoOpHook;
impl TelemetryHook for NoOpHook {}
#[derive(Debug, Clone, Default)]
pub struct LoggingHook {
pub log_success: bool,
pub min_duration_ms: Option<f64>,
}
impl LoggingHook {
pub fn new() -> Self {
Self {
log_success: true,
min_duration_ms: None,
}
}
pub fn with_min_duration(mut self, min_ms: f64) -> Self {
self.min_duration_ms = Some(min_ms);
self
}
pub fn errors_only(mut self) -> Self {
self.log_success = false;
self
}
}
impl TelemetryHook for LoggingHook {
fn before_operation(&self, context: &OperationContext) {
if let Some(task_id) = context.task_id {
tracing::debug!(
operation = context.operation.as_str(),
task_id = %task_id,
"Starting operation"
);
} else if let Some(batch_size) = context.batch_size {
tracing::debug!(
operation = context.operation.as_str(),
batch_size = batch_size,
"Starting batch operation"
);
} else {
tracing::debug!(operation = context.operation.as_str(), "Starting operation");
}
}
fn after_operation(&self, result: &OperationResult) {
let duration_ms = result.duration.as_secs_f64() * 1000.0;
if let Some(min_ms) = self.min_duration_ms {
if duration_ms < min_ms {
return;
}
}
if result.success && self.log_success {
tracing::info!(
operation = result.context.operation.as_str(),
duration_ms = %format!("{:.2}", duration_ms),
success = true,
"Operation completed"
);
} else if !result.success {
tracing::error!(
operation = result.context.operation.as_str(),
duration_ms = %format!("{:.2}", duration_ms),
error = result.error.as_deref().unwrap_or("unknown"),
"Operation failed"
);
}
}
fn on_error(&self, operation: OperationType, error: &BackendError) {
tracing::error!(
operation = operation.as_str(),
error = %error,
"Backend error occurred"
);
}
fn on_event(&self, event_name: &str, data: &[(String, String)]) {
tracing::info!(
event = event_name,
data = ?data,
"Custom event"
);
}
}
#[derive(Debug, Default)]
pub struct MetricsHook {
operation_counts: std::sync::Mutex<std::collections::HashMap<&'static str, u64>>,
error_counts: std::sync::Mutex<std::collections::HashMap<&'static str, u64>>,
duration_totals: std::sync::Mutex<std::collections::HashMap<&'static str, Duration>>,
}
impl MetricsHook {
pub fn new() -> Self {
Self::default()
}
pub fn operation_count(&self, operation: OperationType) -> u64 {
self.operation_counts
.lock()
.unwrap()
.get(operation.as_str())
.copied()
.unwrap_or(0)
}
pub fn error_count(&self, operation: OperationType) -> u64 {
self.error_counts
.lock()
.unwrap()
.get(operation.as_str())
.copied()
.unwrap_or(0)
}
pub fn total_duration(&self, operation: OperationType) -> Duration {
self.duration_totals
.lock()
.unwrap()
.get(operation.as_str())
.copied()
.unwrap_or(Duration::ZERO)
}
pub fn average_duration(&self, operation: OperationType) -> Option<Duration> {
let count = self.operation_count(operation);
if count == 0 {
return None;
}
let total = self.total_duration(operation);
Some(total / count as u32)
}
pub fn reset(&self) {
self.operation_counts
.lock()
.expect("lock should not be poisoned")
.clear();
self.error_counts
.lock()
.expect("lock should not be poisoned")
.clear();
self.duration_totals
.lock()
.expect("lock should not be poisoned")
.clear();
}
}
impl TelemetryHook for MetricsHook {
fn after_operation(&self, result: &OperationResult) {
let op_name = result.context.operation.as_str();
let mut counts = self
.operation_counts
.lock()
.expect("lock should not be poisoned");
*counts.entry(op_name).or_insert(0) += 1;
drop(counts);
let mut durations = self
.duration_totals
.lock()
.expect("lock should not be poisoned");
let entry = durations.entry(op_name).or_insert(Duration::ZERO);
*entry += result.duration;
drop(durations);
if !result.success {
let mut errors = self
.error_counts
.lock()
.expect("lock should not be poisoned");
*errors.entry(op_name).or_insert(0) += 1;
}
}
fn on_error(&self, operation: OperationType, _error: &BackendError) {
let mut errors = self
.error_counts
.lock()
.expect("lock should not be poisoned");
*errors.entry(operation.as_str()).or_insert(0) += 1;
}
}
#[derive(Default)]
pub struct MultiHook {
hooks: Vec<Arc<dyn TelemetryHook>>,
}
impl MultiHook {
pub fn new() -> Self {
Self { hooks: Vec::new() }
}
pub fn add_hook(mut self, hook: Arc<dyn TelemetryHook>) -> Self {
self.hooks.push(hook);
self
}
pub fn len(&self) -> usize {
self.hooks.len()
}
pub fn is_empty(&self) -> bool {
self.hooks.is_empty()
}
}
impl TelemetryHook for MultiHook {
fn before_operation(&self, context: &OperationContext) {
for hook in &self.hooks {
hook.before_operation(context);
}
}
fn after_operation(&self, result: &OperationResult) {
for hook in &self.hooks {
hook.after_operation(result);
}
}
fn on_error(&self, operation: OperationType, error: &BackendError) {
for hook in &self.hooks {
hook.on_error(operation, error);
}
}
fn on_event(&self, event_name: &str, data: &[(String, String)]) {
for hook in &self.hooks {
hook.on_event(event_name, data);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_operation_type_as_str() {
assert_eq!(OperationType::Store.as_str(), "store");
assert_eq!(OperationType::Get.as_str(), "get");
assert_eq!(OperationType::BatchStore.as_str(), "batch_store");
}
#[test]
fn test_operation_context() {
let ctx = OperationContext::new(OperationType::Store)
.with_task_id(Uuid::new_v4())
.with_batch_size(10)
.with_metadata("key".to_string(), "value".to_string());
assert_eq!(ctx.operation, OperationType::Store);
assert!(ctx.task_id.is_some());
assert_eq!(ctx.batch_size, Some(10));
assert_eq!(ctx.metadata.len(), 1);
}
#[test]
fn test_operation_result_success() {
let ctx = OperationContext::new(OperationType::Get);
let result = OperationResult::success(ctx).with_data_size(1024);
assert!(result.success);
assert!(result.error.is_none());
assert_eq!(result.data_size, Some(1024));
}
#[test]
fn test_operation_result_failure() {
let ctx = OperationContext::new(OperationType::Store);
let error = BackendError::Connection("test error".to_string());
let result = OperationResult::failure(ctx, &error);
assert!(!result.success);
assert!(result.error.is_some());
assert!(result.error.unwrap().contains("test error"));
}
#[test]
fn test_logging_hook_creation() {
let hook = LoggingHook::new().with_min_duration(10.0).errors_only();
assert!(!hook.log_success);
assert_eq!(hook.min_duration_ms, Some(10.0));
}
#[test]
fn test_metrics_hook() {
let hook = MetricsHook::new();
let ctx = OperationContext::new(OperationType::Store);
let result = OperationResult::success(ctx);
hook.after_operation(&result);
assert_eq!(hook.operation_count(OperationType::Store), 1);
assert_eq!(hook.error_count(OperationType::Store), 0);
assert!(hook.average_duration(OperationType::Store).is_some());
}
#[test]
fn test_metrics_hook_reset() {
let hook = MetricsHook::new();
let ctx = OperationContext::new(OperationType::Get);
let result = OperationResult::success(ctx);
hook.after_operation(&result);
assert_eq!(hook.operation_count(OperationType::Get), 1);
hook.reset();
assert_eq!(hook.operation_count(OperationType::Get), 0);
}
#[test]
fn test_multi_hook() {
let hook1 = Arc::new(MetricsHook::new());
let hook2 = Arc::new(LoggingHook::new());
let multi = MultiHook::new().add_hook(hook1.clone()).add_hook(hook2);
assert_eq!(multi.len(), 2);
assert!(!multi.is_empty());
let ctx = OperationContext::new(OperationType::Delete);
let result = OperationResult::success(ctx);
multi.after_operation(&result);
assert_eq!(hook1.operation_count(OperationType::Delete), 1);
}
#[test]
fn test_multi_hook_empty() {
let multi = MultiHook::new();
assert!(multi.is_empty());
assert_eq!(multi.len(), 0);
}
}