use crate::error::SmithyResult;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub fn calculate_backoff(empty_count: u32, worker_index: u64) -> u64 {
let base: u64 = 100; let max_backoff: u64 = 5000; let stagger = worker_index * 50;
let exponential = base * (2u64.pow(empty_count.min(6)));
(exponential + stagger).min(max_backoff)
}
pub fn calculate_custom_backoff(
empty_count: u32,
worker_index: u64,
base_ms: u64,
max_backoff_ms: u64,
stagger_ms: u64,
max_exponent: u32,
) -> u64 {
let stagger = worker_index * stagger_ms;
let exponential = base_ms * (2u64.pow(empty_count.min(max_exponent)));
(exponential + stagger).min(max_backoff_ms)
}
pub fn calculate_linear_backoff(
empty_count: u32,
worker_index: u64,
base_ms: u64,
increment_ms: u64,
max_backoff_ms: u64,
) -> u64 {
let stagger = worker_index * 50;
let linear = base_ms + (empty_count as u64 * increment_ms);
(linear + stagger).min(max_backoff_ms)
}
pub fn generate_task_id() -> String {
uuid::Uuid::new_v4().to_string()
}
pub fn generate_task_id_with_prefix(prefix: &str) -> String {
format!("{}_{}", prefix, uuid::Uuid::new_v4())
}
pub fn current_timestamp_ms() -> SmithyResult<u64> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis() as u64)
.map_err(|e| crate::error::SmithyError::config(format!("Time error: {}", e)))
}
pub fn current_timestamp_secs() -> SmithyResult<u64> {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.map_err(|e| crate::error::SmithyError::config(format!("Time error: {}", e)))
}
pub fn duration_between(start: SystemTime, end: SystemTime) -> Option<Duration> {
end.duration_since(start).ok()
}
pub fn format_duration(duration: Duration) -> String {
let total_secs = duration.as_secs();
let millis = duration.subsec_millis();
if total_secs >= 3600 {
let hours = total_secs / 3600;
let minutes = (total_secs % 3600) / 60;
let seconds = total_secs % 60;
format!("{}h {}m {}s", hours, minutes, seconds)
} else if total_secs >= 60 {
let minutes = total_secs / 60;
let seconds = total_secs % 60;
format!("{}m {}s", minutes, seconds)
} else if total_secs > 0 {
if millis > 0 {
format!("{}.{}s", total_secs, millis / 100)
} else {
format!("{}s", total_secs)
}
} else {
format!("{}ms", millis)
}
}
pub fn calculate_rate(task_count: u64, duration: Duration) -> f64 {
if duration.is_zero() {
0.0
} else {
task_count as f64 / duration.as_secs_f64()
}
}
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub base_delay: Duration,
pub max_delay: Duration,
pub multiplier: f64,
pub jitter: bool,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
base_delay: Duration::from_millis(100),
max_delay: Duration::from_secs(60),
multiplier: 2.0,
jitter: true,
}
}
}
impl RetryConfig {
pub fn new(max_attempts: u32) -> Self {
Self {
max_attempts,
..Default::default()
}
}
pub fn with_base_delay(mut self, delay: Duration) -> Self {
self.base_delay = delay;
self
}
pub fn with_max_delay(mut self, delay: Duration) -> Self {
self.max_delay = delay;
self
}
pub fn with_multiplier(mut self, multiplier: f64) -> Self {
self.multiplier = multiplier;
self
}
pub fn with_jitter(mut self, enabled: bool) -> Self {
self.jitter = enabled;
self
}
pub fn delay_for_attempt(&self, attempt: u32) -> Duration {
if attempt == 0 {
return Duration::ZERO;
}
let delay_ms =
(self.base_delay.as_millis() as f64) * self.multiplier.powi((attempt - 1) as i32);
let mut delay = Duration::from_millis(delay_ms as u64);
delay = delay.min(self.max_delay);
if self.jitter {
let jitter_ms = (delay.as_millis() as f64 * 0.25 * rand::random::<f64>()) as u64;
delay += Duration::from_millis(jitter_ms);
}
delay
}
}
pub async fn retry_with_config<F, Fut, T, E>(config: RetryConfig, mut operation: F) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
let mut last_error = None;
for attempt in 0..=config.max_attempts {
match operation().await {
Ok(result) => return Ok(result),
Err(error) => {
last_error = Some(error);
if attempt < config.max_attempts {
let delay = config.delay_for_attempt(attempt + 1);
tracing::debug!(
"Retry attempt {} failed, retrying in {:?}",
attempt + 1,
delay
);
tokio::time::sleep(delay).await;
}
}
}
}
Err(last_error.unwrap())
}
pub async fn retry<F, Fut, T, E>(max_attempts: u32, operation: F) -> Result<T, E>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
retry_with_config(RetryConfig::new(max_attempts), operation).await
}
pub mod memory {
#[cfg(target_os = "linux")]
pub fn current_memory_usage() -> Option<usize> {
use std::fs;
let status = fs::read_to_string("/proc/self/status").ok()?;
for line in status.lines() {
if line.starts_with("VmRSS:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(kb) = parts[1].parse::<usize>() {
return Some(kb * 1024); }
}
}
}
None
}
#[cfg(not(target_os = "linux"))]
pub fn current_memory_usage() -> Option<usize> {
None
}
pub fn format_memory_size(bytes: usize) -> String {
const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
const THRESHOLD: f64 = 1024.0;
let mut size = bytes as f64;
let mut unit_index = 0;
while size >= THRESHOLD && unit_index < UNITS.len() - 1 {
size /= THRESHOLD;
unit_index += 1;
}
if unit_index == 0 {
format!("{} {}", bytes, UNITS[unit_index])
} else {
format!("{:.1} {}", size, UNITS[unit_index])
}
}
}
pub mod naming {
pub fn humanize_task_type(task_type: &str) -> String {
let with_spaces = task_type.replace('_', " ");
let mut result = String::new();
let mut prev_char: Option<char> = None;
for c in with_spaces.chars() {
if c.is_uppercase()
&& !result.is_empty()
&& !result.ends_with(' ')
&& prev_char.map_or(false, |prev| prev.is_lowercase())
{
result.push(' ');
}
result.push(c);
prev_char = Some(c);
}
result
.split_whitespace()
.map(|word| {
let mut chars = word.chars();
match chars.next() {
None => String::new(),
Some(first) => {
first.to_uppercase().collect::<String>() + &chars.as_str().to_lowercase()
}
}
})
.collect::<Vec<_>>()
.join(" ")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_calculate_backoff() {
assert_eq!(calculate_backoff(1, 0), 200);
assert_eq!(calculate_backoff(2, 1), 450);
assert_eq!(calculate_backoff(3, 2), 900);
assert_eq!(calculate_backoff(10, 5), 5000); }
#[test]
fn test_custom_backoff() {
let backoff = calculate_custom_backoff(2, 1, 50, 1000, 25, 4);
assert_eq!(backoff, 225); }
#[test]
fn test_linear_backoff() {
let backoff = calculate_linear_backoff(3, 2, 100, 50, 1000);
assert_eq!(backoff, 350); }
#[test]
fn test_format_duration() {
assert_eq!(format_duration(Duration::from_millis(500)), "500ms");
assert_eq!(format_duration(Duration::from_secs(5)), "5s");
assert_eq!(format_duration(Duration::from_secs(65)), "1m 5s");
assert_eq!(format_duration(Duration::from_secs(3665)), "1h 1m 5s");
}
#[test]
fn test_calculate_rate() {
let rate = calculate_rate(100, Duration::from_secs(10));
assert_eq!(rate, 10.0);
let rate = calculate_rate(0, Duration::from_secs(10));
assert_eq!(rate, 0.0);
let rate = calculate_rate(100, Duration::ZERO);
assert_eq!(rate, 0.0);
}
#[test]
fn test_retry_config() {
let config = RetryConfig::new(5)
.with_base_delay(Duration::from_millis(100))
.with_multiplier(2.0)
.with_jitter(false);
assert_eq!(config.delay_for_attempt(0), Duration::ZERO);
assert_eq!(config.delay_for_attempt(1), Duration::from_millis(100));
assert_eq!(config.delay_for_attempt(2), Duration::from_millis(200));
assert_eq!(config.delay_for_attempt(3), Duration::from_millis(400));
}
#[test]
fn test_memory_format() {
use memory::format_memory_size;
assert_eq!(format_memory_size(512), "512 B");
assert_eq!(format_memory_size(1536), "1.5 KB"); assert_eq!(format_memory_size(2_097_152), "2.0 MB"); }
#[test]
fn test_humanize_task_type() {
use naming::humanize_task_type;
assert_eq!(humanize_task_type("send_email"), "Send Email");
assert_eq!(humanize_task_type("ProcessImageTask"), "Process Image Task");
assert_eq!(humanize_task_type("simple"), "Simple");
assert_eq!(humanize_task_type("UPPER_CASE"), "Upper Case");
}
#[test]
fn test_task_id_generation() {
let id1 = generate_task_id();
let id2 = generate_task_id();
assert_ne!(id1, id2);
assert!(id1.len() > 0);
let prefixed_id = generate_task_id_with_prefix("test");
assert!(prefixed_id.starts_with("test_"));
}
#[tokio::test]
async fn test_retry_success() {
use std::cell::RefCell;
use std::rc::Rc;
let attempts = Rc::new(RefCell::new(0));
let attempts_clone = attempts.clone();
let result = retry(3, move || {
let attempts = attempts_clone.clone();
async move {
*attempts.borrow_mut() += 1;
if *attempts.borrow() >= 2 {
Ok("success")
} else {
Err("failure")
}
}
})
.await;
assert_eq!(result.unwrap(), "success");
assert_eq!(*attempts.borrow(), 2);
}
#[tokio::test]
async fn test_retry_failure() {
use std::cell::RefCell;
use std::rc::Rc;
let attempts = Rc::new(RefCell::new(0));
let attempts_clone = attempts.clone();
let result: Result<(), &str> = retry(2, move || {
let attempts = attempts_clone.clone();
async move {
*attempts.borrow_mut() += 1;
Err("always fails")
}
})
.await;
assert!(result.is_err());
assert_eq!(*attempts.borrow(), 3); }
}
#[cfg(feature = "jitter")]
mod rand {
use std::sync::atomic::{AtomicU64, Ordering};
static SEED: AtomicU64 = AtomicU64::new(1);
pub fn random<T: From<f64>>() -> T {
let prev = SEED.load(Ordering::Relaxed);
let next = prev.wrapping_mul(1103515245).wrapping_add(12345);
SEED.store(next, Ordering::Relaxed);
let normalized = (next as f64) / (u64::MAX as f64);
T::from(normalized)
}
}
#[cfg(not(feature = "jitter"))]
mod rand {
pub fn random<T: From<f64>>() -> T {
T::from(0.5) }
}