alopex_embedded/
columnar_api.rs

1//! カラムナーストレージの埋め込み API 拡張。
2
3use std::collections::hash_map::DefaultHasher;
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6
7use alopex_core::columnar::segment_v2::{RecordBatch, SegmentWriterV2};
8use alopex_core::storage::format::AlopexFileWriter;
9use alopex_core::{StorageFactory, StorageMode as CoreStorageMode};
10
11use crate::{Database, Error, Result, SegmentConfigV2, Transaction};
12
13/// カラムナー関連設定。
14#[derive(Debug, Clone)]
15pub struct EmbeddedConfig {
16    /// データパス(Disk モード時に必須)。
17    pub path: Option<PathBuf>,
18    /// カラムナーストレージモード。
19    pub storage_mode: StorageMode,
20    /// InMemory モードのメモリ上限(バイト)。
21    pub memory_limit: Option<usize>,
22    /// セグメント設定。
23    pub segment_config: SegmentConfigV2,
24}
25
26impl EmbeddedConfig {
27    /// ディスクモードで初期化。
28    pub fn disk(path: PathBuf) -> Self {
29        Self {
30            path: Some(path),
31            storage_mode: StorageMode::Disk,
32            memory_limit: None,
33            segment_config: SegmentConfigV2::default(),
34        }
35    }
36
37    /// インメモリモードで初期化(無制限)。
38    pub fn in_memory() -> Self {
39        Self {
40            path: None,
41            storage_mode: StorageMode::InMemory,
42            memory_limit: None,
43            segment_config: SegmentConfigV2::default(),
44        }
45    }
46
47    /// インメモリモードでメモリ上限を設定。
48    pub fn in_memory_with_limit(limit: usize) -> Self {
49        Self {
50            path: None,
51            storage_mode: StorageMode::InMemory,
52            memory_limit: Some(limit),
53            segment_config: SegmentConfigV2::default(),
54        }
55    }
56
57    /// セグメント設定を上書き。
58    pub fn with_segment_config(mut self, cfg: SegmentConfigV2) -> Self {
59        self.segment_config = cfg;
60        self
61    }
62}
63
64/// カラムナー用ストレージモード。
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum StorageMode {
67    /// KVS 経由でディスク永続化。
68    Disk,
69    /// 完全インメモリ保持。
70    InMemory,
71}
72
73impl Database {
74    /// 構成付きでデータベースを開く(カラムナー機能を初期化)。
75    pub fn open_with_config(config: EmbeddedConfig) -> Result<Self> {
76        let store = match config.storage_mode {
77            StorageMode::Disk => {
78                let path = config.path.clone().ok_or_else(|| {
79                    Error::Core(alopex_core::Error::InvalidFormat(
80                        "disk mode requires a path".into(),
81                    ))
82                })?;
83                let path = crate::disk_data_dir_path(&path);
84                StorageFactory::create(CoreStorageMode::Disk { path, config: None })
85                    .map_err(Error::Core)?
86            }
87            StorageMode::InMemory => StorageFactory::create(CoreStorageMode::Memory {
88                max_size: config.memory_limit,
89            })
90            .map_err(Error::Core)?,
91        };
92
93        Ok(Self::init(
94            store,
95            config.storage_mode,
96            config.memory_limit,
97            config.segment_config,
98        ))
99    }
100
101    /// 現在のカラムナーストレージモードを返す。
102    pub fn storage_mode(&self) -> StorageMode {
103        self.columnar_mode
104    }
105
106    /// カラムナーセグメントを書き込む。
107    pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
108        let mut writer = SegmentWriterV2::new(self.segment_config.clone());
109        writer
110            .write_batch(batch)
111            .map_err(|e| Error::Core(e.into()))?;
112        let segment = writer.finish().map_err(|e| Error::Core(e.into()))?;
113        let table_id = table_id(table)?;
114
115        match self.columnar_mode {
116            StorageMode::Disk => self
117                .columnar_bridge
118                .write_segment(table_id, &segment)
119                .map_err(|e| Error::Core(e.into())),
120            StorageMode::InMemory => {
121                let store = self.columnar_memory.as_ref().ok_or_else(|| {
122                    Error::Core(alopex_core::Error::InvalidFormat(
123                        "in-memory columnar store is not initialized".into(),
124                    ))
125                })?;
126                store
127                    .write_segment(table_id, segment)
128                    .map_err(|e| Error::Core(e.into()))
129            }
130        }
131    }
132
133    /// カラムナーセグメントを読み取る(カラム名指定オプション付き)。
134    pub fn read_columnar_segment(
135        &self,
136        table: &str,
137        segment_id: u64,
138        columns: Option<&[&str]>,
139    ) -> Result<Vec<RecordBatch>> {
140        let table_id = table_id(table)?;
141        let column_count = match self.columnar_mode {
142            StorageMode::Disk => self
143                .columnar_bridge
144                .column_count(table_id, segment_id)
145                .map_err(|e| Error::Core(e.into()))?,
146            StorageMode::InMemory => self
147                .columnar_memory
148                .as_ref()
149                .ok_or_else(|| {
150                    Error::Core(alopex_core::Error::InvalidFormat(
151                        "in-memory columnar store is not initialized".into(),
152                    ))
153                })?
154                .column_count(table_id, segment_id)
155                .map_err(|e| Error::Core(e.into()))?,
156        };
157        let all_indices: Vec<usize> = (0..column_count).collect();
158
159        let batches_full = match self.columnar_mode {
160            StorageMode::Disk => self
161                .columnar_bridge
162                .read_segment(table_id, segment_id, &all_indices)
163                .map_err(|e| Error::Core(e.into()))?,
164            StorageMode::InMemory => self
165                .columnar_memory
166                .as_ref()
167                .ok_or_else(|| {
168                    Error::Core(alopex_core::Error::InvalidFormat(
169                        "in-memory columnar store is not initialized".into(),
170                    ))
171                })?
172                .read_segment(table_id, segment_id, &all_indices)
173                .map_err(|e| Error::Core(e.into()))?,
174        };
175
176        if let Some(names) = columns {
177            let indices = resolve_indices(&batches_full, names)?;
178            project_batches(batches_full, &indices)
179        } else {
180            Ok(batches_full)
181        }
182    }
183
184    /// InMemory モード時のメモリ使用量を返す。Disk モードでは None。
185    pub fn in_memory_usage(&self) -> Option<u64> {
186        if self.columnar_mode == StorageMode::InMemory {
187            self.columnar_memory.as_ref().map(|m| m.memory_usage())
188        } else {
189            None
190        }
191    }
192
193    /// メモリ上限付きでインメモリ DB を開く。
194    pub fn open_in_memory_with_limit(limit: usize) -> Result<Self> {
195        Self::open_with_config(EmbeddedConfig::in_memory_with_limit(limit))
196    }
197
198    /// テーブル名から内部 ID を解決する。
199    pub fn resolve_table_id(&self, table: &str) -> Result<u32> {
200        table_id(table)
201    }
202
203    /// InMemory モードのセグメントをファイルへフラッシュする。
204    pub fn flush_in_memory_segment_to_file(
205        &self,
206        table: &str,
207        segment_id: u64,
208        path: &Path,
209    ) -> Result<()> {
210        let store = self
211            .columnar_memory
212            .as_ref()
213            .ok_or(Error::NotInMemoryMode)?;
214        let table_id = table_id(table)?;
215        store
216            .flush_to_segment_file(table_id, segment_id, path)
217            .map_err(|e| Error::Core(e.into()))
218    }
219
220    /// InMemory モードのセグメントを KVS へフラッシュする。
221    pub fn flush_in_memory_segment_to_kvs(&self, table: &str, segment_id: u64) -> Result<u64> {
222        let store = self
223            .columnar_memory
224            .as_ref()
225            .ok_or(Error::NotInMemoryMode)?;
226        let table_id = table_id(table)?;
227        store
228            .flush_to_kvs(table_id, segment_id, &self.columnar_bridge)
229            .map_err(|e| Error::Core(e.into()))
230    }
231
232    /// InMemory モードのセグメントを `.alopex` ファイルへフラッシュする。
233    pub fn flush_in_memory_segment_to_alopex(
234        &self,
235        table: &str,
236        segment_id: u64,
237        writer: &mut AlopexFileWriter,
238    ) -> Result<u32> {
239        let store = self
240            .columnar_memory
241            .as_ref()
242            .ok_or(Error::NotInMemoryMode)?;
243        let table_id = table_id(table)?;
244        store
245            .flush_to_alopex(table_id, segment_id, writer)
246            .map_err(|e| Error::Core(e.into()))
247    }
248}
249
250impl<'a> Transaction<'a> {
251    /// 現在のカラムナーストレージモードを返す。
252    pub fn storage_mode(&self) -> StorageMode {
253        self.db.storage_mode()
254    }
255
256    /// カラムナーセグメントを書き込む(トランザクションコンテキスト利用)。
257    pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
258        self.db.write_columnar_segment(table, batch)
259    }
260
261    /// カラムナーセグメントを読み取る(トランザクションコンテキスト利用)。
262    pub fn read_columnar_segment(
263        &self,
264        table: &str,
265        segment_id: u64,
266        columns: Option<&[&str]>,
267    ) -> Result<Vec<RecordBatch>> {
268        self.db.read_columnar_segment(table, segment_id, columns)
269    }
270}
271
272fn table_id(table: &str) -> Result<u32> {
273    if table.is_empty() {
274        return Err(Error::TableNotFound("table name is empty".into()));
275    }
276    let mut hasher = DefaultHasher::new();
277    table.hash(&mut hasher);
278    Ok((hasher.finish() & 0xffff_ffff) as u32)
279}
280
281fn resolve_indices(batches: &[RecordBatch], names: &[&str]) -> Result<Vec<usize>> {
282    let Some(first) = batches.first() else {
283        return Err(Error::Core(alopex_core::Error::InvalidFormat(
284            "segment is empty".into(),
285        )));
286    };
287    let mut indices = Vec::with_capacity(names.len());
288    for name in names {
289        let pos = first
290            .schema
291            .columns
292            .iter()
293            .position(|c| c.name == *name)
294            .ok_or_else(|| {
295                Error::Core(alopex_core::Error::InvalidFormat(format!(
296                    "column not found: {name}"
297                )))
298            })?;
299        indices.push(pos);
300    }
301    Ok(indices)
302}
303
304fn project_batches(batches: Vec<RecordBatch>, indices: &[usize]) -> Result<Vec<RecordBatch>> {
305    let mut projected = Vec::with_capacity(batches.len());
306    for batch in batches {
307        let mut cols = Vec::with_capacity(indices.len());
308        let mut bitmaps = Vec::with_capacity(indices.len());
309        for &idx in indices {
310            let col = batch
311                .columns
312                .get(idx)
313                .ok_or_else(|| {
314                    Error::Core(alopex_core::Error::InvalidFormat(
315                        "column index out of bounds".into(),
316                    ))
317                })?
318                .clone();
319            let bitmap = batch.null_bitmaps.get(idx).cloned().unwrap_or(None);
320            cols.push(col);
321            bitmaps.push(bitmap);
322        }
323        let schema = alopex_core::columnar::segment_v2::Schema {
324            columns: indices
325                .iter()
326                .map(|&idx| batch.schema.columns[idx].clone())
327                .collect(),
328        };
329        projected.push(RecordBatch::new(schema, cols, bitmaps));
330    }
331    Ok(projected)
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use alopex_core::columnar::encoding::{Column, LogicalType};
338    use alopex_core::columnar::segment_v2::{ColumnSchema, Schema};
339    use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
340    use tempfile::tempdir;
341
342    fn make_batch() -> RecordBatch {
343        let schema = Schema {
344            columns: vec![
345                ColumnSchema {
346                    name: "id".into(),
347                    logical_type: LogicalType::Int64,
348                    nullable: false,
349                    fixed_len: None,
350                },
351                ColumnSchema {
352                    name: "val".into(),
353                    logical_type: LogicalType::Int64,
354                    nullable: false,
355                    fixed_len: None,
356                },
357            ],
358        };
359        RecordBatch::new(
360            schema,
361            vec![
362                Column::Int64(vec![1, 2, 3]),
363                Column::Int64(vec![10, 20, 30]),
364            ],
365            vec![None, None],
366        )
367    }
368
369    #[test]
370    fn write_read_disk_mode() {
371        let dir = tempdir().unwrap();
372        let wal = dir.path().join("wal.log");
373        let cfg = EmbeddedConfig::disk(wal);
374        let db = Database::open_with_config(cfg).unwrap();
375        let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
376        let batches = db.read_columnar_segment("tbl", seg_id, None).unwrap();
377        assert_eq!(batches[0].num_rows(), 3);
378    }
379
380    #[test]
381    fn read_with_column_names() {
382        let dir = tempdir().unwrap();
383        let wal = dir.path().join("wal.log");
384        let cfg = EmbeddedConfig::disk(wal);
385        let db = Database::open_with_config(cfg).unwrap();
386        let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
387        let batches = db
388            .read_columnar_segment("tbl", seg_id, Some(&["val"]))
389            .unwrap();
390        assert_eq!(batches[0].columns.len(), 1);
391        if let Column::Int64(vals) = &batches[0].columns[0] {
392            assert_eq!(vals, &vec![10, 20, 30]);
393        } else {
394            panic!("expected int64");
395        }
396    }
397
398    #[test]
399    fn in_memory_limit_rejects_large_segment() {
400        let cfg = EmbeddedConfig::in_memory_with_limit(1);
401        let db = Database::open_with_config(cfg).unwrap();
402        let err = db
403            .write_columnar_segment("tbl", make_batch())
404            .expect_err("should exceed limit");
405        assert!(format!("{err}").contains("memory limit exceeded"));
406    }
407
408    #[test]
409    fn storage_mode_flags() {
410        let dir = tempdir().unwrap();
411        let wal = dir.path().join("wal.log");
412        let disk = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
413        assert!(matches!(disk.storage_mode(), StorageMode::Disk));
414
415        let mem = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
416        assert!(matches!(mem.storage_mode(), StorageMode::InMemory));
417    }
418
419    #[test]
420    fn transaction_write_and_read() {
421        let dir = tempdir().unwrap();
422        let wal = dir.path().join("wal.log");
423        let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
424        let txn = db.begin(crate::TxnMode::ReadWrite).unwrap();
425        let seg_id = txn.write_columnar_segment("tbl_txn", make_batch()).unwrap();
426        txn.commit().unwrap();
427
428        let batches = db
429            .read_columnar_segment("tbl_txn", seg_id, Some(&["id"]))
430            .unwrap();
431        assert_eq!(batches[0].num_rows(), 3);
432    }
433
434    #[test]
435    fn flush_in_memory_paths() {
436        let dir = tempdir().unwrap();
437        let db = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
438        let seg_id = db.write_columnar_segment("mem_tbl", make_batch()).unwrap();
439
440        // flush to file
441        let file_path = dir.path().join("seg.bin");
442        db.flush_in_memory_segment_to_file("mem_tbl", seg_id, &file_path)
443            .unwrap();
444        let bytes = std::fs::read(&file_path).unwrap();
445        assert!(!bytes.is_empty());
446
447        // flush to kvs
448        let kv_id = db
449            .flush_in_memory_segment_to_kvs("mem_tbl", seg_id)
450            .unwrap();
451        assert_eq!(kv_id, 0);
452
453        // flush to .alopex
454        let alo_path = dir.path().join("out.alopex");
455        let mut writer =
456            AlopexFileWriter::new(alo_path.clone(), FileVersion::CURRENT, FileFlags(0)).unwrap();
457        db.flush_in_memory_segment_to_alopex("mem_tbl", seg_id, &mut writer)
458            .unwrap();
459        writer.finalize().unwrap();
460        assert!(alo_path.exists());
461    }
462
463    #[test]
464    fn flush_not_in_memory_mode_errors() {
465        let dir = tempdir().unwrap();
466        let wal = dir.path().join("wal.log");
467        let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
468        let err = db
469            .flush_in_memory_segment_to_kvs("tbl", 0)
470            .expect_err("should error");
471        assert!(matches!(err, Error::NotInMemoryMode));
472    }
473}