use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceUtilization {
pub cpu: f64,
pub memory: f64,
pub disk_io: f64,
pub network_io: f64,
pub timestamp: u64,
}
impl ResourceUtilization {
pub fn new(cpu: f64, memory: f64, disk_io: f64, network_io: f64) -> Self {
Self {
cpu: cpu.clamp(0.0, 1.0),
memory: memory.clamp(0.0, 1.0),
disk_io: disk_io.clamp(0.0, 1.0),
network_io: network_io.clamp(0.0, 1.0),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
pub fn overall(&self) -> f64 {
(self.cpu + self.memory + self.disk_io + self.network_io) / 4.0
}
pub fn is_overloaded(&self, threshold: f64) -> bool {
self.cpu > threshold
|| self.memory > threshold
|| self.disk_io > threshold
|| self.network_io > threshold
}
pub fn bottleneck(&self) -> &'static str {
let max = self
.cpu
.max(self.memory)
.max(self.disk_io)
.max(self.network_io);
if (max - self.cpu).abs() < f64::EPSILON {
"cpu"
} else if (max - self.memory).abs() < f64::EPSILON {
"memory"
} else if (max - self.disk_io).abs() < f64::EPSILON {
"disk_io"
} else {
"network_io"
}
}
}
impl std::fmt::Display for ResourceUtilization {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ResourceUtilization[cpu={:.2}, mem={:.2}, disk={:.2}, net={:.2}, overall={:.2}]",
self.cpu,
self.memory,
self.disk_io,
self.network_io,
self.overall()
)
}
}
#[derive(Debug, Clone)]
pub struct WorkflowResourceMonitor {
pub workflow_id: Uuid,
pub history: Vec<ResourceUtilization>,
pub max_history: usize,
pub sampling_interval: u64,
}
impl WorkflowResourceMonitor {
pub fn new(workflow_id: Uuid) -> Self {
Self {
workflow_id,
history: Vec::new(),
max_history: 1000,
sampling_interval: 5,
}
}
pub fn with_max_history(mut self, max: usize) -> Self {
self.max_history = max;
self
}
pub fn with_sampling_interval(mut self, seconds: u64) -> Self {
self.sampling_interval = seconds;
self
}
pub fn record(&mut self, utilization: ResourceUtilization) {
self.history.push(utilization);
if self.history.len() > self.max_history {
self.history
.drain(0..(self.history.len() - self.max_history));
}
}
pub fn average_utilization(&self, window_seconds: u64) -> Option<ResourceUtilization> {
if self.history.is_empty() {
return None;
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let cutoff = now.saturating_sub(window_seconds);
let recent: Vec<_> = self
.history
.iter()
.filter(|u| u.timestamp >= cutoff)
.collect();
if recent.is_empty() {
return None;
}
let sum_cpu: f64 = recent.iter().map(|u| u.cpu).sum();
let sum_memory: f64 = recent.iter().map(|u| u.memory).sum();
let sum_disk: f64 = recent.iter().map(|u| u.disk_io).sum();
let sum_network: f64 = recent.iter().map(|u| u.network_io).sum();
let count = recent.len() as f64;
Some(ResourceUtilization::new(
sum_cpu / count,
sum_memory / count,
sum_disk / count,
sum_network / count,
))
}
pub fn peak_utilization(&self) -> Option<&ResourceUtilization> {
self.history.iter().max_by(|a, b| {
a.overall()
.partial_cmp(&b.overall())
.unwrap_or(std::cmp::Ordering::Equal)
})
}
pub fn clear(&mut self) {
self.history.clear();
}
}
impl std::fmt::Display for WorkflowResourceMonitor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"WorkflowResourceMonitor[workflow={}, samples={}]",
self.workflow_id,
self.history.len()
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MockTaskResult {
pub task_name: String,
pub result: serde_json::Value,
pub delay_ms: u64,
pub should_fail: bool,
pub failure_message: Option<String>,
}
impl MockTaskResult {
pub fn success(task_name: impl Into<String>, result: serde_json::Value) -> Self {
Self {
task_name: task_name.into(),
result,
delay_ms: 0,
should_fail: false,
failure_message: None,
}
}
pub fn failure(task_name: impl Into<String>, message: impl Into<String>) -> Self {
Self {
task_name: task_name.into(),
result: serde_json::Value::Null,
delay_ms: 0,
should_fail: true,
failure_message: Some(message.into()),
}
}
pub fn with_delay(mut self, milliseconds: u64) -> Self {
self.delay_ms = milliseconds;
self
}
}
#[derive(Debug, Clone)]
pub struct MockTaskExecutor {
pub mock_results: HashMap<String, MockTaskResult>,
pub execution_history: Vec<(String, u64)>, }
impl MockTaskExecutor {
pub fn new() -> Self {
Self {
mock_results: HashMap::new(),
execution_history: Vec::new(),
}
}
pub fn register(&mut self, result: MockTaskResult) {
self.mock_results.insert(result.task_name.clone(), result);
}
pub fn execute(&mut self, task_name: &str) -> Result<serde_json::Value, String> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
self.execution_history
.push((task_name.to_string(), timestamp));
if let Some(mock_result) = self.mock_results.get(task_name) {
if mock_result.delay_ms > 0 {
std::thread::sleep(std::time::Duration::from_millis(mock_result.delay_ms));
}
if mock_result.should_fail {
Err(mock_result
.failure_message
.clone()
.unwrap_or_else(|| "Mock task failed".to_string()))
} else {
Ok(mock_result.result.clone())
}
} else {
Err(format!("No mock result registered for task: {}", task_name))
}
}
pub fn execution_count(&self, task_name: &str) -> usize {
self.execution_history
.iter()
.filter(|(name, _)| name == task_name)
.count()
}
pub fn clear_history(&mut self) {
self.execution_history.clear();
}
}
impl Default for MockTaskExecutor {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct TestDataInjector {
pub data: HashMap<String, serde_json::Value>,
}
impl TestDataInjector {
pub fn new() -> Self {
Self {
data: HashMap::new(),
}
}
pub fn inject(&mut self, key: impl Into<String>, value: serde_json::Value) {
self.data.insert(key.into(), value);
}
pub fn get(&self, key: &str) -> Option<&serde_json::Value> {
self.data.get(key)
}
pub fn clear(&mut self) {
self.data.clear();
}
}
impl Default for TestDataInjector {
fn default() -> Self {
Self::new()
}
}