1use crate::client::{LogWriteRecord, Record, WriteRecord};
19use crate::compression::ArrowCompressionInfo;
20use crate::error::{Error, Result};
21use crate::metadata::{DataType, RowType};
22use crate::record::{ChangeType, ScanRecord};
23use crate::row::column_writer::ColumnWriter;
24use crate::row::{ColumnarRow, InternalRow};
25use arrow::array::{ArrayBuilder, ArrayRef};
26use arrow::{
27 array::RecordBatch,
28 buffer::Buffer,
29 ipc::{
30 reader::{StreamReader, read_record_batch},
31 root_as_message,
32 writer::StreamWriter,
33 },
34};
35use arrow_schema::ArrowError::ParseError;
36use arrow_schema::SchemaRef;
37use arrow_schema::{DataType as ArrowDataType, Field};
38use byteorder::WriteBytesExt;
39use byteorder::{ByteOrder, LittleEndian};
40use bytes::Bytes;
41use crc32c::crc32c;
42use std::{
43 collections::HashMap,
44 fs::File,
45 io::{Cursor, Read, Seek, SeekFrom, Write},
46 path::PathBuf,
47 sync::Arc,
48};
49
50use crate::error::Error::IllegalArgument;
51use arrow::ipc::writer::IpcWriteOptions;
52pub const BASE_OFFSET_LENGTH: usize = 8;
54pub const LENGTH_LENGTH: usize = 4;
55pub const MAGIC_LENGTH: usize = 1;
56pub const COMMIT_TIMESTAMP_LENGTH: usize = 8;
57pub const CRC_LENGTH: usize = 4;
58pub const SCHEMA_ID_LENGTH: usize = 2;
59pub const ATTRIBUTE_LENGTH: usize = 1;
60pub const LAST_OFFSET_DELTA_LENGTH: usize = 4;
61pub const WRITE_CLIENT_ID_LENGTH: usize = 8;
62pub const BATCH_SEQUENCE_LENGTH: usize = 4;
63pub const RECORDS_COUNT_LENGTH: usize = 4;
64
65pub const BASE_OFFSET_OFFSET: usize = 0;
66pub const LENGTH_OFFSET: usize = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH;
67pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH;
68pub const COMMIT_TIMESTAMP_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH;
69pub const CRC_OFFSET: usize = COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH;
70pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH;
71pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH;
72pub const LAST_OFFSET_DELTA_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
73pub const WRITE_CLIENT_ID_OFFSET: usize = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
74pub const BATCH_SEQUENCE_OFFSET: usize = WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
75pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
76pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
77
78pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET;
79pub const ARROW_CHANGETYPE_OFFSET: usize = RECORD_BATCH_HEADER_SIZE;
80pub const LOG_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH;
81
82pub const MAX_BATCH_SIZE: usize = i32::MAX as usize; #[derive(Debug, Clone, Copy)]
90pub enum LogMagicValue {
91 V0 = 0,
92}
93
94fn validate_batch_size(batch_size_bytes: i32) -> Result<usize> {
101 if batch_size_bytes < 0 {
103 return Err(Error::UnexpectedError {
104 message: format!("Invalid negative batch size: {batch_size_bytes}"),
105 source: None,
106 });
107 }
108
109 let batch_size_u = batch_size_bytes as usize;
110
111 let total_size =
113 batch_size_u
114 .checked_add(LOG_OVERHEAD)
115 .ok_or_else(|| Error::UnexpectedError {
116 message: format!(
117 "Batch size {batch_size_u} + LOG_OVERHEAD {LOG_OVERHEAD} would overflow"
118 ),
119 source: None,
120 })?;
121
122 if total_size > MAX_BATCH_SIZE {
124 return Err(Error::UnexpectedError {
125 message: format!(
126 "Batch size {total_size} exceeds maximum allowed size {MAX_BATCH_SIZE}"
127 ),
128 source: None,
129 });
130 }
131
132 Ok(total_size)
133}
134
135pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8;
138
139pub const NO_WRITER_ID: i64 = -1;
141
142pub const NO_BATCH_SEQUENCE: i32 = -1;
144
145pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
146
147pub const DEFAULT_MAX_RECORD: i32 = 256;
149
150pub struct MemoryLogRecordsArrowBuilder {
151 base_log_offset: i64,
152 schema_id: i32,
153 magic: u8,
154 writer_id: i64,
155 batch_sequence: i32,
156 arrow_record_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder>,
157 is_closed: bool,
158 arrow_compression_info: ArrowCompressionInfo,
159}
160
161pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
162 fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>>;
163
164 fn append(&mut self, row: &dyn InternalRow) -> Result<bool>;
165
166 fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool>;
167
168 fn schema(&self) -> SchemaRef;
169
170 fn records_count(&self) -> i32;
171
172 fn is_full(&self) -> bool;
173
174 fn estimated_size_in_bytes(&self) -> usize;
176}
177
178#[derive(Default)]
179pub struct PrebuiltRecordBatchBuilder {
180 arrow_record_batch: Option<Arc<RecordBatch>>,
181 records_count: i32,
182}
183
184impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder {
185 fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
186 Ok(self.arrow_record_batch.as_ref().unwrap().clone())
187 }
188
189 fn append(&mut self, _row: &dyn InternalRow) -> Result<bool> {
190 Ok(false)
192 }
193
194 fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool> {
195 if self.arrow_record_batch.is_some() {
196 return Ok(false);
197 }
198 self.records_count = record_batch.num_rows() as i32;
199 self.arrow_record_batch = Some(record_batch);
200 Ok(true)
201 }
202
203 fn schema(&self) -> SchemaRef {
204 self.arrow_record_batch.as_ref().unwrap().schema()
205 }
206
207 fn records_count(&self) -> i32 {
208 self.records_count
209 }
210
211 fn is_full(&self) -> bool {
212 self.arrow_record_batch.is_some()
214 }
215
216 fn estimated_size_in_bytes(&self) -> usize {
217 self.arrow_record_batch
218 .as_ref()
219 .map(|batch| batch.get_array_memory_size())
220 .unwrap_or(0)
221 }
222}
223
224pub struct RowAppendRecordBatchBuilder {
225 table_schema: SchemaRef,
226 column_writers: Vec<ColumnWriter>,
227 records_count: i32,
228}
229
230impl RowAppendRecordBatchBuilder {
231 pub fn new(row_type: &RowType) -> Result<Self> {
232 let capacity = DEFAULT_MAX_RECORD as usize;
233 let schema_ref = to_arrow_schema(row_type)?;
234 let writers: Result<Vec<_>> = row_type
235 .fields()
236 .iter()
237 .enumerate()
238 .map(|(pos, field)| {
239 let arrow_type = schema_ref.field(pos).data_type();
240 ColumnWriter::create(field.data_type(), arrow_type, pos, capacity)
241 })
242 .collect();
243 Ok(Self {
244 table_schema: schema_ref.clone(),
245 column_writers: writers?,
246 records_count: 0,
247 })
248 }
249 pub fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
251 ArrowRecordBatchInnerBuilder::append(self, row)
252 }
253
254 pub fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
256 ArrowRecordBatchInnerBuilder::build_arrow_record_batch(self)
257 }
258}
259
260impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
261 fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
262 let arrays: Result<Vec<ArrayRef>> = self
263 .column_writers
264 .iter_mut()
265 .enumerate()
266 .map(|(idx, writer)| {
267 let array = writer.finish();
268 let expected_type = self.table_schema.field(idx).data_type();
269
270 if array.data_type() != expected_type {
272 return Err(Error::IllegalArgument {
273 message: format!(
274 "Builder type mismatch at column {}: expected {:?}, got {:?}",
275 idx,
276 expected_type,
277 array.data_type()
278 ),
279 });
280 }
281
282 Ok(array)
283 })
284 .collect();
285
286 Ok(Arc::new(RecordBatch::try_new(
287 self.table_schema.clone(),
288 arrays?,
289 )?))
290 }
291
292 fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
293 for writer in &mut self.column_writers {
294 writer.write_field(row)?;
295 }
296 self.records_count += 1;
297 Ok(true)
298 }
299
300 fn append_batch(&mut self, _record_batch: Arc<RecordBatch>) -> Result<bool> {
301 Ok(false)
302 }
303
304 fn schema(&self) -> SchemaRef {
305 self.table_schema.clone()
306 }
307
308 fn records_count(&self) -> i32 {
309 self.records_count
310 }
311
312 fn is_full(&self) -> bool {
313 self.records_count() >= DEFAULT_MAX_RECORD
314 }
315
316 fn estimated_size_in_bytes(&self) -> usize {
317 self.column_writers
321 .iter()
322 .map(|writer| writer.finish_cloned().get_array_memory_size())
323 .sum()
324 }
325}
326
327impl MemoryLogRecordsArrowBuilder {
328 pub fn new(
329 schema_id: i32,
330 row_type: &RowType,
331 to_append_record_batch: bool,
332 arrow_compression_info: ArrowCompressionInfo,
333 ) -> Result<Self> {
334 let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
335 if to_append_record_batch {
336 Box::new(PrebuiltRecordBatchBuilder::default())
337 } else {
338 Box::new(RowAppendRecordBatchBuilder::new(row_type)?)
339 }
340 };
341 Ok(MemoryLogRecordsArrowBuilder {
342 base_log_offset: BUILDER_DEFAULT_OFFSET,
343 schema_id,
344 magic: CURRENT_LOG_MAGIC_VALUE,
345 writer_id: NO_WRITER_ID,
346 batch_sequence: NO_BATCH_SEQUENCE,
347 is_closed: false,
348 arrow_record_batch_builder: arrow_batch_builder,
349 arrow_compression_info,
350 })
351 }
352
353 pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
354 match &record.record() {
355 Record::Log(log_write_record) => match log_write_record {
356 LogWriteRecord::InternalRow(row) => {
357 Ok(self.arrow_record_batch_builder.append(*row)?)
358 }
359 LogWriteRecord::RecordBatch(record_batch) => Ok(self
360 .arrow_record_batch_builder
361 .append_batch(record_batch.clone())?),
362 },
363 Record::Kv(_) => Err(Error::UnsupportedOperation {
364 message: "Only LogRecord is supported to append".to_string(),
365 }),
366 }
367 }
369
370 pub fn is_full(&self) -> bool {
371 self.arrow_record_batch_builder.records_count() >= DEFAULT_MAX_RECORD
372 }
373
374 pub fn is_closed(&self) -> bool {
375 self.is_closed
376 }
377
378 pub fn close(&mut self) {
379 self.is_closed = true;
380 }
381
382 pub fn build(&mut self) -> Result<Vec<u8>> {
383 let mut arrow_batch_bytes = vec![];
385 let table_schema = self.arrow_record_batch_builder.schema();
386 let compression_type = self.arrow_compression_info.get_compression_type();
387 let write_option =
388 IpcWriteOptions::try_with_compression(IpcWriteOptions::default(), compression_type);
389 let mut writer = StreamWriter::try_new_with_options(
390 &mut arrow_batch_bytes,
391 &table_schema,
392 write_option?,
393 )?;
394
395 let header = writer.get_ref().len();
397 let record_batch = self.arrow_record_batch_builder.build_arrow_record_batch()?;
398 writer.write(record_batch.as_ref())?;
399 let real_arrow_batch_bytes = &arrow_batch_bytes[header..];
401
402 let mut batch_bytes = vec![0u8; RECORD_BATCH_HEADER_SIZE + real_arrow_batch_bytes.len()];
404 self.write_batch_header(&mut batch_bytes[..])?;
406
407 let mut cursor = Cursor::new(&mut batch_bytes[..]);
409 cursor.set_position(RECORD_BATCH_HEADER_SIZE as u64);
410 cursor.write_all(real_arrow_batch_bytes)?;
411
412 let calcute_crc_bytes = &cursor.get_ref()[SCHEMA_ID_OFFSET..];
413 let crc = crc32c(calcute_crc_bytes);
415 cursor.set_position(CRC_OFFSET as u64);
416 cursor.write_u32::<LittleEndian>(crc)?;
417
418 Ok(batch_bytes.to_vec())
419 }
420
421 fn write_batch_header(&self, buffer: &mut [u8]) -> Result<()> {
422 let total_len = buffer.len();
423 let mut cursor = Cursor::new(buffer);
424 cursor.write_i64::<LittleEndian>(self.base_log_offset)?;
425 cursor
426 .write_i32::<LittleEndian>((total_len - BASE_OFFSET_LENGTH - LENGTH_LENGTH) as i32)?;
427 cursor.write_u8(self.magic)?;
428 cursor.write_i64::<LittleEndian>(0)?; cursor.write_u32::<LittleEndian>(0)?; cursor.write_i16::<LittleEndian>(self.schema_id as i16)?;
431
432 let record_count = self.arrow_record_batch_builder.records_count();
433 let append_only = true;
435 cursor.write_u8(if append_only { 1 } else { 0 })?;
436 cursor.write_i32::<LittleEndian>(if record_count > 0 {
437 record_count - 1
438 } else {
439 0
440 })?;
441
442 cursor.write_i64::<LittleEndian>(self.writer_id)?;
443 cursor.write_i32::<LittleEndian>(self.batch_sequence)?;
444 cursor.write_i32::<LittleEndian>(record_count)?;
445 Ok(())
446 }
447
448 pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: i32) {
449 self.writer_id = writer_id;
450 self.batch_sequence = batch_base_sequence;
451 }
452
453 pub fn estimated_size_in_bytes(&self) -> usize {
456 RECORD_BATCH_HEADER_SIZE + self.arrow_record_batch_builder.estimated_size_in_bytes()
457 }
458}
459
460pub trait ToArrow {
461 fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
462}
463
464struct MemorySource {
467 data: Bytes,
468}
469
470impl MemorySource {
471 fn new(data: Vec<u8>) -> Self {
472 Self {
473 data: Bytes::from(data),
474 }
475 }
476
477 fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
478 if pos + LOG_OVERHEAD > self.data.len() {
479 return Err(Error::UnexpectedError {
480 message: format!(
481 "Position {} + LOG_OVERHEAD {} exceeds data size {}",
482 pos,
483 LOG_OVERHEAD,
484 self.data.len()
485 ),
486 source: None,
487 });
488 }
489
490 let base_offset = LittleEndian::read_i64(&self.data[pos + BASE_OFFSET_OFFSET..]);
491 let batch_size_bytes = LittleEndian::read_i32(&self.data[pos + LENGTH_OFFSET..]);
492
493 let batch_size = validate_batch_size(batch_size_bytes)?;
495
496 Ok((base_offset, batch_size))
497 }
498
499 fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
500 if pos + size > self.data.len() {
501 return Err(Error::UnexpectedError {
502 message: format!(
503 "Read beyond data size: {} + {} > {}",
504 pos,
505 size,
506 self.data.len()
507 ),
508 source: None,
509 });
510 }
511 Ok(self.data.slice(pos..pos + size))
513 }
514
515 fn total_size(&self) -> usize {
516 self.data.len()
517 }
518}
519
520struct FileCleanupGuard {
523 file_path: PathBuf,
524}
525
526impl Drop for FileCleanupGuard {
527 fn drop(&mut self) {
528 if let Err(e) = std::fs::remove_file(&self.file_path) {
530 log::warn!(
531 "Failed to delete remote log file {}: {}",
532 self.file_path.display(),
533 e
534 );
535 } else {
536 log::debug!("Deleted remote log file: {}", self.file_path.display());
537 }
538 }
539}
540
541struct FileSource {
548 file: File,
549 file_size: usize,
550 base_offset: usize,
551 _cleanup: Option<FileCleanupGuard>, }
553
554impl FileSource {
555 fn new(file: File, base_offset: usize, file_path: PathBuf) -> Result<Self> {
559 let file_size = file.metadata()?.len() as usize;
560
561 if base_offset > file_size {
563 return Err(Error::UnexpectedError {
564 message: format!("base_offset ({base_offset}) exceeds file_size ({file_size})"),
565 source: None,
566 });
567 }
568
569 Ok(Self {
570 file,
571 file_size,
572 base_offset,
573 _cleanup: Some(FileCleanupGuard { file_path }),
574 })
575 }
576
577 fn read_at(&mut self, pos: u64, buf: &mut [u8]) -> Result<()> {
580 self.file.seek(SeekFrom::Start(pos))?;
581 self.file.read_exact(buf)?;
582 Ok(())
583 }
584
585 fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
586 let actual_pos = self.base_offset + pos;
587 if actual_pos + LOG_OVERHEAD > self.file_size {
588 return Err(Error::UnexpectedError {
589 message: format!(
590 "Position {} exceeds file size {}",
591 actual_pos, self.file_size
592 ),
593 source: None,
594 });
595 }
596
597 let mut header_buf = vec![0u8; LOG_OVERHEAD];
599 self.read_at(actual_pos as u64, &mut header_buf)?;
600
601 let base_offset = LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]);
602 let batch_size_bytes = LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]);
603
604 let batch_size = validate_batch_size(batch_size_bytes)?;
606
607 Ok((base_offset, batch_size))
608 }
609
610 fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
611 let actual_pos = self.base_offset + pos;
612 if actual_pos + size > self.file_size {
613 return Err(Error::UnexpectedError {
614 message: format!(
615 "Read beyond file size: {} + {} > {}",
616 actual_pos, size, self.file_size
617 ),
618 source: None,
619 });
620 }
621
622 let mut batch_buf = vec![0u8; size];
624 self.read_at(actual_pos as u64, &mut batch_buf)?;
625
626 Ok(Bytes::from(batch_buf))
627 }
628
629 fn total_size(&self) -> usize {
630 self.file_size - self.base_offset
631 }
632}
633
634enum LogRecordsSource {
636 Memory(MemorySource),
637 File(FileSource),
638}
639
640impl LogRecordsSource {
641 fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
642 match self {
643 Self::Memory(s) => s.read_batch_header(pos),
644 Self::File(s) => s.read_batch_header(pos),
645 }
646 }
647
648 fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
649 match self {
650 Self::Memory(s) => s.read_batch_data(pos, size),
651 Self::File(s) => s.read_batch_data(pos, size),
652 }
653 }
654
655 fn total_size(&self) -> usize {
656 match self {
657 Self::Memory(s) => s.total_size(),
658 Self::File(s) => s.total_size(),
659 }
660 }
661}
662
663pub struct LogRecordsBatches {
664 source: LogRecordsSource,
665 current_pos: usize,
666 remaining_bytes: usize,
667}
668
669impl LogRecordsBatches {
670 pub fn new(data: Vec<u8>) -> Self {
672 let source = LogRecordsSource::Memory(MemorySource::new(data));
673 let remaining_bytes = source.total_size();
674 Self {
675 source,
676 current_pos: 0,
677 remaining_bytes,
678 }
679 }
680
681 pub fn from_file(file: File, base_offset: usize, file_path: PathBuf) -> Result<Self> {
687 let source = FileSource::new(file, base_offset, file_path)?;
688 let remaining_bytes = source.total_size();
689 Ok(Self {
690 source: LogRecordsSource::File(source),
691 current_pos: 0,
692 remaining_bytes,
693 })
694 }
695
696 fn next_batch_size(&mut self) -> Result<Option<usize>> {
698 if self.remaining_bytes < LOG_OVERHEAD {
699 return Ok(None);
700 }
701
702 match self.source.read_batch_header(self.current_pos) {
704 Ok((_base_offset, batch_size)) => {
705 if batch_size > self.remaining_bytes {
706 Ok(None)
707 } else {
708 Ok(Some(batch_size))
709 }
710 }
711 Err(e) => Err(e),
712 }
713 }
714}
715
716impl Iterator for LogRecordsBatches {
717 type Item = Result<LogRecordBatch>;
718
719 fn next(&mut self) -> Option<Self::Item> {
720 match self.next_batch_size() {
721 Ok(Some(batch_size)) => {
722 match self.source.read_batch_data(self.current_pos, batch_size) {
724 Ok(data) => {
725 let record_batch = LogRecordBatch::new(data);
726 self.current_pos += batch_size;
727 self.remaining_bytes -= batch_size;
728 Some(Ok(record_batch))
729 }
730 Err(e) => Some(Err(e)),
731 }
732 }
733 Ok(None) => None,
734 Err(e) => Some(Err(e)),
735 }
736 }
737}
738
739pub struct LogRecordBatch {
740 data: Bytes,
741}
742
743#[allow(dead_code)]
744impl LogRecordBatch {
745 pub fn new(data: Bytes) -> Self {
746 LogRecordBatch { data }
747 }
748
749 pub fn magic(&self) -> u8 {
750 self.data[MAGIC_OFFSET]
751 }
752
753 pub fn commit_timestamp(&self) -> i64 {
754 let offset = COMMIT_TIMESTAMP_OFFSET;
755 LittleEndian::read_i64(&self.data[offset..offset + COMMIT_TIMESTAMP_LENGTH])
756 }
757
758 pub fn writer_id(&self) -> i64 {
759 let offset = WRITE_CLIENT_ID_OFFSET;
760 LittleEndian::read_i64(&self.data[offset..offset + WRITE_CLIENT_ID_LENGTH])
761 }
762
763 pub fn batch_sequence(&self) -> i32 {
764 let offset = BATCH_SEQUENCE_OFFSET;
765 LittleEndian::read_i32(&self.data[offset..offset + BATCH_SEQUENCE_LENGTH])
766 }
767
768 pub fn ensure_valid(&self) -> Result<()> {
769 Ok(())
771 }
772
773 pub fn is_valid(&self) -> bool {
774 self.size_in_bytes() >= RECORD_BATCH_HEADER_SIZE
775 && self.checksum() == self.compute_checksum()
776 }
777
778 fn compute_checksum(&self) -> u32 {
779 let start = SCHEMA_ID_OFFSET;
780 crc32c(&self.data[start..])
781 }
782
783 fn attributes(&self) -> u8 {
784 self.data[ATTRIBUTES_OFFSET]
785 }
786
787 pub fn next_log_offset(&self) -> i64 {
788 self.last_log_offset() + 1
789 }
790
791 pub fn checksum(&self) -> u32 {
792 let offset = CRC_OFFSET;
793 LittleEndian::read_u32(&self.data[offset..offset + CRC_LENGTH])
794 }
795
796 pub fn schema_id(&self) -> i16 {
797 let offset = SCHEMA_ID_OFFSET;
798 LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_LENGTH])
799 }
800
801 pub fn base_log_offset(&self) -> i64 {
802 let offset = BASE_OFFSET_OFFSET;
803 LittleEndian::read_i64(&self.data[offset..offset + BASE_OFFSET_LENGTH])
804 }
805
806 pub fn last_log_offset(&self) -> i64 {
807 self.base_log_offset() + self.last_offset_delta() as i64
808 }
809
810 fn last_offset_delta(&self) -> i32 {
811 let offset = LAST_OFFSET_DELTA_OFFSET;
812 LittleEndian::read_i32(&self.data[offset..offset + LAST_OFFSET_DELTA_LENGTH])
813 }
814
815 pub fn size_in_bytes(&self) -> usize {
816 let offset = LENGTH_OFFSET;
817 LittleEndian::read_i32(&self.data[offset..offset + LENGTH_LENGTH]) as usize + LOG_OVERHEAD
818 }
819
820 pub fn record_count(&self) -> i32 {
821 let offset = RECORDS_COUNT_OFFSET;
822 LittleEndian::read_i32(&self.data[offset..offset + RECORDS_COUNT_LENGTH])
823 }
824
825 pub fn records(&self, read_context: &ReadContext) -> Result<LogRecordIterator> {
826 if self.record_count() == 0 {
827 return Ok(LogRecordIterator::empty());
828 }
829
830 let data = &self.data[RECORDS_OFFSET..];
831
832 let record_batch = read_context.record_batch(data)?;
833 let arrow_reader = ArrowReader::new(Arc::new(record_batch));
834 let log_record_iterator = LogRecordIterator::Arrow(ArrowLogRecordIterator {
835 reader: arrow_reader,
836 base_offset: self.base_log_offset(),
837 timestamp: self.commit_timestamp(),
838 row_id: 0,
839 change_type: ChangeType::AppendOnly,
840 });
841
842 Ok(log_record_iterator)
843 }
844
845 pub fn records_for_remote_log(&self, read_context: &ReadContext) -> Result<LogRecordIterator> {
846 if self.record_count() == 0 {
847 return Ok(LogRecordIterator::empty());
848 }
849
850 let data = &self.data[RECORDS_OFFSET..];
851
852 let record_batch = read_context.record_batch_for_remote_log(data)?;
853 let log_record_iterator = match record_batch {
854 None => LogRecordIterator::empty(),
855 Some(record_batch) => {
856 let arrow_reader = ArrowReader::new(Arc::new(record_batch));
857 LogRecordIterator::Arrow(ArrowLogRecordIterator {
858 reader: arrow_reader,
859 base_offset: self.base_log_offset(),
860 timestamp: self.commit_timestamp(),
861 row_id: 0,
862 change_type: ChangeType::AppendOnly,
863 })
864 }
865 };
866 Ok(log_record_iterator)
867 }
868
869 pub fn record_batch(&self, read_context: &ReadContext) -> Result<RecordBatch> {
873 if self.record_count() == 0 {
874 return Ok(RecordBatch::new_empty(read_context.target_schema.clone()));
876 }
877
878 let data = self
879 .data
880 .get(RECORDS_OFFSET..)
881 .ok_or_else(|| Error::UnexpectedError {
882 message: format!(
883 "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}",
884 self.data.len(),
885 RECORDS_OFFSET
886 ),
887 source: None,
888 })?;
889 read_context.record_batch(data)
890 }
891}
892
893fn parse_ipc_message(
913 data: &[u8],
914) -> Result<(
915 arrow::ipc::RecordBatch<'_>,
916 Buffer,
917 arrow::ipc::MetadataVersion,
918)> {
919 const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
920
921 if data.len() < 8 {
922 Err(ParseError(format!("Invalid data length: {}", data.len())))?
923 }
924
925 let continuation = LittleEndian::read_u32(&data[0..4]);
926 let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize;
927
928 if continuation != CONTINUATION_MARKER {
929 Err(ParseError(format!(
930 "Invalid continuation marker: {continuation}"
931 )))?
932 }
933
934 if data.len() < 8 + metadata_size {
935 Err(ParseError(format!(
936 "Invalid data length. Remaining data length {} is shorter than specified size {}",
937 data.len() - 8,
938 metadata_size
939 )))?
940 }
941
942 let metadata_bytes = &data[8..8 + metadata_size];
943 let message = root_as_message(metadata_bytes).map_err(|err| ParseError(err.to_string()))?;
944 let batch_metadata = message
945 .header_as_record_batch()
946 .ok_or(ParseError(String::from("Not a record batch")))?;
947
948 let metadata_padded_size = (metadata_size + 7) & !7;
949 let body_start = 8 + metadata_padded_size;
950 let body_data = &data[body_start..];
951 let body_buffer = Buffer::from(body_data);
952
953 Ok((batch_metadata, body_buffer, message.version()))
954}
955
956pub fn to_arrow_schema(fluss_schema: &RowType) -> Result<SchemaRef> {
957 let fields: Result<Vec<Field>> = fluss_schema
958 .fields()
959 .iter()
960 .map(|f| {
961 Ok(Field::new(
962 f.name(),
963 to_arrow_type(f.data_type())?,
964 f.data_type().is_nullable(),
965 ))
966 })
967 .collect();
968
969 Ok(SchemaRef::new(arrow_schema::Schema::new(fields?)))
970}
971
972pub fn to_arrow_type(fluss_type: &DataType) -> Result<ArrowDataType> {
973 Ok(match fluss_type {
974 DataType::Boolean(_) => ArrowDataType::Boolean,
975 DataType::TinyInt(_) => ArrowDataType::Int8,
976 DataType::SmallInt(_) => ArrowDataType::Int16,
977 DataType::BigInt(_) => ArrowDataType::Int64,
978 DataType::Int(_) => ArrowDataType::Int32,
979 DataType::Float(_) => ArrowDataType::Float32,
980 DataType::Double(_) => ArrowDataType::Float64,
981 DataType::Char(_) => ArrowDataType::Utf8,
982 DataType::String(_) => ArrowDataType::Utf8,
983 DataType::Decimal(decimal_type) => {
984 let precision =
985 decimal_type
986 .precision()
987 .try_into()
988 .map_err(|_| Error::IllegalArgument {
989 message: format!(
990 "Decimal precision {} exceeds Arrow's maximum (u8::MAX)",
991 decimal_type.precision()
992 ),
993 })?;
994 let scale = decimal_type
995 .scale()
996 .try_into()
997 .map_err(|_| Error::IllegalArgument {
998 message: format!(
999 "Decimal scale {} exceeds Arrow's maximum (i8::MAX)",
1000 decimal_type.scale()
1001 ),
1002 })?;
1003 ArrowDataType::Decimal128(precision, scale)
1004 }
1005 DataType::Date(_) => ArrowDataType::Date32,
1006 DataType::Time(time_type) => match time_type.precision() {
1007 0 => ArrowDataType::Time32(arrow_schema::TimeUnit::Second),
1008 1..=3 => ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond),
1009 4..=6 => ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond),
1010 7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond),
1011 invalid => {
1012 return Err(Error::IllegalArgument {
1013 message: format!("Invalid precision {invalid} for TimeType (must be 0-9)"),
1014 });
1015 }
1016 },
1017 DataType::Timestamp(timestamp_type) => match timestamp_type.precision() {
1018 0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None),
1019 1..=3 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
1020 4..=6 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
1021 7..=9 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1022 invalid => {
1023 return Err(Error::IllegalArgument {
1024 message: format!("Invalid precision {invalid} for TimestampType (must be 0-9)"),
1025 });
1026 }
1027 },
1028 DataType::TimestampLTz(timestamp_ltz_type) => match timestamp_ltz_type.precision() {
1029 0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None),
1030 1..=3 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
1031 4..=6 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
1032 7..=9 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1033 invalid => {
1034 return Err(Error::IllegalArgument {
1035 message: format!(
1036 "Invalid precision {invalid} for TimestampLTzType (must be 0-9)"
1037 ),
1038 });
1039 }
1040 },
1041 DataType::Bytes(_) => ArrowDataType::Binary,
1042 DataType::Binary(binary_type) => {
1043 let length = binary_type
1044 .length()
1045 .try_into()
1046 .map_err(|_| Error::IllegalArgument {
1047 message: format!(
1048 "Binary length {} exceeds Arrow's maximum (i32::MAX)",
1049 binary_type.length()
1050 ),
1051 })?;
1052 ArrowDataType::FixedSizeBinary(length)
1053 }
1054 DataType::Array(array_type) => ArrowDataType::List(
1055 Field::new_list_field(
1056 to_arrow_type(array_type.get_element_type())?,
1057 fluss_type.is_nullable(),
1058 )
1059 .into(),
1060 ),
1061 DataType::Map(map_type) => {
1062 let key_type = to_arrow_type(map_type.key_type())?;
1063 let value_type = to_arrow_type(map_type.value_type())?;
1064 let entry_fields = vec![
1065 Field::new("key", key_type, map_type.key_type().is_nullable()),
1066 Field::new("value", value_type, map_type.value_type().is_nullable()),
1067 ];
1068 ArrowDataType::Map(
1069 Arc::new(Field::new(
1070 "entries",
1071 ArrowDataType::Struct(arrow_schema::Fields::from(entry_fields)),
1072 fluss_type.is_nullable(),
1073 )),
1074 false,
1075 )
1076 }
1077 DataType::Row(row_type) => {
1078 let fields: Result<Vec<Field>> = row_type
1079 .fields()
1080 .iter()
1081 .map(|f| {
1082 Ok(Field::new(
1083 f.name(),
1084 to_arrow_type(f.data_type())?,
1085 f.data_type().is_nullable(),
1086 ))
1087 })
1088 .collect();
1089 ArrowDataType::Struct(arrow_schema::Fields::from(fields?))
1090 }
1091 })
1092}
1093
1094#[derive(Clone)]
1095pub struct ReadContext {
1096 target_schema: SchemaRef,
1097 full_schema: SchemaRef,
1098 projection: Option<Projection>,
1099 is_from_remote: bool,
1100}
1101
1102#[derive(Clone)]
1103struct Projection {
1104 ordered_schema: SchemaRef,
1105 projected_fields: Vec<usize>,
1106 ordered_fields: Vec<usize>,
1107
1108 reordering_indexes: Vec<usize>,
1109 reordering_needed: bool,
1110}
1111
1112impl ReadContext {
1113 pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext {
1114 ReadContext {
1115 target_schema: arrow_schema.clone(),
1116 full_schema: arrow_schema,
1117 projection: None,
1118 is_from_remote,
1119 }
1120 }
1121
1122 pub fn with_projection_pushdown(
1123 arrow_schema: SchemaRef,
1124 projected_fields: Vec<usize>,
1125 is_from_remote: bool,
1126 ) -> Result<ReadContext> {
1127 Self::validate_projection(&arrow_schema, projected_fields.as_slice())?;
1128 let target_schema =
1129 Self::project_schema(arrow_schema.clone(), projected_fields.as_slice())?;
1130 let (need_do_reorder, sorted_fields) = {
1133 if !is_from_remote {
1136 let mut sorted_fields = projected_fields.clone();
1137 sorted_fields.sort_unstable();
1138 (!sorted_fields.eq(&projected_fields), sorted_fields)
1139 } else {
1140 (false, vec![])
1143 }
1144 };
1145
1146 let project = {
1147 if need_do_reorder {
1148 let mut reordering_indexes = Vec::with_capacity(projected_fields.len());
1151 for &original_idx in &projected_fields {
1152 let pos = sorted_fields.binary_search(&original_idx).map_err(|_| {
1153 IllegalArgument {
1154 message: format!(
1155 "Projection index {original_idx} is invalid for the current schema."
1156 ),
1157 }
1158 })?;
1159 reordering_indexes.push(pos);
1160 }
1161 Projection {
1162 ordered_schema: Self::project_schema(
1163 arrow_schema.clone(),
1164 sorted_fields.as_slice(),
1165 )?,
1166 projected_fields,
1167 ordered_fields: sorted_fields,
1168 reordering_indexes,
1169 reordering_needed: true,
1170 }
1171 } else {
1172 Projection {
1173 ordered_schema: Self::project_schema(
1174 arrow_schema.clone(),
1175 projected_fields.as_slice(),
1176 )?,
1177 ordered_fields: projected_fields.clone(),
1178 projected_fields,
1179 reordering_indexes: vec![],
1180 reordering_needed: false,
1181 }
1182 }
1183 };
1184
1185 Ok(ReadContext {
1186 target_schema,
1187 full_schema: arrow_schema,
1188 projection: Some(project),
1189 is_from_remote,
1190 })
1191 }
1192
1193 fn validate_projection(schema: &SchemaRef, projected_fields: &[usize]) -> Result<()> {
1194 let field_count = schema.fields().len();
1195 for &index in projected_fields {
1196 if index >= field_count {
1197 return Err(IllegalArgument {
1198 message: format!(
1199 "Projection index {index} is out of bounds for schema with {field_count} fields."
1200 ),
1201 });
1202 }
1203 }
1204 Ok(())
1205 }
1206
1207 pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> Result<SchemaRef> {
1208 Ok(SchemaRef::new(schema.project(projected_fields).map_err(
1209 |e| IllegalArgument {
1210 message: format!("Invalid projection: {e}"),
1211 },
1212 )?))
1213 }
1214
1215 pub fn project_fields(&self) -> Option<&[usize]> {
1216 self.projection
1217 .as_ref()
1218 .map(|p| p.projected_fields.as_slice())
1219 }
1220
1221 pub fn project_fields_in_order(&self) -> Option<&[usize]> {
1222 self.projection
1223 .as_ref()
1224 .map(|p| p.ordered_fields.as_slice())
1225 }
1226
1227 pub fn record_batch(&self, data: &[u8]) -> Result<RecordBatch> {
1228 let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
1229
1230 let resolve_schema = {
1231 if self.is_from_remote {
1233 self.full_schema.clone()
1234 } else {
1235 match self.projection {
1239 Some(ref projection) => {
1240 projection.ordered_schema.clone()
1242 }
1243 None => {
1244 self.target_schema.clone()
1246 }
1247 }
1248 }
1249 };
1250
1251 let record_batch = read_record_batch(
1252 &body_buffer,
1253 batch_metadata,
1254 resolve_schema,
1255 &HashMap::new(),
1256 None,
1257 &version,
1258 )?;
1259
1260 let record_batch = match &self.projection {
1261 Some(projection) => {
1262 let reordered_columns = {
1263 if self.is_from_remote {
1265 Some(&projection.projected_fields)
1266 } else if projection.reordering_needed {
1267 Some(&projection.reordering_indexes)
1268 } else {
1269 None
1270 }
1271 };
1272 match reordered_columns {
1273 Some(reordered_columns) => {
1274 let arrow_columns = reordered_columns
1275 .iter()
1276 .map(|&idx| record_batch.column(idx).clone())
1277 .collect();
1278 RecordBatch::try_new(self.target_schema.clone(), arrow_columns)?
1279 }
1280 _ => record_batch,
1281 }
1282 }
1283 _ => record_batch,
1284 };
1285 Ok(record_batch)
1286 }
1287
1288 pub fn record_batch_for_remote_log(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
1289 let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
1290
1291 let record_batch = read_record_batch(
1292 &body_buffer,
1293 batch_metadata,
1294 self.full_schema.clone(),
1295 &HashMap::new(),
1296 None,
1297 &version,
1298 )?;
1299
1300 let record_batch = match &self.projection {
1301 Some(projection) => {
1302 let projected_columns: Vec<_> = projection
1303 .projected_fields
1304 .iter()
1305 .map(|&idx| record_batch.column(idx).clone())
1306 .collect();
1307 RecordBatch::try_new(self.target_schema.clone(), projected_columns)?
1308 }
1309 None => record_batch,
1310 };
1311 Ok(Some(record_batch))
1312 }
1313}
1314
1315pub enum LogRecordIterator {
1316 Empty,
1317 Arrow(ArrowLogRecordIterator),
1318}
1319
1320impl LogRecordIterator {
1321 pub fn empty() -> Self {
1322 LogRecordIterator::Empty
1323 }
1324}
1325
1326impl Iterator for LogRecordIterator {
1327 type Item = ScanRecord;
1328
1329 fn next(&mut self) -> Option<Self::Item> {
1330 match self {
1331 LogRecordIterator::Empty => None,
1332 LogRecordIterator::Arrow(iter) => iter.next(),
1333 }
1334 }
1335}
1336
1337pub struct ArrowLogRecordIterator {
1338 reader: ArrowReader,
1339 base_offset: i64,
1340 timestamp: i64,
1341 row_id: usize,
1342 change_type: ChangeType,
1343}
1344
1345#[allow(dead_code)]
1346impl ArrowLogRecordIterator {
1347 fn new(reader: ArrowReader, base_offset: i64, timestamp: i64, change_type: ChangeType) -> Self {
1348 Self {
1349 reader,
1350 base_offset,
1351 timestamp,
1352 row_id: 0,
1353 change_type,
1354 }
1355 }
1356}
1357
1358impl Iterator for ArrowLogRecordIterator {
1359 type Item = ScanRecord;
1360
1361 fn next(&mut self) -> Option<Self::Item> {
1362 if self.row_id >= self.reader.row_count() {
1363 return None;
1364 }
1365
1366 let columnar_row = self.reader.read(self.row_id);
1367 let scan_record = ScanRecord::new(
1368 columnar_row,
1369 self.base_offset + self.row_id as i64,
1370 self.timestamp,
1371 self.change_type,
1372 );
1373 self.row_id += 1;
1374 Some(scan_record)
1375 }
1376}
1377
1378pub struct ArrowReader {
1379 record_batch: Arc<RecordBatch>,
1380}
1381
1382impl ArrowReader {
1383 pub fn new(record_batch: Arc<RecordBatch>) -> Self {
1384 ArrowReader { record_batch }
1385 }
1386
1387 pub fn row_count(&self) -> usize {
1388 self.record_batch.num_rows()
1389 }
1390
1391 pub fn read(&self, row_id: usize) -> ColumnarRow {
1392 ColumnarRow::new_with_row_id(self.record_batch.clone(), row_id)
1393 }
1394}
1395pub struct MyVec<T>(pub StreamReader<T>);
1396
1397#[cfg(test)]
1398mod tests {
1399 use super::*;
1400 use crate::metadata::{DataField, DataTypes, RowType};
1401 use crate::test_utils::build_table_info;
1402
1403 #[test]
1404 fn test_to_array_type() {
1405 assert_eq!(
1406 to_arrow_type(&DataTypes::boolean()).unwrap(),
1407 ArrowDataType::Boolean
1408 );
1409 assert_eq!(
1410 to_arrow_type(&DataTypes::tinyint()).unwrap(),
1411 ArrowDataType::Int8
1412 );
1413 assert_eq!(
1414 to_arrow_type(&DataTypes::smallint()).unwrap(),
1415 ArrowDataType::Int16
1416 );
1417 assert_eq!(
1418 to_arrow_type(&DataTypes::bigint()).unwrap(),
1419 ArrowDataType::Int64
1420 );
1421 assert_eq!(
1422 to_arrow_type(&DataTypes::int()).unwrap(),
1423 ArrowDataType::Int32
1424 );
1425 assert_eq!(
1426 to_arrow_type(&DataTypes::float()).unwrap(),
1427 ArrowDataType::Float32
1428 );
1429 assert_eq!(
1430 to_arrow_type(&DataTypes::double()).unwrap(),
1431 ArrowDataType::Float64
1432 );
1433 assert_eq!(
1434 to_arrow_type(&DataTypes::char(16)).unwrap(),
1435 ArrowDataType::Utf8
1436 );
1437 assert_eq!(
1438 to_arrow_type(&DataTypes::string()).unwrap(),
1439 ArrowDataType::Utf8
1440 );
1441 assert_eq!(
1442 to_arrow_type(&DataTypes::decimal(10, 2)).unwrap(),
1443 ArrowDataType::Decimal128(10, 2)
1444 );
1445 assert_eq!(
1446 to_arrow_type(&DataTypes::date()).unwrap(),
1447 ArrowDataType::Date32
1448 );
1449 assert_eq!(
1450 to_arrow_type(&DataTypes::time()).unwrap(),
1451 ArrowDataType::Time32(arrow_schema::TimeUnit::Second)
1452 );
1453 assert_eq!(
1454 to_arrow_type(&DataTypes::time_with_precision(3)).unwrap(),
1455 ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond)
1456 );
1457 assert_eq!(
1458 to_arrow_type(&DataTypes::time_with_precision(6)).unwrap(),
1459 ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond)
1460 );
1461 assert_eq!(
1462 to_arrow_type(&DataTypes::time_with_precision(9)).unwrap(),
1463 ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond)
1464 );
1465 assert_eq!(
1466 to_arrow_type(&DataTypes::timestamp_with_precision(0)).unwrap(),
1467 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
1468 );
1469 assert_eq!(
1470 to_arrow_type(&DataTypes::timestamp_with_precision(3)).unwrap(),
1471 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
1472 );
1473 assert_eq!(
1474 to_arrow_type(&DataTypes::timestamp_with_precision(6)).unwrap(),
1475 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1476 );
1477 assert_eq!(
1478 to_arrow_type(&DataTypes::timestamp_with_precision(9)).unwrap(),
1479 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
1480 );
1481 assert_eq!(
1482 to_arrow_type(&DataTypes::timestamp_ltz_with_precision(0)).unwrap(),
1483 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
1484 );
1485 assert_eq!(
1486 to_arrow_type(&DataTypes::timestamp_ltz_with_precision(3)).unwrap(),
1487 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
1488 );
1489 assert_eq!(
1490 to_arrow_type(&DataTypes::timestamp_ltz_with_precision(6)).unwrap(),
1491 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1492 );
1493 assert_eq!(
1494 to_arrow_type(&DataTypes::timestamp_ltz_with_precision(9)).unwrap(),
1495 ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
1496 );
1497 assert_eq!(
1498 to_arrow_type(&DataTypes::bytes()).unwrap(),
1499 ArrowDataType::Binary
1500 );
1501 assert_eq!(
1502 to_arrow_type(&DataTypes::binary(16)).unwrap(),
1503 ArrowDataType::FixedSizeBinary(16)
1504 );
1505
1506 assert_eq!(
1507 to_arrow_type(&DataTypes::array(DataTypes::int())).unwrap(),
1508 ArrowDataType::List(Field::new_list_field(ArrowDataType::Int32, true).into())
1509 );
1510
1511 assert_eq!(
1512 to_arrow_type(&DataTypes::map(DataTypes::string(), DataTypes::int())).unwrap(),
1513 ArrowDataType::Map(
1514 Arc::new(Field::new(
1515 "entries",
1516 ArrowDataType::Struct(arrow_schema::Fields::from(vec![
1517 Field::new("key", ArrowDataType::Utf8, true),
1518 Field::new("value", ArrowDataType::Int32, true),
1519 ])),
1520 true,
1521 )),
1522 false,
1523 )
1524 );
1525
1526 assert_eq!(
1527 to_arrow_type(&DataTypes::row(vec![
1528 DataTypes::field("f1", DataTypes::int()),
1529 DataTypes::field("f2", DataTypes::string()),
1530 ]))
1531 .unwrap(),
1532 ArrowDataType::Struct(arrow_schema::Fields::from(vec![
1533 Field::new("f1", ArrowDataType::Int32, true),
1534 Field::new("f2", ArrowDataType::Utf8, true),
1535 ]))
1536 );
1537 }
1538
1539 #[test]
1540 fn test_parse_ipc_message() {
1541 let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]);
1542 let result = parse_ipc_message(empty_body);
1543 assert_eq!(
1544 result.unwrap_err().to_string(),
1545 String::from(
1546 "Fluss hitting Arrow error Parser error: Range [0, 4) is out of bounds.\n\n: ParseError(\"Range [0, 4) is out of bounds.\\n\\n\")."
1547 )
1548 );
1549
1550 let invalid_data = &[];
1551 assert_eq!(
1552 parse_ipc_message(invalid_data).unwrap_err().to_string(),
1553 String::from(
1554 "Fluss hitting Arrow error Parser error: Invalid data length: 0: ParseError(\"Invalid data length: 0\")."
1555 )
1556 );
1557
1558 let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001, 0x00000000]);
1559 assert_eq!(
1560 parse_ipc_message(data_with_invalid_continuation)
1561 .unwrap_err()
1562 .to_string(),
1563 String::from(
1564 "Fluss hitting Arrow error Parser error: Invalid continuation marker: 1: ParseError(\"Invalid continuation marker: 1\")."
1565 )
1566 );
1567
1568 let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000001]);
1569 assert_eq!(
1570 parse_ipc_message(data_with_invalid_length)
1571 .unwrap_err()
1572 .to_string(),
1573 String::from(
1574 "Fluss hitting Arrow error Parser error: Invalid data length. Remaining data length 0 is shorter than specified size 1: ParseError(\"Invalid data length. Remaining data length 0 is shorter than specified size 1\")."
1575 )
1576 );
1577
1578 let data_with_invalid_length = &le_bytes(&[0xFFFFFFFF, 0x00000004, 0x00000000]);
1579 assert_eq!(
1580 parse_ipc_message(data_with_invalid_length)
1581 .unwrap_err()
1582 .to_string(),
1583 String::from(
1584 "Fluss hitting Arrow error Parser error: Not a record batch: ParseError(\"Not a record batch\")."
1585 )
1586 );
1587 }
1588
1589 #[test]
1590 fn projection_rejects_out_of_bounds_index() {
1591 let row_type = RowType::new(vec![
1592 DataField::new("id", DataTypes::int(), None),
1593 DataField::new("name", DataTypes::string(), None),
1594 ]);
1595 let schema = to_arrow_schema(&row_type).unwrap();
1596 let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false);
1597
1598 assert!(matches!(result, Err(IllegalArgument { .. })));
1599 }
1600
1601 #[test]
1602 fn checksum_and_schema_id_read_minimum_header() {
1603 let mut data = vec![0u8; SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH];
1606 let crc = 0xA1B2C3D4u32;
1607 let schema_id = 42i16;
1608 LittleEndian::write_u32(&mut data[CRC_OFFSET..CRC_OFFSET + CRC_LENGTH], crc);
1609 LittleEndian::write_i16(
1610 &mut data[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH],
1611 schema_id,
1612 );
1613
1614 let batch = LogRecordBatch::new(Bytes::from(data));
1615 assert_eq!(batch.checksum(), crc);
1616 assert_eq!(batch.schema_id(), schema_id);
1617
1618 let expected = crc32c(&batch.data[SCHEMA_ID_OFFSET..]);
1619 assert_eq!(batch.compute_checksum(), expected);
1620 }
1621
1622 fn le_bytes(vals: &[u32]) -> Vec<u8> {
1623 let mut out = Vec::with_capacity(vals.len() * 4);
1624 for &v in vals {
1625 out.extend_from_slice(&v.to_le_bytes());
1626 }
1627 out
1628 }
1629
1630 #[test]
1631 fn test_temporal_and_decimal_builder_validation() {
1632 use crate::row::column_writer::ColumnWriter;
1633 use arrow::array::Array;
1634
1635 let mut writer = ColumnWriter::create(
1637 &DataTypes::decimal(10, 2),
1638 &ArrowDataType::Decimal128(10, 2),
1639 0,
1640 256,
1641 )
1642 .unwrap();
1643 let array = writer.finish();
1644 assert_eq!(array.data_type(), &ArrowDataType::Decimal128(10, 2));
1645
1646 let result = ColumnWriter::create(
1648 &DataTypes::decimal(10, 2),
1649 &ArrowDataType::Decimal128(100, 50),
1650 0,
1651 256,
1652 );
1653 assert!(result.is_err());
1654 }
1655
1656 #[test]
1657 fn test_decimal_rescaling_and_validation() -> Result<()> {
1658 use crate::row::{Datum, Decimal, GenericRow};
1659 use arrow::array::Decimal128Array;
1660 use bigdecimal::BigDecimal;
1661 use std::str::FromStr;
1662
1663 let row_type = RowType::new(vec![DataField::new(
1665 "amount",
1666 DataTypes::decimal(10, 2),
1667 None,
1668 )]);
1669 let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
1670 let decimal = Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
1671 let row = GenericRow {
1672 values: vec![Datum::Decimal(decimal)],
1673 };
1674 builder.append(&row)?;
1675 let batch = builder.build_arrow_record_batch()?;
1676 let array = batch
1677 .column(0)
1678 .as_any()
1679 .downcast_ref::<Decimal128Array>()
1680 .unwrap();
1681 assert_eq!(array.value(0), 12346); assert_eq!(array.scale(), 2);
1683
1684 let row_type = RowType::new(vec![DataField::new(
1686 "amount",
1687 DataTypes::decimal(5, 2),
1688 None,
1689 )]);
1690 let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
1691 let decimal = Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
1692 let row = GenericRow {
1693 values: vec![Datum::Decimal(decimal)],
1694 };
1695 let result = builder.append(&row);
1696 assert!(result.is_err());
1697 assert!(
1698 result
1699 .unwrap_err()
1700 .to_string()
1701 .contains("precision overflow")
1702 );
1703
1704 Ok(())
1705 }
1706
1707 #[test]
1710 fn test_file_source_streaming() -> Result<()> {
1711 use tempfile::NamedTempFile;
1712
1713 let test_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1715 let mut tmp_file = NamedTempFile::new()?;
1716 tmp_file.write_all(&test_data)?;
1717 tmp_file.flush()?;
1718
1719 let file_path = tmp_file.path().to_path_buf();
1720 let file = File::open(&file_path)?;
1721 let mut source = FileSource::new(file, 0, file_path)?;
1722
1723 let data = source.read_batch_data(0, 10)?;
1725 assert_eq!(data.to_vec(), test_data);
1726
1727 let partial = source.read_batch_data(2, 5)?;
1729 assert_eq!(partial.to_vec(), vec![3, 4, 5, 6, 7]);
1730
1731 let prefix = vec![0xFF; 100];
1733 let actual_data = vec![1, 2, 3, 4, 5];
1734 let mut tmp_file2 = NamedTempFile::new()?;
1735 tmp_file2.write_all(&prefix)?;
1736 tmp_file2.write_all(&actual_data)?;
1737 tmp_file2.flush()?;
1738
1739 let file_path2 = tmp_file2.path().to_path_buf();
1740 let file2 = File::open(&file_path2)?;
1741 let mut source2 = FileSource::new(file2, 100, file_path2)?; assert_eq!(source2.total_size(), 5); let data2 = source2.read_batch_data(0, 5)?;
1745 assert_eq!(data2.to_vec(), actual_data);
1746
1747 Ok(())
1748 }
1749
1750 #[test]
1751 fn test_all_types_end_to_end() -> Result<()> {
1752 use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, TimestampNtz};
1753 use arrow::array::{
1754 Date32Array, Decimal128Array, Int32Array, Time32MillisecondArray,
1755 Time64NanosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
1756 };
1757 use bigdecimal::BigDecimal;
1758 use std::str::FromStr;
1759
1760 let row_type = RowType::new(vec![
1762 DataField::new("id".to_string(), DataTypes::int(), None),
1763 DataField::new("amount".to_string(), DataTypes::decimal(10, 2), None),
1764 DataField::new("date".to_string(), DataTypes::date(), None),
1765 DataField::new(
1766 "time_ms".to_string(),
1767 DataTypes::time_with_precision(3),
1768 None,
1769 ),
1770 DataField::new(
1771 "time_ns".to_string(),
1772 DataTypes::time_with_precision(9),
1773 None,
1774 ),
1775 DataField::new(
1776 "ts_us".to_string(),
1777 DataTypes::timestamp_with_precision(6),
1778 None,
1779 ),
1780 DataField::new(
1781 "ts_ltz_ns".to_string(),
1782 DataTypes::timestamp_ltz_with_precision(9),
1783 None,
1784 ),
1785 ]);
1786
1787 let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
1788
1789 let row = GenericRow {
1791 values: vec![
1792 Datum::Int32(1),
1793 Datum::Decimal(Decimal::from_big_decimal(
1794 BigDecimal::from_str("123.456").unwrap(),
1795 10,
1796 3,
1797 )?),
1798 Datum::Date(Date::new(18000)),
1800 Datum::Time(Time::new(43200000)),
1802 Datum::Time(Time::new(12345)),
1804 Datum::TimestampNtz(TimestampNtz::from_millis_nanos(1609459200000, 123456)?),
1806 Datum::TimestampLtz(TimestampLtz::from_millis_nanos(1609459200000, 987654)?),
1808 ],
1809 };
1810 builder.append(&row)?;
1811
1812 let batch = builder.build_arrow_record_batch()?;
1813
1814 assert_eq!(
1816 batch
1817 .column(0)
1818 .as_any()
1819 .downcast_ref::<Int32Array>()
1820 .unwrap()
1821 .value(0),
1822 1
1823 );
1824
1825 let dec = batch
1826 .column(1)
1827 .as_any()
1828 .downcast_ref::<Decimal128Array>()
1829 .unwrap();
1830 assert_eq!(dec.value(0), 12346); assert_eq!(
1833 batch
1834 .column(2)
1835 .as_any()
1836 .downcast_ref::<Date32Array>()
1837 .unwrap()
1838 .value(0),
1839 18000
1840 );
1841
1842 assert_eq!(
1843 batch
1844 .column(3)
1845 .as_any()
1846 .downcast_ref::<Time32MillisecondArray>()
1847 .unwrap()
1848 .value(0),
1849 43200000
1850 );
1851
1852 assert_eq!(
1853 batch
1854 .column(4)
1855 .as_any()
1856 .downcast_ref::<Time64NanosecondArray>()
1857 .unwrap()
1858 .value(0),
1859 12345000000
1860 );
1861
1862 assert_eq!(
1864 batch
1865 .column(5)
1866 .as_any()
1867 .downcast_ref::<TimestampMicrosecondArray>()
1868 .unwrap()
1869 .value(0),
1870 1609459200000123
1871 );
1872
1873 assert_eq!(
1874 batch
1875 .column(6)
1876 .as_any()
1877 .downcast_ref::<TimestampNanosecondArray>()
1878 .unwrap()
1879 .value(0),
1880 1609459200000987654
1881 );
1882
1883 Ok(())
1884 }
1885
1886 #[test]
1887 fn test_log_records_batches_from_file() -> Result<()> {
1888 use crate::client::WriteRecord;
1889 use crate::compression::{
1890 ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1891 };
1892 use crate::metadata::{PhysicalTablePath, TablePath};
1893 use crate::row::GenericRow;
1894 use tempfile::NamedTempFile;
1895
1896 let row_type = RowType::new(vec![
1898 DataField::new("id".to_string(), DataTypes::int(), None),
1899 DataField::new("name".to_string(), DataTypes::string(), None),
1900 ]);
1901 let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1902 let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
1903 let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
1904
1905 let mut builder = MemoryLogRecordsArrowBuilder::new(
1906 1,
1907 &row_type,
1908 false,
1909 ArrowCompressionInfo {
1910 compression_type: ArrowCompressionType::None,
1911 compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1912 },
1913 )?;
1914
1915 let mut row = GenericRow::new(2);
1916 row.set_field(0, 1_i32);
1917 row.set_field(1, "alice");
1918 let record = WriteRecord::for_append(
1919 Arc::clone(&table_info),
1920 physical_table_path.clone(),
1921 1,
1922 &row,
1923 );
1924 builder.append(&record)?;
1925
1926 let mut row2 = GenericRow::new(2);
1927 row2.set_field(0, 2_i32);
1928 row2.set_field(1, "bob");
1929 let record2 =
1930 WriteRecord::for_append(Arc::clone(&table_info), physical_table_path, 2, &row2);
1931 builder.append(&record2)?;
1932
1933 let data = builder.build()?;
1934
1935 let mut tmp_file = NamedTempFile::new()?;
1937 tmp_file.write_all(&data)?;
1938 tmp_file.flush()?;
1939
1940 let file_path = tmp_file.path().to_path_buf();
1942 let file = File::open(&file_path)?;
1943 let mut batches = LogRecordsBatches::from_file(file, 0, file_path)?;
1944
1945 let batch = batches.next().expect("Should have at least one batch")?;
1947 assert!(batch.size_in_bytes() > 0);
1948 assert_eq!(batch.record_count(), 2);
1949
1950 Ok(())
1951 }
1952}