1use std::collections::hash_map::DefaultHasher;
4use std::hash::{Hash, Hasher};
5use std::path::{Path, PathBuf};
6
7use alopex_core::columnar::encoding::Column;
8use alopex_core::columnar::segment_v2::{RecordBatch, SegmentWriterV2};
9use alopex_core::storage::format::AlopexFileWriter;
10use alopex_core::{StorageFactory, StorageMode as CoreStorageMode};
11
12use crate::{Database, Error, Result, SegmentConfigV2, Transaction, TxnMode};
13
14#[derive(Debug, Clone)]
16pub struct ColumnarSegmentStats {
17 pub row_count: usize,
19 pub column_count: usize,
21 pub size_bytes: usize,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum ColumnarIndexType {
28 Minmax,
30 Bloom,
32}
33
34impl ColumnarIndexType {
35 pub fn as_str(&self) -> &'static str {
37 match self {
38 Self::Minmax => "minmax",
39 Self::Bloom => "bloom",
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct ColumnarIndexInfo {
47 pub column: String,
49 pub index_type: ColumnarIndexType,
51}
52
53#[derive(Debug, Clone)]
55pub struct EmbeddedConfig {
56 pub path: Option<PathBuf>,
58 pub storage_mode: StorageMode,
60 pub memory_limit: Option<usize>,
62 pub segment_config: SegmentConfigV2,
64}
65
66impl EmbeddedConfig {
67 pub fn disk(path: PathBuf) -> Self {
69 Self {
70 path: Some(path),
71 storage_mode: StorageMode::Disk,
72 memory_limit: None,
73 segment_config: SegmentConfigV2::default(),
74 }
75 }
76
77 pub fn in_memory() -> Self {
79 Self {
80 path: None,
81 storage_mode: StorageMode::InMemory,
82 memory_limit: None,
83 segment_config: SegmentConfigV2::default(),
84 }
85 }
86
87 pub fn in_memory_with_limit(limit: usize) -> Self {
89 Self {
90 path: None,
91 storage_mode: StorageMode::InMemory,
92 memory_limit: Some(limit),
93 segment_config: SegmentConfigV2::default(),
94 }
95 }
96
97 pub fn with_segment_config(mut self, cfg: SegmentConfigV2) -> Self {
99 self.segment_config = cfg;
100 self
101 }
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106pub enum StorageMode {
107 Disk,
109 InMemory,
111}
112
113impl Database {
114 pub fn open_with_config(config: EmbeddedConfig) -> Result<Self> {
116 let store = match config.storage_mode {
117 StorageMode::Disk => {
118 let path = config.path.clone().ok_or_else(|| {
119 Error::Core(alopex_core::Error::InvalidFormat(
120 "disk mode requires a path".into(),
121 ))
122 })?;
123 let path = crate::disk_data_dir_path(&path);
124 StorageFactory::create(CoreStorageMode::Disk { path, config: None })
125 .map_err(Error::Core)?
126 }
127 StorageMode::InMemory => StorageFactory::create(CoreStorageMode::Memory {
128 max_size: config.memory_limit,
129 })
130 .map_err(Error::Core)?,
131 };
132
133 Ok(Self::init(
134 store,
135 config.storage_mode,
136 config.memory_limit,
137 config.segment_config,
138 ))
139 }
140
141 pub fn storage_mode(&self) -> StorageMode {
143 self.columnar_mode
144 }
145
146 pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
148 let mut writer = SegmentWriterV2::new(self.segment_config.clone());
149 writer
150 .write_batch(batch)
151 .map_err(|e| Error::Core(e.into()))?;
152 let segment = writer.finish().map_err(|e| Error::Core(e.into()))?;
153 let table_id = table_id(table)?;
154
155 match self.columnar_mode {
156 StorageMode::Disk => self
157 .columnar_bridge
158 .write_segment(table_id, &segment)
159 .map_err(|e| Error::Core(e.into())),
160 StorageMode::InMemory => {
161 let store = self.columnar_memory.as_ref().ok_or_else(|| {
162 Error::Core(alopex_core::Error::InvalidFormat(
163 "in-memory columnar store is not initialized".into(),
164 ))
165 })?;
166 store
167 .write_segment(table_id, segment)
168 .map_err(|e| Error::Core(e.into()))
169 }
170 }
171 }
172
173 pub fn write_columnar_segment_with_config(
175 &self,
176 table: &str,
177 batch: RecordBatch,
178 config: SegmentConfigV2,
179 ) -> Result<u64> {
180 let mut writer = SegmentWriterV2::new(config);
181 writer
182 .write_batch(batch)
183 .map_err(|e| Error::Core(e.into()))?;
184 let segment = writer.finish().map_err(|e| Error::Core(e.into()))?;
185 let table_id = table_id(table)?;
186
187 match self.columnar_mode {
188 StorageMode::Disk => self
189 .columnar_bridge
190 .write_segment(table_id, &segment)
191 .map_err(|e| Error::Core(e.into())),
192 StorageMode::InMemory => {
193 let store = self.columnar_memory.as_ref().ok_or_else(|| {
194 Error::Core(alopex_core::Error::InvalidFormat(
195 "in-memory columnar store is not initialized".into(),
196 ))
197 })?;
198 store
199 .write_segment(table_id, segment)
200 .map_err(|e| Error::Core(e.into()))
201 }
202 }
203 }
204
205 pub fn read_columnar_segment(
207 &self,
208 table: &str,
209 segment_id: u64,
210 columns: Option<&[&str]>,
211 ) -> Result<Vec<RecordBatch>> {
212 let table_id = table_id(table)?;
213 let column_count = match self.columnar_mode {
214 StorageMode::Disk => self
215 .columnar_bridge
216 .column_count(table_id, segment_id)
217 .map_err(|e| Error::Core(e.into()))?,
218 StorageMode::InMemory => self
219 .columnar_memory
220 .as_ref()
221 .ok_or_else(|| {
222 Error::Core(alopex_core::Error::InvalidFormat(
223 "in-memory columnar store is not initialized".into(),
224 ))
225 })?
226 .column_count(table_id, segment_id)
227 .map_err(|e| Error::Core(e.into()))?,
228 };
229 let all_indices: Vec<usize> = (0..column_count).collect();
230
231 let batches_full = match self.columnar_mode {
232 StorageMode::Disk => self
233 .columnar_bridge
234 .read_segment(table_id, segment_id, &all_indices)
235 .map_err(|e| Error::Core(e.into()))?,
236 StorageMode::InMemory => self
237 .columnar_memory
238 .as_ref()
239 .ok_or_else(|| {
240 Error::Core(alopex_core::Error::InvalidFormat(
241 "in-memory columnar store is not initialized".into(),
242 ))
243 })?
244 .read_segment(table_id, segment_id, &all_indices)
245 .map_err(|e| Error::Core(e.into()))?,
246 };
247
248 if let Some(names) = columns {
249 let indices = resolve_indices(&batches_full, names)?;
250 project_batches(batches_full, &indices)
251 } else {
252 Ok(batches_full)
253 }
254 }
255
256 pub fn in_memory_usage(&self) -> Option<u64> {
258 if self.columnar_mode == StorageMode::InMemory {
259 self.columnar_memory.as_ref().map(|m| m.memory_usage())
260 } else {
261 None
262 }
263 }
264
265 pub fn open_in_memory_with_limit(limit: usize) -> Result<Self> {
267 Self::open_with_config(EmbeddedConfig::in_memory_with_limit(limit))
268 }
269
270 pub fn resolve_table_id(&self, table: &str) -> Result<u32> {
272 table_id(table)
273 }
274
275 pub fn scan_columnar_segment(
280 &self,
281 segment_id: &str,
282 ) -> Result<Vec<Vec<alopex_sql::SqlValue>>> {
283 let (table_id, seg_id) = parse_segment_id(segment_id)?;
284 let all_indices: Vec<usize> = match self.columnar_mode {
285 StorageMode::Disk => {
286 let count = self
287 .columnar_bridge
288 .column_count(table_id, seg_id)
289 .map_err(|e| Error::Core(e.into()))?;
290 (0..count).collect()
291 }
292 StorageMode::InMemory => {
293 let store = self.columnar_memory.as_ref().ok_or_else(|| {
294 Error::Core(alopex_core::Error::InvalidFormat(
295 "in-memory columnar store is not initialized".into(),
296 ))
297 })?;
298 let count = store
299 .column_count(table_id, seg_id)
300 .map_err(|e| Error::Core(e.into()))?;
301 (0..count).collect()
302 }
303 };
304
305 let batches = match self.columnar_mode {
306 StorageMode::Disk => self
307 .columnar_bridge
308 .read_segment(table_id, seg_id, &all_indices)
309 .map_err(|e| Error::Core(e.into()))?,
310 StorageMode::InMemory => self
311 .columnar_memory
312 .as_ref()
313 .ok_or_else(|| {
314 Error::Core(alopex_core::Error::InvalidFormat(
315 "in-memory columnar store is not initialized".into(),
316 ))
317 })?
318 .read_segment(table_id, seg_id, &all_indices)
319 .map_err(|e| Error::Core(e.into()))?,
320 };
321
322 let mut rows = Vec::new();
324 for batch in batches {
325 let num_rows = batch.num_rows();
326 for row_idx in 0..num_rows {
327 let mut row = Vec::with_capacity(batch.columns.len());
328 for col in &batch.columns {
329 let sql_val = column_value_to_sql_value(col, row_idx);
330 row.push(sql_val);
331 }
332 rows.push(row);
333 }
334 }
335 Ok(rows)
336 }
337
338 pub fn scan_columnar_segment_batches(&self, segment_id: &str) -> Result<Vec<RecordBatch>> {
346 let (table_id, seg_id) = parse_segment_id(segment_id)?;
347 let all_indices: Vec<usize> = match self.columnar_mode {
348 StorageMode::Disk => {
349 let count = self
350 .columnar_bridge
351 .column_count(table_id, seg_id)
352 .map_err(|e| Error::Core(e.into()))?;
353 (0..count).collect()
354 }
355 StorageMode::InMemory => {
356 let store = self.columnar_memory.as_ref().ok_or_else(|| {
357 Error::Core(alopex_core::Error::InvalidFormat(
358 "in-memory columnar store is not initialized".into(),
359 ))
360 })?;
361 let count = store
362 .column_count(table_id, seg_id)
363 .map_err(|e| Error::Core(e.into()))?;
364 (0..count).collect()
365 }
366 };
367
368 match self.columnar_mode {
369 StorageMode::Disk => self
370 .columnar_bridge
371 .read_segment(table_id, seg_id, &all_indices)
372 .map_err(|e| Error::Core(e.into())),
373 StorageMode::InMemory => self
374 .columnar_memory
375 .as_ref()
376 .ok_or_else(|| {
377 Error::Core(alopex_core::Error::InvalidFormat(
378 "in-memory columnar store is not initialized".into(),
379 ))
380 })?
381 .read_segment(table_id, seg_id, &all_indices)
382 .map_err(|e| Error::Core(e.into())),
383 }
384 }
385
386 pub fn scan_columnar_segment_streaming(&self, segment_id: &str) -> Result<ColumnarRowIterator> {
393 let batches = self.scan_columnar_segment_batches(segment_id)?;
394 Ok(ColumnarRowIterator::new(batches))
395 }
396
397 pub fn get_columnar_segment_stats(&self, segment_id: &str) -> Result<ColumnarSegmentStats> {
401 let (table_id, seg_id) = parse_segment_id(segment_id)?;
402
403 match self.columnar_mode {
404 StorageMode::Disk => {
405 let column_count = self
406 .columnar_bridge
407 .column_count(table_id, seg_id)
408 .map_err(|e| Error::Core(e.into()))?;
409 let batches = self
410 .columnar_bridge
411 .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
412 .map_err(|e| Error::Core(e.into()))?;
413 let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
414
415 Ok(ColumnarSegmentStats {
416 row_count,
417 column_count,
418 size_bytes: 0, })
420 }
421 StorageMode::InMemory => {
422 let store = self.columnar_memory.as_ref().ok_or_else(|| {
423 Error::Core(alopex_core::Error::InvalidFormat(
424 "in-memory columnar store is not initialized".into(),
425 ))
426 })?;
427 let column_count = store
428 .column_count(table_id, seg_id)
429 .map_err(|e| Error::Core(e.into()))?;
430 let batches = store
431 .read_segment(table_id, seg_id, &(0..column_count).collect::<Vec<_>>())
432 .map_err(|e| Error::Core(e.into()))?;
433 let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
434
435 Ok(ColumnarSegmentStats {
436 row_count,
437 column_count,
438 size_bytes: 0, })
440 }
441 }
442 }
443
444 pub fn list_columnar_segments(&self) -> Result<Vec<String>> {
448 match self.columnar_mode {
449 StorageMode::Disk => {
450 let segments = self
451 .columnar_bridge
452 .list_segments()
453 .map_err(|e| Error::Core(e.into()))?;
454 Ok(segments
455 .into_iter()
456 .map(|(table_id, seg_id)| format!("{}:{}", table_id, seg_id))
457 .collect())
458 }
459 StorageMode::InMemory => {
460 let store = self.columnar_memory.as_ref().ok_or_else(|| {
461 Error::Core(alopex_core::Error::InvalidFormat(
462 "in-memory columnar store is not initialized".into(),
463 ))
464 })?;
465 let segments = store.list_segments();
466 Ok(segments
467 .into_iter()
468 .map(|(table_id, seg_id)| format!("{}:{}", table_id, seg_id))
469 .collect())
470 }
471 }
472 }
473
474 pub fn create_columnar_index(
476 &self,
477 segment_id: &str,
478 column: &str,
479 index_type: ColumnarIndexType,
480 ) -> Result<()> {
481 let _ = self.get_columnar_segment_stats(segment_id)?;
482 let key = columnar_index_key(segment_id, column);
483 let value = index_type.as_str().as_bytes().to_vec();
484 let mut txn = self.begin(TxnMode::ReadWrite)?;
485 txn.put(&key, &value)?;
486 txn.commit()?;
487 Ok(())
488 }
489
490 pub fn list_columnar_indexes(&self, segment_id: &str) -> Result<Vec<ColumnarIndexInfo>> {
492 let _ = self.get_columnar_segment_stats(segment_id)?;
493 let prefix = columnar_index_prefix(segment_id);
494 let mut txn = self.begin(TxnMode::ReadOnly)?;
495 let mut entries = Vec::new();
496 for (key, value) in txn.scan_prefix(&prefix)? {
497 let column = parse_index_column(segment_id, &key)?;
498 let index_type = parse_index_type(&value)?;
499 entries.push(ColumnarIndexInfo { column, index_type });
500 }
501 txn.commit()?;
502 Ok(entries)
503 }
504
505 pub fn drop_columnar_index(&self, segment_id: &str, column: &str) -> Result<()> {
507 let _ = self.get_columnar_segment_stats(segment_id)?;
508 let key = columnar_index_key(segment_id, column);
509 let mut txn = self.begin(TxnMode::ReadWrite)?;
510 let exists = txn.get(&key)?.is_some();
511 if !exists {
512 txn.rollback()?;
513 return Err(Error::IndexNotFound(format!(
514 "columnar index {}:{}",
515 segment_id, column
516 )));
517 }
518 txn.delete(&key)?;
519 txn.commit()?;
520 Ok(())
521 }
522
523 pub fn flush_in_memory_segment_to_file(
525 &self,
526 table: &str,
527 segment_id: u64,
528 path: &Path,
529 ) -> Result<()> {
530 let store = self
531 .columnar_memory
532 .as_ref()
533 .ok_or(Error::NotInMemoryMode)?;
534 let table_id = table_id(table)?;
535 store
536 .flush_to_segment_file(table_id, segment_id, path)
537 .map_err(|e| Error::Core(e.into()))
538 }
539
540 pub fn flush_in_memory_segment_to_kvs(&self, table: &str, segment_id: u64) -> Result<u64> {
542 let store = self
543 .columnar_memory
544 .as_ref()
545 .ok_or(Error::NotInMemoryMode)?;
546 let table_id = table_id(table)?;
547 store
548 .flush_to_kvs(table_id, segment_id, &self.columnar_bridge)
549 .map_err(|e| Error::Core(e.into()))
550 }
551
552 pub fn flush_in_memory_segment_to_alopex(
554 &self,
555 table: &str,
556 segment_id: u64,
557 writer: &mut AlopexFileWriter,
558 ) -> Result<u32> {
559 let store = self
560 .columnar_memory
561 .as_ref()
562 .ok_or(Error::NotInMemoryMode)?;
563 let table_id = table_id(table)?;
564 store
565 .flush_to_alopex(table_id, segment_id, writer)
566 .map_err(|e| Error::Core(e.into()))
567 }
568}
569
570impl<'a> Transaction<'a> {
571 pub fn storage_mode(&self) -> StorageMode {
573 self.db.storage_mode()
574 }
575
576 pub fn write_columnar_segment(&self, table: &str, batch: RecordBatch) -> Result<u64> {
578 self.db.write_columnar_segment(table, batch)
579 }
580
581 pub fn read_columnar_segment(
583 &self,
584 table: &str,
585 segment_id: u64,
586 columns: Option<&[&str]>,
587 ) -> Result<Vec<RecordBatch>> {
588 self.db.read_columnar_segment(table, segment_id, columns)
589 }
590}
591
592fn table_id(table: &str) -> Result<u32> {
593 if table.is_empty() {
594 return Err(Error::TableNotFound("table name is empty".into()));
595 }
596 let mut hasher = DefaultHasher::new();
597 table.hash(&mut hasher);
598 Ok((hasher.finish() & 0xffff_ffff) as u32)
599}
600
601fn resolve_indices(batches: &[RecordBatch], names: &[&str]) -> Result<Vec<usize>> {
602 let Some(first) = batches.first() else {
603 return Err(Error::Core(alopex_core::Error::InvalidFormat(
604 "segment is empty".into(),
605 )));
606 };
607 let mut indices = Vec::with_capacity(names.len());
608 for name in names {
609 let pos = first
610 .schema
611 .columns
612 .iter()
613 .position(|c| c.name == *name)
614 .ok_or_else(|| {
615 Error::Core(alopex_core::Error::InvalidFormat(format!(
616 "column not found: {name}"
617 )))
618 })?;
619 indices.push(pos);
620 }
621 Ok(indices)
622}
623
624fn project_batches(batches: Vec<RecordBatch>, indices: &[usize]) -> Result<Vec<RecordBatch>> {
625 let mut projected = Vec::with_capacity(batches.len());
626 for batch in batches {
627 let mut cols = Vec::with_capacity(indices.len());
628 let mut bitmaps = Vec::with_capacity(indices.len());
629 for &idx in indices {
630 let col = batch
631 .columns
632 .get(idx)
633 .ok_or_else(|| {
634 Error::Core(alopex_core::Error::InvalidFormat(
635 "column index out of bounds".into(),
636 ))
637 })?
638 .clone();
639 let bitmap = batch.null_bitmaps.get(idx).cloned().unwrap_or(None);
640 cols.push(col);
641 bitmaps.push(bitmap);
642 }
643 let schema = alopex_core::columnar::segment_v2::Schema {
644 columns: indices
645 .iter()
646 .map(|&idx| batch.schema.columns[idx].clone())
647 .collect(),
648 };
649 projected.push(RecordBatch::new(schema, cols, bitmaps));
650 }
651 Ok(projected)
652}
653
654const COLUMNAR_INDEX_PREFIX: &str = "__alopex_columnar_index__:";
655
656fn columnar_index_key(segment: &str, column: &str) -> Vec<u8> {
657 let mut key =
658 String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + column.len() + 1);
659 key.push_str(COLUMNAR_INDEX_PREFIX);
660 key.push_str(segment);
661 key.push(':');
662 key.push_str(column);
663 key.into_bytes()
664}
665
666fn columnar_index_prefix(segment: &str) -> Vec<u8> {
667 let mut key = String::with_capacity(COLUMNAR_INDEX_PREFIX.len() + segment.len() + 1);
668 key.push_str(COLUMNAR_INDEX_PREFIX);
669 key.push_str(segment);
670 key.push(':');
671 key.into_bytes()
672}
673
674fn parse_index_column(segment: &str, key: &[u8]) -> Result<String> {
675 let prefix = columnar_index_prefix(segment);
676 if !key.starts_with(&prefix) {
677 return Err(Error::Core(alopex_core::Error::InvalidFormat(
678 "columnar index key is invalid".into(),
679 )));
680 }
681 let suffix = &key[prefix.len()..];
682 String::from_utf8(suffix.to_vec()).map_err(|_| {
683 Error::Core(alopex_core::Error::InvalidFormat(
684 "columnar index column is not valid UTF-8".into(),
685 ))
686 })
687}
688
689fn parse_index_type(raw: &[u8]) -> Result<ColumnarIndexType> {
690 let value = std::str::from_utf8(raw).map_err(|_| {
691 Error::Core(alopex_core::Error::InvalidFormat(
692 "columnar index type is not valid UTF-8".into(),
693 ))
694 })?;
695 match value {
696 "minmax" => Ok(ColumnarIndexType::Minmax),
697 "bloom" => Ok(ColumnarIndexType::Bloom),
698 other => Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
699 "unknown columnar index type: {other}"
700 )))),
701 }
702}
703
704fn parse_segment_id(segment_id: &str) -> Result<(u32, u64)> {
708 let parts: Vec<&str> = segment_id.split(':').collect();
709 if parts.len() != 2 {
710 return Err(Error::Core(alopex_core::Error::InvalidFormat(format!(
711 "invalid segment ID format: expected 'table_id:segment_id', got '{}'",
712 segment_id
713 ))));
714 }
715
716 let table_id: u32 = parts[0].parse().map_err(|_| {
717 Error::Core(alopex_core::Error::InvalidFormat(format!(
718 "invalid table_id in segment ID: '{}'",
719 parts[0]
720 )))
721 })?;
722
723 let seg_id: u64 = parts[1].parse().map_err(|_| {
724 Error::Core(alopex_core::Error::InvalidFormat(format!(
725 "invalid segment_id in segment ID: '{}'",
726 parts[1]
727 )))
728 })?;
729
730 Ok((table_id, seg_id))
731}
732
733fn column_value_to_sql_value(col: &Column, row_idx: usize) -> alopex_sql::SqlValue {
735 match col {
736 Column::Int64(vals) => vals
737 .get(row_idx)
738 .map(|&v| alopex_sql::SqlValue::BigInt(v))
739 .unwrap_or(alopex_sql::SqlValue::Null),
740 Column::Float32(vals) => vals
741 .get(row_idx)
742 .map(|&v| alopex_sql::SqlValue::Float(v))
743 .unwrap_or(alopex_sql::SqlValue::Null),
744 Column::Float64(vals) => vals
745 .get(row_idx)
746 .map(|&v| alopex_sql::SqlValue::Double(v))
747 .unwrap_or(alopex_sql::SqlValue::Null),
748 Column::Bool(vals) => vals
749 .get(row_idx)
750 .map(|&v| alopex_sql::SqlValue::Boolean(v))
751 .unwrap_or(alopex_sql::SqlValue::Null),
752 Column::Binary(vals) => vals
753 .get(row_idx)
754 .map(|v| alopex_sql::SqlValue::Blob(v.clone()))
755 .unwrap_or(alopex_sql::SqlValue::Null),
756 Column::Fixed { values, .. } => values
757 .get(row_idx)
758 .map(|v| alopex_sql::SqlValue::Blob(v.clone()))
759 .unwrap_or(alopex_sql::SqlValue::Null),
760 }
761}
762
763pub struct ColumnarRowIterator {
772 batches: Vec<RecordBatch>,
774 batch_idx: usize,
776 row_idx: usize,
778}
779
780impl ColumnarRowIterator {
781 pub fn new(batches: Vec<RecordBatch>) -> Self {
783 Self {
784 batches,
785 batch_idx: 0,
786 row_idx: 0,
787 }
788 }
789
790 pub fn batch_count(&self) -> usize {
792 self.batches.len()
793 }
794
795 pub fn current_batch(&self) -> Option<&RecordBatch> {
797 self.batches.get(self.batch_idx)
798 }
799}
800
801impl Iterator for ColumnarRowIterator {
802 type Item = Vec<alopex_sql::SqlValue>;
803
804 fn next(&mut self) -> Option<Self::Item> {
805 loop {
806 if self.batch_idx >= self.batches.len() {
808 return None;
809 }
810
811 let batch = &self.batches[self.batch_idx];
812 let row_count = batch.num_rows();
813
814 if self.row_idx >= row_count {
816 self.batch_idx += 1;
817 self.row_idx = 0;
818 continue;
819 }
820
821 let row_idx = self.row_idx;
823 self.row_idx += 1;
824
825 let mut row = Vec::with_capacity(batch.columns.len());
826 for col in &batch.columns {
827 let sql_val = column_value_to_sql_value(col, row_idx);
828 row.push(sql_val);
829 }
830 return Some(row);
831 }
832 }
833}
834
835#[cfg(test)]
836mod tests {
837 use super::*;
838 use alopex_core::columnar::encoding::{Column, LogicalType};
839 use alopex_core::columnar::segment_v2::{ColumnSchema, Schema};
840 use alopex_core::storage::format::{AlopexFileWriter, FileFlags, FileVersion};
841 use tempfile::tempdir;
842
843 fn make_batch() -> RecordBatch {
844 let schema = Schema {
845 columns: vec![
846 ColumnSchema {
847 name: "id".into(),
848 logical_type: LogicalType::Int64,
849 nullable: false,
850 fixed_len: None,
851 },
852 ColumnSchema {
853 name: "val".into(),
854 logical_type: LogicalType::Int64,
855 nullable: false,
856 fixed_len: None,
857 },
858 ],
859 };
860 RecordBatch::new(
861 schema,
862 vec![
863 Column::Int64(vec![1, 2, 3]),
864 Column::Int64(vec![10, 20, 30]),
865 ],
866 vec![None, None],
867 )
868 }
869
870 #[test]
871 fn write_read_disk_mode() {
872 let dir = tempdir().unwrap();
873 let wal = dir.path().join("wal.log");
874 let cfg = EmbeddedConfig::disk(wal);
875 let db = Database::open_with_config(cfg).unwrap();
876 let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
877 let batches = db.read_columnar_segment("tbl", seg_id, None).unwrap();
878 assert_eq!(batches[0].num_rows(), 3);
879 }
880
881 #[test]
882 fn read_with_column_names() {
883 let dir = tempdir().unwrap();
884 let wal = dir.path().join("wal.log");
885 let cfg = EmbeddedConfig::disk(wal);
886 let db = Database::open_with_config(cfg).unwrap();
887 let seg_id = db.write_columnar_segment("tbl", make_batch()).unwrap();
888 let batches = db
889 .read_columnar_segment("tbl", seg_id, Some(&["val"]))
890 .unwrap();
891 assert_eq!(batches[0].columns.len(), 1);
892 if let Column::Int64(vals) = &batches[0].columns[0] {
893 assert_eq!(vals, &vec![10, 20, 30]);
894 } else {
895 panic!("expected int64");
896 }
897 }
898
899 #[test]
900 fn in_memory_limit_rejects_large_segment() {
901 let cfg = EmbeddedConfig::in_memory_with_limit(1);
902 let db = Database::open_with_config(cfg).unwrap();
903 let err = db
904 .write_columnar_segment("tbl", make_batch())
905 .expect_err("should exceed limit");
906 assert!(format!("{err}").contains("memory limit exceeded"));
907 }
908
909 #[test]
910 fn storage_mode_flags() {
911 let dir = tempdir().unwrap();
912 let wal = dir.path().join("wal.log");
913 let disk = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
914 assert!(matches!(disk.storage_mode(), StorageMode::Disk));
915
916 let mem = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
917 assert!(matches!(mem.storage_mode(), StorageMode::InMemory));
918 }
919
920 #[test]
921 fn transaction_write_and_read() {
922 let dir = tempdir().unwrap();
923 let wal = dir.path().join("wal.log");
924 let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
925 let txn = db.begin(crate::TxnMode::ReadWrite).unwrap();
926 let seg_id = txn.write_columnar_segment("tbl_txn", make_batch()).unwrap();
927 txn.commit().unwrap();
928
929 let batches = db
930 .read_columnar_segment("tbl_txn", seg_id, Some(&["id"]))
931 .unwrap();
932 assert_eq!(batches[0].num_rows(), 3);
933 }
934
935 #[test]
936 fn flush_in_memory_paths() {
937 let dir = tempdir().unwrap();
938 let db = Database::open_with_config(EmbeddedConfig::in_memory()).unwrap();
939 let seg_id = db.write_columnar_segment("mem_tbl", make_batch()).unwrap();
940
941 let file_path = dir.path().join("seg.bin");
943 db.flush_in_memory_segment_to_file("mem_tbl", seg_id, &file_path)
944 .unwrap();
945 let bytes = std::fs::read(&file_path).unwrap();
946 assert!(!bytes.is_empty());
947
948 let kv_id = db
950 .flush_in_memory_segment_to_kvs("mem_tbl", seg_id)
951 .unwrap();
952 assert_eq!(kv_id, 0);
953
954 let alo_path = dir.path().join("out.alopex");
956 let mut writer =
957 AlopexFileWriter::new(alo_path.clone(), FileVersion::CURRENT, FileFlags(0)).unwrap();
958 db.flush_in_memory_segment_to_alopex("mem_tbl", seg_id, &mut writer)
959 .unwrap();
960 writer.finalize().unwrap();
961 assert!(alo_path.exists());
962 }
963
964 #[test]
965 fn flush_not_in_memory_mode_errors() {
966 let dir = tempdir().unwrap();
967 let wal = dir.path().join("wal.log");
968 let db = Database::open_with_config(EmbeddedConfig::disk(wal)).unwrap();
969 let err = db
970 .flush_in_memory_segment_to_kvs("tbl", 0)
971 .expect_err("should error");
972 assert!(matches!(err, Error::NotInMemoryMode));
973 }
974}