use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum IntegrityState {
Normal = 0,
Stress = 1,
Critical = 2,
Emergency = 3,
}
impl std::fmt::Display for IntegrityState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
IntegrityState::Normal => write!(f, "normal"),
IntegrityState::Stress => write!(f, "stress"),
IntegrityState::Critical => write!(f, "critical"),
IntegrityState::Emergency => write!(f, "emergency"),
}
}
}
impl IntegrityState {
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"normal" => Some(IntegrityState::Normal),
"stress" => Some(IntegrityState::Stress),
"critical" => Some(IntegrityState::Critical),
"emergency" => Some(IntegrityState::Emergency),
_ => None,
}
}
pub fn from_lambda(
lambda_cut: f64,
threshold_high: f64,
threshold_low: f64,
threshold_critical: f64,
) -> Self {
if lambda_cut >= threshold_high {
IntegrityState::Normal
} else if lambda_cut >= threshold_low {
IntegrityState::Stress
} else if lambda_cut >= threshold_critical {
IntegrityState::Critical
} else {
IntegrityState::Emergency
}
}
pub fn as_u32(&self) -> u32 {
*self as u32
}
pub fn from_u32(v: u32) -> Self {
match v {
0 => IntegrityState::Normal,
1 => IntegrityState::Stress,
2 => IntegrityState::Critical,
_ => IntegrityState::Emergency,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HysteresisThresholds {
pub normal_rising: f64,
pub normal_falling: f64,
pub stress_rising: f64,
pub stress_falling: f64,
pub critical_rising: f64,
pub critical_falling: f64,
}
impl Default for HysteresisThresholds {
fn default() -> Self {
Self {
normal_rising: 0.8,
normal_falling: 0.7,
stress_rising: 0.5,
stress_falling: 0.4,
critical_rising: 0.2,
critical_falling: 0.1,
}
}
}
impl HysteresisThresholds {
pub fn compute_next_state(&self, current: IntegrityState, lambda_cut: f64) -> IntegrityState {
match current {
IntegrityState::Normal => {
if lambda_cut < self.normal_falling {
IntegrityState::Stress
} else {
IntegrityState::Normal
}
}
IntegrityState::Stress => {
if lambda_cut >= self.normal_rising {
IntegrityState::Normal
} else if lambda_cut < self.stress_falling {
IntegrityState::Critical
} else {
IntegrityState::Stress
}
}
IntegrityState::Critical => {
if lambda_cut >= self.stress_rising {
IntegrityState::Stress
} else if lambda_cut < self.critical_falling {
IntegrityState::Emergency
} else {
IntegrityState::Critical
}
}
IntegrityState::Emergency => {
if lambda_cut >= self.critical_rising {
IntegrityState::Critical
} else {
IntegrityState::Emergency
}
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatePermissions {
pub allow_reads: bool,
pub allow_single_insert: bool,
pub allow_bulk_insert: bool,
pub allow_delete: bool,
pub allow_update: bool,
pub allow_index_rewire: bool,
pub allow_compression: bool,
pub allow_replication: bool,
pub allow_backup: bool,
pub throttle_inserts_pct: u8,
pub throttle_searches_pct: u8,
pub max_concurrent_searches: Option<u32>,
pub pause_gnn_training: bool,
pub pause_tier_management: bool,
}
impl Default for StatePermissions {
fn default() -> Self {
Self::normal()
}
}
impl StatePermissions {
pub fn normal() -> Self {
Self {
allow_reads: true,
allow_single_insert: true,
allow_bulk_insert: true,
allow_delete: true,
allow_update: true,
allow_index_rewire: true,
allow_compression: true,
allow_replication: true,
allow_backup: true,
throttle_inserts_pct: 0,
throttle_searches_pct: 0,
max_concurrent_searches: None,
pause_gnn_training: false,
pause_tier_management: false,
}
}
pub fn stress() -> Self {
Self {
allow_reads: true,
allow_single_insert: true,
allow_bulk_insert: false, allow_delete: true,
allow_update: true,
allow_index_rewire: false, allow_compression: false, allow_replication: true,
allow_backup: true,
throttle_inserts_pct: 50, throttle_searches_pct: 0,
max_concurrent_searches: Some(100),
pause_gnn_training: true, pause_tier_management: false,
}
}
pub fn critical() -> Self {
Self {
allow_reads: true,
allow_single_insert: true,
allow_bulk_insert: false,
allow_delete: false, allow_update: false, allow_index_rewire: false,
allow_compression: false,
allow_replication: true, allow_backup: true, throttle_inserts_pct: 90, throttle_searches_pct: 25, max_concurrent_searches: Some(50),
pause_gnn_training: true,
pause_tier_management: true,
}
}
pub fn emergency() -> Self {
Self {
allow_reads: true,
allow_single_insert: false, allow_bulk_insert: false,
allow_delete: false,
allow_update: false,
allow_index_rewire: false,
allow_compression: false,
allow_replication: false, allow_backup: true, throttle_inserts_pct: 100, throttle_searches_pct: 50, max_concurrent_searches: Some(20),
pause_gnn_training: true,
pause_tier_management: true,
}
}
pub fn for_state(state: IntegrityState) -> Self {
match state {
IntegrityState::Normal => Self::normal(),
IntegrityState::Stress => Self::stress(),
IntegrityState::Critical => Self::critical(),
IntegrityState::Emergency => Self::emergency(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GateResult {
pub allowed: bool,
pub throttle_pct: u8,
pub state: IntegrityState,
pub reason: Option<String>,
pub retry_delay_ms: Option<u64>,
}
impl GateResult {
pub fn allow(state: IntegrityState) -> Self {
Self {
allowed: true,
throttle_pct: 0,
state,
reason: None,
retry_delay_ms: None,
}
}
pub fn throttle(state: IntegrityState, throttle_pct: u8) -> Self {
Self {
allowed: true,
throttle_pct,
state,
reason: None,
retry_delay_ms: None,
}
}
pub fn block(state: IntegrityState, reason: impl Into<String>) -> Self {
Self {
allowed: false,
throttle_pct: 100,
state,
reason: Some(reason.into()),
retry_delay_ms: Some(5000), }
}
pub fn should_throttle(&self) -> bool {
self.throttle_pct > 0
}
}
pub struct IntegrityGate {
collection_id: i32,
state: AtomicU32,
lambda_cut_scaled: AtomicU32,
thresholds: HysteresisThresholds,
custom_permissions: Option<HashMap<IntegrityState, StatePermissions>>,
concurrent_searches: AtomicU32,
last_state_change_ms: AtomicU64,
}
impl IntegrityGate {
pub fn new(collection_id: i32) -> Self {
Self {
collection_id,
state: AtomicU32::new(IntegrityState::Normal.as_u32()),
lambda_cut_scaled: AtomicU32::new(1000), thresholds: HysteresisThresholds::default(),
custom_permissions: None,
concurrent_searches: AtomicU32::new(0),
last_state_change_ms: AtomicU64::new(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
),
}
}
pub fn with_thresholds(mut self, thresholds: HysteresisThresholds) -> Self {
self.thresholds = thresholds;
self
}
pub fn with_permissions(
mut self,
permissions: HashMap<IntegrityState, StatePermissions>,
) -> Self {
self.custom_permissions = Some(permissions);
self
}
pub fn current_state(&self) -> IntegrityState {
IntegrityState::from_u32(self.state.load(Ordering::Relaxed))
}
pub fn current_lambda_cut(&self) -> f64 {
self.lambda_cut_scaled.load(Ordering::Relaxed) as f64 / 1000.0
}
pub fn update_lambda(&self, lambda_cut: f64) -> Option<IntegrityState> {
let current = self.current_state();
let new_state = self.thresholds.compute_next_state(current, lambda_cut);
let scaled = (lambda_cut * 1000.0).round() as u32;
self.lambda_cut_scaled.store(scaled, Ordering::Relaxed);
if new_state != current {
self.state.store(new_state.as_u32(), Ordering::Release);
self.last_state_change_ms.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
Ordering::Relaxed,
);
Some(new_state)
} else {
None
}
}
pub fn force_state(&self, state: IntegrityState) {
self.state.store(state.as_u32(), Ordering::Release);
self.last_state_change_ms.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64,
Ordering::Relaxed,
);
}
pub fn current_permissions(&self) -> StatePermissions {
let state = self.current_state();
self.custom_permissions
.as_ref()
.and_then(|p| p.get(&state).cloned())
.unwrap_or_else(|| StatePermissions::for_state(state))
}
pub fn check_operation(&self, operation: &str) -> GateResult {
let state = self.current_state();
let permissions = self.current_permissions();
let (allowed, throttle_pct) = match operation.to_lowercase().as_str() {
"search" | "read" | "query" => {
let within_limit = permissions.max_concurrent_searches.map_or(true, |max| {
self.concurrent_searches.load(Ordering::Relaxed) < max
});
(
permissions.allow_reads && within_limit,
permissions.throttle_searches_pct,
)
}
"insert" => (
permissions.allow_single_insert,
permissions.throttle_inserts_pct,
),
"bulk_insert" => (
permissions.allow_bulk_insert,
permissions.throttle_inserts_pct,
),
"delete" => (permissions.allow_delete, 0),
"update" => (permissions.allow_update, 0),
"index_build" | "index_rewire" => (permissions.allow_index_rewire, 0),
"compression" | "compact" => (permissions.allow_compression, 0),
"replication" | "replicate" => (permissions.allow_replication, 0),
"backup" => (permissions.allow_backup, 0),
"gnn_train" | "gnn_training" => (!permissions.pause_gnn_training, 0),
"tier_manage" | "tier_management" => (!permissions.pause_tier_management, 0),
_ => {
(true, 0)
}
};
if !allowed {
GateResult::block(
state,
format!(
"Operation '{}' blocked: system in {} state",
operation, state
),
)
} else if throttle_pct > 0 {
GateResult::throttle(state, throttle_pct)
} else {
GateResult::allow(state)
}
}
pub fn begin_search(&self) -> bool {
let permissions = self.current_permissions();
if let Some(max) = permissions.max_concurrent_searches {
let current = self.concurrent_searches.fetch_add(1, Ordering::AcqRel);
if current >= max {
self.concurrent_searches.fetch_sub(1, Ordering::AcqRel);
return false;
}
} else {
self.concurrent_searches.fetch_add(1, Ordering::AcqRel);
}
true
}
pub fn end_search(&self) {
let prev = self.concurrent_searches.fetch_sub(1, Ordering::AcqRel);
if prev == 0 {
self.concurrent_searches.store(0, Ordering::Relaxed);
}
}
pub fn status(&self) -> serde_json::Value {
let state = self.current_state();
let permissions = self.current_permissions();
serde_json::json!({
"collection_id": self.collection_id,
"state": state.to_string(),
"lambda_cut": self.current_lambda_cut(),
"concurrent_searches": self.concurrent_searches.load(Ordering::Relaxed),
"permissions": {
"allow_reads": permissions.allow_reads,
"allow_single_insert": permissions.allow_single_insert,
"allow_bulk_insert": permissions.allow_bulk_insert,
"allow_delete": permissions.allow_delete,
"allow_update": permissions.allow_update,
"allow_index_rewire": permissions.allow_index_rewire,
"throttle_inserts_pct": permissions.throttle_inserts_pct,
"throttle_searches_pct": permissions.throttle_searches_pct,
}
})
}
}
pub fn apply_throttle(throttle_pct: u8) -> bool {
if throttle_pct == 0 {
return true; }
if throttle_pct >= 100 {
return false; }
let random_val = (std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos()
% 100) as u8;
random_val >= throttle_pct
}
static GATE_REGISTRY: once_cell::sync::Lazy<DashMap<i32, IntegrityGate>> =
once_cell::sync::Lazy::new(DashMap::new);
pub fn get_or_create_gate(
collection_id: i32,
) -> dashmap::mapref::one::Ref<'static, i32, IntegrityGate> {
if !GATE_REGISTRY.contains_key(&collection_id) {
GATE_REGISTRY.insert(collection_id, IntegrityGate::new(collection_id));
}
GATE_REGISTRY.get(&collection_id).unwrap()
}
pub fn get_gate(
collection_id: i32,
) -> Option<dashmap::mapref::one::Ref<'static, i32, IntegrityGate>> {
GATE_REGISTRY.get(&collection_id)
}
pub fn check_integrity_gate(collection_id: i32, operation: &str) -> GateResult {
let gate = get_or_create_gate(collection_id);
gate.check_operation(operation)
}
pub fn update_lambda_cut(collection_id: i32, lambda_cut: f64) -> Option<IntegrityState> {
let gate = get_or_create_gate(collection_id);
gate.update_lambda(lambda_cut)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_integrity_state_display() {
assert_eq!(IntegrityState::Normal.to_string(), "normal");
assert_eq!(IntegrityState::Stress.to_string(), "stress");
assert_eq!(IntegrityState::Critical.to_string(), "critical");
assert_eq!(IntegrityState::Emergency.to_string(), "emergency");
}
#[test]
fn test_state_parsing() {
assert_eq!(
IntegrityState::from_str("normal"),
Some(IntegrityState::Normal)
);
assert_eq!(
IntegrityState::from_str("STRESS"),
Some(IntegrityState::Stress)
);
assert_eq!(IntegrityState::from_str("invalid"), None);
}
#[test]
fn test_hysteresis_transitions() {
let thresholds = HysteresisThresholds::default();
let state = thresholds.compute_next_state(IntegrityState::Normal, 0.6);
assert_eq!(state, IntegrityState::Stress);
let state = thresholds.compute_next_state(IntegrityState::Stress, 0.85);
assert_eq!(state, IntegrityState::Normal);
let state = thresholds.compute_next_state(IntegrityState::Stress, 0.6);
assert_eq!(state, IntegrityState::Stress);
}
#[test]
fn test_gate_operations() {
let gate = IntegrityGate::new(1);
let result = gate.check_operation("insert");
assert!(result.allowed);
assert_eq!(result.throttle_pct, 0);
gate.force_state(IntegrityState::Stress);
let result = gate.check_operation("bulk_insert");
assert!(!result.allowed);
let result = gate.check_operation("insert");
assert!(result.allowed);
assert_eq!(result.throttle_pct, 50);
}
#[test]
fn test_emergency_permissions() {
let gate = IntegrityGate::new(1);
gate.force_state(IntegrityState::Emergency);
let result = gate.check_operation("search");
assert!(result.allowed);
let result = gate.check_operation("insert");
assert!(!result.allowed);
let result = gate.check_operation("delete");
assert!(!result.allowed);
let result = gate.check_operation("backup");
assert!(result.allowed);
}
#[test]
fn test_lambda_update() {
let gate = IntegrityGate::new(1);
assert_eq!(gate.current_state(), IntegrityState::Normal);
let new_state = gate.update_lambda(0.5);
assert_eq!(new_state, Some(IntegrityState::Stress));
assert_eq!(gate.current_state(), IntegrityState::Stress);
assert!((gate.current_lambda_cut() - 0.5).abs() < 0.01);
}
#[test]
fn test_concurrent_search_limit() {
let gate = IntegrityGate::new(1);
gate.force_state(IntegrityState::Critical);
for _ in 0..50 {
assert!(gate.begin_search());
}
assert!(!gate.begin_search());
gate.end_search();
assert!(gate.begin_search());
}
#[test]
fn test_throttle_function() {
for _ in 0..100 {
assert!(apply_throttle(0));
}
for _ in 0..100 {
assert!(!apply_throttle(100));
}
}
#[test]
fn test_gate_registry() {
let gate1 = get_or_create_gate(9001);
assert_eq!(gate1.collection_id, 9001);
let gate2 = get_gate(9001);
assert!(gate2.is_some());
let gate3 = get_gate(9999);
assert!(gate3.is_none());
}
#[test]
fn test_gate_status() {
let gate = IntegrityGate::new(42);
let status = gate.status();
assert_eq!(status["collection_id"], 42);
assert_eq!(status["state"], "normal");
assert!(status["permissions"]["allow_reads"].as_bool().unwrap());
}
}