Skip to main content

trueno_db/storage/
mod.rs

1//! Storage backend (Arrow/Parquet)
2//!
3//! **OLAP-Only Design** (Append-Only Write Pattern):
4//! - Trueno-DB is OLAP-optimized (columnar storage, bulk analytics)
5//! - Write pattern: Append-only batches (no random updates)
6//! - Use case: Full codebase re-analysis, bulk data loads
7//! - NOT suitable for: OLTP workloads, incremental row updates
8//!
9//! See: ../paiml-mcp-agent-toolkit/docs/specifications/trueno-db-integration-review-response.md Issue #4
10//!
11//! Toyota Way Principles:
12//! - Poka-Yoke: Morsel-based paging prevents VRAM OOM (Funke et al. 2018)
13//! - Muda elimination: Late materialization (Abadi et al. 2008)
14
15use crate::{Error, Result};
16use arrow::record_batch::RecordBatch;
17#[cfg(feature = "parquet-io")]
18use std::path::Path;
19
20/// Morsel size for out-of-core execution (128MB chunks)
21/// Based on: Leis et al. (2014) morsel-driven parallelism
22pub const MORSEL_SIZE_BYTES: usize = 128 * 1024 * 1024; // 128MB
23
24/// Maximum number of in-flight GPU transfers
25/// Bounded to prevent memory explosion while keeping `PCIe` bus busy
26#[cfg(feature = "tokio")]
27const MAX_IN_FLIGHT_TRANSFERS: usize = 2;
28
29/// Storage engine for Arrow/Parquet data
30pub struct StorageEngine {
31    batches: Vec<RecordBatch>,
32}
33
34impl StorageEngine {
35    /// Create a new storage engine from existing batches
36    ///
37    /// Useful for testing and benchmarking
38    #[must_use]
39    pub const fn new(batches: Vec<RecordBatch>) -> Self {
40        Self { batches }
41    }
42
43    /// Load table from Parquet file
44    ///
45    /// # Errors
46    /// Returns error if file cannot be read or parsed
47    #[cfg(feature = "parquet-io")]
48    pub fn load_parquet<P: AsRef<Path>>(path: P) -> Result<Self> {
49        use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
50        use std::fs::File;
51
52        let file = File::open(path.as_ref())
53            .map_err(|e| Error::StorageError(format!("Failed to open Parquet file: {e}")))?;
54
55        let builder = ParquetRecordBatchReaderBuilder::try_new(file)
56            .map_err(|e| Error::StorageError(format!("Failed to parse Parquet file: {e}")))?;
57
58        let reader = builder
59            .build()
60            .map_err(|e| Error::StorageError(format!("Failed to create Parquet reader: {e}")))?;
61
62        // Read all batches into memory
63        let mut batches = Vec::new();
64        for batch in reader {
65            let batch = batch
66                .map_err(|e| Error::StorageError(format!("Failed to read record batch: {e}")))?;
67            batches.push(batch);
68        }
69
70        Ok(Self { batches })
71    }
72
73    /// Get all record batches
74    #[must_use]
75    pub fn batches(&self) -> &[RecordBatch] {
76        &self.batches
77    }
78
79    /// Create iterator over morsels (128MB chunks)
80    #[must_use]
81    pub fn morsels(&self) -> MorselIterator<'_> {
82        MorselIterator::new(&self.batches)
83    }
84
85    /// Append batches to storage (OLAP-optimized)
86    ///
87    /// **WARNING**: This is the ONLY supported write operation.
88    /// Trueno-DB does NOT support incremental row updates (OLTP).
89    ///
90    /// # Design Rationale
91    ///
92    /// Columnar storage optimizes for bulk reads, not random writes:
93    /// - Single-row update cost: O(N) (rewrite entire column)
94    /// - Batch append cost: O(1) (append to new partition)
95    ///
96    /// # Example
97    ///
98    /// ```rust
99    /// # use trueno_db::storage::StorageEngine;
100    /// # use arrow::array::{Int32Array, RecordBatch};
101    /// # use arrow::datatypes::{DataType, Field, Schema};
102    /// # use std::sync::Arc;
103    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
104    /// // Good: Bulk append (OLAP-compatible)
105    /// let schema = Arc::new(Schema::new(vec![
106    ///     Field::new("id", DataType::Int32, false),
107    /// ]));
108    /// let batch = RecordBatch::try_new(
109    ///     schema,
110    ///     vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
111    /// )?;
112    ///
113    /// let mut storage = StorageEngine::new(vec![]);
114    /// storage.append_batch(batch)?;
115    /// # Ok(())
116    /// # }
117    /// ```
118    ///
119    /// # Errors
120    ///
121    /// Returns error if batch schema doesn't match existing batches
122    pub fn append_batch(&mut self, batch: RecordBatch) -> Result<()> {
123        // Validate schema compatibility
124        if !self.batches.is_empty() {
125            let existing_schema = self.batches[0].schema();
126            if batch.schema() != existing_schema {
127                return Err(Error::StorageError(format!(
128                    "Schema mismatch: expected {:?}, got {:?}",
129                    existing_schema,
130                    batch.schema()
131                )));
132            }
133        }
134
135        self.batches.push(batch);
136        Ok(())
137    }
138
139    /// **DEPRECATED**: Single-row update not supported
140    ///
141    /// Trueno-DB is OLAP-only (columnar storage). Use [`append_batch`](Self::append_batch) instead.
142    ///
143    /// # Why This Fails
144    ///
145    /// Column stores are optimized for bulk reads, not random writes:
146    /// - `SQLite` (row-store): `O(1)` update with `B-tree` index
147    /// - `Trueno-DB` (column-store): `O(N)` update (rewrite entire column)
148    ///
149    /// # Migration Guide
150    ///
151    /// ```rust,no_run
152    /// # use trueno_db::storage::StorageEngine;
153    /// # use arrow::array::{Int32Array, RecordBatch};
154    /// # use arrow::datatypes::{DataType, Field, Schema};
155    /// # use std::sync::Arc;
156    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
157    /// // Bad: Incremental update (OLTP pattern)
158    /// // storage.update_row(row_id, new_values)?;  // NOT SUPPORTED
159    ///
160    /// // Good: Batch re-analysis (OLAP pattern)
161    /// let schema = Arc::new(Schema::new(vec![
162    ///     Field::new("id", DataType::Int32, false),
163    /// ]));
164    /// let new_batch = RecordBatch::try_new(
165    ///     schema,
166    ///     vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
167    /// )?;
168    /// let mut storage = StorageEngine::new(vec![]);
169    /// storage.append_batch(new_batch)?;
170    /// # Ok(())
171    /// # }
172    /// ```
173    ///
174    /// # Errors
175    ///
176    /// Always returns error (not implemented)
177    #[deprecated(
178        since = "0.1.0",
179        note = "Trueno-DB is OLAP-only. Use append_batch() for bulk data loads."
180    )]
181    #[allow(clippy::unused_self)]
182    pub fn update_row(&mut self, _row_id: usize, _values: RecordBatch) -> Result<()> {
183        Err(Error::StorageError(
184            "Single-row updates not supported in columnar storage. \
185             Use append_batch() for bulk re-analysis instead."
186                .to_string(),
187        ))
188    }
189}
190
191/// Iterator over 128MB morsels of data
192pub struct MorselIterator<'a> {
193    batches: &'a [RecordBatch],
194    current_batch_idx: usize,
195    current_offset: usize,
196    morsel_rows: usize,
197}
198
199impl<'a> MorselIterator<'a> {
200    /// Create new morsel iterator
201    fn new(batches: &'a [RecordBatch]) -> Self {
202        // Calculate morsel size based on first batch
203        let morsel_rows = batches.first().map_or(0, Self::calculate_morsel_rows);
204
205        Self { batches, current_batch_idx: 0, current_offset: 0, morsel_rows }
206    }
207
208    /// Calculate how many rows fit in a 128MB morsel
209    fn calculate_morsel_rows(batch: &RecordBatch) -> usize {
210        let num_rows = batch.num_rows();
211        if num_rows == 0 {
212            return 0;
213        }
214
215        let total_bytes = batch.get_array_memory_size();
216        let bytes_per_row = total_bytes / num_rows;
217
218        if bytes_per_row == 0 {
219            return num_rows; // Avoid division by zero
220        }
221
222        MORSEL_SIZE_BYTES / bytes_per_row
223    }
224}
225
226impl Iterator for MorselIterator<'_> {
227    type Item = RecordBatch;
228
229    fn next(&mut self) -> Option<Self::Item> {
230        // Check if we've exhausted all batches
231        if self.current_batch_idx >= self.batches.len() {
232            return None;
233        }
234
235        let current_batch = &self.batches[self.current_batch_idx];
236
237        // Check if we've exhausted current batch
238        if self.current_offset >= current_batch.num_rows() {
239            self.current_batch_idx += 1;
240            self.current_offset = 0;
241            return self.next(); // Recurse to next batch
242        }
243
244        // Calculate slice length
245        let remaining_rows = current_batch.num_rows() - self.current_offset;
246        let slice_length = remaining_rows.min(self.morsel_rows);
247
248        // Create morsel slice
249        let morsel = current_batch.slice(self.current_offset, slice_length);
250        self.current_offset += slice_length;
251
252        Some(morsel)
253    }
254}
255
256/// GPU Transfer Queue for async bounded transfers
257///
258/// Toyota Way: Heijunka (Load Balancing)
259/// - Bounded queue prevents memory explosion (Poka-Yoke)
260/// - Max 2 in-flight keeps `PCIe` bus busy without overwhelming GPU
261/// - Async design prevents blocking Tokio reactor
262///
263/// References:
264/// - Leis et al. (2014): Morsel-driven parallelism
265#[cfg(feature = "tokio")]
266pub struct GpuTransferQueue {
267    sender: tokio::sync::mpsc::Sender<RecordBatch>,
268    receiver: tokio::sync::mpsc::Receiver<RecordBatch>,
269}
270
271#[cfg(feature = "tokio")]
272impl GpuTransferQueue {
273    /// Create new GPU transfer queue with bounded capacity
274    ///
275    /// # Returns
276    /// Queue with max 2 in-flight transfers
277    #[must_use]
278    pub fn new() -> Self {
279        let (sender, receiver) = tokio::sync::mpsc::channel(MAX_IN_FLIGHT_TRANSFERS);
280        Self { sender, receiver }
281    }
282
283    /// Enqueue a record batch for GPU transfer
284    ///
285    /// This will block if queue is full (2 batches in-flight)
286    ///
287    /// # Errors
288    /// Returns error if queue is closed
289    pub async fn enqueue(&self, batch: RecordBatch) -> Result<()> {
290        self.sender.send(batch).await.map_err(|_| Error::QueueClosed)
291    }
292
293    /// Dequeue a record batch from GPU transfer queue
294    ///
295    /// # Returns
296    /// Next batch if available, None if queue is empty and closed
297    pub async fn dequeue(&mut self) -> Option<RecordBatch> {
298        self.receiver.recv().await
299    }
300
301    /// Get sender for concurrent enqueueing
302    #[must_use]
303    pub fn sender(&self) -> tokio::sync::mpsc::Sender<RecordBatch> {
304        self.sender.clone()
305    }
306}
307
308#[cfg(feature = "tokio")]
309impl Default for GpuTransferQueue {
310    fn default() -> Self {
311        Self::new()
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318    use arrow::array::{Float32Array, Int32Array, StringArray};
319    use arrow::datatypes::{DataType, Field, Schema};
320    use std::sync::Arc;
321
322    #[allow(clippy::cast_possible_truncation)]
323    #[allow(clippy::cast_possible_wrap)]
324    #[allow(clippy::cast_precision_loss)]
325    fn create_test_batch(num_rows: usize) -> RecordBatch {
326        let schema = Schema::new(vec![
327            Field::new("id", DataType::Int32, false),
328            Field::new("value", DataType::Float32, false),
329            Field::new("name", DataType::Utf8, false),
330        ]);
331
332        let id_array = Int32Array::from_iter_values(0..num_rows as i32);
333        let value_array = Float32Array::from_iter_values((0..num_rows).map(|i| i as f32));
334        let name_array = StringArray::from_iter_values((0..num_rows).map(|i| format!("name_{i}")));
335
336        RecordBatch::try_new(
337            Arc::new(schema),
338            vec![Arc::new(id_array), Arc::new(value_array), Arc::new(name_array)],
339        )
340        .unwrap()
341    }
342
343    #[test]
344    fn test_morsel_iterator_splits_correctly() {
345        let batch = create_test_batch(1000);
346        let batches = vec![batch];
347
348        let iter = MorselIterator::new(&batches);
349        let morsels: Vec<_> = iter.collect();
350
351        // Verify all rows accounted for
352        let total_rows: usize = morsels.iter().map(RecordBatch::num_rows).sum();
353        assert_eq!(total_rows, 1000);
354
355        // Verify each morsel is within size limit
356        for morsel in &morsels {
357            assert!(morsel.get_array_memory_size() <= MORSEL_SIZE_BYTES);
358        }
359    }
360
361    #[test]
362    fn test_morsel_iterator_empty_batch() {
363        let batch = create_test_batch(0);
364        let batches = vec![batch];
365
366        let iter = MorselIterator::new(&batches);
367        assert_eq!(iter.count(), 0);
368    }
369
370    #[test]
371    fn test_append_batch_olap_pattern() {
372        // OLAP-compatible: Bulk append
373        let mut storage = StorageEngine::new(vec![]);
374        let batch1 = create_test_batch(100);
375        let batch2 = create_test_batch(200);
376
377        storage.append_batch(batch1).unwrap();
378        storage.append_batch(batch2).unwrap();
379
380        assert_eq!(storage.batches().len(), 2);
381        assert_eq!(storage.batches()[0].num_rows(), 100);
382        assert_eq!(storage.batches()[1].num_rows(), 200);
383    }
384
385    #[test]
386    fn test_append_batch_schema_validation() {
387        // Schema mismatch should fail
388        let mut storage = StorageEngine::new(vec![]);
389        let batch1 = create_test_batch(100);
390        storage.append_batch(batch1).unwrap();
391
392        // Create incompatible schema
393        let incompatible_schema =
394            Schema::new(vec![Field::new("different_field", DataType::Int32, false)]);
395        let incompatible_batch = RecordBatch::try_new(
396            Arc::new(incompatible_schema),
397            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
398        )
399        .unwrap();
400
401        let result = storage.append_batch(incompatible_batch);
402        assert!(result.is_err());
403        assert!(result.unwrap_err().to_string().contains("Schema mismatch"));
404    }
405
406    #[test]
407    #[allow(deprecated)]
408    fn test_update_row_fails_oltp_pattern() {
409        // OLTP pattern (single-row update) must fail
410        let mut storage = StorageEngine::new(vec![]);
411        let batch = create_test_batch(100);
412
413        let result = storage.update_row(0, batch);
414        assert!(result.is_err());
415        assert!(result.unwrap_err().to_string().contains("Single-row updates not supported"));
416    }
417
418    #[test]
419    fn test_morsel_iterator_multiple_batches() {
420        let batch1 = create_test_batch(500);
421        let batch2 = create_test_batch(500);
422        let batches = vec![batch1, batch2];
423
424        let iter = MorselIterator::new(&batches);
425        let morsels: Vec<_> = iter.collect();
426
427        // Verify all rows accounted for across both batches
428        let total_rows: usize = morsels.iter().map(RecordBatch::num_rows).sum();
429        assert_eq!(total_rows, 1000);
430    }
431
432    #[tokio::test]
433    async fn test_gpu_transfer_queue_basic() {
434        let mut queue = GpuTransferQueue::new();
435        let batch = create_test_batch(100);
436
437        // Enqueue
438        queue.enqueue(batch.clone()).await.unwrap();
439
440        // Dequeue
441        let received = queue.dequeue().await.unwrap();
442        assert_eq!(received.num_rows(), 100);
443    }
444
445    #[tokio::test]
446    async fn test_gpu_transfer_queue_bounded() {
447        use tokio::time::{timeout, Duration};
448
449        let queue = GpuTransferQueue::new();
450        let batch = create_test_batch(100);
451
452        // Fill queue (capacity = 2)
453        queue.enqueue(batch.clone()).await.unwrap();
454        queue.enqueue(batch.clone()).await.unwrap();
455
456        // Third enqueue should timeout (queue full)
457        let result = timeout(Duration::from_millis(100), queue.enqueue(batch)).await;
458
459        assert!(result.is_err(), "Queue should be full and block");
460    }
461
462    #[tokio::test]
463    async fn test_gpu_transfer_queue_concurrent_enqueue_dequeue() {
464        use tokio::task;
465
466        let mut queue = GpuTransferQueue::new();
467        let sender = queue.sender();
468
469        // Spawn task to enqueue multiple batches
470        let enqueue_handle = task::spawn(async move {
471            for i in 0..5 {
472                let batch = create_test_batch(100 * (i + 1));
473                sender.send(batch).await.unwrap();
474            }
475        });
476
477        // Dequeue and verify order
478        for i in 0..5 {
479            let batch = queue.dequeue().await.unwrap();
480            assert_eq!(batch.num_rows(), 100 * (i + 1));
481        }
482
483        // Wait for enqueue task to complete
484        enqueue_handle.await.unwrap();
485    }
486
487    // Property-based tests (EXTREME TDD - Toyota Way: Jidoka)
488    mod property_tests {
489        use super::*;
490        use proptest::prelude::*;
491
492        proptest! {
493            /// Property: MorselIterator preserves all rows (no data loss)
494            #[test]
495            fn prop_morsel_iterator_preserves_all_rows(
496                num_rows in 1usize..100_000
497            ) {
498                let batch = create_test_batch(num_rows);
499                let original_rows = batch.num_rows();
500                let batches = vec![batch];
501
502                let iter = MorselIterator::new(&batches);
503                let total_morsel_rows: usize = iter.map(|m| m.num_rows()).sum();
504
505                prop_assert_eq!(original_rows, total_morsel_rows);
506            }
507
508            /// Property: Each morsel respects 128MB size limit (Poka-Yoke)
509            #[test]
510            fn prop_morsel_size_within_limit(
511                num_rows in 1usize..100_000
512            ) {
513                let batch = create_test_batch(num_rows);
514                let batches = vec![batch];
515
516                let iter = MorselIterator::new(&batches);
517
518                for morsel in iter {
519                    let size = morsel.get_array_memory_size();
520                    prop_assert!(
521                        size <= MORSEL_SIZE_BYTES,
522                        "Morsel size {} exceeds limit {}",
523                        size,
524                        MORSEL_SIZE_BYTES
525                    );
526                }
527            }
528
529            /// Property: Multiple batches preserve total row count
530            #[test]
531            fn prop_multiple_batches_preserve_rows(
532                batch_sizes in prop::collection::vec(1usize..10_000, 1..10)
533            ) {
534                let total_expected: usize = batch_sizes.iter().sum();
535                let batches: Vec<_> = batch_sizes.iter()
536                    .map(|&size| create_test_batch(size))
537                    .collect();
538
539                let iter = MorselIterator::new(&batches);
540                let total_actual: usize = iter.map(|m| m.num_rows()).sum();
541
542                prop_assert_eq!(total_expected, total_actual);
543            }
544
545            /// Property: Empty batches are handled correctly
546            #[test]
547            fn prop_empty_batches_handled(
548                num_empty in 0usize..10
549            ) {
550                let batches: Vec<_> = (0..num_empty)
551                    .map(|_| create_test_batch(0))
552                    .collect();
553
554                let iter = MorselIterator::new(&batches);
555                let total_rows: usize = iter.map(|m| m.num_rows()).sum();
556
557                prop_assert_eq!(0, total_rows);
558            }
559        }
560    }
561}