1use crate::{StorageError, StorageResult};
7use std::{
8 sync::{Arc, atomic::{AtomicU64, AtomicUsize, Ordering}, RwLock},
9 collections::HashMap,
10 time::{Duration, Instant, SystemTime},
11};
12use tokio::time::timeout;
13
14#[derive(Debug, Clone)]
16pub struct SecurityConfig {
17 pub max_event_size: usize,
19 pub max_batch_size: usize,
21 pub max_concurrent_batches: usize,
23 pub gpu_timeout_ms: u64,
25 pub rate_limit_per_second: u64,
27 pub max_memory_usage: usize,
29 pub max_cpu_usage: f32,
31 pub max_gpu_usage: f32,
33}
34
35impl Default for SecurityConfig {
36 fn default() -> Self {
37 Self {
38 max_event_size: 1024 * 1024, max_batch_size: 10_000, max_concurrent_batches: 100, gpu_timeout_ms: 5_000, rate_limit_per_second: 100_000, max_memory_usage: 2 * 1024 * 1024 * 1024, max_cpu_usage: 80.0, max_gpu_usage: 90.0, }
47 }
48}
49
50pub struct SecurityManager {
52 config: SecurityConfig,
53 rate_limiter: Arc<RateLimiter>,
54 resource_monitor: Arc<ResourceMonitor>,
55 gpu_timeout_manager: Arc<GPUTimeoutManager>,
56 security_metrics: SecurityMetrics,
57}
58
59pub struct RateLimiter {
61 current_second_count: AtomicU64,
63 current_second: AtomicU64,
65 limit_per_second: u64,
67 violations: AtomicU64,
69}
70
71pub struct ResourceMonitor {
73 active_batches: AtomicUsize,
75 current_memory_usage: AtomicUsize,
77 current_cpu_usage: AtomicU64, current_gpu_usage: AtomicU64, peak_memory: AtomicUsize,
83 peak_cpu: AtomicU64,
84 peak_gpu: AtomicU64,
85}
86
87pub struct GPUTimeoutManager {
89 active_gpu_tasks: RwLock<HashMap<u64, GPUTaskInfo>>,
91 task_id_counter: AtomicU64,
93 timeout_duration: Duration,
95 timeout_count: AtomicU64,
97}
98
99#[derive(Debug, Clone)]
101pub struct GPUTaskInfo {
102 pub task_id: u64,
103 pub start_time: Instant,
104 pub event_count: usize,
105 pub gpu_device: String,
106}
107
108#[derive(Debug, Default)]
110pub struct SecurityMetrics {
111 pub rate_limit_violations: AtomicU64,
113 pub resource_limit_violations: AtomicU64,
115 pub gpu_timeouts: AtomicU64,
117 pub rejected_requests: AtomicU64,
119 pub security_events: AtomicU64,
121}
122
123#[derive(Debug, Clone)]
125pub enum SecurityError {
126 RateLimitExceeded { current: u64, limit: u64 },
127 BatchTooLarge { size: usize, limit: usize },
128 TooManyActiveBatches { active: usize, limit: usize },
129 EventTooLarge { size: usize, limit: usize },
130 MemoryExhausted { current: usize, limit: usize },
131 CPUExhausted { current: f32, limit: f32 },
132 GPUExhausted { current: f32, limit: f32 },
133 GPUTimeout { task_id: u64, duration: Duration },
134 ResourceMonitoringFailed,
135}
136
137impl SecurityManager {
138 pub fn new(config: SecurityConfig) -> Self {
140 let rate_limiter = Arc::new(RateLimiter::new(config.rate_limit_per_second));
141 let resource_monitor = Arc::new(ResourceMonitor::new());
142 let gpu_timeout_manager = Arc::new(GPUTimeoutManager::new(
143 Duration::from_millis(config.gpu_timeout_ms)
144 ));
145
146 tracing::info!(
147 "SecurityManager initialized: rate_limit={}/sec, max_batch={}, gpu_timeout={}ms",
148 config.rate_limit_per_second,
149 config.max_batch_size,
150 config.gpu_timeout_ms
151 );
152
153 Self {
154 config,
155 rate_limiter,
156 resource_monitor,
157 gpu_timeout_manager,
158 security_metrics: SecurityMetrics::default(),
159 }
160 }
161
162 pub async fn validate_batch_request(
164 &self,
165 event_count: usize,
166 total_size: usize,
167 ) -> Result<(), SecurityError> {
168 self.rate_limiter.check_rate(event_count as u64)
170 .map_err(|_| {
171 self.security_metrics.rate_limit_violations.fetch_add(1, Ordering::Relaxed);
172 SecurityError::RateLimitExceeded {
173 current: event_count as u64,
174 limit: self.config.rate_limit_per_second,
175 }
176 })?;
177
178 if event_count > self.config.max_batch_size {
180 self.security_metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
181 return Err(SecurityError::BatchTooLarge {
182 size: event_count,
183 limit: self.config.max_batch_size,
184 });
185 }
186
187 self.resource_monitor.check_resources(&self.config)
189 .map_err(|e| {
190 self.security_metrics.resource_limit_violations.fetch_add(1, Ordering::Relaxed);
191 e
192 })?;
193
194 tracing::debug!(
195 "Security validation passed: {} events, {} bytes",
196 event_count,
197 total_size
198 );
199
200 Ok(())
201 }
202
203 pub async fn secure_gpu_process<F, T>(
205 &self,
206 task_name: &str,
207 event_count: usize,
208 gpu_future: F,
209 ) -> Result<T, SecurityError>
210 where
211 F: std::future::Future<Output = Result<T, crate::StorageError>>,
212 {
213 let task_id = self.gpu_timeout_manager.register_task(
215 task_name.to_string(),
216 event_count,
217 ).await;
218
219 tracing::debug!(
220 "Starting secure GPU process: task_id={}, events={}, timeout={}ms",
221 task_id,
222 event_count,
223 self.config.gpu_timeout_ms
224 );
225
226 let result = timeout(
228 Duration::from_millis(self.config.gpu_timeout_ms),
229 gpu_future
230 ).await;
231
232 self.gpu_timeout_manager.unregister_task(task_id).await;
234
235 match result {
236 Ok(Ok(value)) => {
237 tracing::debug!("GPU task completed successfully: task_id={}", task_id);
238 Ok(value)
239 }
240 Ok(Err(_storage_err)) => {
241 Err(SecurityError::ResourceMonitoringFailed)
242 }
243 Err(_timeout_err) => {
244 self.security_metrics.gpu_timeouts.fetch_add(1, Ordering::Relaxed);
245 tracing::error!(
246 "GPU task timeout: task_id={}, duration={}ms",
247 task_id,
248 self.config.gpu_timeout_ms
249 );
250 Err(SecurityError::GPUTimeout {
251 task_id,
252 duration: Duration::from_millis(self.config.gpu_timeout_ms),
253 })
254 }
255 }
256 }
257
258 pub fn get_security_metrics(&self) -> SecurityMetrics {
260 SecurityMetrics {
261 rate_limit_violations: AtomicU64::new(
262 self.security_metrics.rate_limit_violations.load(Ordering::Relaxed)
263 ),
264 resource_limit_violations: AtomicU64::new(
265 self.security_metrics.resource_limit_violations.load(Ordering::Relaxed)
266 ),
267 gpu_timeouts: AtomicU64::new(
268 self.security_metrics.gpu_timeouts.load(Ordering::Relaxed)
269 ),
270 rejected_requests: AtomicU64::new(
271 self.security_metrics.rejected_requests.load(Ordering::Relaxed)
272 ),
273 security_events: AtomicU64::new(
274 self.security_metrics.security_events.load(Ordering::Relaxed)
275 ),
276 }
277 }
278}
279
280impl RateLimiter {
281 pub fn new(limit_per_second: u64) -> Self {
283 Self {
284 current_second_count: AtomicU64::new(0),
285 current_second: AtomicU64::new(Self::current_timestamp()),
286 limit_per_second,
287 violations: AtomicU64::new(0),
288 }
289 }
290
291 pub fn check_rate(&self, event_count: u64) -> Result<(), ()> {
293 let now = Self::current_timestamp();
294 let current_sec = self.current_second.load(Ordering::Relaxed);
295
296 if now != current_sec {
298 if self.current_second.compare_exchange(current_sec, now, Ordering::Relaxed, Ordering::Relaxed).is_ok() {
299 self.current_second_count.store(0, Ordering::Relaxed);
300 }
301 }
302
303 let new_count = self.current_second_count.fetch_add(event_count, Ordering::Relaxed) + event_count;
305
306 if new_count > self.limit_per_second {
307 self.current_second_count.fetch_sub(event_count, Ordering::Relaxed);
309 self.violations.fetch_add(1, Ordering::Relaxed);
310
311 tracing::warn!(
312 "Rate limit exceeded: current={}, limit={}, violations={}",
313 new_count,
314 self.limit_per_second,
315 self.violations.load(Ordering::Relaxed)
316 );
317
318 return Err(());
319 }
320
321 Ok(())
322 }
323
324 fn current_timestamp() -> u64 {
326 SystemTime::now()
327 .duration_since(SystemTime::UNIX_EPOCH)
328 .unwrap_or_default()
329 .as_secs()
330 }
331}
332
333impl ResourceMonitor {
334 pub fn new() -> Self {
336 Self {
337 active_batches: AtomicUsize::new(0),
338 current_memory_usage: AtomicUsize::new(0),
339 current_cpu_usage: AtomicU64::new(0),
340 current_gpu_usage: AtomicU64::new(0),
341 peak_memory: AtomicUsize::new(0),
342 peak_cpu: AtomicU64::new(0),
343 peak_gpu: AtomicU64::new(0),
344 }
345 }
346
347 pub fn check_resources(&self, config: &SecurityConfig) -> Result<(), SecurityError> {
349 let active = self.active_batches.load(Ordering::Relaxed);
351 if active >= config.max_concurrent_batches {
352 return Err(SecurityError::TooManyActiveBatches {
353 active,
354 limit: config.max_concurrent_batches,
355 });
356 }
357
358 let memory = self.current_memory_usage.load(Ordering::Relaxed);
360 if memory > config.max_memory_usage {
361 return Err(SecurityError::MemoryExhausted {
362 current: memory,
363 limit: config.max_memory_usage,
364 });
365 }
366
367 Ok(())
368 }
369
370 pub fn start_batch(&self) {
372 self.active_batches.fetch_add(1, Ordering::Relaxed);
373 }
374
375 pub fn end_batch(&self) {
377 self.active_batches.fetch_sub(1, Ordering::Relaxed);
378 }
379}
380
381impl GPUTimeoutManager {
382 pub fn new(timeout_duration: Duration) -> Self {
384 Self {
385 active_gpu_tasks: RwLock::new(HashMap::new()),
386 task_id_counter: AtomicU64::new(0),
387 timeout_duration,
388 timeout_count: AtomicU64::new(0),
389 }
390 }
391
392 pub async fn register_task(&self, gpu_device: String, event_count: usize) -> u64 {
394 let task_id = self.task_id_counter.fetch_add(1, Ordering::Relaxed);
395 let task_info = GPUTaskInfo {
396 task_id,
397 start_time: Instant::now(),
398 event_count,
399 gpu_device,
400 };
401
402 if let Ok(mut tasks) = self.active_gpu_tasks.write() {
403 tasks.insert(task_id, task_info);
404 }
405
406 tracing::trace!("GPU task registered: task_id={}, events={}", task_id, event_count);
407 task_id
408 }
409
410 pub async fn unregister_task(&self, task_id: u64) {
412 if let Ok(mut tasks) = self.active_gpu_tasks.write() {
413 if let Some(task_info) = tasks.remove(&task_id) {
414 let duration = task_info.start_time.elapsed();
415 tracing::trace!(
416 "GPU task unregistered: task_id={}, duration={:?}",
417 task_id,
418 duration
419 );
420 }
421 }
422 }
423}