1#![expect(
19 dead_code,
20 reason = "experimental zero-copy Arrow path; helpers retained for upcoming gRPC/TCP wiring"
21)]
22
23use std::collections::VecDeque;
24use std::sync::Arc;
25
26use arrow::array::{
27 Array, BinaryArray, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array,
28 Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray,
29 TimestampMicrosecondArray,
30};
31use arrow::buffer::Buffer;
32use arrow::datatypes::{DataType, Schema, TimeUnit};
33use arrow::ipc::reader::StreamDecoder;
34use arrow::record_batch::RecordBatch;
35use bytes::Bytes;
36
37use crate::error::{Error, Result};
38use hyperdb_api_core::types::SqlType;
39
40#[derive(Debug)]
42pub struct ArrowRow<'a> {
43 batch: &'a RecordBatch,
44 row_index: usize,
45}
46
47impl<'a> ArrowRow<'a> {
48 pub(crate) fn new(batch: &'a RecordBatch, row_index: usize) -> Self {
50 ArrowRow { batch, row_index }
51 }
52
53 #[must_use]
55 pub fn column_count(&self) -> usize {
56 self.batch.num_columns()
57 }
58
59 #[must_use]
61 pub fn get<T: FromArrowValue>(&self, col: usize) -> Option<T> {
62 if col >= self.batch.num_columns() {
63 return None;
64 }
65 T::from_arrow_column(self.batch.column(col), self.row_index)
66 }
67
68 #[must_use]
70 pub fn get_i16(&self, col: usize) -> Option<i16> {
71 self.get::<i16>(col)
72 }
73
74 #[must_use]
76 pub fn get_i32(&self, col: usize) -> Option<i32> {
77 self.get::<i32>(col)
78 }
79
80 #[must_use]
82 pub fn get_i64(&self, col: usize) -> Option<i64> {
83 self.get::<i64>(col)
84 }
85
86 #[must_use]
88 pub fn get_f32(&self, col: usize) -> Option<f32> {
89 self.get::<f32>(col)
90 }
91
92 #[must_use]
94 pub fn get_f64(&self, col: usize) -> Option<f64> {
95 self.get::<f64>(col)
96 }
97
98 #[must_use]
100 pub fn get_bool(&self, col: usize) -> Option<bool> {
101 self.get::<bool>(col)
102 }
103
104 #[must_use]
106 pub fn get_string(&self, col: usize) -> Option<String> {
107 self.get::<String>(col)
108 }
109
110 #[must_use]
112 pub fn get_bytes(&self, col: usize) -> Option<Vec<u8>> {
113 self.get::<Vec<u8>>(col)
114 }
115
116 #[must_use]
118 pub fn is_null(&self, col: usize) -> bool {
119 if col >= self.batch.num_columns() {
120 return true;
121 }
122 self.batch.column(col).is_null(self.row_index)
123 }
124}
125
126pub trait FromArrowValue: Sized {
128 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self>;
130}
131
132impl FromArrowValue for i16 {
133 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
134 if array.is_null(row) {
135 return None;
136 }
137 if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
138 Some(arr.value(row))
139 } else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
140 i16::try_from(arr.value(row)).ok()
141 } else {
142 array
143 .as_any()
144 .downcast_ref::<Int64Array>()
145 .and_then(|arr| i16::try_from(arr.value(row)).ok())
146 }
147 }
148}
149
150impl FromArrowValue for i32 {
151 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
152 if array.is_null(row) {
153 return None;
154 }
155 if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
156 Some(arr.value(row))
157 } else if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
158 Some(i32::from(arr.value(row)))
159 } else {
160 array
161 .as_any()
162 .downcast_ref::<Int64Array>()
163 .and_then(|arr| i32::try_from(arr.value(row)).ok())
164 }
165 }
166}
167
168impl FromArrowValue for i64 {
169 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
170 if array.is_null(row) {
171 return None;
172 }
173 if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
174 Some(arr.value(row))
175 } else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
176 Some(i64::from(arr.value(row)))
177 } else if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
178 Some(i64::from(arr.value(row)))
179 } else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() {
180 Some(i64::from(arr.value(row)))
181 } else {
182 array
183 .as_any()
184 .downcast_ref::<TimestampMicrosecondArray>()
185 .map(|arr| arr.value(row))
186 }
187 }
188}
189
190impl FromArrowValue for f32 {
191 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
192 if array.is_null(row) {
193 return None;
194 }
195 if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
196 Some(arr.value(row))
197 } else {
198 array.as_any().downcast_ref::<Float64Array>().map(|arr| {
199 #[expect(
204 clippy::cast_possible_truncation,
205 reason = "f64 → f32 narrowing is caller-accepted precision loss for this column-coercion path"
206 )]
207 let narrowed = arr.value(row) as f32;
208 narrowed
209 })
210 }
211 }
212}
213
214impl FromArrowValue for f64 {
215 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
216 if array.is_null(row) {
217 return None;
218 }
219 if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
220 Some(arr.value(row))
221 } else {
222 array
223 .as_any()
224 .downcast_ref::<Float32Array>()
225 .map(|arr| f64::from(arr.value(row)))
226 }
227 }
228}
229
230impl FromArrowValue for bool {
231 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
232 if array.is_null(row) {
233 return None;
234 }
235 array
236 .as_any()
237 .downcast_ref::<BooleanArray>()
238 .map(|arr| arr.value(row))
239 }
240}
241
242impl FromArrowValue for String {
243 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
244 if array.is_null(row) {
245 return None;
246 }
247 if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
248 Some(arr.value(row).to_string())
249 } else {
250 array
251 .as_any()
252 .downcast_ref::<LargeStringArray>()
253 .map(|arr| arr.value(row).to_string())
254 }
255 }
256}
257
258impl FromArrowValue for Vec<u8> {
259 fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
260 if array.is_null(row) {
261 return None;
262 }
263 if let Some(arr) = array.as_any().downcast_ref::<BinaryArray>() {
264 Some(arr.value(row).to_vec())
265 } else {
266 array
267 .as_any()
268 .downcast_ref::<LargeBinaryArray>()
269 .map(|arr| arr.value(row).to_vec())
270 }
271 }
272}
273
274#[derive(Debug)]
276pub struct ArrowChunk {
277 batch: RecordBatch,
278}
279
280impl ArrowChunk {
281 pub(crate) fn new(batch: RecordBatch) -> Self {
283 ArrowChunk { batch }
284 }
285
286 #[must_use]
288 pub fn len(&self) -> usize {
289 self.batch.num_rows()
290 }
291
292 #[must_use]
294 pub fn is_empty(&self) -> bool {
295 self.batch.num_rows() == 0
296 }
297
298 #[must_use]
300 pub fn column_count(&self) -> usize {
301 self.batch.num_columns()
302 }
303
304 #[must_use]
306 pub fn row(&self, index: usize) -> Option<ArrowRow<'_>> {
307 if index < self.batch.num_rows() {
308 Some(ArrowRow::new(&self.batch, index))
309 } else {
310 None
311 }
312 }
313
314 #[must_use]
316 pub fn first(&self) -> Option<ArrowRow<'_>> {
317 self.row(0)
318 }
319
320 #[must_use]
322 pub fn iter(&self) -> ArrowChunkIter<'_> {
323 ArrowChunkIter {
324 chunk: self,
325 index: 0,
326 }
327 }
328
329 #[must_use]
331 pub fn into_batch(self) -> RecordBatch {
332 self.batch
333 }
334}
335
336impl<'a> IntoIterator for &'a ArrowChunk {
337 type Item = ArrowRow<'a>;
338 type IntoIter = ArrowChunkIter<'a>;
339
340 fn into_iter(self) -> Self::IntoIter {
341 self.iter()
342 }
343}
344
345#[derive(Debug)]
347pub struct ArrowChunkIter<'a> {
348 chunk: &'a ArrowChunk,
349 index: usize,
350}
351
352impl<'a> Iterator for ArrowChunkIter<'a> {
353 type Item = ArrowRow<'a>;
354
355 fn next(&mut self) -> Option<Self::Item> {
356 if self.index < self.chunk.len() {
357 let row = ArrowRow::new(&self.chunk.batch, self.index);
358 self.index += 1;
359 Some(row)
360 } else {
361 None
362 }
363 }
364
365 fn size_hint(&self) -> (usize, Option<usize>) {
366 let remaining = self.chunk.len() - self.index;
367 (remaining, Some(remaining))
368 }
369}
370
371impl ExactSizeIterator for ArrowChunkIter<'_> {}
372
373pub trait ChunkSource: Send {
382 fn next_chunk(&mut self) -> Result<Option<Bytes>>;
392}
393
394pub struct ArrowRowset {
408 inner: ArrowRowsetInner,
409 schema: Arc<Schema>,
410}
411
412impl std::fmt::Debug for ArrowRowset {
413 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
414 f.debug_struct("ArrowRowset")
415 .field("schema", &self.schema)
416 .finish_non_exhaustive()
417 }
418}
419
420enum ArrowRowsetInner {
421 Buffered {
424 batches: Vec<RecordBatch>,
425 current: usize,
426 },
427 Streaming {
439 source: Box<dyn ChunkSource>,
440 decoder: StreamDecoder,
441 pending: VecDeque<RecordBatch>,
442 leftover: Option<Buffer>,
443 exhausted: bool,
444 },
445}
446
447impl ArrowRowset {
448 fn empty() -> Self {
450 ArrowRowset {
451 inner: ArrowRowsetInner::Buffered {
452 batches: Vec::new(),
453 current: 0,
454 },
455 schema: Arc::new(Schema::empty()),
456 }
457 }
458
459 pub fn from_bytes(bytes: Bytes) -> Result<Self> {
479 if bytes.is_empty() {
480 return Ok(Self::empty());
481 }
482 Self::from_buffer(Buffer::from(bytes))
483 }
484
485 pub fn from_buffer(buf: Buffer) -> Result<Self> {
495 if buf.is_empty() {
496 return Ok(Self::empty());
497 }
498 let (schema, batches) = decode_possibly_concatenated_streams(buf)?;
499 Ok(ArrowRowset {
500 inner: ArrowRowsetInner::Buffered {
501 batches,
502 current: 0,
503 },
504 schema,
505 })
506 }
507
508 pub fn from_chunks<I>(chunks: I) -> Result<Self>
520 where
521 I: IntoIterator<Item = Bytes>,
522 {
523 let mut batches = Vec::new();
524 let mut schema = Arc::new(Schema::empty());
525 for chunk in chunks {
526 if chunk.is_empty() {
527 continue;
528 }
529 let (chunk_schema, chunk_batches) = decode_chunk(chunk)?;
530 if schema.fields().is_empty() {
531 schema = chunk_schema;
532 }
533 batches.extend(chunk_batches);
534 }
535 Ok(ArrowRowset {
536 inner: ArrowRowsetInner::Buffered {
537 batches,
538 current: 0,
539 },
540 schema,
541 })
542 }
543
544 pub fn from_stream(source: Box<dyn ChunkSource>) -> Result<Self> {
565 let mut rowset = ArrowRowset {
566 inner: ArrowRowsetInner::Streaming {
567 source,
568 decoder: StreamDecoder::new(),
569 pending: VecDeque::new(),
570 leftover: None,
571 exhausted: false,
572 },
573 schema: Arc::new(Schema::empty()),
574 };
575 rowset.prime_stream()?;
579 Ok(rowset)
580 }
581
582 fn prime_stream(&mut self) -> Result<()> {
587 let new_schema = {
588 let ArrowRowsetInner::Streaming {
589 source,
590 decoder,
591 pending,
592 leftover,
593 exhausted,
594 } = &mut self.inner
595 else {
596 return Ok(());
597 };
598 while decoder.schema().is_none() && !*exhausted {
599 let mut buf = match leftover.take() {
600 Some(b) => b,
601 None => match source.next_chunk()? {
602 Some(bytes) if !bytes.is_empty() => Buffer::from(bytes),
603 Some(_) => continue,
604 None => {
605 *exhausted = true;
606 break;
607 }
608 },
609 };
610 drive_streaming_decoder(decoder, &mut buf, pending)?;
611 if !buf.is_empty() {
612 *leftover = Some(buf);
613 }
614 }
615 decoder.schema()
616 };
617 if let Some(s) = new_schema {
618 self.schema = s;
619 }
620 Ok(())
621 }
622
623 pub fn from_ipc_slice(data: &[u8]) -> Result<Self> {
633 if data.is_empty() {
634 return Ok(Self::empty());
635 }
636 Self::from_buffer(Buffer::from(data.to_vec()))
639 }
640
641 #[must_use]
643 pub fn schema(&self) -> &Arc<Schema> {
644 &self.schema
645 }
646
647 #[must_use]
649 pub fn column_count(&self) -> usize {
650 self.schema.fields().len()
651 }
652
653 #[must_use]
655 pub fn column_names(&self) -> Vec<String> {
656 self.schema
657 .fields()
658 .iter()
659 .map(|f| f.name().clone())
660 .collect()
661 }
662
663 #[must_use]
665 pub fn column_name(&self, index: usize) -> Option<&str> {
666 self.schema.fields().get(index).map(|f| f.name().as_str())
667 }
668
669 pub fn next_chunk(&mut self) -> Result<Option<ArrowChunk>> {
685 match &mut self.inner {
686 ArrowRowsetInner::Buffered { batches, current } => {
687 if *current >= batches.len() {
688 return Ok(None);
689 }
690 let batch = batches[*current].clone();
691 *current += 1;
692 Ok(Some(ArrowChunk::new(batch)))
693 }
694 ArrowRowsetInner::Streaming {
695 source,
696 decoder,
697 pending,
698 leftover,
699 exhausted,
700 } => loop {
701 if let Some(batch) = pending.pop_front() {
702 return Ok(Some(ArrowChunk::new(batch)));
703 }
704 if *exhausted {
705 return Ok(None);
706 }
707 let mut buf = match leftover.take() {
708 Some(b) => b,
709 None => match source.next_chunk()? {
710 Some(bytes) if !bytes.is_empty() => Buffer::from(bytes),
711 Some(_) => continue,
712 None => {
713 *exhausted = true;
714 continue;
715 }
716 },
717 };
718 drive_streaming_decoder(decoder, &mut buf, pending)?;
719 if !buf.is_empty() {
720 *leftover = Some(buf);
721 }
722 },
723 }
724 }
725
726 #[must_use]
732 pub fn total_rows(&self) -> usize {
733 match &self.inner {
734 ArrowRowsetInner::Buffered { batches, .. } => batches
735 .iter()
736 .map(arrow::array::RecordBatch::num_rows)
737 .sum(),
738 ArrowRowsetInner::Streaming { pending, .. } => pending
739 .iter()
740 .map(arrow::array::RecordBatch::num_rows)
741 .sum(),
742 }
743 }
744
745 #[must_use]
752 pub fn is_empty(&self) -> bool {
753 match &self.inner {
754 ArrowRowsetInner::Buffered { batches, .. } => {
755 batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0)
756 }
757 ArrowRowsetInner::Streaming {
758 pending, exhausted, ..
759 } => *exhausted && pending.is_empty(),
760 }
761 }
762}
763
764fn arrow_type_to_sql_name(dt: &DataType) -> String {
766 match dt {
767 DataType::Boolean => "BOOLEAN".to_string(),
768 DataType::Int8 => "SMALLINT".to_string(),
769 DataType::Int16 => "SMALLINT".to_string(),
770 DataType::Int32 => "INTEGER".to_string(),
771 DataType::Int64 => "BIGINT".to_string(),
772 DataType::UInt8 => "SMALLINT".to_string(),
773 DataType::UInt16 => "INTEGER".to_string(),
774 DataType::UInt32 => "BIGINT".to_string(),
775 DataType::UInt64 => "BIGINT".to_string(),
776 DataType::Float16 => "REAL".to_string(),
777 DataType::Float32 => "REAL".to_string(),
778 DataType::Float64 => "DOUBLE PRECISION".to_string(),
779 DataType::Utf8 | DataType::LargeUtf8 => "TEXT".to_string(),
780 DataType::Binary | DataType::LargeBinary => "BYTEA".to_string(),
781 DataType::Date32 | DataType::Date64 => "DATE".to_string(),
782 DataType::Time32(_) | DataType::Time64(_) => "TIME".to_string(),
783 DataType::Timestamp(TimeUnit::Microsecond, None) => "TIMESTAMP".to_string(),
784 DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => "TIMESTAMPTZ".to_string(),
785 DataType::Timestamp(_, None) => "TIMESTAMP".to_string(),
786 DataType::Timestamp(_, Some(_)) => "TIMESTAMPTZ".to_string(),
787 DataType::Decimal128(p, s) => format!("NUMERIC({p}, {s})"),
788 DataType::Decimal256(p, s) => format!("NUMERIC({p}, {s})"),
789 DataType::Interval(_) => "INTERVAL".to_string(),
790 DataType::List(_) => "ARRAY".to_string(),
791 DataType::Struct(_) => "RECORD".to_string(),
792 _ => "UNKNOWN".to_string(),
793 }
794}
795
796fn decimal_scale_to_u32(scale: i8) -> u32 {
801 u32::try_from(scale).unwrap_or(0)
802}
803
804pub(crate) fn arrow_type_to_sql_type(dt: &DataType) -> SqlType {
806 match dt {
807 DataType::Boolean => SqlType::Bool,
808 DataType::Int8 | DataType::Int16 => SqlType::SmallInt,
809 DataType::Int32 => SqlType::Int,
810 DataType::Int64 => SqlType::BigInt,
811 DataType::UInt8 | DataType::UInt16 => SqlType::SmallInt,
812 DataType::UInt32 => SqlType::Int,
813 DataType::UInt64 => SqlType::BigInt,
814 DataType::Float16 | DataType::Float32 => SqlType::Float,
815 DataType::Float64 => SqlType::Double,
816 DataType::Utf8 | DataType::LargeUtf8 => SqlType::Text,
817 DataType::Binary | DataType::LargeBinary => SqlType::ByteA,
818 DataType::Date32 | DataType::Date64 => SqlType::Date,
819 DataType::Time32(_) | DataType::Time64(_) => SqlType::Time,
820 DataType::Timestamp(_, None) => SqlType::Timestamp,
821 DataType::Timestamp(_, Some(_)) => SqlType::TimestampTz,
822 DataType::Decimal128(p, s) => SqlType::Numeric {
823 precision: u32::from(*p),
824 scale: decimal_scale_to_u32(*s),
825 },
826 DataType::Decimal256(p, s) => SqlType::Numeric {
827 precision: u32::from(*p),
828 scale: decimal_scale_to_u32(*s),
829 },
830 DataType::Interval(_) => SqlType::Interval,
831 _ => SqlType::Text, }
833}
834
835pub fn parse_arrow_ipc(bytes: Bytes) -> Result<Vec<RecordBatch>> {
845 if bytes.is_empty() {
846 return Ok(Vec::new());
847 }
848 let (_, batches) = decode_possibly_concatenated_streams(Buffer::from(bytes))?;
849 Ok(batches)
850}
851
852fn decode_chunk(bytes: Bytes) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
858 decode_possibly_concatenated_streams(Buffer::from(bytes))
859}
860
861const ARROW_IPC_EOS: [u8; 8] = [0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0];
867
868fn drive_streaming_decoder(
882 decoder: &mut StreamDecoder,
883 buf: &mut Buffer,
884 pending: &mut VecDeque<RecordBatch>,
885) -> Result<()> {
886 loop {
887 if buf.is_empty() {
888 return Ok(());
889 }
890 if buf.len() >= ARROW_IPC_EOS.len() && buf[..ARROW_IPC_EOS.len()] == ARROW_IPC_EOS {
896 let new_len = buf.len() - ARROW_IPC_EOS.len();
897 *buf = buf.slice_with_length(ARROW_IPC_EOS.len(), new_len);
898 if !buf.is_empty() {
899 *decoder = StreamDecoder::new();
900 }
901 continue;
902 }
903 if decoder.schema().is_some() && peek_is_schema_message(buf) {
909 match peek_message_total_size(buf) {
910 Some(total) if buf.len() >= total => {
911 *buf = buf.slice_with_length(total, buf.len() - total);
912 continue;
913 }
914 _ => {}
915 }
916 }
917 let buf_before = buf.clone();
918 match decoder.decode(buf) {
919 Ok(Some(batch)) => pending.push_back(batch),
920 Ok(None) => return Ok(()),
921 Err(e) => {
922 let msg = e.to_string();
923 if msg.contains("Not expecting a schema when messages are read") {
924 *buf = buf_before;
927 *decoder = StreamDecoder::new();
928 continue;
929 }
930 if msg.contains("Unexpected EOS") {
931 *buf = buf_before;
935 *decoder = StreamDecoder::new();
936 continue;
937 }
938 return Err(Error::new(format!("Failed to parse Arrow IPC data: {e}")));
939 }
940 }
941 }
942}
943
944fn peek_is_schema_message(buf: &Buffer) -> bool {
948 let Some((_len, body)) = peek_message_body(buf) else {
953 return false;
954 };
955 match arrow::ipc::root_as_message(body) {
961 Ok(msg) => msg.header_type() == arrow::ipc::MessageHeader::Schema,
962 Err(_) => false,
963 }
964}
965
966fn peek_message_body(buf: &Buffer) -> Option<(usize, &[u8])> {
969 let bytes: &[u8] = buf;
970 let (length_offset, remaining) = if bytes.len() >= 4 && bytes[0..4] == [0xFF; 4] {
972 (4, &bytes[4..])
973 } else {
974 (0, bytes)
975 };
976 if remaining.len() < 4 {
977 return None;
978 }
979 let length =
980 u32::from_le_bytes([remaining[0], remaining[1], remaining[2], remaining[3]]) as usize;
981 let body_start = length_offset + 4;
982 if buf.len() < body_start + length {
983 return None;
984 }
985 Some((body_start + length, &bytes[body_start..body_start + length]))
986}
987
988fn peek_message_total_size(buf: &Buffer) -> Option<usize> {
993 let (total, _body) = peek_message_body(buf)?;
994 Some(total)
995}
996
997fn decode_possibly_concatenated_streams(
1003 mut buf: Buffer,
1004) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
1005 let mut decoder = StreamDecoder::new();
1006 let mut pending = VecDeque::new();
1007 while !buf.is_empty() {
1008 let before_len = buf.len();
1009 drive_streaming_decoder(&mut decoder, &mut buf, &mut pending)?;
1010 if buf.len() == before_len {
1011 if !buf.is_empty() {
1014 return Err(Error::new(
1015 "Failed to parse Arrow IPC data: decoder made no progress",
1016 ));
1017 }
1018 break;
1019 }
1020 }
1021 let schema = decoder
1022 .schema()
1023 .unwrap_or_else(|| Arc::new(Schema::empty()));
1024 Ok((schema, pending.into_iter().collect()))
1025}
1026
1027#[cfg(test)]
1028mod tests {
1029 use super::*;
1030 use arrow::datatypes::Field;
1031
1032 #[test]
1033 fn test_arrow_rowset_empty() {
1034 let rowset = ArrowRowset::from_bytes(Bytes::new()).unwrap();
1035 assert!(rowset.is_empty());
1036 assert_eq!(rowset.column_count(), 0);
1037
1038 let rowset = ArrowRowset::from_ipc_slice(&[]).unwrap();
1039 assert!(rowset.is_empty());
1040 }
1041
1042 #[test]
1043 fn test_arrow_chunk_iteration() {
1044 let schema = Arc::new(Schema::new(vec![
1045 Field::new("id", DataType::Int32, false),
1046 Field::new("name", DataType::Utf8, true),
1047 ]));
1048
1049 let id_array = Int32Array::from(vec![1, 2, 3]);
1050 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), None]);
1051
1052 let batch =
1053 RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap();
1054
1055 let chunk = ArrowChunk::new(batch);
1056 assert_eq!(chunk.len(), 3);
1057 assert_eq!(chunk.column_count(), 2);
1058
1059 let mut iter = chunk.iter();
1060
1061 let row0 = iter.next().unwrap();
1062 assert_eq!(row0.get::<i32>(0), Some(1));
1063 assert_eq!(row0.get::<String>(1), Some("Alice".to_string()));
1064
1065 let row1 = iter.next().unwrap();
1066 assert_eq!(row1.get::<i32>(0), Some(2));
1067 assert_eq!(row1.get::<String>(1), Some("Bob".to_string()));
1068
1069 let row2 = iter.next().unwrap();
1070 assert_eq!(row2.get::<i32>(0), Some(3));
1071 assert_eq!(row2.get::<String>(1), None);
1072 assert!(row2.is_null(1));
1073
1074 assert!(iter.next().is_none());
1075 }
1076
1077 struct VecChunkSource {
1080 chunks: VecDeque<Bytes>,
1081 }
1082
1083 impl VecChunkSource {
1084 fn new(chunks: Vec<Bytes>) -> Self {
1085 VecChunkSource {
1086 chunks: chunks.into(),
1087 }
1088 }
1089 }
1090
1091 impl ChunkSource for VecChunkSource {
1092 fn next_chunk(&mut self) -> Result<Option<Bytes>> {
1093 Ok(self.chunks.pop_front())
1094 }
1095 }
1096
1097 fn serialize_independent_streams(num_streams: usize, rows_per_stream: i32) -> Vec<Bytes> {
1102 use arrow::ipc::writer::StreamWriter;
1103 let schema = Arc::new(Schema::new(vec![
1104 Field::new("id", DataType::Int32, false),
1105 Field::new("name", DataType::Utf8, true),
1106 ]));
1107 let mut out = Vec::with_capacity(num_streams);
1108 for s in 0..num_streams {
1109 let start = i32::try_from(s).expect("test uses small stream counts") * rows_per_stream;
1110 let id_array = Int32Array::from((start..start + rows_per_stream).collect::<Vec<_>>());
1111 let name_array = StringArray::from(
1112 (start..start + rows_per_stream)
1113 .map(|i| Some(format!("n{i}")))
1114 .collect::<Vec<_>>(),
1115 );
1116 let batch = RecordBatch::try_new(
1117 Arc::clone(&schema),
1118 vec![Arc::new(id_array), Arc::new(name_array)],
1119 )
1120 .unwrap();
1121
1122 let mut buf: Vec<u8> = Vec::new();
1123 {
1124 let mut writer = StreamWriter::try_new(&mut buf, &schema).unwrap();
1125 writer.write(&batch).unwrap();
1126 writer.finish().unwrap();
1127 }
1128 out.push(Bytes::from(buf));
1129 }
1130 out
1131 }
1132
1133 #[test]
1134 fn test_streaming_rowset_single_chunk() {
1135 let chunks = serialize_independent_streams(1, 100);
1136 let source = Box::new(VecChunkSource::new(chunks));
1137 let mut rowset = ArrowRowset::from_stream(source).unwrap();
1138
1139 assert_eq!(rowset.column_count(), 2);
1141 assert_eq!(rowset.column_name(0), Some("id"));
1142
1143 let chunk = rowset.next_chunk().unwrap().expect("one chunk");
1144 assert_eq!(chunk.len(), 100);
1145 assert!(rowset.next_chunk().unwrap().is_none());
1146 }
1147
1148 #[test]
1149 fn test_streaming_rowset_multiple_streams() {
1150 let chunks = serialize_independent_streams(4, 500);
1153 assert_eq!(chunks.len(), 4);
1154 let source = Box::new(VecChunkSource::new(chunks));
1155 let mut rowset = ArrowRowset::from_stream(source).unwrap();
1156
1157 let mut total_rows = 0;
1158 while let Some(chunk) = rowset.next_chunk().unwrap() {
1159 total_rows += chunk.len();
1160 }
1161 assert_eq!(total_rows, 2000);
1162 }
1163
1164 #[test]
1165 fn test_streaming_rowset_empty_source() {
1166 let source = Box::new(VecChunkSource::new(vec![]));
1167 let mut rowset = ArrowRowset::from_stream(source).unwrap();
1168 assert!(rowset.next_chunk().unwrap().is_none());
1169 assert!(rowset.is_empty());
1170 }
1171
1172 #[test]
1173 fn test_from_bytes_concatenated_streams() {
1174 let streams = serialize_independent_streams(2, 300);
1178 let mut concat = bytes::BytesMut::new();
1179 for s in &streams {
1180 concat.extend_from_slice(s);
1181 }
1182 let rowset = ArrowRowset::from_bytes(concat.freeze()).unwrap();
1183 let mut total_rows = 0usize;
1184 let mut rowset = rowset;
1185 while let Some(chunk) = rowset.next_chunk().unwrap() {
1186 total_rows += chunk.len();
1187 }
1188 assert_eq!(total_rows, 600);
1189 }
1190}