use std::collections::VecDeque;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use super::gating::IntegrityState;
use super::mincut::WitnessEdge;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IntegrityEventType {
PartitionCreated,
PartitionDeleted,
PartitionHealthChanged,
CentroidMoved,
CentroidRebalanced,
ShardRebalanced,
ShardAdded,
ShardRemoved,
DependencyDown,
DependencyUp,
StateChanged,
LambdaSampled,
GraphRebuilt,
EdgeCapacityChanged,
ErrorRateExceeded,
ManualOverride,
}
impl std::fmt::Display for IntegrityEventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = serde_json::to_string(self).unwrap_or_else(|_| "unknown".to_string());
write!(f, "{}", s.trim_matches('"'))
}
}
impl IntegrityEventType {
pub fn requires_graph_update(&self) -> bool {
matches!(
self,
IntegrityEventType::PartitionCreated
| IntegrityEventType::PartitionDeleted
| IntegrityEventType::CentroidMoved
| IntegrityEventType::CentroidRebalanced
| IntegrityEventType::ShardRebalanced
| IntegrityEventType::ShardAdded
| IntegrityEventType::ShardRemoved
| IntegrityEventType::DependencyDown
| IntegrityEventType::DependencyUp
)
}
pub fn requires_mincut_recomputation(&self) -> bool {
matches!(
self,
IntegrityEventType::PartitionCreated
| IntegrityEventType::PartitionDeleted
| IntegrityEventType::PartitionHealthChanged
| IntegrityEventType::ShardRebalanced
| IntegrityEventType::ShardAdded
| IntegrityEventType::ShardRemoved
| IntegrityEventType::DependencyDown
| IntegrityEventType::DependencyUp
| IntegrityEventType::EdgeCapacityChanged
| IntegrityEventType::GraphRebuilt
)
}
pub fn severity(&self) -> u8 {
match self {
IntegrityEventType::LambdaSampled => 0,
IntegrityEventType::GraphRebuilt => 0,
IntegrityEventType::PartitionCreated => 0,
IntegrityEventType::CentroidMoved => 0,
IntegrityEventType::CentroidRebalanced => 1,
IntegrityEventType::PartitionDeleted => 1,
IntegrityEventType::PartitionHealthChanged => 1,
IntegrityEventType::ShardRebalanced => 1,
IntegrityEventType::ShardAdded => 1,
IntegrityEventType::EdgeCapacityChanged => 1,
IntegrityEventType::StateChanged => 2,
IntegrityEventType::ShardRemoved => 2,
IntegrityEventType::DependencyDown => 2,
IntegrityEventType::DependencyUp => 1,
IntegrityEventType::ErrorRateExceeded => 2,
IntegrityEventType::ManualOverride => 2,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IntegrityEventContent {
pub event_id: u64,
pub collection_id: i32,
pub event_type: IntegrityEventType,
pub previous_state: Option<IntegrityState>,
pub new_state: Option<IntegrityState>,
pub lambda_cut: Option<f32>,
pub witness_edges: Option<Vec<WitnessEdge>>,
pub metadata: serde_json::Value,
pub created_at: SystemTime,
pub source: String,
}
impl IntegrityEventContent {
pub fn new(
collection_id: i32,
event_type: IntegrityEventType,
source: impl Into<String>,
) -> Self {
Self {
event_id: 0, collection_id,
event_type,
previous_state: None,
new_state: None,
lambda_cut: None,
witness_edges: None,
metadata: serde_json::json!({}),
created_at: SystemTime::now(),
source: source.into(),
}
}
pub fn state_change(
collection_id: i32,
previous: IntegrityState,
new: IntegrityState,
lambda_cut: f32,
witness_edges: Vec<WitnessEdge>,
source: impl Into<String>,
) -> Self {
Self {
event_id: 0,
collection_id,
event_type: IntegrityEventType::StateChanged,
previous_state: Some(previous),
new_state: Some(new),
lambda_cut: Some(lambda_cut),
witness_edges: Some(witness_edges),
metadata: serde_json::json!({
"direction": if new > previous { "degrading" } else { "improving" }
}),
created_at: SystemTime::now(),
source: source.into(),
}
}
pub fn lambda_sampled(
collection_id: i32,
lambda_cut: f32,
state: IntegrityState,
source: impl Into<String>,
) -> Self {
Self {
event_id: 0,
collection_id,
event_type: IntegrityEventType::LambdaSampled,
previous_state: None,
new_state: Some(state),
lambda_cut: Some(lambda_cut),
witness_edges: None,
metadata: serde_json::json!({}),
created_at: SystemTime::now(),
source: source.into(),
}
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = metadata;
self
}
pub fn with_metadata_field(mut self, key: &str, value: serde_json::Value) -> Self {
if let serde_json::Value::Object(ref mut map) = self.metadata {
map.insert(key.to_string(), value);
}
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphDelta {
pub collection_id: i32,
pub add_nodes: Vec<DeltaNode>,
pub remove_nodes: Vec<(String, i64)>,
pub update_nodes: Vec<DeltaNode>,
pub add_edges: Vec<DeltaEdge>,
pub remove_edges: Vec<((String, i64), (String, i64))>,
pub update_edges: Vec<DeltaEdge>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaNode {
pub node_type: String,
pub node_id: i64,
pub node_name: Option<String>,
pub health_score: Option<f32>,
pub metadata: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaEdge {
pub source_type: String,
pub source_id: i64,
pub target_type: String,
pub target_id: i64,
pub edge_type: String,
pub capacity: Option<f32>,
pub current_flow: Option<f32>,
pub error_rate: Option<f32>,
}
impl GraphDelta {
pub fn new(collection_id: i32) -> Self {
Self {
collection_id,
add_nodes: Vec::new(),
remove_nodes: Vec::new(),
update_nodes: Vec::new(),
add_edges: Vec::new(),
remove_edges: Vec::new(),
update_edges: Vec::new(),
}
}
pub fn is_empty(&self) -> bool {
self.add_nodes.is_empty()
&& self.remove_nodes.is_empty()
&& self.update_nodes.is_empty()
&& self.add_edges.is_empty()
&& self.remove_edges.is_empty()
&& self.update_edges.is_empty()
}
pub fn change_count(&self) -> usize {
self.add_nodes.len()
+ self.remove_nodes.len()
+ self.update_nodes.len()
+ self.add_edges.len()
+ self.remove_edges.len()
+ self.update_edges.len()
}
}
pub struct IntegrityEventStore {
collection_id: i32,
max_events: usize,
next_event_id: std::sync::atomic::AtomicU64,
events: RwLock<VecDeque<IntegrityEventContent>>,
listeners: RwLock<Vec<Box<dyn Fn(&IntegrityEventContent) + Send + Sync>>>,
}
impl IntegrityEventStore {
pub fn new(collection_id: i32, max_events: usize) -> Self {
Self {
collection_id,
max_events,
next_event_id: std::sync::atomic::AtomicU64::new(1),
events: RwLock::new(VecDeque::with_capacity(max_events)),
listeners: RwLock::new(Vec::new()),
}
}
pub fn record(&self, mut event: IntegrityEventContent) -> u64 {
let event_id = self
.next_event_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
event.event_id = event_id;
{
let mut events = self.events.write().unwrap();
if events.len() >= self.max_events {
events.pop_front();
}
events.push_back(event.clone());
}
{
let listeners = self.listeners.read().unwrap();
for listener in listeners.iter() {
listener(&event);
}
}
event_id
}
pub fn get_recent(&self, count: usize) -> Vec<IntegrityEventContent> {
let events = self.events.read().unwrap();
events.iter().rev().take(count).cloned().collect()
}
pub fn get_by_type(
&self,
event_type: IntegrityEventType,
count: usize,
) -> Vec<IntegrityEventContent> {
let events = self.events.read().unwrap();
events
.iter()
.rev()
.filter(|e| e.event_type == event_type)
.take(count)
.cloned()
.collect()
}
pub fn get_since(&self, since: SystemTime) -> Vec<IntegrityEventContent> {
let events = self.events.read().unwrap();
events
.iter()
.filter(|e| e.created_at >= since)
.cloned()
.collect()
}
pub fn get_state_changes(&self, count: usize) -> Vec<IntegrityEventContent> {
self.get_by_type(IntegrityEventType::StateChanged, count)
}
pub fn add_listener<F>(&self, listener: F)
where
F: Fn(&IntegrityEventContent) + Send + Sync + 'static,
{
let mut listeners = self.listeners.write().unwrap();
listeners.push(Box::new(listener));
}
pub fn event_count(&self) -> usize {
self.events.read().unwrap().len()
}
pub fn clear(&self) {
self.events.write().unwrap().clear();
}
pub fn stats(&self) -> EventStoreStats {
let events = self.events.read().unwrap();
let mut by_type: std::collections::HashMap<IntegrityEventType, usize> =
std::collections::HashMap::new();
let mut by_severity = [0usize; 3];
for event in events.iter() {
*by_type.entry(event.event_type).or_insert(0) += 1;
let severity = event.event_type.severity() as usize;
if severity < 3 {
by_severity[severity] += 1;
}
}
EventStoreStats {
total_events: events.len(),
by_type,
info_count: by_severity[0],
warning_count: by_severity[1],
critical_count: by_severity[2],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventStoreStats {
pub total_events: usize,
pub by_type: std::collections::HashMap<IntegrityEventType, usize>,
pub info_count: usize,
pub warning_count: usize,
pub critical_count: usize,
}
static EVENT_REGISTRY: once_cell::sync::Lazy<DashMap<i32, Arc<IntegrityEventStore>>> =
once_cell::sync::Lazy::new(DashMap::new);
pub fn get_or_create_event_store(collection_id: i32) -> Arc<IntegrityEventStore> {
EVENT_REGISTRY
.entry(collection_id)
.or_insert_with(|| Arc::new(IntegrityEventStore::new(collection_id, 10000)))
.clone()
}
pub fn get_event_store(collection_id: i32) -> Option<Arc<IntegrityEventStore>> {
EVENT_REGISTRY.get(&collection_id).map(|e| e.clone())
}
pub fn record_event(event: IntegrityEventContent) -> u64 {
let store = get_or_create_event_store(event.collection_id);
store.record(event)
}
pub fn event_to_delta(event: &IntegrityEventContent) -> Option<GraphDelta> {
if !event.event_type.requires_graph_update() {
return None;
}
let mut delta = GraphDelta::new(event.collection_id);
match event.event_type {
IntegrityEventType::PartitionCreated => {
if let Some(partition_id) = event.metadata.get("partition_id").and_then(|v| v.as_i64())
{
delta.add_nodes.push(DeltaNode {
node_type: "partition".to_string(),
node_id: partition_id,
node_name: Some(format!("partition_{}", partition_id)),
health_score: Some(1.0),
metadata: None,
});
}
}
IntegrityEventType::PartitionDeleted => {
if let Some(partition_id) = event.metadata.get("partition_id").and_then(|v| v.as_i64())
{
delta
.remove_nodes
.push(("partition".to_string(), partition_id));
}
}
IntegrityEventType::DependencyDown => {
if let Some(dep_id) = event.metadata.get("dependency_id").and_then(|v| v.as_i64()) {
delta.update_nodes.push(DeltaNode {
node_type: "external_dependency".to_string(),
node_id: dep_id,
node_name: None,
health_score: Some(0.0),
metadata: None,
});
}
}
IntegrityEventType::DependencyUp => {
if let Some(dep_id) = event.metadata.get("dependency_id").and_then(|v| v.as_i64()) {
delta.update_nodes.push(DeltaNode {
node_type: "external_dependency".to_string(),
node_id: dep_id,
node_name: None,
health_score: Some(1.0),
metadata: None,
});
}
}
_ => {
}
}
if delta.is_empty() {
None
} else {
Some(delta)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_type_display() {
assert_eq!(
IntegrityEventType::StateChanged.to_string(),
"state_changed"
);
assert_eq!(
IntegrityEventType::LambdaSampled.to_string(),
"lambda_sampled"
);
}
#[test]
fn test_event_type_properties() {
assert!(IntegrityEventType::PartitionCreated.requires_graph_update());
assert!(!IntegrityEventType::LambdaSampled.requires_graph_update());
assert!(IntegrityEventType::GraphRebuilt.requires_mincut_recomputation());
assert!(!IntegrityEventType::ManualOverride.requires_mincut_recomputation());
}
#[test]
fn test_event_creation() {
let event = IntegrityEventContent::new(1, IntegrityEventType::GraphRebuilt, "test");
assert_eq!(event.collection_id, 1);
assert_eq!(event.event_type, IntegrityEventType::GraphRebuilt);
assert_eq!(event.source, "test");
}
#[test]
fn test_state_change_event() {
let event = IntegrityEventContent::state_change(
1,
IntegrityState::Normal,
IntegrityState::Stress,
0.65,
vec![],
"integrity_worker",
);
assert_eq!(event.event_type, IntegrityEventType::StateChanged);
assert_eq!(event.previous_state, Some(IntegrityState::Normal));
assert_eq!(event.new_state, Some(IntegrityState::Stress));
assert_eq!(event.lambda_cut, Some(0.65));
}
#[test]
fn test_event_store() {
let store = IntegrityEventStore::new(1, 100);
let id1 = store.record(IntegrityEventContent::new(
1,
IntegrityEventType::GraphRebuilt,
"test",
));
let id2 = store.record(IntegrityEventContent::new(
1,
IntegrityEventType::LambdaSampled,
"test",
));
assert_eq!(id1, 1);
assert_eq!(id2, 2);
assert_eq!(store.event_count(), 2);
let recent = store.get_recent(10);
assert_eq!(recent.len(), 2);
assert_eq!(recent[0].event_id, 2); }
#[test]
fn test_event_store_overflow() {
let store = IntegrityEventStore::new(1, 5);
for i in 0..10 {
store.record(IntegrityEventContent::new(
1,
IntegrityEventType::LambdaSampled,
format!("test_{}", i),
));
}
assert_eq!(store.event_count(), 5);
let events = store.get_recent(10);
assert_eq!(events.len(), 5);
assert!(events[0].source.contains("test_9")); }
#[test]
fn test_get_by_type() {
let store = IntegrityEventStore::new(1, 100);
store.record(IntegrityEventContent::new(
1,
IntegrityEventType::GraphRebuilt,
"test",
));
store.record(IntegrityEventContent::new(
1,
IntegrityEventType::LambdaSampled,
"test",
));
store.record(IntegrityEventContent::new(
1,
IntegrityEventType::LambdaSampled,
"test",
));
let sampled = store.get_by_type(IntegrityEventType::LambdaSampled, 10);
assert_eq!(sampled.len(), 2);
}
#[test]
fn test_graph_delta() {
let mut delta = GraphDelta::new(1);
assert!(delta.is_empty());
delta.add_nodes.push(DeltaNode {
node_type: "partition".to_string(),
node_id: 1,
node_name: None,
health_score: Some(1.0),
metadata: None,
});
assert!(!delta.is_empty());
assert_eq!(delta.change_count(), 1);
}
#[test]
fn test_event_to_delta() {
let event = IntegrityEventContent::new(1, IntegrityEventType::PartitionCreated, "test")
.with_metadata_field("partition_id", serde_json::json!(42));
let delta = event_to_delta(&event);
assert!(delta.is_some());
let delta = delta.unwrap();
assert_eq!(delta.add_nodes.len(), 1);
assert_eq!(delta.add_nodes[0].node_id, 42);
}
#[test]
fn test_event_store_stats() {
let store = IntegrityEventStore::new(1, 100);
store.record(IntegrityEventContent::new(
1,
IntegrityEventType::LambdaSampled,
"test",
));
store.record(IntegrityEventContent::new(
1,
IntegrityEventType::StateChanged,
"test",
));
store.record(IntegrityEventContent::new(
1,
IntegrityEventType::DependencyDown,
"test",
));
let stats = store.stats();
assert_eq!(stats.total_events, 3);
assert_eq!(stats.info_count, 1);
assert_eq!(stats.critical_count, 2);
}
#[test]
fn test_global_event_registry() {
let store = get_or_create_event_store(12345);
let event_id = record_event(IntegrityEventContent::new(
12345,
IntegrityEventType::GraphRebuilt,
"test",
));
assert!(event_id > 0);
let retrieved = get_event_store(12345);
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().event_count(), 1);
}
}