#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionState {
Healthy,
Stale,
Broken,
}
#[derive(Debug)]
pub struct Connection {
#[allow(dead_code)]
id: u64,
created_at: std::time::Instant,
}
pub struct ConnectionPool {
config: ConnectionConfig,
active: std::sync::atomic::AtomicUsize,
idle: std::sync::Mutex<Vec<Connection>>,
next_id: std::sync::atomic::AtomicU64,
}
impl ConnectionPool {
#[must_use]
pub fn new(config: ConnectionConfig) -> Self {
Self {
config,
active: std::sync::atomic::AtomicUsize::new(0),
idle: std::sync::Mutex::new(Vec::new()),
next_id: std::sync::atomic::AtomicU64::new(0),
}
}
#[must_use]
pub fn max_connections(&self) -> usize {
self.config.max_connections
}
#[must_use]
pub fn min_connections(&self) -> usize {
self.config.min_connections
}
#[must_use]
pub fn active_connections(&self) -> usize {
self.active.load(std::sync::atomic::Ordering::SeqCst)
}
#[must_use]
pub fn idle_connections(&self) -> usize {
self.idle.lock().expect("mutex poisoned").len()
}
pub fn acquire(&self) -> std::result::Result<Connection, &'static str> {
{
let mut idle = self.idle.lock().expect("mutex poisoned");
if let Some(conn) = idle.pop() {
self.active
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
return Ok(conn);
}
}
let current = self.active.load(std::sync::atomic::Ordering::SeqCst);
if current < self.config.max_connections {
self.active
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
return Ok(Connection {
id,
created_at: std::time::Instant::now(),
});
}
Err("Pool exhausted")
}
pub fn try_acquire(&self) -> std::result::Result<Connection, &'static str> {
self.acquire()
}
pub fn release(&self, conn: Connection) {
self.active
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
let mut idle = self.idle.lock().expect("mutex poisoned");
idle.push(conn);
}
#[must_use]
pub fn check_health(&self, conn: &Connection) -> ConnectionState {
let age = conn.created_at.elapsed();
if age > self.config.idle_timeout {
ConnectionState::Stale
} else {
ConnectionState::Healthy
}
}
pub fn warm(&self) {
let current_idle = self.idle_connections();
let need = self.config.min_connections.saturating_sub(current_idle);
let mut idle = self.idle.lock().expect("mutex poisoned");
for _ in 0..need {
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
idle.push(Connection {
id,
created_at: std::time::Instant::now(),
});
}
}
}
#[derive(Debug, Clone)]
#[allow(clippy::struct_field_names)]
pub struct ResourceConfig {
max_memory_per_request: u64,
max_total_memory: u64,
max_compute_time: Duration,
max_queue_depth: usize,
}
impl ResourceConfig {
#[must_use]
pub fn new() -> Self {
Self {
max_memory_per_request: 512 * 1024 * 1024, max_total_memory: 4 * 1024 * 1024 * 1024, max_compute_time: Duration::from_secs(30),
max_queue_depth: 100,
}
}
#[must_use]
pub fn with_max_memory_per_request(mut self, bytes: u64) -> Self {
self.max_memory_per_request = bytes;
self
}
#[must_use]
pub fn with_max_total_memory(mut self, bytes: u64) -> Self {
self.max_total_memory = bytes;
self
}
#[must_use]
pub fn with_max_compute_time(mut self, time: Duration) -> Self {
self.max_compute_time = time;
self
}
#[must_use]
pub fn with_max_queue_depth(mut self, depth: usize) -> Self {
self.max_queue_depth = depth;
self
}
}
impl Default for ResourceConfig {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum LimitResult {
Allowed,
Denied {
reason: String,
},
Backpressure,
}
pub struct ResourceLimiter {
config: ResourceConfig,
current_memory: std::sync::atomic::AtomicU64,
queue_depth: std::sync::atomic::AtomicUsize,
}
impl ResourceLimiter {
#[must_use]
pub fn new(config: ResourceConfig) -> Self {
Self {
config,
current_memory: std::sync::atomic::AtomicU64::new(0),
queue_depth: std::sync::atomic::AtomicUsize::new(0),
}
}
#[must_use]
pub fn check_memory(&self, bytes: u64) -> LimitResult {
if bytes > self.config.max_memory_per_request {
return LimitResult::Denied {
reason: format!(
"Request {} bytes exceeds per-request limit {} bytes",
bytes, self.config.max_memory_per_request
),
};
}
let current = self
.current_memory
.load(std::sync::atomic::Ordering::SeqCst);
if current + bytes > self.config.max_total_memory {
return LimitResult::Denied {
reason: format!(
"Total memory {} + {} would exceed limit {}",
current, bytes, self.config.max_total_memory
),
};
}
LimitResult::Allowed
}
pub fn allocate(&self, bytes: u64) -> std::result::Result<(), &'static str> {
if let LimitResult::Denied { .. } = self.check_memory(bytes) {
return Err("Memory limit exceeded");
}
self.current_memory
.fetch_add(bytes, std::sync::atomic::Ordering::SeqCst);
Ok(())
}
pub fn deallocate(&self, bytes: u64) {
self.current_memory
.fetch_sub(bytes, std::sync::atomic::Ordering::SeqCst);
}
#[must_use]
pub fn current_memory(&self) -> u64 {
self.current_memory
.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn enqueue(&self) -> LimitResult {
let current = self
.queue_depth
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if current >= self.config.max_queue_depth {
self.queue_depth
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
LimitResult::Backpressure
} else {
LimitResult::Allowed
}
}
#[must_use]
pub fn try_enqueue(&self) -> LimitResult {
let current = self.queue_depth.load(std::sync::atomic::Ordering::SeqCst);
if current >= self.config.max_queue_depth {
LimitResult::Backpressure
} else {
self.enqueue()
}
}
pub fn dequeue(&self) {
let current = self.queue_depth.load(std::sync::atomic::Ordering::SeqCst);
if current > 0 {
self.queue_depth
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}
#[must_use]
pub fn start_compute(&self) -> std::time::Instant {
std::time::Instant::now()
}
}
#[derive(Debug, Clone)]
pub struct ResourceMetrics {
pub memory_bytes: u64,
pub gpu_utilization: f64,
pub queue_depth: usize,
pub last_latency_ms: u64,
}
#[derive(Debug, Clone)]
pub struct LatencyStats {
pub min_ms: u64,
pub max_ms: u64,
pub avg_ms: u64,
}
#[derive(Debug, Clone)]
pub struct ResourceSnapshot {
pub timestamp: u64,
pub memory_bytes: u64,
pub gpu_utilization: f64,
pub queue_depth: usize,
}
pub struct ResourceMonitor {
memory_bytes: std::sync::atomic::AtomicU64,
gpu_utilization: std::sync::Mutex<f64>,
queue_depth: std::sync::atomic::AtomicUsize,
latencies: std::sync::Mutex<Vec<u64>>,
last_latency_ms: std::sync::atomic::AtomicU64,
}