Skip to main content

fluss/record/
arrow.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::{LogWriteRecord, Record, WriteRecord};
19use crate::compression::ArrowCompressionInfo;
20use crate::error::{Error, Result};
21use crate::metadata::{DataType, RowType};
22use crate::record::{ChangeType, ScanRecord};
23use crate::row::column_writer::ColumnWriter;
24use crate::row::{ColumnarRow, InternalRow};
25use arrow::array::{ArrayBuilder, ArrayRef};
26use arrow::{
27    array::RecordBatch,
28    buffer::Buffer,
29    ipc::{
30        reader::{StreamReader, read_record_batch},
31        root_as_message,
32        writer::StreamWriter,
33    },
34};
35use arrow_schema::ArrowError::ParseError;
36use arrow_schema::SchemaRef;
37use arrow_schema::{DataType as ArrowDataType, Field};
38use byteorder::WriteBytesExt;
39use byteorder::{ByteOrder, LittleEndian};
40use bytes::Bytes;
41use crc32c::crc32c;
42use std::{
43    collections::HashMap,
44    fs::File,
45    io::{Cursor, Read, Seek, SeekFrom, Write},
46    path::PathBuf,
47    sync::Arc,
48};
49
50use crate::error::Error::IllegalArgument;
51use arrow::ipc::writer::IpcWriteOptions;
52/// const for record batch
53pub const BASE_OFFSET_LENGTH: usize = 8;
54pub const LENGTH_LENGTH: usize = 4;
55pub const MAGIC_LENGTH: usize = 1;
56pub const COMMIT_TIMESTAMP_LENGTH: usize = 8;
57pub const CRC_LENGTH: usize = 4;
58pub const SCHEMA_ID_LENGTH: usize = 2;
59pub const ATTRIBUTE_LENGTH: usize = 1;
60pub const LAST_OFFSET_DELTA_LENGTH: usize = 4;
61pub const WRITE_CLIENT_ID_LENGTH: usize = 8;
62pub const BATCH_SEQUENCE_LENGTH: usize = 4;
63pub const RECORDS_COUNT_LENGTH: usize = 4;
64
65pub const BASE_OFFSET_OFFSET: usize = 0;
66pub const LENGTH_OFFSET: usize = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH;
67pub const MAGIC_OFFSET: usize = LENGTH_OFFSET + LENGTH_LENGTH;
68pub const COMMIT_TIMESTAMP_OFFSET: usize = MAGIC_OFFSET + MAGIC_LENGTH;
69pub const CRC_OFFSET: usize = COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH;
70pub const SCHEMA_ID_OFFSET: usize = CRC_OFFSET + CRC_LENGTH;
71pub const ATTRIBUTES_OFFSET: usize = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH;
72pub const LAST_OFFSET_DELTA_OFFSET: usize = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
73pub const WRITE_CLIENT_ID_OFFSET: usize = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH;
74pub const BATCH_SEQUENCE_OFFSET: usize = WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH;
75pub const RECORDS_COUNT_OFFSET: usize = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH;
76pub const RECORDS_OFFSET: usize = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH;
77
78pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET;
79pub const ARROW_CHANGETYPE_OFFSET: usize = RECORD_BATCH_HEADER_SIZE;
80pub const LOG_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH;
81
82/// Maximum batch size matches Java's Integer.MAX_VALUE limit.
83/// Java uses int type for batch size, so max value is 2^31 - 1 = 2,147,483,647 bytes (~2GB).
84/// This is the implicit limit in FileLogRecords.java and other Java components.
85pub const MAX_BATCH_SIZE: usize = i32::MAX as usize; // 2,147,483,647 bytes (~2GB)
86
87/// const for record
88/// The "magic" values.
89#[derive(Debug, Clone, Copy)]
90pub enum LogMagicValue {
91    V0 = 0,
92}
93
94/// Safely convert batch size from i32 to usize with validation.
95///
96/// Validates that:
97/// - batch_size_bytes is non-negative
98/// - batch_size_bytes + LOG_OVERHEAD doesn't overflow
99/// - Result is within reasonable bounds
100fn validate_batch_size(batch_size_bytes: i32) -> Result<usize> {
101    // Check for negative size (corrupted data)
102    if batch_size_bytes < 0 {
103        return Err(Error::UnexpectedError {
104            message: format!("Invalid negative batch size: {batch_size_bytes}"),
105            source: None,
106        });
107    }
108
109    let batch_size_u = batch_size_bytes as usize;
110
111    // Check for overflow when adding LOG_OVERHEAD
112    let total_size =
113        batch_size_u
114            .checked_add(LOG_OVERHEAD)
115            .ok_or_else(|| Error::UnexpectedError {
116                message: format!(
117                    "Batch size {batch_size_u} + LOG_OVERHEAD {LOG_OVERHEAD} would overflow"
118                ),
119                source: None,
120            })?;
121
122    // Sanity check: reject unreasonably large batches
123    if total_size > MAX_BATCH_SIZE {
124        return Err(Error::UnexpectedError {
125            message: format!(
126                "Batch size {total_size} exceeds maximum allowed size {MAX_BATCH_SIZE}"
127            ),
128            source: None,
129        });
130    }
131
132    Ok(total_size)
133}
134
135// NOTE: Rust layout/offsets currently match Java only for V0.
136// TODO: Add V1 layout/offsets to keep parity with Java's V1 format.
137pub const CURRENT_LOG_MAGIC_VALUE: u8 = LogMagicValue::V0 as u8;
138
139/// Value used if writer ID is not available or non-idempotent.
140pub const NO_WRITER_ID: i64 = -1;
141
142/// Value used if batch sequence is not available.
143pub const NO_BATCH_SEQUENCE: i32 = -1;
144
145pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
146
147// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead of a hard record cap.
148pub const DEFAULT_MAX_RECORD: i32 = 256;
149
150pub struct MemoryLogRecordsArrowBuilder {
151    base_log_offset: i64,
152    schema_id: i32,
153    magic: u8,
154    writer_id: i64,
155    batch_sequence: i32,
156    arrow_record_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder>,
157    is_closed: bool,
158    arrow_compression_info: ArrowCompressionInfo,
159}
160
161pub trait ArrowRecordBatchInnerBuilder: Send + Sync {
162    fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>>;
163
164    fn append(&mut self, row: &dyn InternalRow) -> Result<bool>;
165
166    fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool>;
167
168    fn schema(&self) -> SchemaRef;
169
170    fn records_count(&self) -> i32;
171
172    fn is_full(&self) -> bool;
173
174    /// Get an estimate of the size in bytes of the arrow data.
175    fn estimated_size_in_bytes(&self) -> usize;
176}
177
178#[derive(Default)]
179pub struct PrebuiltRecordBatchBuilder {
180    arrow_record_batch: Option<Arc<RecordBatch>>,
181    records_count: i32,
182}
183
184impl ArrowRecordBatchInnerBuilder for PrebuiltRecordBatchBuilder {
185    fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
186        Ok(self.arrow_record_batch.as_ref().unwrap().clone())
187    }
188
189    fn append(&mut self, _row: &dyn InternalRow) -> Result<bool> {
190        // append one single row is not supported, return false directly
191        Ok(false)
192    }
193
194    fn append_batch(&mut self, record_batch: Arc<RecordBatch>) -> Result<bool> {
195        if self.arrow_record_batch.is_some() {
196            return Ok(false);
197        }
198        self.records_count = record_batch.num_rows() as i32;
199        self.arrow_record_batch = Some(record_batch);
200        Ok(true)
201    }
202
203    fn schema(&self) -> SchemaRef {
204        self.arrow_record_batch.as_ref().unwrap().schema()
205    }
206
207    fn records_count(&self) -> i32 {
208        self.records_count
209    }
210
211    fn is_full(&self) -> bool {
212        // full if has one record batch
213        self.arrow_record_batch.is_some()
214    }
215
216    fn estimated_size_in_bytes(&self) -> usize {
217        self.arrow_record_batch
218            .as_ref()
219            .map(|batch| batch.get_array_memory_size())
220            .unwrap_or(0)
221    }
222}
223
224pub struct RowAppendRecordBatchBuilder {
225    table_schema: SchemaRef,
226    column_writers: Vec<ColumnWriter>,
227    records_count: i32,
228}
229
230impl RowAppendRecordBatchBuilder {
231    pub fn new(row_type: &RowType) -> Result<Self> {
232        let capacity = DEFAULT_MAX_RECORD as usize;
233        let schema_ref = to_arrow_schema(row_type)?;
234        let writers: Result<Vec<_>> = row_type
235            .fields()
236            .iter()
237            .enumerate()
238            .map(|(pos, field)| {
239                let arrow_type = schema_ref.field(pos).data_type();
240                ColumnWriter::create(field.data_type(), arrow_type, pos, capacity)
241            })
242            .collect();
243        Ok(Self {
244            table_schema: schema_ref.clone(),
245            column_writers: writers?,
246            records_count: 0,
247        })
248    }
249    /// Appends a row to the builder.
250    pub fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
251        ArrowRecordBatchInnerBuilder::append(self, row)
252    }
253
254    /// Builds the final Arrow RecordBatch.
255    pub fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
256        ArrowRecordBatchInnerBuilder::build_arrow_record_batch(self)
257    }
258}
259
260impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
261    fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
262        let arrays: Result<Vec<ArrayRef>> = self
263            .column_writers
264            .iter_mut()
265            .enumerate()
266            .map(|(idx, writer)| {
267                let array = writer.finish();
268                let expected_type = self.table_schema.field(idx).data_type();
269
270                // Validate array type matches schema
271                if array.data_type() != expected_type {
272                    return Err(Error::IllegalArgument {
273                        message: format!(
274                            "Builder type mismatch at column {}: expected {:?}, got {:?}",
275                            idx,
276                            expected_type,
277                            array.data_type()
278                        ),
279                    });
280                }
281
282                Ok(array)
283            })
284            .collect();
285
286        Ok(Arc::new(RecordBatch::try_new(
287            self.table_schema.clone(),
288            arrays?,
289        )?))
290    }
291
292    fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
293        for writer in &mut self.column_writers {
294            writer.write_field(row)?;
295        }
296        self.records_count += 1;
297        Ok(true)
298    }
299
300    fn append_batch(&mut self, _record_batch: Arc<RecordBatch>) -> Result<bool> {
301        Ok(false)
302    }
303
304    fn schema(&self) -> SchemaRef {
305        self.table_schema.clone()
306    }
307
308    fn records_count(&self) -> i32 {
309        self.records_count
310    }
311
312    fn is_full(&self) -> bool {
313        self.records_count() >= DEFAULT_MAX_RECORD
314    }
315
316    fn estimated_size_in_bytes(&self) -> usize {
317        // Returns the uncompressed Arrow array memory size (same as Java's arrowWriter.estimatedSizeInBytes()).
318        // Note: This is the size before compression. After build(), the actual size may be smaller
319        // if compression is enabled.
320        self.column_writers
321            .iter()
322            .map(|writer| writer.finish_cloned().get_array_memory_size())
323            .sum()
324    }
325}
326
327impl MemoryLogRecordsArrowBuilder {
328    pub fn new(
329        schema_id: i32,
330        row_type: &RowType,
331        to_append_record_batch: bool,
332        arrow_compression_info: ArrowCompressionInfo,
333    ) -> Result<Self> {
334        let arrow_batch_builder: Box<dyn ArrowRecordBatchInnerBuilder> = {
335            if to_append_record_batch {
336                Box::new(PrebuiltRecordBatchBuilder::default())
337            } else {
338                Box::new(RowAppendRecordBatchBuilder::new(row_type)?)
339            }
340        };
341        Ok(MemoryLogRecordsArrowBuilder {
342            base_log_offset: BUILDER_DEFAULT_OFFSET,
343            schema_id,
344            magic: CURRENT_LOG_MAGIC_VALUE,
345            writer_id: NO_WRITER_ID,
346            batch_sequence: NO_BATCH_SEQUENCE,
347            is_closed: false,
348            arrow_record_batch_builder: arrow_batch_builder,
349            arrow_compression_info,
350        })
351    }
352
353    pub fn append(&mut self, record: &WriteRecord) -> Result<bool> {
354        match &record.record() {
355            Record::Log(log_write_record) => match log_write_record {
356                LogWriteRecord::InternalRow(row) => {
357                    Ok(self.arrow_record_batch_builder.append(*row)?)
358                }
359                LogWriteRecord::RecordBatch(record_batch) => Ok(self
360                    .arrow_record_batch_builder
361                    .append_batch(record_batch.clone())?),
362            },
363            Record::Kv(_) => Err(Error::UnsupportedOperation {
364                message: "Only LogRecord is supported to append".to_string(),
365            }),
366        }
367        // todo: consider write other change type
368    }
369
370    pub fn is_full(&self) -> bool {
371        self.arrow_record_batch_builder.records_count() >= DEFAULT_MAX_RECORD
372    }
373
374    pub fn is_closed(&self) -> bool {
375        self.is_closed
376    }
377
378    pub fn close(&mut self) {
379        self.is_closed = true;
380    }
381
382    pub fn build(&mut self) -> Result<Vec<u8>> {
383        // serialize arrow batch
384        let mut arrow_batch_bytes = vec![];
385        let table_schema = self.arrow_record_batch_builder.schema();
386        let compression_type = self.arrow_compression_info.get_compression_type();
387        let write_option =
388            IpcWriteOptions::try_with_compression(IpcWriteOptions::default(), compression_type);
389        let mut writer = StreamWriter::try_new_with_options(
390            &mut arrow_batch_bytes,
391            &table_schema,
392            write_option?,
393        )?;
394
395        // get header len
396        let header = writer.get_ref().len();
397        let record_batch = self.arrow_record_batch_builder.build_arrow_record_batch()?;
398        writer.write(record_batch.as_ref())?;
399        // get real arrow batch bytes
400        let real_arrow_batch_bytes = &arrow_batch_bytes[header..];
401
402        // now, write batch header and arrow batch
403        let mut batch_bytes = vec![0u8; RECORD_BATCH_HEADER_SIZE + real_arrow_batch_bytes.len()];
404        // write batch header
405        self.write_batch_header(&mut batch_bytes[..])?;
406
407        // write arrow batch bytes
408        let mut cursor = Cursor::new(&mut batch_bytes[..]);
409        cursor.set_position(RECORD_BATCH_HEADER_SIZE as u64);
410        cursor.write_all(real_arrow_batch_bytes)?;
411
412        let calcute_crc_bytes = &cursor.get_ref()[SCHEMA_ID_OFFSET..];
413        // then update crc
414        let crc = crc32c(calcute_crc_bytes);
415        cursor.set_position(CRC_OFFSET as u64);
416        cursor.write_u32::<LittleEndian>(crc)?;
417
418        Ok(batch_bytes.to_vec())
419    }
420
421    fn write_batch_header(&self, buffer: &mut [u8]) -> Result<()> {
422        let total_len = buffer.len();
423        let mut cursor = Cursor::new(buffer);
424        cursor.write_i64::<LittleEndian>(self.base_log_offset)?;
425        cursor
426            .write_i32::<LittleEndian>((total_len - BASE_OFFSET_LENGTH - LENGTH_LENGTH) as i32)?;
427        cursor.write_u8(self.magic)?;
428        cursor.write_i64::<LittleEndian>(0)?; // timestamp placeholder
429        cursor.write_u32::<LittleEndian>(0)?; // crc placeholder
430        cursor.write_i16::<LittleEndian>(self.schema_id as i16)?;
431
432        let record_count = self.arrow_record_batch_builder.records_count();
433        // todo: curerntly, always is append only
434        let append_only = true;
435        cursor.write_u8(if append_only { 1 } else { 0 })?;
436        cursor.write_i32::<LittleEndian>(if record_count > 0 {
437            record_count - 1
438        } else {
439            0
440        })?;
441
442        cursor.write_i64::<LittleEndian>(self.writer_id)?;
443        cursor.write_i32::<LittleEndian>(self.batch_sequence)?;
444        cursor.write_i32::<LittleEndian>(record_count)?;
445        Ok(())
446    }
447
448    pub fn set_writer_state(&mut self, writer_id: i64, batch_base_sequence: i32) {
449        self.writer_id = writer_id;
450        self.batch_sequence = batch_base_sequence;
451    }
452
453    /// Get an estimate of the number of bytes written to the underlying buffer.
454    /// This includes the batch header size plus the estimated arrow data size.
455    pub fn estimated_size_in_bytes(&self) -> usize {
456        RECORD_BATCH_HEADER_SIZE + self.arrow_record_batch_builder.estimated_size_in_bytes()
457    }
458}
459
460pub trait ToArrow {
461    fn append_to(&self, builder: &mut dyn ArrayBuilder) -> Result<()>;
462}
463
464/// In-memory log record source.
465/// Used for local tablet server fetches (existing path).
466struct MemorySource {
467    data: Bytes,
468}
469
470impl MemorySource {
471    fn new(data: Vec<u8>) -> Self {
472        Self {
473            data: Bytes::from(data),
474        }
475    }
476
477    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
478        if pos + LOG_OVERHEAD > self.data.len() {
479            return Err(Error::UnexpectedError {
480                message: format!(
481                    "Position {} + LOG_OVERHEAD {} exceeds data size {}",
482                    pos,
483                    LOG_OVERHEAD,
484                    self.data.len()
485                ),
486                source: None,
487            });
488        }
489
490        let base_offset = LittleEndian::read_i64(&self.data[pos + BASE_OFFSET_OFFSET..]);
491        let batch_size_bytes = LittleEndian::read_i32(&self.data[pos + LENGTH_OFFSET..]);
492
493        // Validate batch size to prevent integer overflow and corruption
494        let batch_size = validate_batch_size(batch_size_bytes)?;
495
496        Ok((base_offset, batch_size))
497    }
498
499    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
500        if pos + size > self.data.len() {
501            return Err(Error::UnexpectedError {
502                message: format!(
503                    "Read beyond data size: {} + {} > {}",
504                    pos,
505                    size,
506                    self.data.len()
507                ),
508                source: None,
509            });
510        }
511        // Zero-copy slice (Bytes is Arc-based)
512        Ok(self.data.slice(pos..pos + size))
513    }
514
515    fn total_size(&self) -> usize {
516        self.data.len()
517    }
518}
519
520/// RAII guard that deletes a file when dropped.
521/// Used to ensure file deletion happens AFTER the file handle is closed.
522struct FileCleanupGuard {
523    file_path: PathBuf,
524}
525
526impl Drop for FileCleanupGuard {
527    fn drop(&mut self) {
528        // File handle is already closed (this guard drops after the file field)
529        if let Err(e) = std::fs::remove_file(&self.file_path) {
530            log::warn!(
531                "Failed to delete remote log file {}: {}",
532                self.file_path.display(),
533                e
534            );
535        } else {
536            log::debug!("Deleted remote log file: {}", self.file_path.display());
537        }
538    }
539}
540
541/// File-backed log record source.
542/// Used for remote log segments downloaded to local disk.
543/// Streams data on-demand instead of loading entire file into memory.
544///
545/// Uses seek + read_exact for cross-platform compatibility.
546/// Access pattern is sequential iteration (single consumer).
547struct FileSource {
548    file: File,
549    file_size: usize,
550    base_offset: usize,
551    _cleanup: Option<FileCleanupGuard>, // Drops AFTER file (field order matters!)
552}
553
554impl FileSource {
555    /// Create a new FileSource.
556    ///
557    /// The file at `file_path` will be deleted when this FileSource is dropped.
558    fn new(file: File, base_offset: usize, file_path: PathBuf) -> Result<Self> {
559        let file_size = file.metadata()?.len() as usize;
560
561        // Validate base_offset to prevent underflow in total_size()
562        if base_offset > file_size {
563            return Err(Error::UnexpectedError {
564                message: format!("base_offset ({base_offset}) exceeds file_size ({file_size})"),
565                source: None,
566            });
567        }
568
569        Ok(Self {
570            file,
571            file_size,
572            base_offset,
573            _cleanup: Some(FileCleanupGuard { file_path }),
574        })
575    }
576
577    /// Read data at a specific position using seek + read_exact.
578    /// This is cross-platform and adequate for sequential access patterns.
579    fn read_at(&mut self, pos: u64, buf: &mut [u8]) -> Result<()> {
580        self.file.seek(SeekFrom::Start(pos))?;
581        self.file.read_exact(buf)?;
582        Ok(())
583    }
584
585    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
586        let actual_pos = self.base_offset + pos;
587        if actual_pos + LOG_OVERHEAD > self.file_size {
588            return Err(Error::UnexpectedError {
589                message: format!(
590                    "Position {} exceeds file size {}",
591                    actual_pos, self.file_size
592                ),
593                source: None,
594            });
595        }
596
597        // Read only the header to extract base_offset and batch_size
598        let mut header_buf = vec![0u8; LOG_OVERHEAD];
599        self.read_at(actual_pos as u64, &mut header_buf)?;
600
601        let base_offset = LittleEndian::read_i64(&header_buf[BASE_OFFSET_OFFSET..]);
602        let batch_size_bytes = LittleEndian::read_i32(&header_buf[LENGTH_OFFSET..]);
603
604        // Validate batch size to prevent integer overflow and corruption
605        let batch_size = validate_batch_size(batch_size_bytes)?;
606
607        Ok((base_offset, batch_size))
608    }
609
610    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
611        let actual_pos = self.base_offset + pos;
612        if actual_pos + size > self.file_size {
613            return Err(Error::UnexpectedError {
614                message: format!(
615                    "Read beyond file size: {} + {} > {}",
616                    actual_pos, size, self.file_size
617                ),
618                source: None,
619            });
620        }
621
622        // Read the full batch data
623        let mut batch_buf = vec![0u8; size];
624        self.read_at(actual_pos as u64, &mut batch_buf)?;
625
626        Ok(Bytes::from(batch_buf))
627    }
628
629    fn total_size(&self) -> usize {
630        self.file_size - self.base_offset
631    }
632}
633
634/// Enum for different log record sources.
635enum LogRecordsSource {
636    Memory(MemorySource),
637    File(FileSource),
638}
639
640impl LogRecordsSource {
641    fn read_batch_header(&mut self, pos: usize) -> Result<(i64, usize)> {
642        match self {
643            Self::Memory(s) => s.read_batch_header(pos),
644            Self::File(s) => s.read_batch_header(pos),
645        }
646    }
647
648    fn read_batch_data(&mut self, pos: usize, size: usize) -> Result<Bytes> {
649        match self {
650            Self::Memory(s) => s.read_batch_data(pos, size),
651            Self::File(s) => s.read_batch_data(pos, size),
652        }
653    }
654
655    fn total_size(&self) -> usize {
656        match self {
657            Self::Memory(s) => s.total_size(),
658            Self::File(s) => s.total_size(),
659        }
660    }
661}
662
663pub struct LogRecordsBatches {
664    source: LogRecordsSource,
665    current_pos: usize,
666    remaining_bytes: usize,
667}
668
669impl LogRecordsBatches {
670    /// Create from in-memory Vec (existing path - backward compatible).
671    pub fn new(data: Vec<u8>) -> Self {
672        let source = LogRecordsSource::Memory(MemorySource::new(data));
673        let remaining_bytes = source.total_size();
674        Self {
675            source,
676            current_pos: 0,
677            remaining_bytes,
678        }
679    }
680
681    /// Create from file.
682    /// Enables streaming without loading entire file into memory.
683    ///
684    /// The file at `file_path` will be deleted when dropped.
685    /// This ensures the file is closed before deletion.
686    pub fn from_file(file: File, base_offset: usize, file_path: PathBuf) -> Result<Self> {
687        let source = FileSource::new(file, base_offset, file_path)?;
688        let remaining_bytes = source.total_size();
689        Ok(Self {
690            source: LogRecordsSource::File(source),
691            current_pos: 0,
692            remaining_bytes,
693        })
694    }
695
696    /// Try to get the size of the next batch.
697    fn next_batch_size(&mut self) -> Result<Option<usize>> {
698        if self.remaining_bytes < LOG_OVERHEAD {
699            return Ok(None);
700        }
701
702        // Read only header to get size
703        match self.source.read_batch_header(self.current_pos) {
704            Ok((_base_offset, batch_size)) => {
705                if batch_size > self.remaining_bytes {
706                    Ok(None)
707                } else {
708                    Ok(Some(batch_size))
709                }
710            }
711            Err(e) => Err(e),
712        }
713    }
714}
715
716impl Iterator for LogRecordsBatches {
717    type Item = Result<LogRecordBatch>;
718
719    fn next(&mut self) -> Option<Self::Item> {
720        match self.next_batch_size() {
721            Ok(Some(batch_size)) => {
722                // Read full batch data on-demand
723                match self.source.read_batch_data(self.current_pos, batch_size) {
724                    Ok(data) => {
725                        let record_batch = LogRecordBatch::new(data);
726                        self.current_pos += batch_size;
727                        self.remaining_bytes -= batch_size;
728                        Some(Ok(record_batch))
729                    }
730                    Err(e) => Some(Err(e)),
731                }
732            }
733            Ok(None) => None,
734            Err(e) => Some(Err(e)),
735        }
736    }
737}
738
739pub struct LogRecordBatch {
740    data: Bytes,
741}
742
743#[allow(dead_code)]
744impl LogRecordBatch {
745    pub fn new(data: Bytes) -> Self {
746        LogRecordBatch { data }
747    }
748
749    pub fn magic(&self) -> u8 {
750        self.data[MAGIC_OFFSET]
751    }
752
753    pub fn commit_timestamp(&self) -> i64 {
754        let offset = COMMIT_TIMESTAMP_OFFSET;
755        LittleEndian::read_i64(&self.data[offset..offset + COMMIT_TIMESTAMP_LENGTH])
756    }
757
758    pub fn writer_id(&self) -> i64 {
759        let offset = WRITE_CLIENT_ID_OFFSET;
760        LittleEndian::read_i64(&self.data[offset..offset + WRITE_CLIENT_ID_LENGTH])
761    }
762
763    pub fn batch_sequence(&self) -> i32 {
764        let offset = BATCH_SEQUENCE_OFFSET;
765        LittleEndian::read_i32(&self.data[offset..offset + BATCH_SEQUENCE_LENGTH])
766    }
767
768    pub fn ensure_valid(&self) -> Result<()> {
769        // TODO enable validation once checksum handling is corrected.
770        Ok(())
771    }
772
773    pub fn is_valid(&self) -> bool {
774        self.size_in_bytes() >= RECORD_BATCH_HEADER_SIZE
775            && self.checksum() == self.compute_checksum()
776    }
777
778    fn compute_checksum(&self) -> u32 {
779        let start = SCHEMA_ID_OFFSET;
780        crc32c(&self.data[start..])
781    }
782
783    fn attributes(&self) -> u8 {
784        self.data[ATTRIBUTES_OFFSET]
785    }
786
787    pub fn next_log_offset(&self) -> i64 {
788        self.last_log_offset() + 1
789    }
790
791    pub fn checksum(&self) -> u32 {
792        let offset = CRC_OFFSET;
793        LittleEndian::read_u32(&self.data[offset..offset + CRC_LENGTH])
794    }
795
796    pub fn schema_id(&self) -> i16 {
797        let offset = SCHEMA_ID_OFFSET;
798        LittleEndian::read_i16(&self.data[offset..offset + SCHEMA_ID_LENGTH])
799    }
800
801    pub fn base_log_offset(&self) -> i64 {
802        let offset = BASE_OFFSET_OFFSET;
803        LittleEndian::read_i64(&self.data[offset..offset + BASE_OFFSET_LENGTH])
804    }
805
806    pub fn last_log_offset(&self) -> i64 {
807        self.base_log_offset() + self.last_offset_delta() as i64
808    }
809
810    fn last_offset_delta(&self) -> i32 {
811        let offset = LAST_OFFSET_DELTA_OFFSET;
812        LittleEndian::read_i32(&self.data[offset..offset + LAST_OFFSET_DELTA_LENGTH])
813    }
814
815    pub fn size_in_bytes(&self) -> usize {
816        let offset = LENGTH_OFFSET;
817        LittleEndian::read_i32(&self.data[offset..offset + LENGTH_LENGTH]) as usize + LOG_OVERHEAD
818    }
819
820    pub fn record_count(&self) -> i32 {
821        let offset = RECORDS_COUNT_OFFSET;
822        LittleEndian::read_i32(&self.data[offset..offset + RECORDS_COUNT_LENGTH])
823    }
824
825    pub fn records(&self, read_context: &ReadContext) -> Result<LogRecordIterator> {
826        if self.record_count() == 0 {
827            return Ok(LogRecordIterator::empty());
828        }
829
830        let data = &self.data[RECORDS_OFFSET..];
831
832        let record_batch = read_context.record_batch(data)?;
833        let arrow_reader = ArrowReader::new(Arc::new(record_batch));
834        let log_record_iterator = LogRecordIterator::Arrow(ArrowLogRecordIterator {
835            reader: arrow_reader,
836            base_offset: self.base_log_offset(),
837            timestamp: self.commit_timestamp(),
838            row_id: 0,
839            change_type: ChangeType::AppendOnly,
840        });
841
842        Ok(log_record_iterator)
843    }
844
845    pub fn records_for_remote_log(&self, read_context: &ReadContext) -> Result<LogRecordIterator> {
846        if self.record_count() == 0 {
847            return Ok(LogRecordIterator::empty());
848        }
849
850        let data = &self.data[RECORDS_OFFSET..];
851
852        let record_batch = read_context.record_batch_for_remote_log(data)?;
853        let log_record_iterator = match record_batch {
854            None => LogRecordIterator::empty(),
855            Some(record_batch) => {
856                let arrow_reader = ArrowReader::new(Arc::new(record_batch));
857                LogRecordIterator::Arrow(ArrowLogRecordIterator {
858                    reader: arrow_reader,
859                    base_offset: self.base_log_offset(),
860                    timestamp: self.commit_timestamp(),
861                    row_id: 0,
862                    change_type: ChangeType::AppendOnly,
863                })
864            }
865        };
866        Ok(log_record_iterator)
867    }
868
869    /// Returns the record batch directly without creating an iterator.
870    /// This is more efficient when you need the entire batch rather than
871    /// iterating row-by-row.
872    pub fn record_batch(&self, read_context: &ReadContext) -> Result<RecordBatch> {
873        if self.record_count() == 0 {
874            // Return empty batch with correct schema
875            return Ok(RecordBatch::new_empty(read_context.target_schema.clone()));
876        }
877
878        let data = self
879            .data
880            .get(RECORDS_OFFSET..)
881            .ok_or_else(|| Error::UnexpectedError {
882                message: format!(
883                    "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}",
884                    self.data.len(),
885                    RECORDS_OFFSET
886                ),
887                source: None,
888            })?;
889        read_context.record_batch(data)
890    }
891}
892
893/// Parse an Arrow IPC message from a byte slice.
894///
895/// Server returns RecordBatch message (without Schema message) in the encapsulated message format.
896/// Format: [continuation: 4 bytes (0xFFFFFFFF)][metadata_size: 4 bytes][RecordBatch metadata][body]
897///
898/// This format is documented at:
899/// https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format
900///
901/// # Arguments
902/// * `data` - The byte slice containing the IPC message.
903///
904/// # Returns
905/// Returns `Ok((batch_metadata, body_buffer, version))` on success:
906/// - `batch_metadata`: The RecordBatch metadata from the IPC message.
907/// - `body_buffer`: The buffer containing the record batch body data.
908/// - `version`: The Arrow IPC metadata version.
909///
910/// Returns `Err(arrow_error)` on errors
911/// - `arrow_error`: Error details e.g. malformed, too short or bad continuation marker.
912fn parse_ipc_message(
913    data: &[u8],
914) -> Result<(
915    arrow::ipc::RecordBatch<'_>,
916    Buffer,
917    arrow::ipc::MetadataVersion,
918)> {
919    const CONTINUATION_MARKER: u32 = 0xFFFFFFFF;
920
921    if data.len() < 8 {
922        Err(ParseError(format!("Invalid data length: {}", data.len())))?
923    }
924
925    let continuation = LittleEndian::read_u32(&data[0..4]);
926    let metadata_size = LittleEndian::read_u32(&data[4..8]) as usize;
927
928    if continuation != CONTINUATION_MARKER {
929        Err(ParseError(format!(
930            "Invalid continuation marker: {continuation}"
931        )))?
932    }
933
934    if data.len() < 8 + metadata_size {
935        Err(ParseError(format!(
936            "Invalid data length. Remaining data length {} is shorter than specified size {}",
937            data.len() - 8,
938            metadata_size
939        )))?
940    }
941
942    let metadata_bytes = &data[8..8 + metadata_size];
943    let message = root_as_message(metadata_bytes).map_err(|err| ParseError(err.to_string()))?;
944    let batch_metadata = message
945        .header_as_record_batch()
946        .ok_or(ParseError(String::from("Not a record batch")))?;
947
948    let metadata_padded_size = (metadata_size + 7) & !7;
949    let body_start = 8 + metadata_padded_size;
950    let body_data = &data[body_start..];
951    let body_buffer = Buffer::from(body_data);
952
953    Ok((batch_metadata, body_buffer, message.version()))
954}
955
956pub fn to_arrow_schema(fluss_schema: &RowType) -> Result<SchemaRef> {
957    let fields: Result<Vec<Field>> = fluss_schema
958        .fields()
959        .iter()
960        .map(|f| {
961            Ok(Field::new(
962                f.name(),
963                to_arrow_type(f.data_type())?,
964                f.data_type().is_nullable(),
965            ))
966        })
967        .collect();
968
969    Ok(SchemaRef::new(arrow_schema::Schema::new(fields?)))
970}
971
972pub fn to_arrow_type(fluss_type: &DataType) -> Result<ArrowDataType> {
973    Ok(match fluss_type {
974        DataType::Boolean(_) => ArrowDataType::Boolean,
975        DataType::TinyInt(_) => ArrowDataType::Int8,
976        DataType::SmallInt(_) => ArrowDataType::Int16,
977        DataType::BigInt(_) => ArrowDataType::Int64,
978        DataType::Int(_) => ArrowDataType::Int32,
979        DataType::Float(_) => ArrowDataType::Float32,
980        DataType::Double(_) => ArrowDataType::Float64,
981        DataType::Char(_) => ArrowDataType::Utf8,
982        DataType::String(_) => ArrowDataType::Utf8,
983        DataType::Decimal(decimal_type) => {
984            let precision =
985                decimal_type
986                    .precision()
987                    .try_into()
988                    .map_err(|_| Error::IllegalArgument {
989                        message: format!(
990                            "Decimal precision {} exceeds Arrow's maximum (u8::MAX)",
991                            decimal_type.precision()
992                        ),
993                    })?;
994            let scale = decimal_type
995                .scale()
996                .try_into()
997                .map_err(|_| Error::IllegalArgument {
998                    message: format!(
999                        "Decimal scale {} exceeds Arrow's maximum (i8::MAX)",
1000                        decimal_type.scale()
1001                    ),
1002                })?;
1003            ArrowDataType::Decimal128(precision, scale)
1004        }
1005        DataType::Date(_) => ArrowDataType::Date32,
1006        DataType::Time(time_type) => match time_type.precision() {
1007            0 => ArrowDataType::Time32(arrow_schema::TimeUnit::Second),
1008            1..=3 => ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond),
1009            4..=6 => ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond),
1010            7..=9 => ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond),
1011            invalid => {
1012                return Err(Error::IllegalArgument {
1013                    message: format!("Invalid precision {invalid} for TimeType (must be 0-9)"),
1014                });
1015            }
1016        },
1017        DataType::Timestamp(timestamp_type) => match timestamp_type.precision() {
1018            0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None),
1019            1..=3 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
1020            4..=6 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
1021            7..=9 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1022            invalid => {
1023                return Err(Error::IllegalArgument {
1024                    message: format!("Invalid precision {invalid} for TimestampType (must be 0-9)"),
1025                });
1026            }
1027        },
1028        DataType::TimestampLTz(timestamp_ltz_type) => match timestamp_ltz_type.precision() {
1029            0 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None),
1030            1..=3 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None),
1031            4..=6 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None),
1032            7..=9 => ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None),
1033            invalid => {
1034                return Err(Error::IllegalArgument {
1035                    message: format!(
1036                        "Invalid precision {invalid} for TimestampLTzType (must be 0-9)"
1037                    ),
1038                });
1039            }
1040        },
1041        DataType::Bytes(_) => ArrowDataType::Binary,
1042        DataType::Binary(binary_type) => {
1043            let length = binary_type
1044                .length()
1045                .try_into()
1046                .map_err(|_| Error::IllegalArgument {
1047                    message: format!(
1048                        "Binary length {} exceeds Arrow's maximum (i32::MAX)",
1049                        binary_type.length()
1050                    ),
1051                })?;
1052            ArrowDataType::FixedSizeBinary(length)
1053        }
1054        DataType::Array(array_type) => ArrowDataType::List(
1055            Field::new_list_field(
1056                to_arrow_type(array_type.get_element_type())?,
1057                fluss_type.is_nullable(),
1058            )
1059            .into(),
1060        ),
1061        DataType::Map(map_type) => {
1062            let key_type = to_arrow_type(map_type.key_type())?;
1063            let value_type = to_arrow_type(map_type.value_type())?;
1064            let entry_fields = vec![
1065                Field::new("key", key_type, map_type.key_type().is_nullable()),
1066                Field::new("value", value_type, map_type.value_type().is_nullable()),
1067            ];
1068            ArrowDataType::Map(
1069                Arc::new(Field::new(
1070                    "entries",
1071                    ArrowDataType::Struct(arrow_schema::Fields::from(entry_fields)),
1072                    fluss_type.is_nullable(),
1073                )),
1074                false,
1075            )
1076        }
1077        DataType::Row(row_type) => {
1078            let fields: Result<Vec<Field>> = row_type
1079                .fields()
1080                .iter()
1081                .map(|f| {
1082                    Ok(Field::new(
1083                        f.name(),
1084                        to_arrow_type(f.data_type())?,
1085                        f.data_type().is_nullable(),
1086                    ))
1087                })
1088                .collect();
1089            ArrowDataType::Struct(arrow_schema::Fields::from(fields?))
1090        }
1091    })
1092}
1093
1094#[derive(Clone)]
1095pub struct ReadContext {
1096    target_schema: SchemaRef,
1097    full_schema: SchemaRef,
1098    projection: Option<Projection>,
1099    is_from_remote: bool,
1100}
1101
1102#[derive(Clone)]
1103struct Projection {
1104    ordered_schema: SchemaRef,
1105    projected_fields: Vec<usize>,
1106    ordered_fields: Vec<usize>,
1107
1108    reordering_indexes: Vec<usize>,
1109    reordering_needed: bool,
1110}
1111
1112impl ReadContext {
1113    pub fn new(arrow_schema: SchemaRef, is_from_remote: bool) -> ReadContext {
1114        ReadContext {
1115            target_schema: arrow_schema.clone(),
1116            full_schema: arrow_schema,
1117            projection: None,
1118            is_from_remote,
1119        }
1120    }
1121
1122    pub fn with_projection_pushdown(
1123        arrow_schema: SchemaRef,
1124        projected_fields: Vec<usize>,
1125        is_from_remote: bool,
1126    ) -> Result<ReadContext> {
1127        Self::validate_projection(&arrow_schema, projected_fields.as_slice())?;
1128        let target_schema =
1129            Self::project_schema(arrow_schema.clone(), projected_fields.as_slice())?;
1130        // the logic is little bit of hard to understand, to refactor it to follow
1131        // java side
1132        let (need_do_reorder, sorted_fields) = {
1133            // currently, for remote read, arrow log doesn't support projection pushdown,
1134            // so, only need to do reordering when is not from remote
1135            if !is_from_remote {
1136                let mut sorted_fields = projected_fields.clone();
1137                sorted_fields.sort_unstable();
1138                (!sorted_fields.eq(&projected_fields), sorted_fields)
1139            } else {
1140                // sorted_fields won't be used when need_do_reorder is false,
1141                // let's use an empty vec directly
1142                (false, vec![])
1143            }
1144        };
1145
1146        let project = {
1147            if need_do_reorder {
1148                // reordering is required
1149                // Calculate reordering indexes to transform from sorted order to user-requested order
1150                let mut reordering_indexes = Vec::with_capacity(projected_fields.len());
1151                for &original_idx in &projected_fields {
1152                    let pos = sorted_fields.binary_search(&original_idx).map_err(|_| {
1153                        IllegalArgument {
1154                            message: format!(
1155                                "Projection index {original_idx} is invalid for the current schema."
1156                            ),
1157                        }
1158                    })?;
1159                    reordering_indexes.push(pos);
1160                }
1161                Projection {
1162                    ordered_schema: Self::project_schema(
1163                        arrow_schema.clone(),
1164                        sorted_fields.as_slice(),
1165                    )?,
1166                    projected_fields,
1167                    ordered_fields: sorted_fields,
1168                    reordering_indexes,
1169                    reordering_needed: true,
1170                }
1171            } else {
1172                Projection {
1173                    ordered_schema: Self::project_schema(
1174                        arrow_schema.clone(),
1175                        projected_fields.as_slice(),
1176                    )?,
1177                    ordered_fields: projected_fields.clone(),
1178                    projected_fields,
1179                    reordering_indexes: vec![],
1180                    reordering_needed: false,
1181                }
1182            }
1183        };
1184
1185        Ok(ReadContext {
1186            target_schema,
1187            full_schema: arrow_schema,
1188            projection: Some(project),
1189            is_from_remote,
1190        })
1191    }
1192
1193    fn validate_projection(schema: &SchemaRef, projected_fields: &[usize]) -> Result<()> {
1194        let field_count = schema.fields().len();
1195        for &index in projected_fields {
1196            if index >= field_count {
1197                return Err(IllegalArgument {
1198                    message: format!(
1199                        "Projection index {index} is out of bounds for schema with {field_count} fields."
1200                    ),
1201                });
1202            }
1203        }
1204        Ok(())
1205    }
1206
1207    pub fn project_schema(schema: SchemaRef, projected_fields: &[usize]) -> Result<SchemaRef> {
1208        Ok(SchemaRef::new(schema.project(projected_fields).map_err(
1209            |e| IllegalArgument {
1210                message: format!("Invalid projection: {e}"),
1211            },
1212        )?))
1213    }
1214
1215    pub fn project_fields(&self) -> Option<&[usize]> {
1216        self.projection
1217            .as_ref()
1218            .map(|p| p.projected_fields.as_slice())
1219    }
1220
1221    pub fn project_fields_in_order(&self) -> Option<&[usize]> {
1222        self.projection
1223            .as_ref()
1224            .map(|p| p.ordered_fields.as_slice())
1225    }
1226
1227    pub fn record_batch(&self, data: &[u8]) -> Result<RecordBatch> {
1228        let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
1229
1230        let resolve_schema = {
1231            // if from remote, no projection, need to use full schema
1232            if self.is_from_remote {
1233                self.full_schema.clone()
1234            } else {
1235                // the record batch from server must be ordered by field pos,
1236                // according to project to decide what arrow schema to use
1237                // to parse the record batch
1238                match self.projection {
1239                    Some(ref projection) => {
1240                        // projection, should use ordered schema by project field pos
1241                        projection.ordered_schema.clone()
1242                    }
1243                    None => {
1244                        // no projection, use target output schema
1245                        self.target_schema.clone()
1246                    }
1247                }
1248            }
1249        };
1250
1251        let record_batch = read_record_batch(
1252            &body_buffer,
1253            batch_metadata,
1254            resolve_schema,
1255            &HashMap::new(),
1256            None,
1257            &version,
1258        )?;
1259
1260        let record_batch = match &self.projection {
1261            Some(projection) => {
1262                let reordered_columns = {
1263                    // need to do reorder
1264                    if self.is_from_remote {
1265                        Some(&projection.projected_fields)
1266                    } else if projection.reordering_needed {
1267                        Some(&projection.reordering_indexes)
1268                    } else {
1269                        None
1270                    }
1271                };
1272                match reordered_columns {
1273                    Some(reordered_columns) => {
1274                        let arrow_columns = reordered_columns
1275                            .iter()
1276                            .map(|&idx| record_batch.column(idx).clone())
1277                            .collect();
1278                        RecordBatch::try_new(self.target_schema.clone(), arrow_columns)?
1279                    }
1280                    _ => record_batch,
1281                }
1282            }
1283            _ => record_batch,
1284        };
1285        Ok(record_batch)
1286    }
1287
1288    pub fn record_batch_for_remote_log(&self, data: &[u8]) -> Result<Option<RecordBatch>> {
1289        let (batch_metadata, body_buffer, version) = parse_ipc_message(data)?;
1290
1291        let record_batch = read_record_batch(
1292            &body_buffer,
1293            batch_metadata,
1294            self.full_schema.clone(),
1295            &HashMap::new(),
1296            None,
1297            &version,
1298        )?;
1299
1300        let record_batch = match &self.projection {
1301            Some(projection) => {
1302                let projected_columns: Vec<_> = projection
1303                    .projected_fields
1304                    .iter()
1305                    .map(|&idx| record_batch.column(idx).clone())
1306                    .collect();
1307                RecordBatch::try_new(self.target_schema.clone(), projected_columns)?
1308            }
1309            None => record_batch,
1310        };
1311        Ok(Some(record_batch))
1312    }
1313}
1314
1315pub enum LogRecordIterator {
1316    Empty,
1317    Arrow(ArrowLogRecordIterator),
1318}
1319
1320impl LogRecordIterator {
1321    pub fn empty() -> Self {
1322        LogRecordIterator::Empty
1323    }
1324}
1325
1326impl Iterator for LogRecordIterator {
1327    type Item = ScanRecord;
1328
1329    fn next(&mut self) -> Option<Self::Item> {
1330        match self {
1331            LogRecordIterator::Empty => None,
1332            LogRecordIterator::Arrow(iter) => iter.next(),
1333        }
1334    }
1335}
1336
1337pub struct ArrowLogRecordIterator {
1338    reader: ArrowReader,
1339    base_offset: i64,
1340    timestamp: i64,
1341    row_id: usize,
1342    change_type: ChangeType,
1343}
1344
1345#[allow(dead_code)]
1346impl ArrowLogRecordIterator {
1347    fn new(reader: ArrowReader, base_offset: i64, timestamp: i64, change_type: ChangeType) -> Self {
1348        Self {
1349            reader,
1350            base_offset,
1351            timestamp,
1352            row_id: 0,
1353            change_type,
1354        }
1355    }
1356}
1357
1358impl Iterator for ArrowLogRecordIterator {
1359    type Item = ScanRecord;
1360
1361    fn next(&mut self) -> Option<Self::Item> {
1362        if self.row_id >= self.reader.row_count() {
1363            return None;
1364        }
1365
1366        let columnar_row = self.reader.read(self.row_id);
1367        let scan_record = ScanRecord::new(
1368            columnar_row,
1369            self.base_offset + self.row_id as i64,
1370            self.timestamp,
1371            self.change_type,
1372        );
1373        self.row_id += 1;
1374        Some(scan_record)
1375    }
1376}
1377
1378pub struct ArrowReader {
1379    record_batch: Arc<RecordBatch>,
1380}
1381
1382impl ArrowReader {
1383    pub fn new(record_batch: Arc<RecordBatch>) -> Self {
1384        ArrowReader { record_batch }
1385    }
1386
1387    pub fn row_count(&self) -> usize {
1388        self.record_batch.num_rows()
1389    }
1390
1391    pub fn read(&self, row_id: usize) -> ColumnarRow {
1392        ColumnarRow::new_with_row_id(self.record_batch.clone(), row_id)
1393    }
1394}
1395pub struct MyVec<T>(pub StreamReader<T>);
1396
1397#[cfg(test)]
1398mod tests {
1399    use super::*;
1400    use crate::metadata::{DataField, DataTypes, RowType};
1401    use crate::test_utils::build_table_info;
1402
1403    #[test]
1404    fn test_to_array_type() {
1405        assert_eq!(
1406            to_arrow_type(&DataTypes::boolean()).unwrap(),
1407            ArrowDataType::Boolean
1408        );
1409        assert_eq!(
1410            to_arrow_type(&DataTypes::tinyint()).unwrap(),
1411            ArrowDataType::Int8
1412        );
1413        assert_eq!(
1414            to_arrow_type(&DataTypes::smallint()).unwrap(),
1415            ArrowDataType::Int16
1416        );
1417        assert_eq!(
1418            to_arrow_type(&DataTypes::bigint()).unwrap(),
1419            ArrowDataType::Int64
1420        );
1421        assert_eq!(
1422            to_arrow_type(&DataTypes::int()).unwrap(),
1423            ArrowDataType::Int32
1424        );
1425        assert_eq!(
1426            to_arrow_type(&DataTypes::float()).unwrap(),
1427            ArrowDataType::Float32
1428        );
1429        assert_eq!(
1430            to_arrow_type(&DataTypes::double()).unwrap(),
1431            ArrowDataType::Float64
1432        );
1433        assert_eq!(
1434            to_arrow_type(&DataTypes::char(16)).unwrap(),
1435            ArrowDataType::Utf8
1436        );
1437        assert_eq!(
1438            to_arrow_type(&DataTypes::string()).unwrap(),
1439            ArrowDataType::Utf8
1440        );
1441        assert_eq!(
1442            to_arrow_type(&DataTypes::decimal(10, 2)).unwrap(),
1443            ArrowDataType::Decimal128(10, 2)
1444        );
1445        assert_eq!(
1446            to_arrow_type(&DataTypes::date()).unwrap(),
1447            ArrowDataType::Date32
1448        );
1449        assert_eq!(
1450            to_arrow_type(&DataTypes::time()).unwrap(),
1451            ArrowDataType::Time32(arrow_schema::TimeUnit::Second)
1452        );
1453        assert_eq!(
1454            to_arrow_type(&DataTypes::time_with_precision(3)).unwrap(),
1455            ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond)
1456        );
1457        assert_eq!(
1458            to_arrow_type(&DataTypes::time_with_precision(6)).unwrap(),
1459            ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond)
1460        );
1461        assert_eq!(
1462            to_arrow_type(&DataTypes::time_with_precision(9)).unwrap(),
1463            ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond)
1464        );
1465        assert_eq!(
1466            to_arrow_type(&DataTypes::timestamp_with_precision(0)).unwrap(),
1467            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
1468        );
1469        assert_eq!(
1470            to_arrow_type(&DataTypes::timestamp_with_precision(3)).unwrap(),
1471            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
1472        );
1473        assert_eq!(
1474            to_arrow_type(&DataTypes::timestamp_with_precision(6)).unwrap(),
1475            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1476        );
1477        assert_eq!(
1478            to_arrow_type(&DataTypes::timestamp_with_precision(9)).unwrap(),
1479            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
1480        );
1481        assert_eq!(
1482            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(0)).unwrap(),
1483            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, None)
1484        );
1485        assert_eq!(
1486            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(3)).unwrap(),
1487            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, None)
1488        );
1489        assert_eq!(
1490            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(6)).unwrap(),
1491            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
1492        );
1493        assert_eq!(
1494            to_arrow_type(&DataTypes::timestamp_ltz_with_precision(9)).unwrap(),
1495            ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, None)
1496        );
1497        assert_eq!(
1498            to_arrow_type(&DataTypes::bytes()).unwrap(),
1499            ArrowDataType::Binary
1500        );
1501        assert_eq!(
1502            to_arrow_type(&DataTypes::binary(16)).unwrap(),
1503            ArrowDataType::FixedSizeBinary(16)
1504        );
1505
1506        assert_eq!(
1507            to_arrow_type(&DataTypes::array(DataTypes::int())).unwrap(),
1508            ArrowDataType::List(Field::new_list_field(ArrowDataType::Int32, true).into())
1509        );
1510
1511        assert_eq!(
1512            to_arrow_type(&DataTypes::map(DataTypes::string(), DataTypes::int())).unwrap(),
1513            ArrowDataType::Map(
1514                Arc::new(Field::new(
1515                    "entries",
1516                    ArrowDataType::Struct(arrow_schema::Fields::from(vec![
1517                        Field::new("key", ArrowDataType::Utf8, true),
1518                        Field::new("value", ArrowDataType::Int32, true),
1519                    ])),
1520                    true,
1521                )),
1522                false,
1523            )
1524        );
1525
1526        assert_eq!(
1527            to_arrow_type(&DataTypes::row(vec![
1528                DataTypes::field("f1", DataTypes::int()),
1529                DataTypes::field("f2", DataTypes::string()),
1530            ]))
1531            .unwrap(),
1532            ArrowDataType::Struct(arrow_schema::Fields::from(vec![
1533                Field::new("f1", ArrowDataType::Int32, true),
1534                Field::new("f2", ArrowDataType::Utf8, true),
1535            ]))
1536        );
1537    }
1538
1539    #[test]
1540    fn test_parse_ipc_message() {
1541        let empty_body: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000000]);
1542        let result = parse_ipc_message(empty_body);
1543        assert_eq!(
1544            result.unwrap_err().to_string(),
1545            String::from(
1546                "Fluss hitting Arrow error Parser error: Range [0, 4) is out of bounds.\n\n: ParseError(\"Range [0, 4) is out of bounds.\\n\\n\")."
1547            )
1548        );
1549
1550        let invalid_data = &[];
1551        assert_eq!(
1552            parse_ipc_message(invalid_data).unwrap_err().to_string(),
1553            String::from(
1554                "Fluss hitting Arrow error Parser error: Invalid data length: 0: ParseError(\"Invalid data length: 0\")."
1555            )
1556        );
1557
1558        let data_with_invalid_continuation: &[u8] = &le_bytes(&[0x00000001, 0x00000000]);
1559        assert_eq!(
1560            parse_ipc_message(data_with_invalid_continuation)
1561                .unwrap_err()
1562                .to_string(),
1563            String::from(
1564                "Fluss hitting Arrow error Parser error: Invalid continuation marker: 1: ParseError(\"Invalid continuation marker: 1\")."
1565            )
1566        );
1567
1568        let data_with_invalid_length: &[u8] = &le_bytes(&[0xFFFFFFFF, 0x00000001]);
1569        assert_eq!(
1570            parse_ipc_message(data_with_invalid_length)
1571                .unwrap_err()
1572                .to_string(),
1573            String::from(
1574                "Fluss hitting Arrow error Parser error: Invalid data length. Remaining data length 0 is shorter than specified size 1: ParseError(\"Invalid data length. Remaining data length 0 is shorter than specified size 1\")."
1575            )
1576        );
1577
1578        let data_with_invalid_length = &le_bytes(&[0xFFFFFFFF, 0x00000004, 0x00000000]);
1579        assert_eq!(
1580            parse_ipc_message(data_with_invalid_length)
1581                .unwrap_err()
1582                .to_string(),
1583            String::from(
1584                "Fluss hitting Arrow error Parser error: Not a record batch: ParseError(\"Not a record batch\")."
1585            )
1586        );
1587    }
1588
1589    #[test]
1590    fn projection_rejects_out_of_bounds_index() {
1591        let row_type = RowType::new(vec![
1592            DataField::new("id", DataTypes::int(), None),
1593            DataField::new("name", DataTypes::string(), None),
1594        ]);
1595        let schema = to_arrow_schema(&row_type).unwrap();
1596        let result = ReadContext::with_projection_pushdown(schema, vec![0, 2], false);
1597
1598        assert!(matches!(result, Err(IllegalArgument { .. })));
1599    }
1600
1601    #[test]
1602    fn checksum_and_schema_id_read_minimum_header() {
1603        // Header-only batches with record_count == 0 are valid; this covers the minimal bytes
1604        // needed for checksum/schema_id access.
1605        let mut data = vec![0u8; SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH];
1606        let crc = 0xA1B2C3D4u32;
1607        let schema_id = 42i16;
1608        LittleEndian::write_u32(&mut data[CRC_OFFSET..CRC_OFFSET + CRC_LENGTH], crc);
1609        LittleEndian::write_i16(
1610            &mut data[SCHEMA_ID_OFFSET..SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH],
1611            schema_id,
1612        );
1613
1614        let batch = LogRecordBatch::new(Bytes::from(data));
1615        assert_eq!(batch.checksum(), crc);
1616        assert_eq!(batch.schema_id(), schema_id);
1617
1618        let expected = crc32c(&batch.data[SCHEMA_ID_OFFSET..]);
1619        assert_eq!(batch.compute_checksum(), expected);
1620    }
1621
1622    fn le_bytes(vals: &[u32]) -> Vec<u8> {
1623        let mut out = Vec::with_capacity(vals.len() * 4);
1624        for &v in vals {
1625            out.extend_from_slice(&v.to_le_bytes());
1626        }
1627        out
1628    }
1629
1630    #[test]
1631    fn test_temporal_and_decimal_builder_validation() {
1632        use crate::row::column_writer::ColumnWriter;
1633        use arrow::array::Array;
1634
1635        // Test valid builder creation with precision=10, scale=2
1636        let mut writer = ColumnWriter::create(
1637            &DataTypes::decimal(10, 2),
1638            &ArrowDataType::Decimal128(10, 2),
1639            0,
1640            256,
1641        )
1642        .unwrap();
1643        let array = writer.finish();
1644        assert_eq!(array.data_type(), &ArrowDataType::Decimal128(10, 2));
1645
1646        // Test error case: invalid Arrow precision/scale (exceeds Arrow's limit)
1647        let result = ColumnWriter::create(
1648            &DataTypes::decimal(10, 2),
1649            &ArrowDataType::Decimal128(100, 50),
1650            0,
1651            256,
1652        );
1653        assert!(result.is_err());
1654    }
1655
1656    #[test]
1657    fn test_decimal_rescaling_and_validation() -> Result<()> {
1658        use crate::row::{Datum, Decimal, GenericRow};
1659        use arrow::array::Decimal128Array;
1660        use bigdecimal::BigDecimal;
1661        use std::str::FromStr;
1662
1663        // Test 1: Rescaling from scale 3 to scale 2
1664        let row_type = RowType::new(vec![DataField::new(
1665            "amount",
1666            DataTypes::decimal(10, 2),
1667            None,
1668        )]);
1669        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
1670        let decimal = Decimal::from_big_decimal(BigDecimal::from_str("123.456").unwrap(), 10, 3)?;
1671        let row = GenericRow {
1672            values: vec![Datum::Decimal(decimal)],
1673        };
1674        builder.append(&row)?;
1675        let batch = builder.build_arrow_record_batch()?;
1676        let array = batch
1677            .column(0)
1678            .as_any()
1679            .downcast_ref::<Decimal128Array>()
1680            .unwrap();
1681        assert_eq!(array.value(0), 12346); // 123.456 rounded to 2 decimal places
1682        assert_eq!(array.scale(), 2);
1683
1684        // Test 2: Precision overflow (should error)
1685        let row_type = RowType::new(vec![DataField::new(
1686            "amount",
1687            DataTypes::decimal(5, 2),
1688            None,
1689        )]);
1690        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
1691        let decimal = Decimal::from_big_decimal(BigDecimal::from_str("123456.78").unwrap(), 10, 2)?;
1692        let row = GenericRow {
1693            values: vec![Datum::Decimal(decimal)],
1694        };
1695        let result = builder.append(&row);
1696        assert!(result.is_err());
1697        assert!(
1698            result
1699                .unwrap_err()
1700                .to_string()
1701                .contains("precision overflow")
1702        );
1703
1704        Ok(())
1705    }
1706
1707    // Tests for file-backed streaming
1708
1709    #[test]
1710    fn test_file_source_streaming() -> Result<()> {
1711        use tempfile::NamedTempFile;
1712
1713        // Test 1: Basic file reads work
1714        let test_data = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1715        let mut tmp_file = NamedTempFile::new()?;
1716        tmp_file.write_all(&test_data)?;
1717        tmp_file.flush()?;
1718
1719        let file_path = tmp_file.path().to_path_buf();
1720        let file = File::open(&file_path)?;
1721        let mut source = FileSource::new(file, 0, file_path)?;
1722
1723        // Read full data
1724        let data = source.read_batch_data(0, 10)?;
1725        assert_eq!(data.to_vec(), test_data);
1726
1727        // Read partial data
1728        let partial = source.read_batch_data(2, 5)?;
1729        assert_eq!(partial.to_vec(), vec![3, 4, 5, 6, 7]);
1730
1731        // Test 2: base_offset works (critical for remote logs with pos_in_log_segment)
1732        let prefix = vec![0xFF; 100];
1733        let actual_data = vec![1, 2, 3, 4, 5];
1734        let mut tmp_file2 = NamedTempFile::new()?;
1735        tmp_file2.write_all(&prefix)?;
1736        tmp_file2.write_all(&actual_data)?;
1737        tmp_file2.flush()?;
1738
1739        let file_path2 = tmp_file2.path().to_path_buf();
1740        let file2 = File::open(&file_path2)?;
1741        let mut source2 = FileSource::new(file2, 100, file_path2)?; // Skip first 100 bytes
1742
1743        assert_eq!(source2.total_size(), 5); // Only counts data after offset
1744        let data2 = source2.read_batch_data(0, 5)?;
1745        assert_eq!(data2.to_vec(), actual_data);
1746
1747        Ok(())
1748    }
1749
1750    #[test]
1751    fn test_all_types_end_to_end() -> Result<()> {
1752        use crate::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, TimestampNtz};
1753        use arrow::array::{
1754            Date32Array, Decimal128Array, Int32Array, Time32MillisecondArray,
1755            Time64NanosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray,
1756        };
1757        use bigdecimal::BigDecimal;
1758        use std::str::FromStr;
1759
1760        // Schema with int, decimal, date, time (ms + ns), timestamps (μs + ns)
1761        let row_type = RowType::new(vec![
1762            DataField::new("id".to_string(), DataTypes::int(), None),
1763            DataField::new("amount".to_string(), DataTypes::decimal(10, 2), None),
1764            DataField::new("date".to_string(), DataTypes::date(), None),
1765            DataField::new(
1766                "time_ms".to_string(),
1767                DataTypes::time_with_precision(3),
1768                None,
1769            ),
1770            DataField::new(
1771                "time_ns".to_string(),
1772                DataTypes::time_with_precision(9),
1773                None,
1774            ),
1775            DataField::new(
1776                "ts_us".to_string(),
1777                DataTypes::timestamp_with_precision(6),
1778                None,
1779            ),
1780            DataField::new(
1781                "ts_ltz_ns".to_string(),
1782                DataTypes::timestamp_ltz_with_precision(9),
1783                None,
1784            ),
1785        ]);
1786
1787        let mut builder = RowAppendRecordBatchBuilder::new(&row_type)?;
1788
1789        // Append rows with various data types
1790        let row = GenericRow {
1791            values: vec![
1792                Datum::Int32(1),
1793                Datum::Decimal(Decimal::from_big_decimal(
1794                    BigDecimal::from_str("123.456").unwrap(),
1795                    10,
1796                    3,
1797                )?),
1798                // 18000 days since epoch = 2019-04-14
1799                Datum::Date(Date::new(18000)),
1800                // 43200000 ms = 12:00:00.000 (noon)
1801                Datum::Time(Time::new(43200000)),
1802                // 12345 ms = 00:00:12.345
1803                Datum::Time(Time::new(12345)),
1804                // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 123456 additional nanoseconds
1805                Datum::TimestampNtz(TimestampNtz::from_millis_nanos(1609459200000, 123456)?),
1806                // 1609459200000 ms = 2021-01-01 00:00:00 UTC, with 987654 additional nanoseconds
1807                Datum::TimestampLtz(TimestampLtz::from_millis_nanos(1609459200000, 987654)?),
1808            ],
1809        };
1810        builder.append(&row)?;
1811
1812        let batch = builder.build_arrow_record_batch()?;
1813
1814        // Verify all conversions
1815        assert_eq!(
1816            batch
1817                .column(0)
1818                .as_any()
1819                .downcast_ref::<Int32Array>()
1820                .unwrap()
1821                .value(0),
1822            1
1823        );
1824
1825        let dec = batch
1826            .column(1)
1827            .as_any()
1828            .downcast_ref::<Decimal128Array>()
1829            .unwrap();
1830        assert_eq!(dec.value(0), 12346); // 123.456 rounded to 2 decimal places
1831
1832        assert_eq!(
1833            batch
1834                .column(2)
1835                .as_any()
1836                .downcast_ref::<Date32Array>()
1837                .unwrap()
1838                .value(0),
1839            18000
1840        );
1841
1842        assert_eq!(
1843            batch
1844                .column(3)
1845                .as_any()
1846                .downcast_ref::<Time32MillisecondArray>()
1847                .unwrap()
1848                .value(0),
1849            43200000
1850        );
1851
1852        assert_eq!(
1853            batch
1854                .column(4)
1855                .as_any()
1856                .downcast_ref::<Time64NanosecondArray>()
1857                .unwrap()
1858                .value(0),
1859            12345000000
1860        );
1861
1862        // Timestamp with sub-millisecond nanos preserved
1863        assert_eq!(
1864            batch
1865                .column(5)
1866                .as_any()
1867                .downcast_ref::<TimestampMicrosecondArray>()
1868                .unwrap()
1869                .value(0),
1870            1609459200000123
1871        );
1872
1873        assert_eq!(
1874            batch
1875                .column(6)
1876                .as_any()
1877                .downcast_ref::<TimestampNanosecondArray>()
1878                .unwrap()
1879                .value(0),
1880            1609459200000987654
1881        );
1882
1883        Ok(())
1884    }
1885
1886    #[test]
1887    fn test_log_records_batches_from_file() -> Result<()> {
1888        use crate::client::WriteRecord;
1889        use crate::compression::{
1890            ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1891        };
1892        use crate::metadata::{PhysicalTablePath, TablePath};
1893        use crate::row::GenericRow;
1894        use tempfile::NamedTempFile;
1895
1896        // Integration test: Real log record batch streamed from file
1897        let row_type = RowType::new(vec![
1898            DataField::new("id".to_string(), DataTypes::int(), None),
1899            DataField::new("name".to_string(), DataTypes::string(), None),
1900        ]);
1901        let table_path = TablePath::new("db".to_string(), "tbl".to_string());
1902        let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1));
1903        let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path)));
1904
1905        let mut builder = MemoryLogRecordsArrowBuilder::new(
1906            1,
1907            &row_type,
1908            false,
1909            ArrowCompressionInfo {
1910                compression_type: ArrowCompressionType::None,
1911                compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL,
1912            },
1913        )?;
1914
1915        let mut row = GenericRow::new(2);
1916        row.set_field(0, 1_i32);
1917        row.set_field(1, "alice");
1918        let record = WriteRecord::for_append(
1919            Arc::clone(&table_info),
1920            physical_table_path.clone(),
1921            1,
1922            &row,
1923        );
1924        builder.append(&record)?;
1925
1926        let mut row2 = GenericRow::new(2);
1927        row2.set_field(0, 2_i32);
1928        row2.set_field(1, "bob");
1929        let record2 =
1930            WriteRecord::for_append(Arc::clone(&table_info), physical_table_path, 2, &row2);
1931        builder.append(&record2)?;
1932
1933        let data = builder.build()?;
1934
1935        // Write to file
1936        let mut tmp_file = NamedTempFile::new()?;
1937        tmp_file.write_all(&data)?;
1938        tmp_file.flush()?;
1939
1940        // Create file-backed LogRecordsBatches (should stream, not load all into memory)
1941        let file_path = tmp_file.path().to_path_buf();
1942        let file = File::open(&file_path)?;
1943        let mut batches = LogRecordsBatches::from_file(file, 0, file_path)?;
1944
1945        // Iterate through batches (should work just like in-memory)
1946        let batch = batches.next().expect("Should have at least one batch")?;
1947        assert!(batch.size_in_bytes() > 0);
1948        assert_eq!(batch.record_count(), 2);
1949
1950        Ok(())
1951    }
1952}