1use crate::{Error, Result};
16use arrow::record_batch::RecordBatch;
17#[cfg(feature = "parquet-io")]
18use std::path::Path;
19
20pub const MORSEL_SIZE_BYTES: usize = 128 * 1024 * 1024; #[cfg(feature = "tokio")]
27const MAX_IN_FLIGHT_TRANSFERS: usize = 2;
28
29pub struct StorageEngine {
31 batches: Vec<RecordBatch>,
32}
33
34impl StorageEngine {
35 #[must_use]
39 pub const fn new(batches: Vec<RecordBatch>) -> Self {
40 Self { batches }
41 }
42
43 #[cfg(feature = "parquet-io")]
48 pub fn load_parquet<P: AsRef<Path>>(path: P) -> Result<Self> {
49 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
50 use std::fs::File;
51
52 let file = File::open(path.as_ref())
53 .map_err(|e| Error::StorageError(format!("Failed to open Parquet file: {e}")))?;
54
55 let builder = ParquetRecordBatchReaderBuilder::try_new(file)
56 .map_err(|e| Error::StorageError(format!("Failed to parse Parquet file: {e}")))?;
57
58 let reader = builder
59 .build()
60 .map_err(|e| Error::StorageError(format!("Failed to create Parquet reader: {e}")))?;
61
62 let mut batches = Vec::new();
64 for batch in reader {
65 let batch = batch
66 .map_err(|e| Error::StorageError(format!("Failed to read record batch: {e}")))?;
67 batches.push(batch);
68 }
69
70 Ok(Self { batches })
71 }
72
73 #[must_use]
75 pub fn batches(&self) -> &[RecordBatch] {
76 &self.batches
77 }
78
79 #[must_use]
81 pub fn morsels(&self) -> MorselIterator<'_> {
82 MorselIterator::new(&self.batches)
83 }
84
85 pub fn append_batch(&mut self, batch: RecordBatch) -> Result<()> {
123 if !self.batches.is_empty() {
125 let existing_schema = self.batches[0].schema();
126 if batch.schema() != existing_schema {
127 return Err(Error::StorageError(format!(
128 "Schema mismatch: expected {:?}, got {:?}",
129 existing_schema,
130 batch.schema()
131 )));
132 }
133 }
134
135 self.batches.push(batch);
136 Ok(())
137 }
138
139 #[deprecated(
178 since = "0.1.0",
179 note = "Trueno-DB is OLAP-only. Use append_batch() for bulk data loads."
180 )]
181 #[allow(clippy::unused_self)]
182 pub fn update_row(&mut self, _row_id: usize, _values: RecordBatch) -> Result<()> {
183 Err(Error::StorageError(
184 "Single-row updates not supported in columnar storage. \
185 Use append_batch() for bulk re-analysis instead."
186 .to_string(),
187 ))
188 }
189}
190
191pub struct MorselIterator<'a> {
193 batches: &'a [RecordBatch],
194 current_batch_idx: usize,
195 current_offset: usize,
196 morsel_rows: usize,
197}
198
199impl<'a> MorselIterator<'a> {
200 fn new(batches: &'a [RecordBatch]) -> Self {
202 let morsel_rows = batches.first().map_or(0, Self::calculate_morsel_rows);
204
205 Self { batches, current_batch_idx: 0, current_offset: 0, morsel_rows }
206 }
207
208 fn calculate_morsel_rows(batch: &RecordBatch) -> usize {
210 let num_rows = batch.num_rows();
211 if num_rows == 0 {
212 return 0;
213 }
214
215 let total_bytes = batch.get_array_memory_size();
216 let bytes_per_row = total_bytes / num_rows;
217
218 if bytes_per_row == 0 {
219 return num_rows; }
221
222 MORSEL_SIZE_BYTES / bytes_per_row
223 }
224}
225
226impl Iterator for MorselIterator<'_> {
227 type Item = RecordBatch;
228
229 fn next(&mut self) -> Option<Self::Item> {
230 if self.current_batch_idx >= self.batches.len() {
232 return None;
233 }
234
235 let current_batch = &self.batches[self.current_batch_idx];
236
237 if self.current_offset >= current_batch.num_rows() {
239 self.current_batch_idx += 1;
240 self.current_offset = 0;
241 return self.next(); }
243
244 let remaining_rows = current_batch.num_rows() - self.current_offset;
246 let slice_length = remaining_rows.min(self.morsel_rows);
247
248 let morsel = current_batch.slice(self.current_offset, slice_length);
250 self.current_offset += slice_length;
251
252 Some(morsel)
253 }
254}
255
256#[cfg(feature = "tokio")]
266pub struct GpuTransferQueue {
267 sender: tokio::sync::mpsc::Sender<RecordBatch>,
268 receiver: tokio::sync::mpsc::Receiver<RecordBatch>,
269}
270
271#[cfg(feature = "tokio")]
272impl GpuTransferQueue {
273 #[must_use]
278 pub fn new() -> Self {
279 let (sender, receiver) = tokio::sync::mpsc::channel(MAX_IN_FLIGHT_TRANSFERS);
280 Self { sender, receiver }
281 }
282
283 pub async fn enqueue(&self, batch: RecordBatch) -> Result<()> {
290 self.sender.send(batch).await.map_err(|_| Error::QueueClosed)
291 }
292
293 pub async fn dequeue(&mut self) -> Option<RecordBatch> {
298 self.receiver.recv().await
299 }
300
301 #[must_use]
303 pub fn sender(&self) -> tokio::sync::mpsc::Sender<RecordBatch> {
304 self.sender.clone()
305 }
306}
307
308#[cfg(feature = "tokio")]
309impl Default for GpuTransferQueue {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318 use arrow::array::{Float32Array, Int32Array, StringArray};
319 use arrow::datatypes::{DataType, Field, Schema};
320 use std::sync::Arc;
321
322 #[allow(clippy::cast_possible_truncation)]
323 #[allow(clippy::cast_possible_wrap)]
324 #[allow(clippy::cast_precision_loss)]
325 fn create_test_batch(num_rows: usize) -> RecordBatch {
326 let schema = Schema::new(vec![
327 Field::new("id", DataType::Int32, false),
328 Field::new("value", DataType::Float32, false),
329 Field::new("name", DataType::Utf8, false),
330 ]);
331
332 let id_array = Int32Array::from_iter_values(0..num_rows as i32);
333 let value_array = Float32Array::from_iter_values((0..num_rows).map(|i| i as f32));
334 let name_array = StringArray::from_iter_values((0..num_rows).map(|i| format!("name_{i}")));
335
336 RecordBatch::try_new(
337 Arc::new(schema),
338 vec![Arc::new(id_array), Arc::new(value_array), Arc::new(name_array)],
339 )
340 .unwrap()
341 }
342
343 #[test]
344 fn test_morsel_iterator_splits_correctly() {
345 let batch = create_test_batch(1000);
346 let batches = vec![batch];
347
348 let iter = MorselIterator::new(&batches);
349 let morsels: Vec<_> = iter.collect();
350
351 let total_rows: usize = morsels.iter().map(RecordBatch::num_rows).sum();
353 assert_eq!(total_rows, 1000);
354
355 for morsel in &morsels {
357 assert!(morsel.get_array_memory_size() <= MORSEL_SIZE_BYTES);
358 }
359 }
360
361 #[test]
362 fn test_morsel_iterator_empty_batch() {
363 let batch = create_test_batch(0);
364 let batches = vec![batch];
365
366 let iter = MorselIterator::new(&batches);
367 assert_eq!(iter.count(), 0);
368 }
369
370 #[test]
371 fn test_append_batch_olap_pattern() {
372 let mut storage = StorageEngine::new(vec![]);
374 let batch1 = create_test_batch(100);
375 let batch2 = create_test_batch(200);
376
377 storage.append_batch(batch1).unwrap();
378 storage.append_batch(batch2).unwrap();
379
380 assert_eq!(storage.batches().len(), 2);
381 assert_eq!(storage.batches()[0].num_rows(), 100);
382 assert_eq!(storage.batches()[1].num_rows(), 200);
383 }
384
385 #[test]
386 fn test_append_batch_schema_validation() {
387 let mut storage = StorageEngine::new(vec![]);
389 let batch1 = create_test_batch(100);
390 storage.append_batch(batch1).unwrap();
391
392 let incompatible_schema =
394 Schema::new(vec![Field::new("different_field", DataType::Int32, false)]);
395 let incompatible_batch = RecordBatch::try_new(
396 Arc::new(incompatible_schema),
397 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
398 )
399 .unwrap();
400
401 let result = storage.append_batch(incompatible_batch);
402 assert!(result.is_err());
403 assert!(result.unwrap_err().to_string().contains("Schema mismatch"));
404 }
405
406 #[test]
407 #[allow(deprecated)]
408 fn test_update_row_fails_oltp_pattern() {
409 let mut storage = StorageEngine::new(vec![]);
411 let batch = create_test_batch(100);
412
413 let result = storage.update_row(0, batch);
414 assert!(result.is_err());
415 assert!(result.unwrap_err().to_string().contains("Single-row updates not supported"));
416 }
417
418 #[test]
419 fn test_morsel_iterator_multiple_batches() {
420 let batch1 = create_test_batch(500);
421 let batch2 = create_test_batch(500);
422 let batches = vec![batch1, batch2];
423
424 let iter = MorselIterator::new(&batches);
425 let morsels: Vec<_> = iter.collect();
426
427 let total_rows: usize = morsels.iter().map(RecordBatch::num_rows).sum();
429 assert_eq!(total_rows, 1000);
430 }
431
432 #[tokio::test]
433 async fn test_gpu_transfer_queue_basic() {
434 let mut queue = GpuTransferQueue::new();
435 let batch = create_test_batch(100);
436
437 queue.enqueue(batch.clone()).await.unwrap();
439
440 let received = queue.dequeue().await.unwrap();
442 assert_eq!(received.num_rows(), 100);
443 }
444
445 #[tokio::test]
446 async fn test_gpu_transfer_queue_bounded() {
447 use tokio::time::{timeout, Duration};
448
449 let queue = GpuTransferQueue::new();
450 let batch = create_test_batch(100);
451
452 queue.enqueue(batch.clone()).await.unwrap();
454 queue.enqueue(batch.clone()).await.unwrap();
455
456 let result = timeout(Duration::from_millis(100), queue.enqueue(batch)).await;
458
459 assert!(result.is_err(), "Queue should be full and block");
460 }
461
462 #[tokio::test]
463 async fn test_gpu_transfer_queue_concurrent_enqueue_dequeue() {
464 use tokio::task;
465
466 let mut queue = GpuTransferQueue::new();
467 let sender = queue.sender();
468
469 let enqueue_handle = task::spawn(async move {
471 for i in 0..5 {
472 let batch = create_test_batch(100 * (i + 1));
473 sender.send(batch).await.unwrap();
474 }
475 });
476
477 for i in 0..5 {
479 let batch = queue.dequeue().await.unwrap();
480 assert_eq!(batch.num_rows(), 100 * (i + 1));
481 }
482
483 enqueue_handle.await.unwrap();
485 }
486
487 mod property_tests {
489 use super::*;
490 use proptest::prelude::*;
491
492 proptest! {
493 #[test]
495 fn prop_morsel_iterator_preserves_all_rows(
496 num_rows in 1usize..100_000
497 ) {
498 let batch = create_test_batch(num_rows);
499 let original_rows = batch.num_rows();
500 let batches = vec![batch];
501
502 let iter = MorselIterator::new(&batches);
503 let total_morsel_rows: usize = iter.map(|m| m.num_rows()).sum();
504
505 prop_assert_eq!(original_rows, total_morsel_rows);
506 }
507
508 #[test]
510 fn prop_morsel_size_within_limit(
511 num_rows in 1usize..100_000
512 ) {
513 let batch = create_test_batch(num_rows);
514 let batches = vec![batch];
515
516 let iter = MorselIterator::new(&batches);
517
518 for morsel in iter {
519 let size = morsel.get_array_memory_size();
520 prop_assert!(
521 size <= MORSEL_SIZE_BYTES,
522 "Morsel size {} exceeds limit {}",
523 size,
524 MORSEL_SIZE_BYTES
525 );
526 }
527 }
528
529 #[test]
531 fn prop_multiple_batches_preserve_rows(
532 batch_sizes in prop::collection::vec(1usize..10_000, 1..10)
533 ) {
534 let total_expected: usize = batch_sizes.iter().sum();
535 let batches: Vec<_> = batch_sizes.iter()
536 .map(|&size| create_test_batch(size))
537 .collect();
538
539 let iter = MorselIterator::new(&batches);
540 let total_actual: usize = iter.map(|m| m.num_rows()).sum();
541
542 prop_assert_eq!(total_expected, total_actual);
543 }
544
545 #[test]
547 fn prop_empty_batches_handled(
548 num_empty in 0usize..10
549 ) {
550 let batches: Vec<_> = (0..num_empty)
551 .map(|_| create_test_batch(0))
552 .collect();
553
554 let iter = MorselIterator::new(&batches);
555 let total_rows: usize = iter.map(|m| m.num_rows()).sum();
556
557 prop_assert_eq!(0, total_rows);
558 }
559 }
560 }
561}