1use std::collections::hash_map::DefaultHasher;
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6
7use alopex_core::columnar::segment_v2::{RecordBatch, SegmentWriterV2};
8use alopex_core::storage::format::AlopexFileWriter;
9use alopex_core::{StorageFactory, StorageMode as CoreStorageMode};
10
11use crate::{Database, Error, Result, SegmentConfigV2, Transaction};
12
13#[derive(Debug, Clone)]
15pub struct EmbeddedConfig {
16 pub path: Option<PathBuf>,
18 pub storage_mode: StorageMode,
20 pub memory_limit: Option<usize>,
22 pub segment_config: SegmentConfigV2,
24}
25
26impl EmbeddedConfig {
27 pub fn disk(path: PathBuf) -> Self {
29 Self {
30 path: Some(path),
31 storage_mode: StorageMode::Disk,
32 memory_limit: None,
33 segment_config: SegmentConfigV2::default(),
34 }
35 }
36
37 pub fn in_memory() -> Self {
39 Self {
40 path: None,
41 storage_mode: StorageMode::InMemory,
42 memory_limit: None,
43 segment_config: SegmentConfigV2::default(),
44 }
45 }
46
47 pub fn in_memory_with_limit(limit: usize) -> Self {
49 Self {
50 path: None,
51 storage_mode: StorageMode::InMemory,
52 memory_limit: Some(limit),
53 segment_config: SegmentConfigV2::default(),
54 }
55 }
56
57 pub fn with_segment_config(mut self, cfg: SegmentConfigV2) -> Self {
59 self.segment_config = cfg;
60 self
61 }
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum StorageMode {
67 Disk,
69 InMemory,
71}
72
73impl Database {
74 pub fn open_with_config(config: EmbeddedConfig) -> Result<Self> {
76 let store = match config.storage_mode {
77 StorageMode::Disk => {
78 let path = config.path.clone().ok_or_else(|| {
79 Error::Core(alopex_core::Error::InvalidFormat(
80 "disk mode requires a path".into(),
81 ))
82 })?;
83 let path = crate::disk_data_dir_path(&path);
84 StorageFactory::create(CoreStorageMode::Disk { path, config: None })
85 .map_err(Error::Core)?
86 }
87 StorageMode::InMemory => StorageFactory::create(CoreStorageMode::Memory {
88 max_size: config.memory_limit,
89 })
90 .map_err(Error::Core)?,
91 };
92
93 Ok(Self::init(
94 store,
95 config.storage_mode,
96 config.memory_limit,
97 config.segment_config,
98 ))
99 }
100
101 pub fn storage_mode(&self) -> StorageMode {
103 self.columnar_mode
104 }
105
106 pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
108 let mut writer = SegmentWriterV2::new(self.segment_config.clone());
109 writer
110 .write_batch(batch)
111 .map_err(|e| Error::Core(e.into()))?;
112 let segment = writer.finish().map_err(|e| Error::Core(e.into()))?;
113 let table_id = table_id(table)?;
114
115 match self.columnar_mode {
116 StorageMode::Disk => self
117 .columnar_bridge
118 .write_segment(table_id, &segment)
119 .map_err(|e| Error::Core(e.into())),
120 StorageMode::InMemory => {
121 let store = self.columnar_memory.as_ref().ok_or_else(|| {
122 Error::Core(alopex_core::Error::InvalidFormat(
123 "in-memory columnar store is not initialized".into(),
124 ))
125 })?;
126 store
127 .write_segment(table_id, segment)
128 .map_err(|e| Error::Core(e.into()))
129 }
130 }
131 }
132
133 pub fn read_columnar_segment(
135 &self,
136 table: &str,
137 segment_id: u64,
138 columns: Option<&[&str]>,
139 ) -> Result<Vec<RecordBatch>> {
140 let table_id = table_id(table)?;
141 let column_count = match self.columnar_mode {
142 StorageMode::Disk => self
143 .columnar_bridge
144 .column_count(table_id, segment_id)
145 .map_err(|e| Error::Core(e.into()))?,
146 StorageMode::InMemory => self
147 .columnar_memory
148 .as_ref()
149 .ok_or_else(|| {
150 Error::Core(alopex_core::Error::InvalidFormat(
151 "in-memory columnar store is not initialized".into(),
152 ))
153 })?
154 .column_count(table_id, segment_id)
155 .map_err(|e| Error::Core(e.into()))?,
156 };
157 let all_indices: Vec<usize> = (0..column_count).collect();
158
159 let batches_full = match self.columnar_mode {
160 StorageMode::Disk => self
161 .columnar_bridge
162 .read_segment(table_id, segment_id, &all_indices)
163 .map_err(|e| Error::Core(e.into()))?,
164 StorageMode::InMemory => self
165 .columnar_memory
166 .as_ref()
167 .ok_or_else(|| {
168 Error::Core(alopex_core::Error::InvalidFormat(
169 "in-memory columnar store is not initialized".into(),
170 ))
171 })?
172 .read_segment(table_id, segment_id, &all_indices)
173 .map_err(|e| Error::Core(e.into()))?,
174 };
175
176 if let Some(names) = columns {
177 let indices = resolve_indices(&batches_full, names)?;
178 project_batches(batches_full, &indices)
179 } else {
180 Ok(batches_full)
181 }
182 }
183
184 pub fn in_memory_usage(&self) -> Option<u64> {
186 if self.columnar_mode == StorageMode::InMemory {
187 self.columnar_memory.as_ref().map(|m| m.memory_usage())
188 } else {
189 None
190 }
191 }
192
193 pub fn open_in_memory_with_limit(limit: usize) -> Result<Self> {
195 Self::open_with_config(EmbeddedConfig::in_memory_with_limit(limit))
196 }
197
198 pub fn resolve_table_id(&self, table: &str) -> Result<u32> {
200 table_id(table)
201 }
202
203 pub fn flush_in_memory_segment_to_file(
205 &self,
206 table: &str,
207 segment_id: u64,
208 path: &Path,
209 ) -> Result<()> {
210 let store = self
211 .columnar_memory
212 .as_ref()
213 .ok_or(Error::NotInMemoryMode)?;
214 let table_id = table_id(table)?;
215 store
216 .flush_to_segment_file(table_id, segment_id, path)
217 .map_err(|e| Error::Core(e.into()))
218 }
219
220 pub fn flush_in_memory_segment_to_kvs(&self, table: &str, segment_id: u64) -> Result<u64> {
222 let store = self
223 .columnar_memory
224 .as_ref()
225 .ok_or(Error::NotInMemoryMode)?;
226 let table_id = table_id(table)?;
227 store
228 .flush_to_kvs(table_id, segment_id, &self.columnar_bridge)
229 .map_err(|e| Error::Core(e.into()))
230 }
231
232 pub fn flush_in_memory_segment_to_alopex(
234 &self,
235 table: &str,
236 segment_id: u64,
237 writer: &mut AlopexFileWriter,
238 ) -> Result<u32> {
239 let store = self
240 .columnar_memory
241 .as_ref()
242 .ok_or(Error::NotInMemoryMode)?;
243 let table_id = table_id(table)?;
244 store
245 .flush_to_alopex(table_id, segment_id, writer)
246 .map_err(|e| Error::Core(e.into()))
247 }
248}
249
250impl<'a> Transaction<'a> {
251 pub fn storage_mode(&self) -> StorageMode {
253 self.db.storage_mode()
254 }
255
256 pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
258 self.db.write_columnar_segment(table, batch)
259 }
260
261 pub fn read_columnar_segment(
263 &self,
264 table: &str,
265 segment_id: u64,
266 columns: Option<&[&str]>,
267 ) -> Result<Vec<RecordBatch>> {
268 self.db.read_columnar_segment(table, segment_id, columns)
269 }
270}
271
272fn table_id(table: &str) -> Result<u32> {
273 if table.is_empty() {
274 return Err(Error::TableNotFound("table name is empty".into()));
275 }
276 let mut hasher = DefaultHasher::new();
277 table.hash(&mut hasher);
278 Ok((hasher.finish() & 0xffff_ffff) as u32)
279}
280
281fn resolve_indices(batches: &[RecordBatch], names: &[&str]) -> Result<Vec<usize>> {
282 let Some(first) = batches.first() else {
283 return Err(Error::Core(alopex_core::Error::InvalidFormat(
284 "segment is empty".into(),
285 )));
286 };
287 let mut indices = Vec::with_capacity(names.len());
288 for name in names {
289 let pos = first
290 .schema
291 .columns
292 .iter()
293 .position(|c| c.name == *name)
294 .ok_or_else(|| {
295 Error::Core(alopex_core::Error::InvalidFormat(format!(
296 "column not found: {name}"
297 )))
298 })?;
299 indices.push(pos);
300 }
301 Ok(indices)
302}
303
304fn project_batches(batches: Vec<RecordBatch>, indices: &[usize]) -> Result<Vec<RecordBatch>> {
305 let mut projected = Vec::with_capacity(batches.len());
306 for batch in batches {
307 let mut cols = Vec::with_capacity(indices.len());
308 let mut bitmaps = Vec::with_capacity(indices.len());
309 for &idx in indices {
310 let col = batch
311 .columns
312 .get(idx)
313 .ok_or_else(|| {
314 Error::Core(alopex_core::Error::InvalidFormat(
315 "column index out of bounds".into(),
316 ))
317 })?
318 .clone();
319 let bitmap = batch.null_bitmaps.get(idx).cloned().unwrap_or(None);
320 cols.push(col);
321 bitmaps.push(bitmap);
322 }
323 let schema = alopex_core::columnar::segment_v2::Schema {
324 columns: indices
325 .iter()
326 .map(|&idx| batch.schema.columns[idx].clone())
327 .collect(),
328 };
329 projected.push(RecordBatch::new(schema, cols, bitmaps));
330 }
331 Ok(projected)
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use alopex_core::columnar::encoding::{Column, LogicalType};
338 use alopex_core::columnar::segment_v2::{ColumnSchema, Schema};
339 use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
340 use tempfile::tempdir;
341
342 fn make_batch() -> RecordBatch {
343 let schema = Schema {
344 columns: vec![
345 ColumnSchema {
346 name: "id".into(),
347 logical_type: LogicalType::Int64,
348 nullable: false,
349 fixed_len: None,
350 },
351 ColumnSchema {
352 name: "val".into(),
353 logical_type: LogicalType::Int64,
354 nullable: false,
355 fixed_len: None,
356 },
357 ],
358 };
359 RecordBatch::new(
360 schema,
361 vec![
362 Column::Int64(vec![1, 2, 3]),
363 Column::Int64(vec![10, 20, 30]),
364 ],
365 vec![None, None],
366 )
367 }
368
369 #[test]
370 fn write_read_disk_mode() {
371 let dir = tempdir().unwrap();
372 let wal = dir.path().join("wal.log");
373 let cfg = EmbeddedConfig::disk(wal);
374 let db = Database::open_with_config(cfg).unwrap();
375 let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
376 let batches = db.read_columnar_segment("tbl", seg_id, None).unwrap();
377 assert_eq!(batches[0].num_rows(), 3);
378 }
379
380 #[test]
381 fn read_with_column_names() {
382 let dir = tempdir().unwrap();
383 let wal = dir.path().join("wal.log");
384 let cfg = EmbeddedConfig::disk(wal);
385 let db = Database::open_with_config(cfg).unwrap();
386 let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
387 let batches = db
388 .read_columnar_segment("tbl", seg_id, Some(&["val"]))
389 .unwrap();
390 assert_eq!(batches[0].columns.len(), 1);
391 if let Column::Int64(vals) = &batches[0].columns[0] {
392 assert_eq!(vals, &vec![10, 20, 30]);
393 } else {
394 panic!("expected int64");
395 }
396 }
397
398 #[test]
399 fn in_memory_limit_rejects_large_segment() {
400 let cfg = EmbeddedConfig::in_memory_with_limit(1);
401 let db = Database::open_with_config(cfg).unwrap();
402 let err = db
403 .write_columnar_segment("tbl", make_batch())
404 .expect_err("should exceed limit");
405 assert!(format!("{err}").contains("memory limit exceeded"));
406 }
407
408 #[test]
409 fn storage_mode_flags() {
410 let dir = tempdir().unwrap();
411 let wal = dir.path().join("wal.log");
412 let disk = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
413 assert!(matches!(disk.storage_mode(), StorageMode::Disk));
414
415 let mem = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
416 assert!(matches!(mem.storage_mode(), StorageMode::InMemory));
417 }
418
419 #[test]
420 fn transaction_write_and_read() {
421 let dir = tempdir().unwrap();
422 let wal = dir.path().join("wal.log");
423 let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
424 let txn = db.begin(crate::TxnMode::ReadWrite).unwrap();
425 let seg_id = txn.write_columnar_segment("tbl_txn", make_batch()).unwrap();
426 txn.commit().unwrap();
427
428 let batches = db
429 .read_columnar_segment("tbl_txn", seg_id, Some(&["id"]))
430 .unwrap();
431 assert_eq!(batches[0].num_rows(), 3);
432 }
433
434 #[test]
435 fn flush_in_memory_paths() {
436 let dir = tempdir().unwrap();
437 let db = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
438 let seg_id = db.write_columnar_segment("mem_tbl", make_batch()).unwrap();
439
440 let file_path = dir.path().join("seg.bin");
442 db.flush_in_memory_segment_to_file("mem_tbl", seg_id, &file_path)
443 .unwrap();
444 let bytes = std::fs::read(&file_path).unwrap();
445 assert!(!bytes.is_empty());
446
447 let kv_id = db
449 .flush_in_memory_segment_to_kvs("mem_tbl", seg_id)
450 .unwrap();
451 assert_eq!(kv_id, 0);
452
453 let alo_path = dir.path().join("out.alopex");
455 let mut writer =
456 AlopexFileWriter::new(alo_path.clone(), FileVersion::CURRENT, FileFlags(0)).unwrap();
457 db.flush_in_memory_segment_to_alopex("mem_tbl", seg_id, &mut writer)
458 .unwrap();
459 writer.finalize().unwrap();
460 assert!(alo_path.exists());
461 }
462
463 #[test]
464 fn flush_not_in_memory_mode_errors() {
465 let dir = tempdir().unwrap();
466 let wal = dir.path().join("wal.log");
467 let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
468 let err = db
469 .flush_in_memory_segment_to_kvs("tbl", 0)
470 .expect_err("should error");
471 assert!(matches!(err, Error::NotInMemoryMode));
472 }
473}