kawa_storage/
segment.rs

1//! # Segment Management
2//!
3//! セグメント化されたファイル管理機能。
4//! 固定長セグメント + メモリマップドI/O + CRCチェックによる高性能ストレージ。
5
6use crate::{Event, Offset, StorageError, StorageResult};
7use derive_more::{Deref, DerefMut, From, Into};
8use memmap2::{MmapMut, MmapOptions};
9use serde::{Deserialize, Serialize};
10use std::{
11    collections::HashMap,
12    fs::{File, OpenOptions},
13    path::{Path, PathBuf},
14    sync::Arc,
15};
16use tokio::sync::RwLock;
17use uuid::Uuid;
18
19/// セグメントID(UUIDベース)
20/// 
21/// 各セグメントファイルの一意識別子。
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Deref, DerefMut, From, Into)]
23pub struct SegmentId(pub Uuid);
24
25impl SegmentId {
26    /// 新しいセグメントIDを生成
27    pub fn new() -> Self {
28        Self(Uuid::new_v4())
29    }
30    
31    /// UUIDからセグメントIDを作成
32    pub fn from_uuid(uuid: Uuid) -> Self {
33        Self(uuid)
34    }
35}
36
37impl Default for SegmentId {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl std::fmt::Display for SegmentId {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        write!(f, "{}", self.0)
46    }
47}
48
49/// セグメントヘッダー情報
50/// 
51/// セグメントファイルの先頭に格納されるメタデータ。
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct SegmentHeader {
54    /// セグメントID
55    pub segment_id: SegmentId,
56    /// 作成日時(UNIXタイムスタンプ)
57    pub created_at: u64,
58    /// セグメントサイズ
59    pub size: u64,
60    /// 書き込み位置オフセット
61    pub write_offset: u64,
62    /// エントリ数
63    pub entry_count: u64,
64    /// CRCチェックサム
65    pub header_crc: u32,
66}
67
68impl SegmentHeader {
69    /// 新しいセグメントヘッダーを作成
70    pub fn new(segment_id: SegmentId, size: u64) -> Self {
71        let created_at = std::time::SystemTime::now()
72            .duration_since(std::time::UNIX_EPOCH)
73            .unwrap_or_default()
74            .as_secs();
75            
76        let mut header = Self {
77            segment_id,
78            created_at,
79            size,
80            write_offset: 256, // 固定ヘッダーサイズ
81            entry_count: 0,
82            header_crc: 0,
83        };
84        
85        header.header_crc = header.calculate_crc();
86        header
87    }
88    
89    /// ヘッダーのCRCを計算
90    fn calculate_crc(&self) -> u32 {
91        let mut temp_header = self.clone();
92        temp_header.header_crc = 0;
93        
94        let serialized = bincode::serialize(&temp_header).unwrap_or_default();
95        crc32fast::hash(&serialized)
96    }
97    
98    /// ヘッダーの整合性を検証
99    pub fn verify_integrity(&self) -> StorageResult<()> {
100        let expected_crc = self.calculate_crc();
101        if self.header_crc != expected_crc {
102            return Err(StorageError::CrcMismatch {
103                offset: 0,
104                expected: expected_crc,
105                actual: self.header_crc,
106            });
107        }
108        Ok(())
109    }
110}
111
112/// セグメントファイル
113/// 
114/// 固定長のメモリマップドファイルでイベントを格納。
115/// 高速なシーケンシャル書き込みとランダムアクセス読み取りを提供。
116#[derive(Debug)]
117pub struct Segment {
118    /// セグメントID
119    id: SegmentId,
120    /// ファイルパス
121    file_path: PathBuf,
122    /// メモリマップドファイル
123    mmap: MmapMut,
124    /// セグメントヘッダー
125    header: SegmentHeader,
126    /// 読み取り専用フラグ
127    read_only: bool,
128}
129
130impl Segment {
131    /// 新しいセグメントを作成
132    pub fn create<P: AsRef<Path>>(file_path: P, size: u64) -> StorageResult<Self> {
133        let file_path = file_path.as_ref().to_path_buf();
134        let segment_id = SegmentId::new();
135        
136        // ファイルを作成してサイズを設定
137        let file = OpenOptions::new()
138            .read(true)
139            .write(true)
140            .create(true)
141            .open(&file_path)?;
142        
143        file.set_len(size)?;
144        
145        // メモリマップを作成
146        let mut mmap = unsafe { MmapOptions::new().map_mut(&file)? };
147        
148        // ヘッダーを作成・書き込み
149        let header = SegmentHeader::new(segment_id, size);
150        let header_bytes = bincode::serialize(&header)?;
151        
152        if header_bytes.len() > mmap.len() {
153            return Err(StorageError::configuration("Header size exceeds segment size"));
154        }
155        
156        // ヘッダーサイズを確保(固定サイズ256バイトを想定)
157        let header_size = 256;
158        if header_bytes.len() > header_size {
159            return Err(StorageError::configuration("Header size exceeds reserved space"));
160        }
161        
162        // ヘッダーをメモリマップに書き込み
163        mmap[..header_bytes.len()].copy_from_slice(&header_bytes);
164        // 残りを0で埋める
165        for i in header_bytes.len()..header_size {
166            mmap[i] = 0;
167        }
168        
169        Ok(Self {
170            id: segment_id,
171            file_path,
172            mmap,
173            header,
174            read_only: false,
175        })
176    }
177    
178    /// 既存のセグメントファイルを開く
179    pub fn open<P: AsRef<Path>>(file_path: P, read_only: bool) -> StorageResult<Self> {
180        let file_path = file_path.as_ref().to_path_buf();
181        
182        let file = if read_only {
183            File::open(&file_path)?
184        } else {
185            OpenOptions::new()
186                .read(true)
187                .write(true)
188                .open(&file_path)?
189        };
190        
191        let mmap = if read_only {
192            unsafe { MmapOptions::new().map(&file)?.make_mut()? }
193        } else {
194            unsafe { MmapOptions::new().map_mut(&file)? }
195        };
196        
197        // ヘッダーを読み取り(固定サイズ256バイト)
198        let header_size = 256;
199        if mmap.len() < header_size {
200            return Err(StorageError::invalid_format("Segment file too small for header"));
201        }
202        
203        // ヘッダーデータを探す(最初の非ゼロバイトまで)
204        let mut actual_header_end = 0;
205        for i in 0..header_size {
206            if mmap[i] != 0 {
207                actual_header_end = i + 1;
208            }
209        }
210        if actual_header_end == 0 {
211            return Err(StorageError::invalid_format("No header data found"));
212        }
213        
214        // 実際のヘッダーをより正確に見つける
215        for end in (actual_header_end..header_size).rev() {
216            if let Ok(header) = bincode::deserialize::<SegmentHeader>(&mmap[..end]) {
217                if header.verify_integrity().is_ok() {
218                    return Ok(Self {
219                        id: header.segment_id,
220                        file_path,
221                        mmap,
222                        header,
223                        read_only,
224                    });
225                }
226            }
227        }
228        
229                 return Err(StorageError::invalid_format("Failed to deserialize segment header"));
230    }
231    
232    /// イベントをセグメントに書き込み
233    pub fn append_event(&mut self, event: &Event) -> StorageResult<Offset> {
234        if self.read_only {
235            return Err(StorageError::internal("Cannot write to read-only segment"));
236        }
237        
238        let serialized_event = bincode::serialize(event)?;
239        let data_size = serialized_event.len() as u32;
240        let entry_size = std::mem::size_of::<u32>() + serialized_event.len() + std::mem::size_of::<u32>(); // サイズ + データ + CRC
241        
242        // 容量チェック
243        if self.header.write_offset + entry_size as u64 > self.header.size {
244            return Err(StorageError::InsufficientSpace {
245                required: entry_size as u64,
246                available: self.header.size - self.header.write_offset,
247            });
248        }
249        
250        let write_offset = self.header.write_offset as usize;
251        let offset = Offset::new(self.header.entry_count);
252        
253        // CRCを計算
254        let crc = crc32fast::hash(&serialized_event);
255        
256        // データサイズを書き込み(4バイト)
257        let size_bytes = data_size.to_le_bytes();
258        let size_end = write_offset + size_bytes.len();
259        self.mmap[write_offset..size_end].copy_from_slice(&size_bytes);
260        
261        // データを書き込み
262        let data_end = size_end + serialized_event.len();
263        self.mmap[size_end..data_end].copy_from_slice(&serialized_event);
264        
265        // CRCを書き込み
266        let crc_bytes = crc.to_le_bytes();
267        let crc_end = data_end + crc_bytes.len();
268        self.mmap[data_end..crc_end].copy_from_slice(&crc_bytes);
269        
270        // ヘッダーを更新
271        self.header.write_offset += entry_size as u64;
272        self.header.entry_count += 1;
273        
274        // ヘッダーを再計算・書き込み
275        self.header.header_crc = self.header.calculate_crc();
276        let header_bytes = bincode::serialize(&self.header)?;
277        let header_size = 256;
278        if header_bytes.len() > header_size {
279            return Err(StorageError::configuration("Header size exceeds reserved space"));
280        }
281        self.mmap[..header_bytes.len()].copy_from_slice(&header_bytes);
282        // 残りを0で埋める(更新時)
283        for i in header_bytes.len()..header_size {
284            self.mmap[i] = 0;
285        }
286        
287        Ok(offset)
288    }
289    
290    /// 指定したオフセットのイベントを読み取り
291    pub fn read_event(&self, offset: Offset) -> StorageResult<Option<Event>> {
292        if offset.0 >= self.header.entry_count {
293            return Ok(None);
294        }
295        
296        // エントリインデックスからファイル内のポジションを探索
297        let mut current_position = 256; // 固定ヘッダーサイズ
298        let mut current_entry_index = 0u64;
299        
300        // 指定したオフセットまで線形探索
301        while current_entry_index <= offset.0 && current_position < self.header.write_offset as usize {
302            // データサイズを読み取り(4バイト)
303            if current_position + std::mem::size_of::<u32>() > self.mmap.len() {
304                break;
305            }
306            
307            let size_bytes = &self.mmap[current_position..current_position + std::mem::size_of::<u32>()];
308            let data_size = u32::from_le_bytes([size_bytes[0], size_bytes[1], size_bytes[2], size_bytes[3]]) as usize;
309            
310            let data_start = current_position + std::mem::size_of::<u32>();
311            let data_end = data_start + data_size;
312            let crc_end = data_end + std::mem::size_of::<u32>();
313            
314            // 境界チェック
315            if crc_end > self.mmap.len() {
316                return Err(StorageError::invalid_format("Entry extends beyond segment"));
317            }
318            
319            if current_entry_index == offset.0 {
320                // 指定されたオフセットのイベントを読み取り
321                let event_data = &self.mmap[data_start..data_end];
322                let stored_crc_bytes = &self.mmap[data_end..crc_end];
323                let stored_crc = u32::from_le_bytes([
324                    stored_crc_bytes[0], stored_crc_bytes[1], 
325                    stored_crc_bytes[2], stored_crc_bytes[3]
326                ]);
327                
328                // CRC検証
329                let calculated_crc = crc32fast::hash(event_data);
330                if stored_crc != calculated_crc {
331                    return Err(StorageError::CrcMismatch {
332                        offset: current_position as u64,
333                        expected: calculated_crc,
334                        actual: stored_crc,
335                    });
336                }
337                
338                // イベントをデシリアライズ
339                match bincode::deserialize::<Event>(event_data) {
340                    Ok(event) => return Ok(Some(event)),
341                    Err(_) => {
342                        return Err(StorageError::invalid_format("Failed to deserialize event data"));
343                    }
344                }
345            }
346            
347            // 次のエントリへ移動
348            current_position = crc_end;
349            current_entry_index += 1;
350        }
351        
352        Ok(None)
353    }
354    
355    /// 指定した範囲のイベントを読み取り
356    pub fn read_events_range(&self, start_offset: Offset, max_events: usize) -> StorageResult<Vec<Event>> {
357        let mut events = Vec::with_capacity(max_events.min(self.header.entry_count as usize));
358        
359        let mut current_position = 256; // 固定ヘッダーサイズ
360        let mut current_entry_index = 0u64;
361        let end_offset = (start_offset.0 + max_events as u64).min(self.header.entry_count);
362        
363        // 開始オフセットまでスキップ
364        while current_entry_index < start_offset.0 && current_position < self.header.write_offset as usize {
365            // データサイズを読み取り(4バイト)
366            if current_position + std::mem::size_of::<u32>() > self.mmap.len() {
367                break;
368            }
369            
370            let size_bytes = &self.mmap[current_position..current_position + std::mem::size_of::<u32>()];
371            let data_size = u32::from_le_bytes([size_bytes[0], size_bytes[1], size_bytes[2], size_bytes[3]]) as usize;
372            
373            let entry_size = std::mem::size_of::<u32>() + data_size + std::mem::size_of::<u32>(); // サイズ + データ + CRC
374            current_position += entry_size;
375            current_entry_index += 1;
376        }
377        
378        // 指定した範囲のイベントを読み取り
379        while current_entry_index < end_offset && current_position < self.header.write_offset as usize && events.len() < max_events {
380            // データサイズを読み取り(4バイト)
381            if current_position + std::mem::size_of::<u32>() > self.mmap.len() {
382                break;
383            }
384            
385            let size_bytes = &self.mmap[current_position..current_position + std::mem::size_of::<u32>()];
386            let data_size = u32::from_le_bytes([size_bytes[0], size_bytes[1], size_bytes[2], size_bytes[3]]) as usize;
387            
388            let data_start = current_position + std::mem::size_of::<u32>();
389            let data_end = data_start + data_size;
390            let crc_end = data_end + std::mem::size_of::<u32>();
391            
392            // 境界チェック
393            if crc_end > self.mmap.len() {
394                return Err(StorageError::invalid_format("Entry extends beyond segment"));
395            }
396            
397            // イベントを読み取り
398            let event_data = &self.mmap[data_start..data_end];
399            let stored_crc_bytes = &self.mmap[data_end..crc_end];
400            let stored_crc = u32::from_le_bytes([
401                stored_crc_bytes[0], stored_crc_bytes[1], 
402                stored_crc_bytes[2], stored_crc_bytes[3]
403            ]);
404            
405            // CRC検証
406            let calculated_crc = crc32fast::hash(event_data);
407            if stored_crc != calculated_crc {
408                return Err(StorageError::CrcMismatch {
409                    offset: current_position as u64,
410                    expected: calculated_crc,
411                    actual: stored_crc,
412                });
413            }
414            
415            // イベントをデシリアライズ
416            match bincode::deserialize::<Event>(event_data) {
417                Ok(event) => {
418                    events.push(event);
419                    current_position = crc_end;
420                    current_entry_index += 1;
421                }
422                Err(_) => {
423                    return Err(StorageError::invalid_format("Failed to deserialize event data during read"));
424                }
425            }
426        }
427        
428        Ok(events)
429    }
430    
431    /// セグメントをフラッシュ
432    pub fn flush(&mut self) -> StorageResult<()> {
433        if !self.read_only {
434            self.mmap.flush()?;
435        }
436        Ok(())
437    }
438    
439    /// セグメントの統計情報を取得
440    pub fn stats(&self) -> SegmentStats {
441        SegmentStats {
442            id: self.id,
443            file_path: self.file_path.clone(),
444            total_size: self.header.size,
445            used_size: self.header.write_offset,
446            entry_count: self.header.entry_count,
447            read_only: self.read_only,
448        }
449    }
450    
451    /// バッチデータを一括書き込み(高性能版)
452    /// 
453    /// # Arguments
454    /// * `batch_data` - シリアライズされたバッチデータ
455    /// * `event_sizes` - 各イベントのサイズリスト
456    /// 
457    /// # Returns
458    /// * `StorageResult<Offset>` - 開始オフセット
459    /// 
460    /// # Performance
461    /// 複数イベントを一回のI/Oで書き込み、1M+ events/secを実現
462    pub fn append_batch_data(&mut self, batch_data: &[u8], event_sizes: &[usize]) -> StorageResult<Offset> {
463        if self.read_only {
464            return Err(StorageError::invalid_format("Cannot write to read-only segment"));
465        }
466        
467        let start_entry_index = self.header.entry_count;
468        let current_position = self.header.write_offset as usize;
469        
470        // 容量チェック
471        let required_space = batch_data.len() + (event_sizes.len() * 8); // 8 bytes per event for metadata
472        if current_position + required_space > self.mmap.len() {
473            return Err(StorageError::InsufficientSpace {
474                required: required_space as u64,
475                available: (self.mmap.len() - current_position) as u64,
476            });
477        }
478        
479        let mut write_position = current_position;
480        let mut data_offset = 0;
481        
482        // 各イベントを書き込み
483        for &event_size in event_sizes {
484            // イベントサイズ書き込み(4バイト)
485            let size_bytes = (event_size as u32).to_le_bytes();
486            self.mmap[write_position..write_position + 4].copy_from_slice(&size_bytes);
487            write_position += 4;
488            
489            // イベントデータ書き込み
490            let event_data = &batch_data[data_offset..data_offset + event_size];
491            self.mmap[write_position..write_position + event_size].copy_from_slice(event_data);
492            write_position += event_size;
493            
494            // CRC書き込み(4バイト)
495            let crc = crc32fast::hash(event_data);
496            let crc_bytes = crc.to_le_bytes();
497            self.mmap[write_position..write_position + 4].copy_from_slice(&crc_bytes);
498            write_position += 4;
499            
500            data_offset += event_size;
501        }
502        
503        // ヘッダー更新
504        self.header.write_offset = write_position as u64;
505        self.header.entry_count += event_sizes.len() as u64;
506        
507        // ヘッダーをファイルに書き込み(既存のロジックを使用)
508        let header_bytes = bincode::serialize(&self.header)?;
509        let header_size = 256;
510        if header_bytes.len() > header_size {
511            return Err(StorageError::configuration("Header size exceeds reserved space"));
512        }
513        self.mmap[..header_bytes.len()].copy_from_slice(&header_bytes);
514        for i in header_bytes.len()..header_size {
515            self.mmap[i] = 0;
516        }
517        
518        tracing::debug!(
519            "Batch written: {} events, {} bytes, start_offset={}",
520            event_sizes.len(),
521            batch_data.len(),
522            start_entry_index
523        );
524        
525        Ok(Offset::new(start_entry_index))
526    }
527    
528
529}
530
531/// セグメント統計情報
532#[derive(Debug, Clone)]
533pub struct SegmentStats {
534    /// セグメントID
535    pub id: SegmentId,
536    /// ファイルパス
537    pub file_path: PathBuf,
538    /// 総サイズ
539    pub total_size: u64,
540    /// 使用サイズ
541    pub used_size: u64,
542    /// エントリ数
543    pub entry_count: u64,
544    /// 読み取り専用フラグ
545    pub read_only: bool,
546}
547
548/// セグメント管理マネージャー
549/// 
550/// 複数のセグメントファイルを管理し、効率的なデータアクセスを提供。
551#[derive(Debug)]
552pub struct SegmentManager {
553    /// データディレクトリ
554    data_dir: PathBuf,
555    /// アクティブなセグメント
556    segments: RwLock<HashMap<SegmentId, Arc<RwLock<Segment>>>>,
557    /// 現在の書き込み用セグメント
558    current_segment: RwLock<Option<SegmentId>>,
559    /// セグメントサイズ
560    segment_size: u64,
561}
562
563impl SegmentManager {
564    /// 新しいセグメント管理マネージャーを作成
565    pub async fn new<P: AsRef<Path>>(data_dir: P) -> StorageResult<Self> {
566        let data_dir = data_dir.as_ref().to_path_buf();
567        let segment_size = 1024 * 1024 * 1024; // 1GB デフォルト
568        
569        let manager = Self {
570            data_dir,
571            segments: RwLock::new(HashMap::new()),
572            current_segment: RwLock::new(None),
573            segment_size,
574        };
575        
576        // 既存セグメントをロード
577        manager.load_existing_segments().await?;
578        
579        Ok(manager)
580    }
581    
582    /// 既存のセグメントファイルをロード
583    async fn load_existing_segments(&self) -> StorageResult<()> {
584        if !self.data_dir.exists() {
585            tokio::fs::create_dir_all(&self.data_dir).await?;
586            return Ok(());
587        }
588        
589        let mut segments = self.segments.write().await;
590        let mut entries = tokio::fs::read_dir(&self.data_dir).await?;
591        
592        while let Some(entry) = entries.next_entry().await? {
593            let path = entry.path();
594            if path.extension().and_then(|s| s.to_str()) == Some("seg") {
595                match Segment::open(&path, false) {
596                    Ok(segment) => {
597                        let segment_id = segment.id;
598                        segments.insert(segment_id, Arc::new(RwLock::new(segment)));
599                    }
600                    Err(e) => {
601                        tracing::warn!("Failed to load segment {:?}: {}", path, e);
602                    }
603                }
604            }
605        }
606        
607        Ok(())
608    }
609    
610    /// 新しいセグメントを作成
611    pub async fn create_segment(&self) -> StorageResult<SegmentId> {
612        let segment_id = SegmentId::new();
613        let file_path = self.data_dir.join(format!("{}.seg", segment_id));
614        
615        let segment = Segment::create(file_path, self.segment_size)?;
616        let segment_id = segment.id;
617        
618        let mut segments = self.segments.write().await;
619        segments.insert(segment_id, Arc::new(RwLock::new(segment)));
620        
621        Ok(segment_id)
622    }
623    
624    /// 指定したセグメントを取得
625    pub async fn get_segment(&self, segment_id: SegmentId) -> Option<Arc<RwLock<Segment>>> {
626        let segments = self.segments.read().await;
627        segments.get(&segment_id).cloned()
628    }
629    
630    /// すべてのセグメント統計を取得
631    pub async fn get_all_stats(&self) -> Vec<SegmentStats> {
632        let segments = self.segments.read().await;
633        let mut stats = Vec::new();
634        
635        for segment_arc in segments.values() {
636            if let Ok(segment) = segment_arc.try_read() {
637                stats.push(segment.stats());
638            }
639        }
640        
641        stats
642    }
643    
644    /// セグメント一覧を取得
645    /// 
646    /// # Returns
647    /// * `Vec<SegmentId>` - セグメントIDリスト
648    pub async fn list_segments(&self) -> Vec<SegmentId> {
649        let segments = self.segments.read().await;
650        segments.keys().copied().collect()
651    }
652    
653    /// アクティブセグメント統計を取得
654    /// 
655    /// # Returns
656    /// * `Vec<SegmentStats>` - セグメント統計リスト
657    pub async fn get_segments_stats(&self) -> Vec<SegmentStats> {
658        let segments = self.segments.read().await;
659        let mut stats = Vec::new();
660        
661        for segment_arc in segments.values() {
662            let segment = segment_arc.read().await;
663            stats.push(segment.stats());
664        }
665        
666        stats
667    }
668    
669    /// セグメントを並列で処理
670    /// 
671    /// # Arguments
672    /// * `processor` - セグメント処理関数
673    /// 
674    /// # Returns
675    /// * `StorageResult<Vec<T>>` - 処理結果リスト
676    pub async fn process_segments_parallel<T, F, Fut>(
677        &self,
678        processor: F,
679    ) -> StorageResult<Vec<T>>
680    where
681        T: Send + 'static,
682        F: Fn(SegmentId, Arc<RwLock<Segment>>) -> Fut + Send + Sync + 'static,
683        Fut: std::future::Future<Output = StorageResult<T>> + Send,
684    {
685        let segments = self.segments.read().await;
686        let processor = Arc::new(processor);
687        
688        let mut handles = Vec::new();
689        
690        for (&segment_id, segment_arc) in segments.iter() {
691            let processor = Arc::clone(&processor);
692            let segment_arc = Arc::clone(segment_arc);
693            
694            let handle = tokio::spawn(async move {
695                processor(segment_id, segment_arc).await
696            });
697            
698            handles.push(handle);
699        }
700        
701        let mut results = Vec::new();
702        for handle in handles {
703            match handle.await {
704                Ok(result) => results.push(result?),
705                Err(e) => return Err(StorageError::internal(format!("Segment processing failed: {}", e))),
706            }
707        }
708        
709        Ok(results)
710    }
711}