kawa_storage/
security.rs

1//! # Security Module for DoS Protection
2//!
3//! Rate limiting, Resource limits, GPU timeout mechanisms
4//! Critical security improvements for production environment
5
6use crate::{StorageError, StorageResult};
7use std::{
8    sync::{Arc, atomic::{AtomicU64, AtomicUsize, Ordering}, RwLock},
9    collections::HashMap,
10    time::{Duration, Instant, SystemTime},
11};
12use tokio::time::timeout;
13
14/// セキュリティ設定
15#[derive(Debug, Clone)]
16pub struct SecurityConfig {
17    /// 最大イベントサイズ(バイト)
18    pub max_event_size: usize,
19    /// 最大バッチサイズ(イベント数)
20    pub max_batch_size: usize,
21    /// 最大同時バッチ数
22    pub max_concurrent_batches: usize,
23    /// GPUタイムアウト(ミリ秒)
24    pub gpu_timeout_ms: u64,
25    /// レート制限(events/sec)
26    pub rate_limit_per_second: u64,
27    /// メモリ使用量制限(バイト)
28    pub max_memory_usage: usize,
29    /// CPU使用率制限(パーセント)
30    pub max_cpu_usage: f32,
31    /// GPU使用率制限(パーセント)
32    pub max_gpu_usage: f32,
33}
34
35impl Default for SecurityConfig {
36    fn default() -> Self {
37        Self {
38            max_event_size: 1024 * 1024,        // 1MB
39            max_batch_size: 10_000,             // 10K events
40            max_concurrent_batches: 100,        // 100 batches
41            gpu_timeout_ms: 5_000,              // 5 seconds
42            rate_limit_per_second: 100_000,     // 100K events/sec
43            max_memory_usage: 2 * 1024 * 1024 * 1024, // 2GB
44            max_cpu_usage: 80.0,                // 80%
45            max_gpu_usage: 90.0,                // 90%
46        }
47    }
48}
49
50/// セキュリティマネージャー
51pub struct SecurityManager {
52    config: SecurityConfig,
53    rate_limiter: Arc<RateLimiter>,
54    resource_monitor: Arc<ResourceMonitor>,
55    gpu_timeout_manager: Arc<GPUTimeoutManager>,
56    security_metrics: SecurityMetrics,
57}
58
59/// レート制限機能
60pub struct RateLimiter {
61    /// 現在の秒における処理済みイベント数
62    current_second_count: AtomicU64,
63    /// 現在の秒のタイムスタンプ
64    current_second: AtomicU64,
65    /// 制限値
66    limit_per_second: u64,
67    /// 違反回数
68    violations: AtomicU64,
69}
70
71/// リソース監視機能
72pub struct ResourceMonitor {
73    /// アクティブなバッチ数
74    active_batches: AtomicUsize,
75    /// 現在のメモリ使用量
76    current_memory_usage: AtomicUsize,
77    /// 現在のCPU使用率
78    current_cpu_usage: AtomicU64, // f32 * 1000 for atomic
79    /// 現在のGPU使用率
80    current_gpu_usage: AtomicU64, // f32 * 1000 for atomic
81    /// 最大値記録
82    peak_memory: AtomicUsize,
83    peak_cpu: AtomicU64,
84    peak_gpu: AtomicU64,
85}
86
87/// GPU タイムアウト管理
88pub struct GPUTimeoutManager {
89    /// アクティブなGPUタスク
90    active_gpu_tasks: RwLock<HashMap<u64, GPUTaskInfo>>,
91    /// タスクIDカウンター
92    task_id_counter: AtomicU64,
93    /// タイムアウト時間
94    timeout_duration: Duration,
95    /// タイムアウト発生回数
96    timeout_count: AtomicU64,
97}
98
99/// GPUタスク情報
100#[derive(Debug, Clone)]
101pub struct GPUTaskInfo {
102    pub task_id: u64,
103    pub start_time: Instant,
104    pub event_count: usize,
105    pub gpu_device: String,
106}
107
108/// セキュリティメトリクス
109#[derive(Debug, Default)]
110pub struct SecurityMetrics {
111    /// レート制限違反回数
112    pub rate_limit_violations: AtomicU64,
113    /// リソース制限違反回数
114    pub resource_limit_violations: AtomicU64,
115    /// GPUタイムアウト回数
116    pub gpu_timeouts: AtomicU64,
117    /// 拒否されたリクエスト数
118    pub rejected_requests: AtomicU64,
119    /// セキュリティイベント数
120    pub security_events: AtomicU64,
121}
122
123/// セキュリティエラー
124#[derive(Debug, Clone)]
125pub enum SecurityError {
126    RateLimitExceeded { current: u64, limit: u64 },
127    BatchTooLarge { size: usize, limit: usize },
128    TooManyActiveBatches { active: usize, limit: usize },
129    EventTooLarge { size: usize, limit: usize },
130    MemoryExhausted { current: usize, limit: usize },
131    CPUExhausted { current: f32, limit: f32 },
132    GPUExhausted { current: f32, limit: f32 },
133    GPUTimeout { task_id: u64, duration: Duration },
134    ResourceMonitoringFailed,
135}
136
137impl SecurityManager {
138    /// 新しいセキュリティマネージャーを作成
139    pub fn new(config: SecurityConfig) -> Self {
140        let rate_limiter = Arc::new(RateLimiter::new(config.rate_limit_per_second));
141        let resource_monitor = Arc::new(ResourceMonitor::new());
142        let gpu_timeout_manager = Arc::new(GPUTimeoutManager::new(
143            Duration::from_millis(config.gpu_timeout_ms)
144        ));
145        
146        tracing::info!(
147            "SecurityManager initialized: rate_limit={}/sec, max_batch={}, gpu_timeout={}ms",
148            config.rate_limit_per_second,
149            config.max_batch_size,
150            config.gpu_timeout_ms
151        );
152        
153        Self {
154            config,
155            rate_limiter,
156            resource_monitor,
157            gpu_timeout_manager,
158            security_metrics: SecurityMetrics::default(),
159        }
160    }
161    
162    /// バッチ処理前のセキュリティ検証
163    pub async fn validate_batch_request(
164        &self,
165        event_count: usize,
166        total_size: usize,
167    ) -> Result<(), SecurityError> {
168        // 1. レート制限チェック
169        self.rate_limiter.check_rate(event_count as u64)
170            .map_err(|_| {
171                self.security_metrics.rate_limit_violations.fetch_add(1, Ordering::Relaxed);
172                SecurityError::RateLimitExceeded {
173                    current: event_count as u64,
174                    limit: self.config.rate_limit_per_second,
175                }
176            })?;
177        
178        // 2. バッチサイズ検証
179        if event_count > self.config.max_batch_size {
180            self.security_metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
181            return Err(SecurityError::BatchTooLarge {
182                size: event_count,
183                limit: self.config.max_batch_size,
184            });
185        }
186        
187        // 3. リソース制限チェック
188        self.resource_monitor.check_resources(&self.config)
189            .map_err(|e| {
190                self.security_metrics.resource_limit_violations.fetch_add(1, Ordering::Relaxed);
191                e
192            })?;
193        
194        tracing::debug!(
195            "Security validation passed: {} events, {} bytes",
196            event_count,
197            total_size
198        );
199        
200        Ok(())
201    }
202    
203    /// GPU処理のセキュアラッパー
204    pub async fn secure_gpu_process<F, T>(
205        &self,
206        task_name: &str,
207        event_count: usize,
208        gpu_future: F,
209    ) -> Result<T, SecurityError>
210    where
211        F: std::future::Future<Output = Result<T, crate::StorageError>>,
212    {
213        // GPUタスク登録
214        let task_id = self.gpu_timeout_manager.register_task(
215            task_name.to_string(),
216            event_count,
217        ).await;
218        
219        tracing::debug!(
220            "Starting secure GPU process: task_id={}, events={}, timeout={}ms",
221            task_id,
222            event_count,
223            self.config.gpu_timeout_ms
224        );
225        
226        // タイムアウト付きGPU処理実行
227        let result = timeout(
228            Duration::from_millis(self.config.gpu_timeout_ms),
229            gpu_future
230        ).await;
231        
232        // タスク登録解除
233        self.gpu_timeout_manager.unregister_task(task_id).await;
234        
235        match result {
236            Ok(Ok(value)) => {
237                tracing::debug!("GPU task completed successfully: task_id={}", task_id);
238                Ok(value)
239            }
240            Ok(Err(_storage_err)) => {
241                Err(SecurityError::ResourceMonitoringFailed)
242            }
243            Err(_timeout_err) => {
244                self.security_metrics.gpu_timeouts.fetch_add(1, Ordering::Relaxed);
245                tracing::error!(
246                    "GPU task timeout: task_id={}, duration={}ms",
247                    task_id,
248                    self.config.gpu_timeout_ms
249                );
250                Err(SecurityError::GPUTimeout {
251                    task_id,
252                    duration: Duration::from_millis(self.config.gpu_timeout_ms),
253                })
254            }
255        }
256    }
257    
258    /// セキュリティメトリクス取得
259    pub fn get_security_metrics(&self) -> SecurityMetrics {
260        SecurityMetrics {
261            rate_limit_violations: AtomicU64::new(
262                self.security_metrics.rate_limit_violations.load(Ordering::Relaxed)
263            ),
264            resource_limit_violations: AtomicU64::new(
265                self.security_metrics.resource_limit_violations.load(Ordering::Relaxed)
266            ),
267            gpu_timeouts: AtomicU64::new(
268                self.security_metrics.gpu_timeouts.load(Ordering::Relaxed)
269            ),
270            rejected_requests: AtomicU64::new(
271                self.security_metrics.rejected_requests.load(Ordering::Relaxed)
272            ),
273            security_events: AtomicU64::new(
274                self.security_metrics.security_events.load(Ordering::Relaxed)
275            ),
276        }
277    }
278}
279
280impl RateLimiter {
281    /// 新しいレート制限機能を作成
282    pub fn new(limit_per_second: u64) -> Self {
283        Self {
284            current_second_count: AtomicU64::new(0),
285            current_second: AtomicU64::new(Self::current_timestamp()),
286            limit_per_second,
287            violations: AtomicU64::new(0),
288        }
289    }
290    
291    /// レート制限チェック
292    pub fn check_rate(&self, event_count: u64) -> Result<(), ()> {
293        let now = Self::current_timestamp();
294        let current_sec = self.current_second.load(Ordering::Relaxed);
295        
296        // 秒が変わった場合はカウンターリセット
297        if now != current_sec {
298            if self.current_second.compare_exchange(current_sec, now, Ordering::Relaxed, Ordering::Relaxed).is_ok() {
299                self.current_second_count.store(0, Ordering::Relaxed);
300            }
301        }
302        
303        // 現在の秒でのカウント増加
304        let new_count = self.current_second_count.fetch_add(event_count, Ordering::Relaxed) + event_count;
305        
306        if new_count > self.limit_per_second {
307            // カウントを元に戻す
308            self.current_second_count.fetch_sub(event_count, Ordering::Relaxed);
309            self.violations.fetch_add(1, Ordering::Relaxed);
310            
311            tracing::warn!(
312                "Rate limit exceeded: current={}, limit={}, violations={}",
313                new_count,
314                self.limit_per_second,
315                self.violations.load(Ordering::Relaxed)
316            );
317            
318            return Err(());
319        }
320        
321        Ok(())
322    }
323    
324    /// 現在のタイムスタンプ(秒)を取得
325    fn current_timestamp() -> u64 {
326        SystemTime::now()
327            .duration_since(SystemTime::UNIX_EPOCH)
328            .unwrap_or_default()
329            .as_secs()
330    }
331}
332
333impl ResourceMonitor {
334    /// 新しいリソース監視機能を作成
335    pub fn new() -> Self {
336        Self {
337            active_batches: AtomicUsize::new(0),
338            current_memory_usage: AtomicUsize::new(0),
339            current_cpu_usage: AtomicU64::new(0),
340            current_gpu_usage: AtomicU64::new(0),
341            peak_memory: AtomicUsize::new(0),
342            peak_cpu: AtomicU64::new(0),
343            peak_gpu: AtomicU64::new(0),
344        }
345    }
346    
347    /// リソース制限チェック
348    pub fn check_resources(&self, config: &SecurityConfig) -> Result<(), SecurityError> {
349        // アクティブバッチ数チェック
350        let active = self.active_batches.load(Ordering::Relaxed);
351        if active >= config.max_concurrent_batches {
352            return Err(SecurityError::TooManyActiveBatches {
353                active,
354                limit: config.max_concurrent_batches,
355            });
356        }
357        
358        // メモリ使用量チェック
359        let memory = self.current_memory_usage.load(Ordering::Relaxed);
360        if memory > config.max_memory_usage {
361            return Err(SecurityError::MemoryExhausted {
362                current: memory,
363                limit: config.max_memory_usage,
364            });
365        }
366        
367        Ok(())
368    }
369    
370    /// バッチ開始通知
371    pub fn start_batch(&self) {
372        self.active_batches.fetch_add(1, Ordering::Relaxed);
373    }
374    
375    /// バッチ終了通知
376    pub fn end_batch(&self) {
377        self.active_batches.fetch_sub(1, Ordering::Relaxed);
378    }
379}
380
381impl GPUTimeoutManager {
382    /// 新しいGPUタイムアウト管理機能を作成
383    pub fn new(timeout_duration: Duration) -> Self {
384        Self {
385            active_gpu_tasks: RwLock::new(HashMap::new()),
386            task_id_counter: AtomicU64::new(0),
387            timeout_duration,
388            timeout_count: AtomicU64::new(0),
389        }
390    }
391    
392    /// GPUタスク登録
393    pub async fn register_task(&self, gpu_device: String, event_count: usize) -> u64 {
394        let task_id = self.task_id_counter.fetch_add(1, Ordering::Relaxed);
395        let task_info = GPUTaskInfo {
396            task_id,
397            start_time: Instant::now(),
398            event_count,
399            gpu_device,
400        };
401        
402        if let Ok(mut tasks) = self.active_gpu_tasks.write() {
403            tasks.insert(task_id, task_info);
404        }
405        
406        tracing::trace!("GPU task registered: task_id={}, events={}", task_id, event_count);
407        task_id
408    }
409    
410    /// GPUタスク登録解除
411    pub async fn unregister_task(&self, task_id: u64) {
412        if let Ok(mut tasks) = self.active_gpu_tasks.write() {
413            if let Some(task_info) = tasks.remove(&task_id) {
414                let duration = task_info.start_time.elapsed();
415                tracing::trace!(
416                    "GPU task unregistered: task_id={}, duration={:?}",
417                    task_id,
418                    duration
419                );
420            }
421        }
422    }
423}