kawa_storage/
lib.rs

1//! # Kawa Storage
2//! 
3//! 高性能な Write-Ahead Log (WAL) ベースのストレージエンジン。
4//! Kafka互換プロトコルでのイベントソーシングを支援する。
5//!
6//! ## 機能
7//! - セグメント化されたWAL
8//! - メモリマップドI/O
9//! - CRCチェック付きデータ整合性保証
10//! - ゼロコピー読み取り
11
12use derive_more::{Deref, DerefMut, From, Into};
13use serde::{Deserialize, Serialize};
14use std::path::PathBuf;
15use tokio::sync::RwLock;
16use std::sync::Arc;
17
18pub mod segment;
19pub mod wal;
20pub mod event;
21pub mod error;
22pub mod memory;
23
24// ★ 追加: Ultra Performance Engine
25pub mod ultra_performance;
26
27// ★ 追加: GPU Acceleration Engine (Phase 2)
28pub mod gpu_acceleration;
29
30// ★ 追加: Security Module (DoS Protection)
31pub mod security;
32
33pub use error::{StorageError, StorageResult};
34pub use event::{Event, EventId, EventData};
35pub use segment::{Segment, SegmentId, SegmentManager};
36pub use wal::{WriteAheadLog, WalConfig};
37pub use memory::{MemoryPool, PooledBuffer};
38
39// ★ 追加: Ultra Performance Engine のエクスポート
40pub use ultra_performance::{
41    UltraPerformanceEngine, 
42    LockFreeRingBuffer, 
43    SIMDBatchProcessor,
44    PerformanceMetrics,
45    NUMATopology,
46};
47
48// ★ 追加: GPU Acceleration Engine のエクスポート (Phase 2)
49pub use gpu_acceleration::{
50    GPUAcceleratedEngine,
51    GPUAccelerationType,
52    GPUContext,
53    GPUMetrics,
54    HybridConfig,
55    DistributionStrategy,
56};
57
58// ★ 追加: Security Module のエクスポート (DoS Protection)
59pub use security::{
60    SecurityManager,
61    SecurityConfig,
62    SecurityMetrics,
63    SecurityError,
64    RateLimiter,
65    ResourceMonitor,
66    GPUTimeoutManager,
67};
68
69/// ストレージエンジンのメインエントリポイント
70/// 
71/// # Example
72/// ```rust
73/// use kawa_storage::{StorageEngine, StorageConfig};
74/// 
75/// #[tokio::main]
76/// async fn main() -> anyhow::Result<()> {
77///     let config = StorageConfig::default();
78///     let storage = StorageEngine::new(config).await?;
79///     
80///     // @todo イベント書き込み・読み取りのサンプル実装
81///     Ok(())
82/// }
83/// ```
84#[derive(Debug)]
85pub struct StorageEngine {
86    /// WALインスタンス
87    wal: RwLock<WriteAheadLog>,
88    /// セグメント管理
89    segment_manager: RwLock<SegmentManager>,
90    /// 高性能メモリプール
91    memory_pool: Arc<MemoryPool>,
92    /// 設定
93    config: StorageConfig,
94}
95
96/// ストレージエンジンの設定
97/// 
98/// # Fields
99/// - `data_dir`: データディレクトリパス
100/// - `segment_size`: セグメントサイズ(バイト)
101/// - `sync_interval`: 同期間隔(ミリ秒)
102/// - `memory_pool_size`: メモリプール初期サイズ
103/// - `batch_size`: バッチ処理サイズ
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct StorageConfig {
106    /// データディレクトリ
107    pub data_dir: PathBuf,
108    /// セグメントサイズ(デフォルト: 1GB)
109    pub segment_size: u64,
110    /// 同期間隔(デフォルト: 1000ms)
111    pub sync_interval_ms: u64,
112    /// 圧縮有効フラグ
113    pub enable_compression: bool,
114    /// メモリプールサイズ(デフォルト: 128MB)
115    pub memory_pool_size: usize,
116    /// バッチサイズ(デフォルト: 1000)
117    pub batch_size: usize,
118    /// 並行ワーカー数(デフォルト: CPU数×2)
119    pub worker_count: Option<usize>,
120}
121
122impl Default for StorageConfig {
123    fn default() -> Self {
124        Self {
125            data_dir: PathBuf::from("./data"),
126            segment_size: 1024 * 1024 * 1024, // 1GB
127            sync_interval_ms: 1000,
128            enable_compression: false,
129            memory_pool_size: 128 * 1024 * 1024, // 128MB
130            batch_size: 1000,
131            worker_count: None, // 自動設定(CPU数×2)
132        }
133    }
134}
135
136/// オフセット型(Newtypeパターン)
137#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
138pub struct Offset(pub u64);
139
140impl Offset {
141    /// 新しいオフセットを作成
142    pub fn new(value: u64) -> Self {
143        Self(value)
144    }
145    
146    /// 次のオフセットを取得
147    pub fn next(self) -> Self {
148        Self(self.0 + 1)
149    }
150}
151
152/// パーティション型(Newtypeパターン)
153#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
154pub struct Partition(pub u32);
155
156impl Partition {
157    /// 新しいパーティションを作成
158    pub fn new(value: u32) -> Self {
159        Self(value)
160    }
161}
162
163/// トピック型(Newtypeパターン)
164#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
165pub struct Topic(pub String);
166
167impl Topic {
168    /// 新しいトピックを作成
169    pub fn new(name: impl Into<String>) -> Self {
170        Self(name.into())
171    }
172}
173
174impl StorageEngine {
175    /// 新しいストレージエンジンを作成
176    /// 
177    /// # Arguments
178    /// * `config` - ストレージ設定
179    /// 
180    /// # Returns
181    /// * `Result<Self>` - ストレージエンジンインスタンス
182    /// 
183    /// # @todo
184    /// - [ ] データディレクトリの自動作成
185    /// - [ ] 既存データの復旧処理
186    /// - [ ] メトリクス収集の初期化
187    pub async fn new(config: StorageConfig) -> StorageResult<Self> {
188        // データディレクトリを作成
189        tokio::fs::create_dir_all(&config.data_dir).await?;
190        
191        // 高性能メモリプールを初期化
192        let memory_pool = Arc::new(MemoryPool::new(
193            config.memory_pool_size,
194            config.batch_size,
195        )?);
196        
197        let wal_config = WalConfig {
198            data_dir: config.data_dir.clone(),
199            segment_size: config.segment_size,
200            sync_interval_ms: config.sync_interval_ms,
201        };
202        
203        let wal = WriteAheadLog::new(wal_config).await?;
204        let segment_manager = SegmentManager::new(config.data_dir.clone()).await?;
205        
206        Ok(Self {
207            wal: RwLock::new(wal),
208            segment_manager: RwLock::new(segment_manager),
209            memory_pool,
210            config,
211        })
212    }
213    
214    /// イベントを永続化
215    /// 
216    /// # Arguments
217    /// * `topic` - トピック名
218    /// * `partition` - パーティション番号
219    /// * `event_data` - イベントデータ
220    /// 
221    /// # Returns
222    /// * `Result<(EventId, Offset)>` - イベントIDとオフセット
223    /// 
224    /// # @todo
225    /// - [ ] バッチ書き込みのサポート
226    /// - [ ] 圧縮機能の実装
227    /// - [ ] レプリケーション対応
228    pub async fn append_event(
229        &self,
230        topic: &Topic,
231        partition: Partition,
232        event_data: EventData,
233    ) -> StorageResult<(EventId, Offset)> {
234        let event_id = EventId::new();
235        let event = Event::new(event_id, topic.clone(), partition, event_data);
236        
237        let mut wal = self.wal.write().await;
238        let offset = wal.append(&event).await?;
239        
240        Ok((event_id, offset))
241    }
242    
243    /// 指定したオフセット範囲のイベントを読み取り
244    /// 
245    /// # Arguments
246    /// * `topic` - トピック名
247    /// * `partition` - パーティション番号
248    /// * `start_offset` - 開始オフセット
249    /// * `max_events` - 最大イベント数
250    /// 
251    /// # Returns
252    /// * `Result<Vec<Event>>` - イベントリスト
253    /// 
254    /// # @todo
255    /// - [ ] ストリーミング読み取り
256    /// - [ ] インデックスベース高速検索
257    /// - [ ] 非同期イテレータの実装
258    pub async fn read_events(
259        &self,
260        topic: &Topic,
261        partition: Partition,
262        start_offset: Offset,
263        max_events: usize,
264    ) -> StorageResult<Vec<Event>> {
265        let wal = self.wal.read().await;
266        wal.read_events(topic, partition, start_offset, max_events).await
267    }
268    
269    /// 最新のオフセットを取得
270    /// 
271    /// # Arguments
272    /// * `topic` - トピック名
273    /// * `partition` - パーティション番号
274    /// 
275    /// # Returns
276    /// * `Result<Option<Offset>>` - 最新オフセット(データがない場合はNone)
277    pub async fn get_latest_offset(
278        &self,
279        topic: &Topic,
280        partition: Partition,
281    ) -> StorageResult<Option<Offset>> {
282        let wal = self.wal.read().await;
283        wal.get_latest_offset(topic, partition).await
284    }
285    
286    /// バッチでイベントを永続化(高性能版)
287    /// 
288    /// # Arguments
289    /// * `events` - イベントのバッチ
290    /// 
291    /// # Returns
292    /// * `Result<Vec<(EventId, Offset)>>` - イベントIDとオフセットのリスト
293    /// 
294    /// # Performance
295    /// 1M+ events/secを達成するための最適化バッチ処理
296    pub async fn append_events_batch(
297        &self,
298        events: Vec<(Topic, Partition, EventData)>,
299    ) -> StorageResult<Vec<(EventId, Offset)>> {
300        if events.is_empty() {
301            return Ok(Vec::new());
302        }
303        
304        // メモリプールからバッファを取得
305        let mut pooled_buffer = self.memory_pool.get_buffer().await?;
306        
307        // バッチ用のイベントリストを準備
308        let mut batch_events = Vec::with_capacity(events.len());
309        let mut result_ids = Vec::with_capacity(events.len());
310        
311        for (topic, partition, event_data) in events {
312            let event_id = EventId::new();
313            let event = Event::new(event_id, topic, partition, event_data);
314            
315            batch_events.push(event);
316            result_ids.push(event_id);
317        }
318        
319        // WALにバッチ書き込み
320        let mut wal = self.wal.write().await;
321        let offsets = wal.append_batch(&batch_events, &mut pooled_buffer).await?;
322        
323        // 結果を構築
324        let results: Vec<(EventId, Offset)> = result_ids.into_iter()
325            .zip(offsets.into_iter())
326            .collect();
327        
328        Ok(results)
329    }
330    
331    /// メモリプール統計を取得
332    pub fn memory_pool_stats(&self) -> memory::PoolStats {
333        self.memory_pool.stats()
334    }
335    
336    /// ストレージエンジンを優雅にシャットダウン
337    /// 
338    /// # @todo
339    /// - [ ] 未書き込みデータのフラッシュ
340    /// - [ ] メトリクスの最終出力
341    /// - [ ] リソースクリーンアップ
342    pub async fn shutdown(&self) -> StorageResult<()> {
343        let wal = self.wal.write().await;
344        wal.sync().await?;
345        
346        tracing::info!("Storage engine shutdown completed");
347        Ok(())
348    }
349}