alopex_embedded/
columnar_api.rs

1//! カラムナーストレージの埋め込み API 拡張。
2
3use std::collections::hash_map::DefaultHasher;
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6
7use alopex_core::columnar::encoding::Column;
8use alopex_core::columnar::segment_v2::{RecordBatch, SegmentWriterV2};
9use alopex_core::storage::format::AlopexFileWriter;
10use alopex_core::{StorageFactory, StorageMode as CoreStorageMode};
11
12use crate::{Database, Error, Result, SegmentConfigV2, Transaction, TxnMode};
13
14/// セグメント統計情報。
15#[derive(Debug, Clone)]
16pub struct ColumnarSegmentStats {
17    /// セグメント内の行数。
18    pub row_count: usize,
19    /// セグメント内のカラム数。
20    pub column_count: usize,
21    /// セグメントのサイズ(バイト)。
22    pub size_bytes: usize,
23}
24
25/// カラムナーインデックス種別。
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum ColumnarIndexType {
28    /// 最小値/最大値インデックス。
29    Minmax,
30    /// Bloom フィルタインデックス。
31    Bloom,
32}
33
34impl ColumnarIndexType {
35    /// 文字列表現を返す。
36    pub fn as_str(&self) -> &'static str {
37        match self {
38            Self::Minmax => "minmax",
39            Self::Bloom => "bloom",
40        }
41    }
42}
43
44/// カラムナーインデックス情報。
45#[derive(Debug, Clone)]
46pub struct ColumnarIndexInfo {
47    /// 対象カラム名。
48    pub column: String,
49    /// インデックス種別。
50    pub index_type: ColumnarIndexType,
51}
52
53/// カラムナー関連設定。
54#[derive(Debug, Clone)]
55pub struct EmbeddedConfig {
56    /// データパス(Disk モード時に必須)。
57    pub path: Option<PathBuf>,
58    /// カラムナーストレージモード。
59    pub storage_mode: StorageMode,
60    /// InMemory モードのメモリ上限(バイト)。
61    pub memory_limit: Option<usize>,
62    /// セグメント設定。
63    pub segment_config: SegmentConfigV2,
64}
65
66impl EmbeddedConfig {
67    /// ディスクモードで初期化。
68    pub fn disk(path: PathBuf) -> Self {
69        Self {
70            path: Some(path),
71            storage_mode: StorageMode::Disk,
72            memory_limit: None,
73            segment_config: SegmentConfigV2::default(),
74        }
75    }
76
77    /// インメモリモードで初期化(無制限)。
78    pub fn in_memory() -> Self {
79        Self {
80            path: None,
81            storage_mode: StorageMode::InMemory,
82            memory_limit: None,
83            segment_config: SegmentConfigV2::default(),
84        }
85    }
86
87    /// インメモリモードでメモリ上限を設定。
88    pub fn in_memory_with_limit(limit: usize) -> Self {
89        Self {
90            path: None,
91            storage_mode: StorageMode::InMemory,
92            memory_limit: Some(limit),
93            segment_config: SegmentConfigV2::default(),
94        }
95    }
96
97    /// セグメント設定を上書き。
98    pub fn with_segment_config(mut self, cfg: SegmentConfigV2) -> Self {
99        self.segment_config = cfg;
100        self
101    }
102}
103
104/// カラムナー用ストレージモード。
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub enum StorageMode {
107    /// KVS 経由でディスク永続化。
108    Disk,
109    /// 完全インメモリ保持。
110    InMemory,
111}
112
113impl Database {
114    /// 構成付きでデータベースを開く(カラムナー機能を初期化)。
115    pub fn open_with_config(config: EmbeddedConfig) -> Result<Self> {
116        let store = match config.storage_mode {
117            StorageMode::Disk => {
118                let path = config.path.clone().ok_or_else(|| {
119                    Error::Core(alopex_core::Error::InvalidFormat(
120                        "disk mode requires a path".into(),
121                    ))
122                })?;
123                let path = crate::disk_data_dir_path(&path);
124                StorageFactory::create(CoreStorageMode::Disk { path, config: None })
125                    .map_err(Error::Core)?
126            }
127            StorageMode::InMemory => StorageFactory::create(CoreStorageMode::Memory {
128                max_size: config.memory_limit,
129            })
130            .map_err(Error::Core)?,
131        };
132
133        Ok(Self::init(
134            store,
135            config.storage_mode,
136            config.memory_limit,
137            config.segment_config,
138        ))
139    }
140
141    /// 現在のカラムナーストレージモードを返す。
142    pub fn storage_mode(&self) -> StorageMode {
143        self.columnar_mode
144    }
145
146    /// カラムナーセグメントを書き込む。
147    pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
148        let mut writer = SegmentWriterV2::new(self.segment_config.clone());
149        writer
150            .write_batch(batch)
151            .map_err(|e| Error::Core(e.into()))?;
152        let segment = writer.finish().map_err(|e| Error::Core(e.into()))?;
153        let table_id = table_id(table)?;
154
155        match self.columnar_mode {
156            StorageMode::Disk => self
157                .columnar_bridge
158                .write_segment(table_id, &segment)
159                .map_err(|e| Error::Core(e.into())),
160            StorageMode::InMemory => {
161                let store = self.columnar_memory.as_ref().ok_or_else(|| {
162                    Error::Core(alopex_core::Error::InvalidFormat(
163                        "in-memory columnar store is not initialized".into(),
164                    ))
165                })?;
166                store
167                    .write_segment(table_id, segment)
168                    .map_err(|e| Error::Core(e.into()))
169            }
170        }
171    }
172
173    /// カラムナーセグメントを書き込む(構成上書き)。
174    pub fn write_columnar_segment_with_config(
175        &self,
176        table: &str,
177        batch: RecordBatch,
178        config: SegmentConfigV2,
179    ) -> Result<u64> {
180        let mut writer = SegmentWriterV2::new(config);
181        writer
182            .write_batch(batch)
183            .map_err(|e| Error::Core(e.into()))?;
184        let segment = writer.finish().map_err(|e| Error::Core(e.into()))?;
185        let table_id = table_id(table)?;
186
187        match self.columnar_mode {
188            StorageMode::Disk => self
189                .columnar_bridge
190                .write_segment(table_id, &segment)
191                .map_err(|e| Error::Core(e.into())),
192            StorageMode::InMemory => {
193                let store = self.columnar_memory.as_ref().ok_or_else(|| {
194                    Error::Core(alopex_core::Error::InvalidFormat(
195                        "in-memory columnar store is not initialized".into(),
196                    ))
197                })?;
198                store
199                    .write_segment(table_id, segment)
200                    .map_err(|e| Error::Core(e.into()))
201            }
202        }
203    }
204
205    /// カラムナーセグメントを読み取る(カラム名指定オプション付き)。
206    pub fn read_columnar_segment(
207        &self,
208        table: &str,
209        segment_id: u64,
210        columns: Option<&[&str]>,
211    ) -> Result<Vec<RecordBatch>> {
212        let table_id = table_id(table)?;
213        let column_count = match self.columnar_mode {
214            StorageMode::Disk => self
215                .columnar_bridge
216                .column_count(table_id, segment_id)
217                .map_err(|e| Error::Core(e.into()))?,
218            StorageMode::InMemory => self
219                .columnar_memory
220                .as_ref()
221                .ok_or_else(|| {
222                    Error::Core(alopex_core::Error::InvalidFormat(
223                        "in-memory columnar store is not initialized".into(),
224                    ))
225                })?
226                .column_count(table_id, segment_id)
227                .map_err(|e| Error::Core(e.into()))?,
228        };
229        let all_indices: Vec<usize> = (0..column_count).collect();
230
231        let batches_full = match self.columnar_mode {
232            StorageMode::Disk => self
233                .columnar_bridge
234                .read_segment(table_id, segment_id, &all_indices)
235                .map_err(|e| Error::Core(e.into()))?,
236            StorageMode::InMemory => self
237                .columnar_memory
238                .as_ref()
239                .ok_or_else(|| {
240                    Error::Core(alopex_core::Error::InvalidFormat(
241                        "in-memory columnar store is not initialized".into(),
242                    ))
243                })?
244                .read_segment(table_id, segment_id, &all_indices)
245                .map_err(|e| Error::Core(e.into()))?,
246        };
247
248        if let Some(names) = columns {
249            let indices = resolve_indices(&batches_full, names)?;
250            project_batches(batches_full, &indices)
251        } else {
252            Ok(batches_full)
253        }
254    }
255
256    /// InMemory モード時のメモリ使用量を返す。Disk モードでは None。
257    pub fn in_memory_usage(&self) -> Option<u64> {
258        if self.columnar_mode == StorageMode::InMemory {
259            self.columnar_memory.as_ref().map(|m| m.memory_usage())
260        } else {
261            None
262        }
263    }
264
265    /// メモリ上限付きでインメモリ DB を開く。
266    pub fn open_in_memory_with_limit(limit: usize) -> Result<Self> {
267        Self::open_with_config(EmbeddedConfig::in_memory_with_limit(limit))
268    }
269
270    /// テーブル名から内部 ID を解決する。
271    pub fn resolve_table_id(&self, table: &str) -> Result<u32> {
272        table_id(table)
273    }
274
275    /// Scan a columnar segment by string ID.
276    ///
277    /// The segment ID format is `{table_id}:{segment_id}` (e.g., "12345:1").
278    /// Returns rows as a vector of SqlValue vectors.
279    pub fn scan_columnar_segment(
280        &self,
281        segment_id: &str,
282    ) -> Result<Vec<Vec<alopex_sql::SqlValue>>> {
283        let (table_id, seg_id) = parse_segment_id(segment_id)?;
284        let all_indices: Vec<usize> = match self.columnar_mode {
285            StorageMode::Disk => {
286                let count = self
287                    .columnar_bridge
288                    .column_count(table_id, seg_id)
289                    .map_err(|e| Error::Core(e.into()))?;
290                (0..count).collect()
291            }
292            StorageMode::InMemory => {
293                let store = self.columnar_memory.as_ref().ok_or_else(|| {
294                    Error::Core(alopex_core::Error::InvalidFormat(
295                        "in-memory columnar store is not initialized".into(),
296                    ))
297                })?;
298                let count = store
299                    .column_count(table_id, seg_id)
300                    .map_err(|e| Error::Core(e.into()))?;
301                (0..count).collect()
302            }
303        };
304
305        let batches = match self.columnar_mode {
306            StorageMode::Disk => self
307                .columnar_bridge
308                .read_segment(table_id, seg_id, &all_indices)
309                .map_err(|e| Error::Core(e.into()))?,
310            StorageMode::InMemory => self
311                .columnar_memory
312                .as_ref()
313                .ok_or_else(|| {
314                    Error::Core(alopex_core::Error::InvalidFormat(
315                        "in-memory columnar store is not initialized".into(),
316                    ))
317                })?
318                .read_segment(table_id, seg_id, &all_indices)
319                .map_err(|e| Error::Core(e.into()))?,
320        };
321
322        // Convert RecordBatch to Vec<Vec<SqlValue>>
323        let mut rows = Vec::new();
324        for batch in batches {
325            let num_rows = batch.num_rows();
326            for row_idx in 0..num_rows {
327                let mut row = Vec::with_capacity(batch.columns.len());
328                for col in &batch.columns {
329                    let sql_val = column_value_to_sql_value(col, row_idx);
330                    row.push(sql_val);
331                }
332                rows.push(row);
333            }
334        }
335        Ok(rows)
336    }
337
338    /// Scan a columnar segment by string ID, returning RecordBatches for streaming (FR-7).
339    ///
340    /// This method returns raw `RecordBatch` objects, allowing the caller to iterate
341    /// over rows without materializing all data upfront. Use this for large datasets
342    /// where streaming is required.
343    ///
344    /// The segment ID format is `{table_id}:{segment_id}` (e.g., "12345:1").
345    pub fn scan_columnar_segment_batches(&self, segment_id: &str) -> Result<Vec<RecordBatch>> {
346        let (table_id, seg_id) = parse_segment_id(segment_id)?;
347        let all_indices: Vec<usize> = match self.columnar_mode {
348            StorageMode::Disk => {
349                let count = self
350                    .columnar_bridge
351                    .column_count(table_id, seg_id)
352                    .map_err(|e| Error::Core(e.into()))?;
353                (0..count).collect()
354            }
355            StorageMode::InMemory => {
356                let store = self.columnar_memory.as_ref().ok_or_else(|| {
357                    Error::Core(alopex_core::Error::InvalidFormat(
358                        "in-memory columnar store is not initialized".into(),
359                    ))
360                })?;
361                let count = store
362                    .column_count(table_id, seg_id)
363                    .map_err(|e| Error::Core(e.into()))?;
364                (0..count).collect()
365            }
366        };
367
368        match self.columnar_mode {
369            StorageMode::Disk => self
370                .columnar_bridge
371                .read_segment(table_id, seg_id, &all_indices)
372                .map_err(|e| Error::Core(e.into())),
373            StorageMode::InMemory => self
374                .columnar_memory
375                .as_ref()
376                .ok_or_else(|| {
377                    Error::Core(alopex_core::Error::InvalidFormat(
378                        "in-memory columnar store is not initialized".into(),
379                    ))
380                })?
381                .read_segment(table_id, seg_id, &all_indices)
382                .map_err(|e| Error::Core(e.into())),
383        }
384    }
385
386    /// Create a streaming row iterator over a columnar segment (FR-7).
387    ///
388    /// This returns a `ColumnarRowIterator` that yields rows one at a time from
389    /// the underlying RecordBatches, without materializing all rows upfront.
390    ///
391    /// The segment ID format is `{table_id}:{segment_id}` (e.g., "12345:1").
392    pub fn scan_columnar_segment_streaming(&self, segment_id: &str) -> Result<ColumnarRowIterator> {
393        let batches = self.scan_columnar_segment_batches(segment_id)?;
394        Ok(ColumnarRowIterator::new(batches))
395    }
396
397    /// Get statistics for a columnar segment by string ID.
398    ///
399    /// The segment ID format is `{table_id}:{segment_id}` (e.g., "12345:1").
400    pub fn get_columnar_segment_stats(&self, segment_id: &str) -> Result<ColumnarSegmentStats> {
401        let (table_id, seg_id) = parse_segment_id(segment_id)?;
402
403        match self.columnar_mode {
404            StorageMode::Disk => {
405                let column_count = self
406                    .columnar_bridge
407                    .column_count(table_id, seg_id)
408                    .map_err(|e| Error::Core(e.into()))?;
409                let batches = self
410                    .columnar_bridge
411                    .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
412                    .map_err(|e| Error::Core(e.into()))?;
413                let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
414
415                Ok(ColumnarSegmentStats {
416                    row_count,
417                    column_count,
418                    size_bytes: 0, // Size not available in current implementation
419                })
420            }
421            StorageMode::InMemory => {
422                let store = self.columnar_memory.as_ref().ok_or_else(|| {
423                    Error::Core(alopex_core::Error::InvalidFormat(
424                        "in-memory columnar store is not initialized".into(),
425                    ))
426                })?;
427                let column_count = store
428                    .column_count(table_id, seg_id)
429                    .map_err(|e| Error::Core(e.into()))?;
430                let batches = store
431                    .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
432                    .map_err(|e| Error::Core(e.into()))?;
433                let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
434
435                Ok(ColumnarSegmentStats {
436                    row_count,
437                    column_count,
438                    size_bytes: 0, // Size not available in current implementation
439                })
440            }
441        }
442    }
443
444    /// List all columnar segments.
445    ///
446    /// Returns segment IDs in the format `{table_id}:{segment_id}`.
447    pub fn list_columnar_segments(&self) -> Result<Vec<String>> {
448        match self.columnar_mode {
449            StorageMode::Disk => {
450                let segments = self
451                    .columnar_bridge
452                    .list_segments()
453                    .map_err(|e| Error::Core(e.into()))?;
454                Ok(segments
455                    .into_iter()
456                    .map(|(table_id, seg_id)| format!("{}:{}", table_id, seg_id))
457                    .collect())
458            }
459            StorageMode::InMemory => {
460                let store = self.columnar_memory.as_ref().ok_or_else(|| {
461                    Error::Core(alopex_core::Error::InvalidFormat(
462                        "in-memory columnar store is not initialized".into(),
463                    ))
464                })?;
465                let segments = store.list_segments();
466                Ok(segments
467                    .into_iter()
468                    .map(|(table_id, seg_id)| format!("{}:{}", table_id, seg_id))
469                    .collect())
470            }
471        }
472    }
473
474    /// Create a columnar index for a segment/column.
475    pub fn create_columnar_index(
476        &self,
477        segment_id: &str,
478        column: &str,
479        index_type: ColumnarIndexType,
480    ) -> Result<()> {
481        let _ = self.get_columnar_segment_stats(segment_id)?;
482        let key = columnar_index_key(segment_id, column);
483        let value = index_type.as_str().as_bytes().to_vec();
484        let mut txn = self.begin(TxnMode::ReadWrite)?;
485        txn.put(&key, &value)?;
486        txn.commit()?;
487        Ok(())
488    }
489
490    /// List columnar indexes for a segment.
491    pub fn list_columnar_indexes(&self, segment_id: &str) -> Result<Vec<ColumnarIndexInfo>> {
492        let _ = self.get_columnar_segment_stats(segment_id)?;
493        let prefix = columnar_index_prefix(segment_id);
494        let mut txn = self.begin(TxnMode::ReadOnly)?;
495        let mut entries = Vec::new();
496        for (key, value) in txn.scan_prefix(&prefix)? {
497            let column = parse_index_column(segment_id, &key)?;
498            let index_type = parse_index_type(&value)?;
499            entries.push(ColumnarIndexInfo { column, index_type });
500        }
501        txn.commit()?;
502        Ok(entries)
503    }
504
505    /// Drop a columnar index for a segment/column.
506    pub fn drop_columnar_index(&self, segment_id: &str, column: &str) -> Result<()> {
507        let _ = self.get_columnar_segment_stats(segment_id)?;
508        let key = columnar_index_key(segment_id, column);
509        let mut txn = self.begin(TxnMode::ReadWrite)?;
510        let exists = txn.get(&key)?.is_some();
511        if !exists {
512            txn.rollback()?;
513            return Err(Error::IndexNotFound(format!(
514                "columnar index {}:{}",
515                segment_id, column
516            )));
517        }
518        txn.delete(&key)?;
519        txn.commit()?;
520        Ok(())
521    }
522
523    /// InMemory モードのセグメントをファイルへフラッシュする。
524    pub fn flush_in_memory_segment_to_file(
525        &self,
526        table: &str,
527        segment_id: u64,
528        path: &Path,
529    ) -> Result<()> {
530        let store = self
531            .columnar_memory
532            .as_ref()
533            .ok_or(Error::NotInMemoryMode)?;
534        let table_id = table_id(table)?;
535        store
536            .flush_to_segment_file(table_id, segment_id, path)
537            .map_err(|e| Error::Core(e.into()))
538    }
539
540    /// InMemory モードのセグメントを KVS へフラッシュする。
541    pub fn flush_in_memory_segment_to_kvs(&self, table: &str, segment_id: u64) -> Result<u64> {
542        let store = self
543            .columnar_memory
544            .as_ref()
545            .ok_or(Error::NotInMemoryMode)?;
546        let table_id = table_id(table)?;
547        store
548            .flush_to_kvs(table_id, segment_id, &self.columnar_bridge)
549            .map_err(|e| Error::Core(e.into()))
550    }
551
552    /// InMemory モードのセグメントを `.alopex` ファイルへフラッシュする。
553    pub fn flush_in_memory_segment_to_alopex(
554        &self,
555        table: &str,
556        segment_id: u64,
557        writer: &mut AlopexFileWriter,
558    ) -> Result<u32> {
559        let store = self
560            .columnar_memory
561            .as_ref()
562            .ok_or(Error::NotInMemoryMode)?;
563        let table_id = table_id(table)?;
564        store
565            .flush_to_alopex(table_id, segment_id, writer)
566            .map_err(|e| Error::Core(e.into()))
567    }
568}
569
570impl<'a> Transaction<'a> {
571    /// 現在のカラムナーストレージモードを返す。
572    pub fn storage_mode(&self) -> StorageMode {
573        self.db.storage_mode()
574    }
575
576    /// カラムナーセグメントを書き込む(トランザクションコンテキスト利用)。
577    pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
578        self.db.write_columnar_segment(table, batch)
579    }
580
581    /// カラムナーセグメントを読み取る(トランザクションコンテキスト利用)。
582    pub fn read_columnar_segment(
583        &self,
584        table: &str,
585        segment_id: u64,
586        columns: Option<&[&str]>,
587    ) -> Result<Vec<RecordBatch>> {
588        self.db.read_columnar_segment(table, segment_id, columns)
589    }
590}
591
592fn table_id(table: &str) -> Result<u32> {
593    if table.is_empty() {
594        return Err(Error::TableNotFound("table name is empty".into()));
595    }
596    let mut hasher = DefaultHasher::new();
597    table.hash(&mut hasher);
598    Ok((hasher.finish() & 0xffff_ffff) as u32)
599}
600
601fn resolve_indices(batches: &[RecordBatch], names: &[&str]) -> Result<Vec<usize>> {
602    let Some(first) = batches.first() else {
603        return Err(Error::Core(alopex_core::Error::InvalidFormat(
604            "segment is empty".into(),
605        )));
606    };
607    let mut indices = Vec::with_capacity(names.len());
608    for name in names {
609        let pos = first
610            .schema
611            .columns
612            .iter()
613            .position(|c| c.name == *name)
614            .ok_or_else(|| {
615                Error::Core(alopex_core::Error::InvalidFormat(format!(
616                    "column not found: {name}"
617                )))
618            })?;
619        indices.push(pos);
620    }
621    Ok(indices)
622}
623
624fn project_batches(batches: Vec<RecordBatch>, indices: &[usize]) -> Result<Vec<RecordBatch>> {
625    let mut projected = Vec::with_capacity(batches.len());
626    for batch in batches {
627        let mut cols = Vec::with_capacity(indices.len());
628        let mut bitmaps = Vec::with_capacity(indices.len());
629        for &idx in indices {
630            let col = batch
631                .columns
632                .get(idx)
633                .ok_or_else(|| {
634                    Error::Core(alopex_core::Error::InvalidFormat(
635                        "column index out of bounds".into(),
636                    ))
637                })?
638                .clone();
639            let bitmap = batch.null_bitmaps.get(idx).cloned().unwrap_or(None);
640            cols.push(col);
641            bitmaps.push(bitmap);
642        }
643        let schema = alopex_core::columnar::segment_v2::Schema {
644            columns: indices
645                .iter()
646                .map(|&idx| batch.schema.columns[idx].clone())
647                .collect(),
648        };
649        projected.push(RecordBatch::new(schema, cols, bitmaps));
650    }
651    Ok(projected)
652}
653
654const COLUMNAR_INDEX_PREFIX: &str = "__alopex_columnar_index__:";
655
656fn columnar_index_key(segment: &str, column: &str) -> Vec<u8> {
657    let mut key =
658        String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + column.len() + 1);
659    key.push_str(COLUMNAR_INDEX_PREFIX);
660    key.push_str(segment);
661    key.push(':');
662    key.push_str(column);
663    key.into_bytes()
664}
665
666fn columnar_index_prefix(segment: &str) -> Vec<u8> {
667    let mut key = String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + 1);
668    key.push_str(COLUMNAR_INDEX_PREFIX);
669    key.push_str(segment);
670    key.push(':');
671    key.into_bytes()
672}
673
674fn parse_index_column(segment: &str, key: &[u8]) -> Result<String> {
675    let prefix = columnar_index_prefix(segment);
676    if !key.starts_with(&prefix) {
677        return Err(Error::Core(alopex_core::Error::InvalidFormat(
678            "columnar index key is invalid".into(),
679        )));
680    }
681    let suffix = &key[prefix.len()..];
682    String::from_utf8(suffix.to_vec()).map_err(|_| {
683        Error::Core(alopex_core::Error::InvalidFormat(
684            "columnar index column is not valid UTF-8".into(),
685        ))
686    })
687}
688
689fn parse_index_type(raw: &[u8]) -> Result<ColumnarIndexType> {
690    let value = std::str::from_utf8(raw).map_err(|_| {
691        Error::Core(alopex_core::Error::InvalidFormat(
692            "columnar index type is not valid UTF-8".into(),
693        ))
694    })?;
695    match value {
696        "minmax" => Ok(ColumnarIndexType::Minmax),
697        "bloom" => Ok(ColumnarIndexType::Bloom),
698        other => Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
699            "unknown columnar index type: {other}"
700        )))),
701    }
702}
703
704/// セグメントID文字列をパースする。
705///
706/// フォーマット: `{table_id}:{segment_id}` (例: "12345:1")
707fn parse_segment_id(segment_id: &str) -> Result<(u32, u64)> {
708    let parts: Vec<&str> = segment_id.split(':').collect();
709    if parts.len() != 2 {
710        return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
711            "invalid segment ID format: expected 'table_id:segment_id', got '{}'",
712            segment_id
713        ))));
714    }
715
716    let table_id: u32 = parts[0].parse().map_err(|_| {
717        Error::Core(alopex_core::Error::InvalidFormat(format!(
718            "invalid table_id in segment ID: '{}'",
719            parts[0]
720        )))
721    })?;
722
723    let seg_id: u64 = parts[1].parse().map_err(|_| {
724        Error::Core(alopex_core::Error::InvalidFormat(format!(
725            "invalid segment_id in segment ID: '{}'",
726            parts[1]
727        )))
728    })?;
729
730    Ok((table_id, seg_id))
731}
732
733/// カラム値を SqlValue に変換する。
734fn column_value_to_sql_value(col: &Column, row_idx: usize) -> alopex_sql::SqlValue {
735    match col {
736        Column::Int64(vals) => vals
737            .get(row_idx)
738            .map(|&v| alopex_sql::SqlValue::BigInt(v))
739            .unwrap_or(alopex_sql::SqlValue::Null),
740        Column::Float32(vals) => vals
741            .get(row_idx)
742            .map(|&v| alopex_sql::SqlValue::Float(v))
743            .unwrap_or(alopex_sql::SqlValue::Null),
744        Column::Float64(vals) => vals
745            .get(row_idx)
746            .map(|&v| alopex_sql::SqlValue::Double(v))
747            .unwrap_or(alopex_sql::SqlValue::Null),
748        Column::Bool(vals) => vals
749            .get(row_idx)
750            .map(|&v| alopex_sql::SqlValue::Boolean(v))
751            .unwrap_or(alopex_sql::SqlValue::Null),
752        Column::Binary(vals) => vals
753            .get(row_idx)
754            .map(|v| alopex_sql::SqlValue::Blob(v.clone()))
755            .unwrap_or(alopex_sql::SqlValue::Null),
756        Column::Fixed { values, .. } => values
757            .get(row_idx)
758            .map(|v| alopex_sql::SqlValue::Blob(v.clone()))
759            .unwrap_or(alopex_sql::SqlValue::Null),
760    }
761}
762
763// ============================================================================
764// ColumnarRowIterator - FR-7 Streaming Row Iterator
765// ============================================================================
766
767/// Streaming row iterator for columnar segments (FR-7 compliant).
768///
769/// This iterator yields rows one at a time from pre-loaded RecordBatches,
770/// avoiding the need to materialize all rows into `Vec<Vec<SqlValue>>` upfront.
771pub struct ColumnarRowIterator {
772    /// Pre-loaded RecordBatches.
773    batches: Vec<RecordBatch>,
774    /// Current batch index.
775    batch_idx: usize,
776    /// Current row index within the batch.
777    row_idx: usize,
778}
779
780impl ColumnarRowIterator {
781    /// Create a new row iterator from RecordBatches.
782    pub fn new(batches: Vec<RecordBatch>) -> Self {
783        Self {
784            batches,
785            batch_idx: 0,
786            row_idx: 0,
787        }
788    }
789
790    /// Returns the total number of batches.
791    pub fn batch_count(&self) -> usize {
792        self.batches.len()
793    }
794
795    /// Returns the current batch being iterated, if any.
796    pub fn current_batch(&self) -> Option<&RecordBatch> {
797        self.batches.get(self.batch_idx)
798    }
799}
800
801impl Iterator for ColumnarRowIterator {
802    type Item = Vec<alopex_sql::SqlValue>;
803
804    fn next(&mut self) -> Option<Self::Item> {
805        loop {
806            // Check if we've exhausted all batches
807            if self.batch_idx >= self.batches.len() {
808                return None;
809            }
810
811            let batch = &self.batches[self.batch_idx];
812            let row_count = batch.num_rows();
813
814            // Check if we've exhausted the current batch
815            if self.row_idx >= row_count {
816                self.batch_idx += 1;
817                self.row_idx = 0;
818                continue;
819            }
820
821            // Convert current row
822            let row_idx = self.row_idx;
823            self.row_idx += 1;
824
825            let mut row = Vec::with_capacity(batch.columns.len());
826            for col in &batch.columns {
827                let sql_val = column_value_to_sql_value(col, row_idx);
828                row.push(sql_val);
829            }
830            return Some(row);
831        }
832    }
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838    use alopex_core::columnar::encoding::{Column, LogicalType};
839    use alopex_core::columnar::segment_v2::{ColumnSchema, Schema};
840    use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
841    use tempfile::tempdir;
842
843    fn make_batch() -> RecordBatch {
844        let schema = Schema {
845            columns: vec![
846                ColumnSchema {
847                    name: "id".into(),
848                    logical_type: LogicalType::Int64,
849                    nullable: false,
850                    fixed_len: None,
851                },
852                ColumnSchema {
853                    name: "val".into(),
854                    logical_type: LogicalType::Int64,
855                    nullable: false,
856                    fixed_len: None,
857                },
858            ],
859        };
860        RecordBatch::new(
861            schema,
862            vec![
863                Column::Int64(vec![1, 2, 3]),
864                Column::Int64(vec![10, 20, 30]),
865            ],
866            vec![None, None],
867        )
868    }
869
870    #[test]
871    fn write_read_disk_mode() {
872        let dir = tempdir().unwrap();
873        let wal = dir.path().join("wal.log");
874        let cfg = EmbeddedConfig::disk(wal);
875        let db = Database::open_with_config(cfg).unwrap();
876        let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
877        let batches = db.read_columnar_segment("tbl", seg_id, None).unwrap();
878        assert_eq!(batches[0].num_rows(), 3);
879    }
880
881    #[test]
882    fn read_with_column_names() {
883        let dir = tempdir().unwrap();
884        let wal = dir.path().join("wal.log");
885        let cfg = EmbeddedConfig::disk(wal);
886        let db = Database::open_with_config(cfg).unwrap();
887        let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
888        let batches = db
889            .read_columnar_segment("tbl", seg_id, Some(&["val"]))
890            .unwrap();
891        assert_eq!(batches[0].columns.len(), 1);
892        if let Column::Int64(vals) = &batches[0].columns[0] {
893            assert_eq!(vals, &vec![10, 20, 30]);
894        } else {
895            panic!("expected int64");
896        }
897    }
898
899    #[test]
900    fn in_memory_limit_rejects_large_segment() {
901        let cfg = EmbeddedConfig::in_memory_with_limit(1);
902        let db = Database::open_with_config(cfg).unwrap();
903        let err = db
904            .write_columnar_segment("tbl", make_batch())
905            .expect_err("should exceed limit");
906        assert!(format!("{err}").contains("memory limit exceeded"));
907    }
908
909    #[test]
910    fn storage_mode_flags() {
911        let dir = tempdir().unwrap();
912        let wal = dir.path().join("wal.log");
913        let disk = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
914        assert!(matches!(disk.storage_mode(), StorageMode::Disk));
915
916        let mem = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
917        assert!(matches!(mem.storage_mode(), StorageMode::InMemory));
918    }
919
920    #[test]
921    fn transaction_write_and_read() {
922        let dir = tempdir().unwrap();
923        let wal = dir.path().join("wal.log");
924        let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
925        let txn = db.begin(crate::TxnMode::ReadWrite).unwrap();
926        let seg_id = txn.write_columnar_segment("tbl_txn", make_batch()).unwrap();
927        txn.commit().unwrap();
928
929        let batches = db
930            .read_columnar_segment("tbl_txn", seg_id, Some(&["id"]))
931            .unwrap();
932        assert_eq!(batches[0].num_rows(), 3);
933    }
934
935    #[test]
936    fn flush_in_memory_paths() {
937        let dir = tempdir().unwrap();
938        let db = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
939        let seg_id = db.write_columnar_segment("mem_tbl", make_batch()).unwrap();
940
941        // flush to file
942        let file_path = dir.path().join("seg.bin");
943        db.flush_in_memory_segment_to_file("mem_tbl", seg_id, &file_path)
944            .unwrap();
945        let bytes = std::fs::read(&file_path).unwrap();
946        assert!(!bytes.is_empty());
947
948        // flush to kvs
949        let kv_id = db
950            .flush_in_memory_segment_to_kvs("mem_tbl", seg_id)
951            .unwrap();
952        assert_eq!(kv_id, 0);
953
954        // flush to .alopex
955        let alo_path = dir.path().join("out.alopex");
956        let mut writer =
957            AlopexFileWriter::new(alo_path.clone(), FileVersion::CURRENT, FileFlags(0)).unwrap();
958        db.flush_in_memory_segment_to_alopex("mem_tbl", seg_id, &mut writer)
959            .unwrap();
960        writer.finalize().unwrap();
961        assert!(alo_path.exists());
962    }
963
964    #[test]
965    fn flush_not_in_memory_mode_errors() {
966        let dir = tempdir().unwrap();
967        let wal = dir.path().join("wal.log");
968        let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
969        let err = db
970            .flush_in_memory_segment_to_kvs("tbl", 0)
971            .expect_err("should error");
972        assert!(matches!(err, Error::NotInMemoryMode));
973    }
974}