1use super::{
2 arrow_array_to_column_conversion::ArrayRefExt,
3 record_batch_errors::{AppendRecordBatchTableCommitmentError, RecordBatchToColumnsError},
4};
5use crate::base::{
6 commitment::{
7 AppendColumnCommitmentsError, AppendTableCommitmentError, Commitment, TableCommitment,
8 TableCommitmentFromColumnsError,
9 },
10 database::Column,
11 scalar::Scalar,
12};
13use arrow::record_batch::RecordBatch;
14use bumpalo::Bump;
15use sqlparser::ast::Ident;
16
17pub fn batch_to_columns<'a, S: Scalar + 'a>(
21 batch: &'a RecordBatch,
22 alloc: &'a Bump,
23) -> Result<Vec<(Ident, Column<'a, S>)>, RecordBatchToColumnsError> {
24 batch
25 .schema()
26 .fields()
27 .into_iter()
28 .zip(batch.columns())
29 .map(|(field, array)| {
30 let identifier: Ident = field.name().as_str().into();
31 let column: Column<S> = array.to_column(alloc, &(0..array.len()), None)?;
32 Ok((identifier, column))
33 })
34 .collect()
35}
36
37impl<C: Commitment> TableCommitment<C> {
38 #[expect(clippy::missing_panics_doc)]
44 pub fn try_append_record_batch(
45 &mut self,
46 batch: &RecordBatch,
47 setup: &C::PublicSetup<'_>,
48 ) -> Result<(), AppendRecordBatchTableCommitmentError> {
49 match self.try_append_rows(
50 batch_to_columns::<C::Scalar>(batch, &Bump::new())?
51 .iter()
52 .map(|(a, b)| (a, b)),
53 setup,
54 ) {
55 Ok(()) => Ok(()),
56 Err(AppendTableCommitmentError::MixedLengthColumns { .. }) => {
57 panic!("RecordBatches cannot have columns of mixed length")
58 }
59 Err(AppendTableCommitmentError::AppendColumnCommitments {
60 source: AppendColumnCommitmentsError::DuplicateIdents { .. },
61 }) => {
62 panic!("RecordBatches cannot have duplicate identifiers")
63 }
64 Err(AppendTableCommitmentError::AppendColumnCommitments {
65 source: AppendColumnCommitmentsError::Mismatch { source: e },
66 }) => Err(e)?,
67 }
68 }
69 pub fn try_from_record_batch(
71 batch: &RecordBatch,
72 setup: &C::PublicSetup<'_>,
73 ) -> Result<TableCommitment<C>, RecordBatchToColumnsError> {
74 Self::try_from_record_batch_with_offset(batch, 0, setup)
75 }
76
77 #[expect(clippy::missing_panics_doc)]
79 pub fn try_from_record_batch_with_offset(
80 batch: &RecordBatch,
81 offset: usize,
82 setup: &C::PublicSetup<'_>,
83 ) -> Result<TableCommitment<C>, RecordBatchToColumnsError> {
84 match Self::try_from_columns_with_offset(
85 batch_to_columns::<C::Scalar>(batch, &Bump::new())?
86 .iter()
87 .map(|(a, b)| (a, b)),
88 offset,
89 setup,
90 ) {
91 Ok(commitment) => Ok(commitment),
92 Err(TableCommitmentFromColumnsError::MixedLengthColumns { .. }) => {
93 panic!("RecordBatches cannot have columns of mixed length")
94 }
95 Err(TableCommitmentFromColumnsError::DuplicateIdents { .. }) => {
96 panic!("RecordBatches cannot have duplicate identifiers")
97 }
98 }
99 }
100}
101
102#[cfg(all(test, feature = "blitzar"))]
103mod tests {
104 use super::*;
105 use crate::base::{
106 commitment::naive_commitment::NaiveCommitment, scalar::test_scalar::TestScalar,
107 };
108 use arrow::{
109 array::{Int64Array, StringArray},
110 datatypes::{DataType, Field, Schema},
111 record_batch::RecordBatch,
112 };
113 use std::sync::Arc;
114
115 #[test]
116 fn we_can_create_and_append_table_commitments_with_record_batches() {
117 let schema = Arc::new(Schema::new(vec![
118 Field::new("a", DataType::Int64, false),
119 Field::new("b", DataType::Utf8, false),
120 ]));
121
122 let batch = RecordBatch::try_new(
123 schema,
124 vec![
125 Arc::new(Int64Array::from(vec![1, 2, 3])),
126 Arc::new(StringArray::from(vec!["1", "2", "3"])),
127 ],
128 )
129 .unwrap();
130
131 let b_scals = ["1".into(), "2".into(), "3".into()];
132
133 let columns = [
134 (&"a".into(), &Column::<TestScalar>::BigInt(&[1, 2, 3])),
135 (
136 &"b".into(),
137 &Column::<TestScalar>::VarChar((&["1", "2", "3"], &b_scals)),
138 ),
139 ];
140
141 let mut expected_commitment =
142 TableCommitment::<NaiveCommitment>::try_from_columns_with_offset(columns, 0, &())
143 .unwrap();
144
145 let mut commitment =
146 TableCommitment::<NaiveCommitment>::try_from_record_batch(&batch, &()).unwrap();
147
148 assert_eq!(commitment, expected_commitment);
149
150 let schema2 = Arc::new(Schema::new(vec![
151 Field::new("a", DataType::Int64, false),
152 Field::new("b", DataType::Utf8, false),
153 ]));
154
155 let batch2 = RecordBatch::try_new(
156 schema2,
157 vec![
158 Arc::new(Int64Array::from(vec![4, 5, 6])),
159 Arc::new(StringArray::from(vec!["4", "5", "6"])),
160 ],
161 )
162 .unwrap();
163
164 let b_scals2 = ["4".into(), "5".into(), "6".into()];
165
166 let columns2 = [
167 (&"a".into(), &Column::<TestScalar>::BigInt(&[4, 5, 6])),
168 (
169 &"b".into(),
170 &Column::<TestScalar>::VarChar((&["4", "5", "6"], &b_scals2)),
171 ),
172 ];
173
174 expected_commitment.try_append_rows(columns2, &()).unwrap();
175 commitment.try_append_record_batch(&batch2, &()).unwrap();
176
177 assert_eq!(commitment, expected_commitment);
178 }
179}