proof_of_sql/base/arrow/
record_batch_conversion.rs

1use super::{
2    arrow_array_to_column_conversion::ArrayRefExt,
3    record_batch_errors::{AppendRecordBatchTableCommitmentError, RecordBatchToColumnsError},
4};
5use crate::base::{
6    commitment::{
7        AppendColumnCommitmentsError, AppendTableCommitmentError, Commitment, TableCommitment,
8        TableCommitmentFromColumnsError,
9    },
10    database::Column,
11    scalar::Scalar,
12};
13use arrow::record_batch::RecordBatch;
14use bumpalo::Bump;
15use sqlparser::ast::Ident;
16
17/// This function will return an error if:
18/// - The field name cannot be parsed into an [`Identifier`].
19/// - The conversion of an Arrow array to a [`Column`] fails.
20pub fn batch_to_columns<'a, S: Scalar + 'a>(
21    batch: &'a RecordBatch,
22    alloc: &'a Bump,
23) -> Result<Vec<(Ident, Column<'a, S>)>, RecordBatchToColumnsError> {
24    batch
25        .schema()
26        .fields()
27        .into_iter()
28        .zip(batch.columns())
29        .map(|(field, array)| {
30            let identifier: Ident = field.name().as_str().into();
31            let column: Column<S> = array.to_column(alloc, &(0..array.len()), None)?;
32            Ok((identifier, column))
33        })
34        .collect()
35}
36
37impl<C: Commitment> TableCommitment<C> {
38    /// Append an arrow [`RecordBatch`] to the existing [`TableCommitment`].
39    ///
40    /// The row offset is assumed to be the end of the [`TableCommitment`]'s current range.
41    ///
42    /// Will error on a variety of mismatches, or if the provided columns have mixed length.
43    #[expect(clippy::missing_panics_doc)]
44    pub fn try_append_record_batch(
45        &mut self,
46        batch: &RecordBatch,
47        setup: &C::PublicSetup<'_>,
48    ) -> Result<(), AppendRecordBatchTableCommitmentError> {
49        match self.try_append_rows(
50            batch_to_columns::<C::Scalar>(batch, &Bump::new())?
51                .iter()
52                .map(|(a, b)| (a, b)),
53            setup,
54        ) {
55            Ok(()) => Ok(()),
56            Err(AppendTableCommitmentError::MixedLengthColumns { .. }) => {
57                panic!("RecordBatches cannot have columns of mixed length")
58            }
59            Err(AppendTableCommitmentError::AppendColumnCommitments {
60                source: AppendColumnCommitmentsError::DuplicateIdents { .. },
61            }) => {
62                panic!("RecordBatches cannot have duplicate identifiers")
63            }
64            Err(AppendTableCommitmentError::AppendColumnCommitments {
65                source: AppendColumnCommitmentsError::Mismatch { source: e },
66            }) => Err(e)?,
67        }
68    }
69    /// Returns a [`TableCommitment`] to the provided arrow [`RecordBatch`].
70    pub fn try_from_record_batch(
71        batch: &RecordBatch,
72        setup: &C::PublicSetup<'_>,
73    ) -> Result<TableCommitment<C>, RecordBatchToColumnsError> {
74        Self::try_from_record_batch_with_offset(batch, 0, setup)
75    }
76
77    /// Returns a [`TableCommitment`] to the provided arrow [`RecordBatch`] with the given row offset.
78    #[expect(clippy::missing_panics_doc)]
79    pub fn try_from_record_batch_with_offset(
80        batch: &RecordBatch,
81        offset: usize,
82        setup: &C::PublicSetup<'_>,
83    ) -> Result<TableCommitment<C>, RecordBatchToColumnsError> {
84        match Self::try_from_columns_with_offset(
85            batch_to_columns::<C::Scalar>(batch, &Bump::new())?
86                .iter()
87                .map(|(a, b)| (a, b)),
88            offset,
89            setup,
90        ) {
91            Ok(commitment) => Ok(commitment),
92            Err(TableCommitmentFromColumnsError::MixedLengthColumns { .. }) => {
93                panic!("RecordBatches cannot have columns of mixed length")
94            }
95            Err(TableCommitmentFromColumnsError::DuplicateIdents { .. }) => {
96                panic!("RecordBatches cannot have duplicate identifiers")
97            }
98        }
99    }
100}
101
102#[cfg(all(test, feature = "blitzar"))]
103mod tests {
104    use super::*;
105    use crate::base::{
106        commitment::naive_commitment::NaiveCommitment, scalar::test_scalar::TestScalar,
107    };
108    use arrow::{
109        array::{Int64Array, StringArray},
110        datatypes::{DataType, Field, Schema},
111        record_batch::RecordBatch,
112    };
113    use std::sync::Arc;
114
115    #[test]
116    fn we_can_create_and_append_table_commitments_with_record_batches() {
117        let schema = Arc::new(Schema::new(vec![
118            Field::new("a", DataType::Int64, false),
119            Field::new("b", DataType::Utf8, false),
120        ]));
121
122        let batch = RecordBatch::try_new(
123            schema,
124            vec![
125                Arc::new(Int64Array::from(vec![1, 2, 3])),
126                Arc::new(StringArray::from(vec!["1", "2", "3"])),
127            ],
128        )
129        .unwrap();
130
131        let b_scals = ["1".into(), "2".into(), "3".into()];
132
133        let columns = [
134            (&"a".into(), &Column::<TestScalar>::BigInt(&[1, 2, 3])),
135            (
136                &"b".into(),
137                &Column::<TestScalar>::VarChar((&["1", "2", "3"], &b_scals)),
138            ),
139        ];
140
141        let mut expected_commitment =
142            TableCommitment::<NaiveCommitment>::try_from_columns_with_offset(columns, 0, &())
143                .unwrap();
144
145        let mut commitment =
146            TableCommitment::<NaiveCommitment>::try_from_record_batch(&batch, &()).unwrap();
147
148        assert_eq!(commitment, expected_commitment);
149
150        let schema2 = Arc::new(Schema::new(vec![
151            Field::new("a", DataType::Int64, false),
152            Field::new("b", DataType::Utf8, false),
153        ]));
154
155        let batch2 = RecordBatch::try_new(
156            schema2,
157            vec![
158                Arc::new(Int64Array::from(vec![4, 5, 6])),
159                Arc::new(StringArray::from(vec!["4", "5", "6"])),
160            ],
161        )
162        .unwrap();
163
164        let b_scals2 = ["4".into(), "5".into(), "6".into()];
165
166        let columns2 = [
167            (&"a".into(), &Column::<TestScalar>::BigInt(&[4, 5, 6])),
168            (
169                &"b".into(),
170                &Column::<TestScalar>::VarChar((&["4", "5", "6"], &b_scals2)),
171            ),
172        ];
173
174        expected_commitment.try_append_rows(columns2, &()).unwrap();
175        commitment.try_append_record_batch(&batch2, &()).unwrap();
176
177        assert_eq!(commitment, expected_commitment);
178    }
179}