use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActorMetrics {
pub id: String,
pub actor_type: String,
pub status: ActorStatus,
pub spawned_at: u64,
pub shut_down_at: Option<u64>,
pub messages_processed: u64,
pub messages_sent: u64,
pub recent_events: Vec<ActorEvent>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ActorStatus {
Running,
Stopped,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActorEvent {
pub timestamp: u64,
pub event_type: EventType,
pub details: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventType {
Spawned,
MessageProcessed,
MessageSent { recipient_id: String },
Stopped,
Failed { reason: String },
ShutDown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SupervisionData {
pub timestamp: u64,
pub actors: HashMap<String, ActorMetrics>,
pub total_messages_processed: u64,
pub total_messages_sent: u64,
}
impl ActorMetrics {
pub fn new(id: String, actor_type: String) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
id,
actor_type,
status: ActorStatus::Running,
spawned_at: now,
shut_down_at: None,
messages_processed: 0,
messages_sent: 0,
recent_events: vec![ActorEvent {
timestamp: now,
event_type: EventType::Spawned,
details: None,
}],
}
}
pub fn record_message_processed(&mut self) {
self.messages_processed += 1;
self.add_event(EventType::MessageProcessed, None);
}
pub fn record_message_sent(&mut self, recipient_id: String) {
self.messages_sent += 1;
self.add_event(
EventType::MessageSent {
recipient_id: recipient_id.clone(),
},
None,
);
}
pub fn record_stopped(&mut self) {
self.status = ActorStatus::Stopped;
self.add_event(EventType::Stopped, None);
}
pub fn record_failed(&mut self, reason: String) {
self.status = ActorStatus::Failed;
self.add_event(
EventType::Failed {
reason: reason.clone(),
},
Some(reason),
);
}
pub fn record_shutdown(&mut self) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
self.shut_down_at = Some(now);
self.add_event(EventType::ShutDown, None);
}
pub fn uptime_ms(&self) -> u64 {
let end = self.shut_down_at.unwrap_or_else(|| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
});
end.saturating_sub(self.spawned_at)
}
fn add_event(&mut self, event_type: EventType, details: Option<String>) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
self.recent_events.push(ActorEvent {
timestamp: now,
event_type,
details,
});
if self.recent_events.len() > 100 {
self.recent_events.remove(0);
}
}
}
impl SupervisionData {
pub fn new() -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
timestamp: now,
actors: HashMap::new(),
total_messages_processed: 0,
total_messages_sent: 0,
}
}
pub fn get_or_create_actor(&mut self, id: String, actor_type: String) -> &mut ActorMetrics {
let id_clone = id.clone();
self.actors
.entry(id)
.or_insert_with(|| ActorMetrics::new_with_type(id_clone, actor_type))
}
pub fn record_message_processed(&mut self, actor_id: &str) {
if let Some(actor) = self.actors.get_mut(actor_id) {
actor.record_message_processed();
}
self.total_messages_processed += 1;
}
pub fn record_message_sent(&mut self, from_id: &str, to_id: &str) {
if let Some(actor) = self.actors.get_mut(from_id) {
actor.record_message_sent(to_id.to_string());
}
self.total_messages_sent += 1;
}
pub fn running_actors(&self) -> Vec<&ActorMetrics> {
self.actors
.values()
.filter(|a| a.status == ActorStatus::Running)
.collect()
}
pub fn summary(&self) -> SupervisionSummary {
let actors = self.actors.values().collect::<Vec<_>>();
let active_count = actors
.iter()
.filter(|a| a.status == ActorStatus::Running)
.count();
let failed_count = actors
.iter()
.filter(|a| a.status == ActorStatus::Failed)
.count();
SupervisionSummary {
total_actors: actors.len(),
active_actors: active_count,
stopped_actors: actors.len() - active_count - failed_count,
failed_actors: failed_count,
total_messages_processed: self.total_messages_processed,
total_messages_sent: self.total_messages_sent,
}
}
}
impl Default for SupervisionData {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SupervisionSummary {
pub total_actors: usize,
pub active_actors: usize,
pub stopped_actors: usize,
pub failed_actors: usize,
pub total_messages_processed: u64,
pub total_messages_sent: u64,
}
impl ActorMetrics {
fn new_with_type(id: String, actor_type: String) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
Self {
id,
actor_type,
status: ActorStatus::Running,
spawned_at: now,
shut_down_at: None,
messages_processed: 0,
messages_sent: 0,
recent_events: vec![ActorEvent {
timestamp: now,
event_type: EventType::Spawned,
details: None,
}],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_actor_metrics_creation() {
let metrics = ActorMetrics::new("actor-1".to_string(), "TestActor".to_string());
assert_eq!(metrics.id, "actor-1");
assert_eq!(metrics.actor_type, "TestActor");
assert_eq!(metrics.status, ActorStatus::Running);
assert_eq!(metrics.messages_processed, 0);
assert_eq!(metrics.recent_events.len(), 1);
}
#[test]
fn test_record_message_processed() {
let mut metrics = ActorMetrics::new("actor-1".to_string(), "TestActor".to_string());
metrics.record_message_processed();
assert_eq!(metrics.messages_processed, 1);
assert_eq!(metrics.recent_events.len(), 2); }
#[test]
fn test_record_message_sent() {
let mut metrics = ActorMetrics::new("actor-1".to_string(), "TestActor".to_string());
metrics.record_message_sent("actor-2".to_string());
assert_eq!(metrics.messages_sent, 1);
assert_eq!(metrics.recent_events.len(), 2); }
#[test]
fn test_record_failed() {
let mut metrics = ActorMetrics::new("actor-1".to_string(), "TestActor".to_string());
metrics.record_failed("test error".to_string());
assert_eq!(metrics.status, ActorStatus::Failed);
assert_eq!(metrics.recent_events.len(), 2); }
#[test]
fn test_supervision_data_summary() {
let mut supervision = SupervisionData::new();
let metrics1 = ActorMetrics::new("actor-1".to_string(), "TestActor".to_string());
let mut metrics2 = ActorMetrics::new("actor-2".to_string(), "TestActor".to_string());
metrics2.record_stopped();
let mut metrics3 = ActorMetrics::new("actor-3".to_string(), "TestActor".to_string());
metrics3.record_failed("error".to_string());
supervision.actors.insert("actor-1".to_string(), metrics1);
supervision.actors.insert("actor-2".to_string(), metrics2);
supervision.actors.insert("actor-3".to_string(), metrics3);
let summary = supervision.summary();
assert_eq!(summary.total_actors, 3);
assert_eq!(summary.active_actors, 1);
assert_eq!(summary.stopped_actors, 1);
assert_eq!(summary.failed_actors, 1);
}
#[test]
fn test_event_rolling_window() {
let mut metrics = ActorMetrics::new("actor-1".to_string(), "TestActor".to_string());
for _ in 0..110 {
metrics.record_message_processed();
}
assert!(metrics.recent_events.len() <= 101);
}
}