1use std::collections::HashMap;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use greptime_proto::v1::auth_header::AuthScheme;
26use greptime_proto::v1::SemanticType;
27use tokio::select;
28use tokio::time::timeout;
29
30use arrow_array::builder::{
31 BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder,
32 Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, StringBuilder,
33 Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
34 Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
35 TimestampNanosecondBuilder, TimestampSecondBuilder, UInt16Builder, UInt32Builder,
36 UInt64Builder, UInt8Builder,
37};
38use arrow_array::{Array, RecordBatch};
39use arrow_flight::{FlightData, FlightDescriptor};
40use arrow_schema::{DataType, Field, Schema, TimeUnit};
41use futures::channel::mpsc;
42use futures::{FutureExt, SinkExt, Stream, StreamExt};
43
44use crate::api::v1::ColumnDataType;
45use crate::client::Client;
46use crate::database::Database;
47use crate::flight::do_put::{DoPutMetadata, DoPutResponse};
48use crate::flight::{FlightEncoder, FlightMessage};
49use crate::table::{Column, DataTypeExtension, Row, TableSchema, Value};
50use crate::{error, Result};
51use snafu::{ensure, OptionExt, ResultExt};
52
53const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1024;
60
61fn get_env_or_default<T>(env_var: &str, default: T) -> T
63where
64 T: std::str::FromStr,
65{
66 std::env::var(env_var)
67 .ok()
68 .and_then(|s| s.parse().ok())
69 .unwrap_or(default)
70}
71
72pub type RequestId = i64;
73
74#[derive(Clone, Debug)]
76pub struct BulkInserter {
77 database: Database,
78}
79
80impl BulkInserter {
81 #[must_use]
83 pub fn new(client: Client, database_name: &str) -> Self {
84 Self {
85 database: Database::new_with_dbname(database_name, client),
86 }
87 }
88
89 pub fn set_auth(&mut self, auth: AuthScheme) {
90 self.database.set_auth(auth);
91 }
92
93 pub async fn create_bulk_stream_writer(
98 &self,
99 table_schema: &TableSchema,
100 options: Option<BulkWriteOptions>,
101 ) -> Result<BulkStreamWriter> {
102 let options = options.unwrap_or_default();
103 BulkStreamWriter::new(&self.database, table_schema, options).await
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum CompressionType {
110 None,
111 #[default]
112 Lz4,
113 Zstd,
114}
115
116#[derive(Debug, Clone)]
118pub struct BulkWriteOptions {
119 pub compression: CompressionType,
120 pub timeout: Duration,
121 pub parallelism: usize,
122}
123
124impl Default for BulkWriteOptions {
125 fn default() -> Self {
126 Self {
127 compression: CompressionType::default(),
128 timeout: Duration::from_secs(60),
129 parallelism: 4,
130 }
131 }
132}
133
134impl BulkWriteOptions {
135 #[must_use]
137 pub fn with_compression(mut self, compression: CompressionType) -> Self {
138 self.compression = compression;
139 self
140 }
141
142 #[must_use]
144 pub fn with_timeout(mut self, timeout: Duration) -> Self {
145 self.timeout = timeout;
146 self
147 }
148
149 #[must_use]
151 pub fn with_parallelism(mut self, parallelism: usize) -> Self {
152 self.parallelism = parallelism;
153 self
154 }
155}
156
157pub struct BulkStreamWriter {
160 sender: mpsc::Sender<FlightData>,
161 response_stream: Pin<Box<dyn Stream<Item = Result<DoPutResponse>>>>,
162 table_schema: TableSchema,
163 arrow_schema: Arc<Schema>,
165 field_map: HashMap<String, usize>,
167 next_request_id: RequestId,
168 encoder: FlightEncoder,
169 schema_sent: bool,
170 parallelism: usize,
172 timeout: Duration,
173 pending_requests: HashMap<RequestId, Instant>,
175 completed_responses: HashMap<RequestId, (DoPutResponse, Instant)>,
177}
178
179impl BulkStreamWriter {
180 pub async fn new(
182 database: &Database,
183 table_schema: &TableSchema,
184 options: BulkWriteOptions,
185 ) -> Result<Self> {
186 let encoder = FlightEncoder::with_compression(options.compression);
188
189 let fields: Result<Vec<Field>> = table_schema
191 .columns()
192 .iter()
193 .map(|col| {
194 let nullable = col.semantic_type != SemanticType::Timestamp;
195 column_to_arrow_data_type(col)
196 .map(|data_type| Field::new(&col.name, data_type, nullable))
197 })
198 .collect();
199 let arrow_schema = Arc::new(Schema::new(fields?));
200
201 let field_map: HashMap<String, usize> = table_schema
203 .columns()
204 .iter()
205 .enumerate()
206 .map(|(i, col)| (col.name.clone(), i))
207 .collect();
208
209 let channel_buffer_size = get_env_or_default(
211 "GREPTIMEDB_CHANNEL_BUFFER_SIZE",
212 DEFAULT_CHANNEL_BUFFER_SIZE,
213 );
214 let (sender, receiver) = mpsc::channel::<FlightData>(channel_buffer_size);
215
216 let flight_stream = receiver.boxed();
218 let response_stream = database.do_put(flight_stream).await?;
219
220 Ok(Self {
221 sender,
222 response_stream,
223 table_schema: table_schema.clone(),
224 arrow_schema,
225 field_map,
226 next_request_id: 0,
227 encoder,
228 schema_sent: false,
229 parallelism: options.parallelism,
230 timeout: options.timeout,
231 pending_requests: HashMap::new(),
232 completed_responses: HashMap::new(),
233 })
234 }
235
236 pub async fn write_rows(&mut self, rows: Rows) -> Result<DoPutResponse> {
238 let request_id = self.write_rows_async(rows).await?;
240 self.wait_for_response(request_id).await
241 }
242
243 pub async fn write_rows_async(&mut self, rows: Rows) -> Result<RequestId> {
246 ensure!(!rows.is_empty(), error::EmptyRowsSnafu);
248 self.validate_rows_schema(&rows)?;
250
251 let record_batch = RecordBatch::try_from(rows)?; let request_id = self.submit_record_batch(record_batch).await?;
253
254 Ok(request_id)
255 }
256
257 pub async fn wait_for_response(
259 &mut self,
260 target_request_id: RequestId,
261 ) -> Result<DoPutResponse> {
262 if let Some((response, _)) = self.completed_responses.remove(&target_request_id) {
264 return Ok(response);
265 }
266
267 let timeout_duration = self.timeout;
268 let start_time = Instant::now();
269
270 loop {
271 let remaining_timeout = timeout_duration.saturating_sub(start_time.elapsed());
272 if remaining_timeout.is_zero() {
274 return error::RequestTimeoutSnafu {
275 request_ids: vec![target_request_id],
276 timeout: self.timeout,
277 }
278 .fail();
279 }
280
281 let next_result = timeout(remaining_timeout, self.response_stream.next()).await;
282 let Ok(next_option) = next_result else {
283 return error::RequestTimeoutSnafu {
284 request_ids: vec![target_request_id],
285 timeout: self.timeout,
286 }
287 .fail();
288 };
289 if let Some(response) = next_option {
290 let response = response?;
291 let request_id = response.request_id();
292 self.pending_requests.remove(&request_id);
293 if request_id == target_request_id {
294 return Ok(response);
295 }
296 self.completed_responses
297 .insert(request_id, (response, Instant::now()));
298 } else {
299 return error::StreamEndedSnafu.fail();
300 }
301 }
302 }
303
304 pub async fn wait_for_all_pending(&mut self) -> Result<Vec<DoPutResponse>> {
306 let mut responses =
307 Vec::with_capacity(self.pending_requests.len() + self.completed_responses.len());
308
309 let completed_responses = std::mem::take(&mut self.completed_responses);
311 for (request_id, (response, _)) in completed_responses {
312 self.pending_requests.remove(&request_id);
314 responses.push(response);
315 }
316
317 let timeout_duration = self.timeout;
318 let start_time = Instant::now();
319
320 while !self.pending_requests.is_empty() {
322 let remaining_timeout = timeout_duration.saturating_sub(start_time.elapsed());
323 let timeout_sleep = tokio::time::sleep(remaining_timeout);
324
325 select! {
326 () = timeout_sleep => {
327 let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
328 return error::RequestTimeoutSnafu {
329 request_ids: pending_ids,
330 timeout: self.timeout,
331 }
332 .fail();
333 }
334 next_option = self.response_stream.next() => {
335 match next_option {
336 Some(response) => {
337 self.handle_single_response(response?, &mut responses);
339
340 loop {
342 match self.response_stream.next().now_or_never() {
343 Some(Some(response)) => self.handle_single_response(response?, &mut responses),
344 Some(None) => return self.handle_stream_end(responses),
345 None => break, }
347 }
348 }
349 None => return self.handle_stream_end(responses),
350 }
351 }
352 }
353 }
354
355 Ok(responses)
356 }
357
358 pub fn flush_completed_responses(&mut self) -> Vec<DoPutResponse> {
367 let responses = std::mem::take(&mut self.completed_responses);
368 responses
369 .into_values()
370 .map(|(response, _)| response)
371 .collect()
372 }
373
374 pub async fn finish(self) -> Result<()> {
376 let _responses = self.finish_with_responses().await?;
377 Ok(())
379 }
380
381 pub async fn finish_with_responses(mut self) -> Result<Vec<DoPutResponse>> {
383 let mut all_responses = Vec::new();
384
385 let completed_responses = std::mem::take(&mut self.completed_responses);
387 for (request_id, (response, _)) in completed_responses {
388 self.pending_requests.remove(&request_id);
391 all_responses.push(response);
392 }
393
394 if !self.pending_requests.is_empty() {
396 let remaining_responses = self.wait_for_all_pending().await?;
397 all_responses.extend(remaining_responses);
398 }
399
400 let _ = self.sender.close().await;
404
405 Ok(all_responses)
406 }
407
408 pub fn alloc_rows_buffer(&self, capacity: usize, row_buffer_size: usize) -> Result<Rows> {
411 Rows::with_arrow_schema(
412 self.column_schemas(),
413 self.arrow_schema.clone(),
414 capacity,
415 row_buffer_size,
416 )
417 }
418
419 #[must_use]
423 pub fn new_row(&self) -> RowBuilder<'_> {
424 RowBuilder::new(self.column_schemas(), &self.field_map)
425 }
426
427 #[must_use]
429 pub fn table_name(&self) -> &str {
430 self.table_schema.name()
431 }
432
433 #[must_use]
435 pub fn column_schemas(&self) -> &[Column] {
436 self.table_schema.columns()
437 }
438
439 fn handle_single_response(
441 &mut self,
442 response: DoPutResponse,
443 responses: &mut Vec<DoPutResponse>,
444 ) {
445 let request_id = response.request_id();
446 self.pending_requests.remove(&request_id);
447 responses.push(response);
448 }
449
450 fn receive_response_and_remove_pending(&mut self, response: DoPutResponse) {
452 let request_id = response.request_id();
453 self.pending_requests.remove(&request_id);
454 self.completed_responses
455 .insert(request_id, (response, Instant::now()));
456
457 self.cleanup_expired_responses_if_needed();
459 }
460
461 fn cleanup_expired_responses_if_needed(&mut self) {
463 const RESPONSE_CACHE_CLEANUP_THRESHOLD: usize = 1024;
464
465 if self.completed_responses.len() > RESPONSE_CACHE_CLEANUP_THRESHOLD {
466 let now = Instant::now();
467 self.completed_responses
468 .retain(|_, (_, cached_time)| now.duration_since(*cached_time) <= self.timeout);
469 }
470 }
471
472 fn handle_stream_end(&self, responses: Vec<DoPutResponse>) -> Result<Vec<DoPutResponse>> {
474 ensure!(self.pending_requests.is_empty(), error::StreamEndedSnafu);
475 Ok(responses)
476 }
477
478 fn handle_stream_end_during_processing(&self) -> Result<()> {
481 if !self.pending_requests.is_empty() {
482 let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
483 return error::StreamEndedWithPendingRequestsSnafu {
484 request_ids: pending_ids,
485 }
486 .fail();
487 }
488 Ok(())
489 }
490
491 async fn submit_record_batch(&mut self, batch: RecordBatch) -> Result<RequestId> {
494 if !self.schema_sent {
496 let mut schema_data = self.encoder.encode(FlightMessage::Schema(batch.schema()));
497 let metadata = DoPutMetadata::new(0);
498 schema_data.app_metadata = serde_json::to_vec(&metadata)
499 .context(error::SerializeMetadataSnafu)?
500 .into();
501
502 schema_data.flight_descriptor = Some(FlightDescriptor {
503 r#type: arrow_flight::flight_descriptor::DescriptorType::Path as i32,
504 path: vec![self.table_name().to_string()],
505 ..Default::default()
506 });
507
508 self.sender
509 .send(schema_data)
510 .await
511 .context(error::SendDataSnafu)?;
512
513 let response_result = timeout(self.timeout, self.response_stream.next()).await;
514 match response_result {
515 Ok(Some(response)) => {
516 let _schema_response = response?;
517 }
518 Ok(None) => return error::StreamEndedSnafu.fail(),
519 Err(_) => {
520 return error::RequestTimeoutSnafu {
521 request_ids: vec![],
522 timeout: self.timeout,
523 }
524 .fail();
525 }
526 }
527
528 self.schema_sent = true;
529 }
530
531 while self.pending_requests.len() >= self.parallelism {
533 self.process_pending_responses().await?;
534 }
535
536 let request_id = self.next_request_id();
538 let message = FlightMessage::RecordBatch(batch);
539 let mut data = self.encoder.encode(message);
540 let metadata = DoPutMetadata::new(request_id);
541 data.app_metadata = serde_json::to_vec(&metadata)
542 .context(error::SerializeMetadataSnafu)?
543 .into();
544
545 self.sender.send(data).await.context(error::SendDataSnafu)?;
546
547 self.pending_requests.insert(request_id, Instant::now());
549
550 Ok(request_id)
551 }
552
553 fn check_timeouts(&self) -> Result<()> {
555 let timeout_duration = self.timeout;
556 let now = Instant::now();
557
558 let timed_out_requests: Vec<RequestId> = self
559 .pending_requests
560 .iter()
561 .filter_map(|(&request_id, &sent_time)| {
562 if now.duration_since(sent_time) > timeout_duration {
563 Some(request_id)
564 } else {
565 None
566 }
567 })
568 .collect();
569
570 if !timed_out_requests.is_empty() {
571 return error::RequestTimeoutSnafu {
572 request_ids: timed_out_requests,
573 timeout: self.timeout,
574 }
575 .fail();
576 }
577
578 Ok(())
579 }
580
581 async fn process_pending_responses(&mut self) -> Result<()> {
583 self.check_timeouts()?;
585
586 let response_result = timeout(self.timeout, self.response_stream.next()).await;
589 match response_result {
590 Ok(Some(response)) => self.receive_response_and_remove_pending(response?),
591 Ok(None) => return self.handle_stream_end_during_processing(),
592 Err(_) => {
593 let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
594 return error::RequestTimeoutSnafu {
595 request_ids: pending_ids,
596 timeout: self.timeout,
597 }
598 .fail();
599 }
600 }
601
602 loop {
604 match self.response_stream.next().now_or_never() {
605 Some(Some(response)) => {
606 self.receive_response_and_remove_pending(response?);
607 }
608 Some(None) => return self.handle_stream_end_during_processing(),
609 None => break, }
611 }
612
613 Ok(())
614 }
615
616 fn validate_rows_schema(&self, rows: &Rows) -> Result<()> {
618 if Arc::ptr_eq(&self.arrow_schema, &rows.schema) {
620 return Ok(());
621 }
622
623 let expected_fields = self.arrow_schema.fields();
625 let actual_fields = rows.schema.fields();
626
627 if expected_fields.len() != actual_fields.len() {
628 return Self::schema_mismatch_error(expected_fields, actual_fields);
629 }
630
631 for (expected, actual) in expected_fields.iter().zip(actual_fields.iter()) {
633 if expected != actual {
634 return Self::schema_mismatch_error(expected_fields, actual_fields);
635 }
636 }
637
638 Ok(())
639 }
640
641 #[cold]
643 fn schema_mismatch_error(
644 expected_fields: &arrow_schema::Fields,
645 actual_fields: &arrow_schema::Fields,
646 ) -> Result<()> {
647 error::SchemaMismatchSnafu {
648 expected: format!("{expected_fields:?}"),
649 actual: format!("{actual_fields:?}"),
650 }
651 .fail()
652 }
653
654 fn next_request_id(&mut self) -> RequestId {
655 self.next_request_id = self.next_request_id.wrapping_add(1);
657 if self.next_request_id == 0 {
658 self.next_request_id = 1;
659 }
660 self.next_request_id
661 }
662}
663
664fn column_to_arrow_data_type(column: &Column) -> Result<DataType> {
667 let data_type = column.data_type;
668 Ok(match data_type {
669 ColumnDataType::Int8 => DataType::Int8,
671 ColumnDataType::Int16 => DataType::Int16,
672 ColumnDataType::Int32 => DataType::Int32,
673 ColumnDataType::Int64 => DataType::Int64,
674 ColumnDataType::Uint8 => DataType::UInt8,
675 ColumnDataType::Uint16 => DataType::UInt16,
676 ColumnDataType::Uint32 => DataType::UInt32,
677 ColumnDataType::Uint64 => DataType::UInt64,
678
679 ColumnDataType::Float32 => DataType::Float32,
681 ColumnDataType::Float64 => DataType::Float64,
682
683 ColumnDataType::Boolean => DataType::Boolean,
685
686 ColumnDataType::String => DataType::Utf8,
688 ColumnDataType::Binary => DataType::Binary,
689
690 ColumnDataType::Date => DataType::Date32,
692
693 ColumnDataType::TimestampSecond => DataType::Timestamp(TimeUnit::Second, None),
695 ColumnDataType::TimestampMillisecond => DataType::Timestamp(TimeUnit::Millisecond, None),
696 ColumnDataType::Datetime | ColumnDataType::TimestampMicrosecond => {
698 DataType::Timestamp(TimeUnit::Microsecond, None)
699 }
700 ColumnDataType::TimestampNanosecond => DataType::Timestamp(TimeUnit::Nanosecond, None),
701
702 ColumnDataType::TimeSecond => DataType::Time32(arrow_schema::TimeUnit::Second),
704 ColumnDataType::TimeMillisecond => DataType::Time32(arrow_schema::TimeUnit::Millisecond),
705 ColumnDataType::TimeMicrosecond => DataType::Time64(arrow_schema::TimeUnit::Microsecond),
706 ColumnDataType::TimeNanosecond => DataType::Time64(arrow_schema::TimeUnit::Nanosecond),
707
708 ColumnDataType::Decimal128 => {
710 match &column.data_type_extension {
711 Some(DataTypeExtension::Decimal128 { precision, scale }) => {
712 DataType::Decimal128(*precision, *scale)
713 }
714 _ => DataType::Decimal128(38, 10), }
716 }
717
718 ColumnDataType::Json => DataType::Binary,
720
721 _ => {
723 return error::UnsupportedDataTypeSnafu {
724 data_type: format!("{data_type:?}. Not supported"),
725 }
726 .fail();
727 }
728 })
729}
730
731enum RowsData {
735 Builder(RowBatchBuilder),
736 RecordBatch(RecordBatch),
737}
738
739pub struct Rows {
742 data: RowsData,
743 schema: Arc<Schema>,
744 column_count: usize,
745 row_buffer: Vec<Row>,
747 buffer_size: usize,
748}
749
750impl Rows {
751 pub fn new(column_schemas: &[Column], capacity: usize, row_buffer_size: usize) -> Result<Self> {
753 let builder = RowBatchBuilder::new(column_schemas, capacity)?;
754 let schema = builder.schema.clone();
755
756 Ok(Self {
757 data: RowsData::Builder(builder),
758 schema,
759 column_count: column_schemas.len(),
760 row_buffer: Vec::with_capacity(row_buffer_size),
761 buffer_size: row_buffer_size,
762 })
763 }
764
765 fn with_arrow_schema(
767 column_schemas: &[Column],
768 arrow_schema: Arc<Schema>,
769 capacity: usize,
770 row_buffer_size: usize,
771 ) -> Result<Self> {
772 let builder =
773 RowBatchBuilder::with_arrow_schema(column_schemas, arrow_schema.clone(), capacity)?;
774
775 Ok(Self {
776 data: RowsData::Builder(builder),
777 schema: arrow_schema,
778 column_count: column_schemas.len(),
779 row_buffer: Vec::with_capacity(row_buffer_size),
780 buffer_size: row_buffer_size,
781 })
782 }
783
784 pub fn from_record_batch(batch: RecordBatch) -> Result<Self> {
795 let schema = batch.schema();
796 let column_count = batch.num_columns();
797 Ok(Self {
798 data: RowsData::RecordBatch(batch),
799 schema,
800 column_count,
801 row_buffer: Vec::new(), buffer_size: 0, })
804 }
805
806 pub fn add_row(&mut self, row: Row) -> Result<()> {
808 ensure!(
810 matches!(self.data, RowsData::Builder(_)),
811 error::AddRowToBuiltBatchSnafu,
812 );
813
814 ensure!(
816 row.len() == self.column_count,
817 error::InvalidColumnCountSnafu {
818 expected: self.column_count,
819 actual: row.len(),
820 }
821 );
822
823 self.row_buffer.push(row);
824
825 if self.row_buffer.len() >= self.buffer_size {
827 self.flush_buffer()?;
828 }
829
830 Ok(())
831 }
832
833 fn flush_buffer(&mut self) -> Result<()> {
835 if self.row_buffer.is_empty() {
836 return Ok(());
837 }
838
839 if let RowsData::Builder(ref mut builder) = self.data {
840 let rows = std::mem::take(&mut self.row_buffer);
842 builder.add_rows(rows)?;
843 }
844
845 Ok(())
846 }
847
848 #[must_use]
850 pub fn len(&self) -> usize {
851 match &self.data {
852 RowsData::RecordBatch(batch) => batch.num_rows(),
853 RowsData::Builder(builder) => builder.len() + self.row_buffer.len(),
854 }
855 }
856
857 #[must_use]
859 pub fn is_empty(&self) -> bool {
860 self.len() == 0
861 }
862
863 #[must_use]
867 pub fn schema(&self) -> &Schema {
868 &self.schema
869 }
870}
871
872impl TryFrom<Rows> for RecordBatch {
874 type Error = crate::Error;
875
876 fn try_from(mut rows: Rows) -> Result<Self> {
877 rows.flush_buffer()?;
879
880 match rows.data {
881 RowsData::RecordBatch(batch) => {
882 ensure!(rows.row_buffer.is_empty(), error::UnflushedRowsSnafu);
885 Ok(batch)
886 }
887 RowsData::Builder(builder) => {
888 builder.build()
890 }
891 }
892 }
893}
894
895pub struct RowBatchBuilder {
899 builders: Vec<ArrayBuilderEnum>,
900 schema: Arc<Schema>,
901 current_rows: usize,
902}
903
904impl RowBatchBuilder {
905 fn new(column_schemas: &[Column], capacity: usize) -> Result<Self> {
907 let fields: Result<Vec<Field>> = column_schemas
908 .iter()
909 .map(|col| {
910 let nullable = col.semantic_type != SemanticType::Timestamp;
911 column_to_arrow_data_type(col)
912 .map(|data_type| Field::new(&col.name, data_type, nullable))
913 })
914 .collect();
915 let schema = Arc::new(Schema::new(fields?));
916
917 let builders: Result<Vec<ArrayBuilderEnum>> = column_schemas
918 .iter()
919 .enumerate()
920 .map(|(col_idx, col)| create_array_builder(col, capacity, col_idx))
921 .collect();
922
923 Ok(Self {
924 builders: builders?,
925 schema,
926 current_rows: 0,
927 })
928 }
929
930 fn with_arrow_schema(
932 column_schemas: &[Column],
933 schema: Arc<Schema>,
934 capacity: usize,
935 ) -> Result<Self> {
936 let builders: Result<Vec<ArrayBuilderEnum>> = column_schemas
937 .iter()
938 .enumerate()
939 .map(|(col_idx, col)| create_array_builder(col, capacity, col_idx))
940 .collect();
941
942 Ok(Self {
943 builders: builders?,
944 schema,
945 current_rows: 0,
946 })
947 }
948
949 fn add_rows(&mut self, mut rows: Vec<Row>) -> Result<()> {
951 for (col_idx, builder) in self.builders.iter_mut().enumerate() {
952 builder.append_values_from_rows(&mut rows, col_idx)?;
953 }
954 self.current_rows += rows.len();
955 Ok(())
956 }
957
958 fn build(mut self) -> Result<RecordBatch> {
960 let arrays: Result<Vec<Arc<dyn Array>>> = self
961 .builders
962 .iter_mut()
963 .map(ArrayBuilderEnum::finish)
964 .collect();
965
966 RecordBatch::try_new(self.schema, arrays?).context(error::CreateRecordBatchSnafu)
967 }
968
969 fn len(&self) -> usize {
971 self.current_rows
972 }
973}
974
975trait ArrayBuilder {
977 fn append_values_from_rows(&mut self, rows: &mut [Row], col_idx: usize) -> Result<()>;
978}
979
980enum ArrayBuilderEnum {
981 Boolean(BooleanBuilder),
982 Int8(Int8Builder),
983 Int16(Int16Builder),
984 Int32(Int32Builder),
985 Int64(Int64Builder),
986 UInt8(UInt8Builder),
987 UInt16(UInt16Builder),
988 UInt32(UInt32Builder),
989 UInt64(UInt64Builder),
990 Float32(Float32Builder),
991 Float64(Float64Builder),
992 String(StringBuilder),
993 Binary(BinaryBuilder),
994 Decimal128(Decimal128Builder),
995 Date(Date32Builder),
996 TimestampSecond(TimestampSecondBuilder),
997 TimestampMillisecond(TimestampMillisecondBuilder),
998 TimestampMicrosecond(TimestampMicrosecondBuilder),
999 TimestampNanosecond(TimestampNanosecondBuilder),
1000 TimeSecond(Time32SecondBuilder),
1001 TimeMillisecond(Time32MillisecondBuilder),
1002 TimeMicrosecond(Time64MicrosecondBuilder),
1003 TimeNanosecond(Time64NanosecondBuilder),
1004}
1005
1006impl ArrayBuilderEnum {
1007 fn append_values_from_rows(&mut self, rows: &mut [Row], col_idx: usize) -> Result<()> {
1008 match self {
1009 ArrayBuilderEnum::Boolean(builder) => builder.append_values_from_rows(rows, col_idx),
1010 ArrayBuilderEnum::Int8(builder) => builder.append_values_from_rows(rows, col_idx),
1011 ArrayBuilderEnum::Int16(builder) => builder.append_values_from_rows(rows, col_idx),
1012 ArrayBuilderEnum::Int32(builder) => builder.append_values_from_rows(rows, col_idx),
1013 ArrayBuilderEnum::Int64(builder) => builder.append_values_from_rows(rows, col_idx),
1014 ArrayBuilderEnum::UInt8(builder) => builder.append_values_from_rows(rows, col_idx),
1015 ArrayBuilderEnum::UInt16(builder) => builder.append_values_from_rows(rows, col_idx),
1016 ArrayBuilderEnum::UInt32(builder) => builder.append_values_from_rows(rows, col_idx),
1017 ArrayBuilderEnum::UInt64(builder) => builder.append_values_from_rows(rows, col_idx),
1018 ArrayBuilderEnum::Float32(builder) => builder.append_values_from_rows(rows, col_idx),
1019 ArrayBuilderEnum::Float64(builder) => builder.append_values_from_rows(rows, col_idx),
1020 ArrayBuilderEnum::String(builder) => builder.append_values_from_rows(rows, col_idx),
1021 ArrayBuilderEnum::Binary(builder) => builder.append_values_from_rows(rows, col_idx),
1022 ArrayBuilderEnum::Decimal128(builder) => builder.append_values_from_rows(rows, col_idx),
1023 ArrayBuilderEnum::Date(builder) => builder.append_values_from_rows(rows, col_idx),
1024 ArrayBuilderEnum::TimestampSecond(builder) => {
1025 builder.append_values_from_rows(rows, col_idx)
1026 }
1027 ArrayBuilderEnum::TimestampMillisecond(builder) => {
1028 builder.append_values_from_rows(rows, col_idx)
1029 }
1030 ArrayBuilderEnum::TimestampMicrosecond(builder) => {
1031 builder.append_values_from_rows(rows, col_idx)
1032 }
1033 ArrayBuilderEnum::TimestampNanosecond(builder) => {
1034 builder.append_values_from_rows(rows, col_idx)
1035 }
1036 ArrayBuilderEnum::TimeSecond(builder) => builder.append_values_from_rows(rows, col_idx),
1037 ArrayBuilderEnum::TimeMillisecond(builder) => {
1038 builder.append_values_from_rows(rows, col_idx)
1039 }
1040 ArrayBuilderEnum::TimeMicrosecond(builder) => {
1041 builder.append_values_from_rows(rows, col_idx)
1042 }
1043 ArrayBuilderEnum::TimeNanosecond(builder) => {
1044 builder.append_values_from_rows(rows, col_idx)
1045 }
1046 }
1047 }
1048
1049 fn finish(&mut self) -> Result<Arc<dyn Array>> {
1050 Ok(match self {
1051 ArrayBuilderEnum::Boolean(builder) => Arc::new(builder.finish()),
1052 ArrayBuilderEnum::Int8(builder) => Arc::new(builder.finish()),
1053 ArrayBuilderEnum::Int16(builder) => Arc::new(builder.finish()),
1054 ArrayBuilderEnum::Int32(builder) => Arc::new(builder.finish()),
1055 ArrayBuilderEnum::Int64(builder) => Arc::new(builder.finish()),
1056 ArrayBuilderEnum::UInt8(builder) => Arc::new(builder.finish()),
1057 ArrayBuilderEnum::UInt16(builder) => Arc::new(builder.finish()),
1058 ArrayBuilderEnum::UInt32(builder) => Arc::new(builder.finish()),
1059 ArrayBuilderEnum::UInt64(builder) => Arc::new(builder.finish()),
1060 ArrayBuilderEnum::Float32(builder) => Arc::new(builder.finish()),
1061 ArrayBuilderEnum::Float64(builder) => Arc::new(builder.finish()),
1062 ArrayBuilderEnum::String(builder) => Arc::new(builder.finish()),
1063 ArrayBuilderEnum::Binary(builder) => Arc::new(builder.finish()),
1064 ArrayBuilderEnum::Decimal128(builder) => Arc::new(builder.finish()),
1065 ArrayBuilderEnum::Date(builder) => Arc::new(builder.finish()),
1066 ArrayBuilderEnum::TimestampSecond(builder) => Arc::new(builder.finish()),
1067 ArrayBuilderEnum::TimestampMillisecond(builder) => Arc::new(builder.finish()),
1068 ArrayBuilderEnum::TimestampMicrosecond(builder) => Arc::new(builder.finish()),
1069 ArrayBuilderEnum::TimestampNanosecond(builder) => Arc::new(builder.finish()),
1070 ArrayBuilderEnum::TimeSecond(builder) => Arc::new(builder.finish()),
1071 ArrayBuilderEnum::TimeMillisecond(builder) => Arc::new(builder.finish()),
1072 ArrayBuilderEnum::TimeMicrosecond(builder) => Arc::new(builder.finish()),
1073 ArrayBuilderEnum::TimeNanosecond(builder) => Arc::new(builder.finish()),
1074 })
1075 }
1076}
1077
1078fn create_array_builder(
1081 column: &Column,
1082 capacity: usize,
1083 _column_index: usize,
1084) -> Result<ArrayBuilderEnum> {
1085 let data_type = column.data_type;
1086 Ok(match data_type {
1087 ColumnDataType::Boolean => {
1088 ArrayBuilderEnum::Boolean(BooleanBuilder::with_capacity(capacity))
1089 }
1090 ColumnDataType::Int8 => ArrayBuilderEnum::Int8(Int8Builder::with_capacity(capacity)),
1091 ColumnDataType::Int16 => ArrayBuilderEnum::Int16(Int16Builder::with_capacity(capacity)),
1092 ColumnDataType::Int32 => ArrayBuilderEnum::Int32(Int32Builder::with_capacity(capacity)),
1093 ColumnDataType::Int64 => ArrayBuilderEnum::Int64(Int64Builder::with_capacity(capacity)),
1094 ColumnDataType::Uint8 => ArrayBuilderEnum::UInt8(UInt8Builder::with_capacity(capacity)),
1095 ColumnDataType::Uint16 => ArrayBuilderEnum::UInt16(UInt16Builder::with_capacity(capacity)),
1096 ColumnDataType::Uint32 => ArrayBuilderEnum::UInt32(UInt32Builder::with_capacity(capacity)),
1097 ColumnDataType::Uint64 => ArrayBuilderEnum::UInt64(UInt64Builder::with_capacity(capacity)),
1098 ColumnDataType::Float32 => {
1099 ArrayBuilderEnum::Float32(Float32Builder::with_capacity(capacity))
1100 }
1101 ColumnDataType::Float64 => {
1102 ArrayBuilderEnum::Float64(Float64Builder::with_capacity(capacity))
1103 }
1104 ColumnDataType::String => {
1105 ArrayBuilderEnum::String(StringBuilder::with_capacity(capacity, capacity * 64))
1106 }
1107 ColumnDataType::Date => ArrayBuilderEnum::Date(Date32Builder::with_capacity(capacity)),
1108 ColumnDataType::TimestampSecond => {
1109 ArrayBuilderEnum::TimestampSecond(TimestampSecondBuilder::with_capacity(capacity))
1110 }
1111 ColumnDataType::TimestampMillisecond => ArrayBuilderEnum::TimestampMillisecond(
1112 TimestampMillisecondBuilder::with_capacity(capacity),
1113 ),
1114 ColumnDataType::Datetime | ColumnDataType::TimestampMicrosecond => {
1115 ArrayBuilderEnum::TimestampMicrosecond(TimestampMicrosecondBuilder::with_capacity(
1116 capacity,
1117 ))
1118 }
1119 ColumnDataType::TimestampNanosecond => ArrayBuilderEnum::TimestampNanosecond(
1120 TimestampNanosecondBuilder::with_capacity(capacity),
1121 ),
1122 ColumnDataType::TimeSecond => {
1123 ArrayBuilderEnum::TimeSecond(Time32SecondBuilder::with_capacity(capacity))
1124 }
1125 ColumnDataType::TimeMillisecond => {
1126 ArrayBuilderEnum::TimeMillisecond(Time32MillisecondBuilder::with_capacity(capacity))
1127 }
1128 ColumnDataType::TimeMicrosecond => {
1129 ArrayBuilderEnum::TimeMicrosecond(Time64MicrosecondBuilder::with_capacity(capacity))
1130 }
1131 ColumnDataType::TimeNanosecond => {
1132 ArrayBuilderEnum::TimeNanosecond(Time64NanosecondBuilder::with_capacity(capacity))
1133 }
1134 ColumnDataType::Decimal128 => {
1135 let (precision, scale) = match &column.data_type_extension {
1137 Some(DataTypeExtension::Decimal128 { precision, scale }) => (*precision, *scale),
1138 _ => (38, 10), };
1140
1141 ArrayBuilderEnum::Decimal128(
1142 Decimal128Builder::with_capacity(capacity)
1143 .with_data_type(arrow_schema::DataType::Decimal128(precision, scale)),
1144 )
1145 }
1146 ColumnDataType::Binary | ColumnDataType::Json => {
1147 ArrayBuilderEnum::Binary(BinaryBuilder::with_capacity(capacity, capacity * 64))
1148 }
1149 _ => {
1150 return error::UnsupportedDataTypeSnafu {
1151 data_type: format!("{data_type:?}. Not supported in RowBatchBuilder"),
1152 }
1153 .fail();
1154 }
1155 })
1156}
1157
1158macro_rules! impl_arrow_builder {
1160 ($builder_type:ty, $getter:ident, $value_type:ty) => {
1161 impl ArrayBuilder for $builder_type {
1162 fn append_values_from_rows(&mut self, rows: &mut [Row], col_idx: usize) -> Result<()> {
1163 for row in rows {
1164 self.append_option(unsafe { row.$getter(col_idx) });
1166 }
1167 Ok(())
1168 }
1169 }
1170 };
1171}
1172
1173impl_arrow_builder!(BooleanBuilder, get_bool_unchecked, bool);
1175impl_arrow_builder!(Int8Builder, get_i8_unchecked, i8);
1176impl_arrow_builder!(Int16Builder, get_i16_unchecked, i16);
1177impl_arrow_builder!(Int32Builder, get_i32_unchecked, i32);
1178impl_arrow_builder!(Int64Builder, get_i64_unchecked, i64);
1179impl_arrow_builder!(UInt8Builder, get_u8_unchecked, u8);
1180impl_arrow_builder!(UInt16Builder, get_u16_unchecked, u16);
1181impl_arrow_builder!(UInt32Builder, get_u32_unchecked, u32);
1182impl_arrow_builder!(UInt64Builder, get_u64_unchecked, u64);
1183impl_arrow_builder!(Float32Builder, get_f32_unchecked, f32);
1184impl_arrow_builder!(Float64Builder, get_f64_unchecked, f64);
1185
1186impl_arrow_builder!(TimestampSecondBuilder, get_timestamp_unchecked, i64);
1188impl_arrow_builder!(TimestampMillisecondBuilder, get_timestamp_unchecked, i64);
1189impl_arrow_builder!(TimestampMicrosecondBuilder, get_timestamp_unchecked, i64);
1190impl_arrow_builder!(TimestampNanosecondBuilder, get_timestamp_unchecked, i64);
1191
1192impl_arrow_builder!(Time32SecondBuilder, get_time32_unchecked, i32);
1194impl_arrow_builder!(Time32MillisecondBuilder, get_time32_unchecked, i32);
1195impl_arrow_builder!(Time64MicrosecondBuilder, get_time64_unchecked, i64);
1196impl_arrow_builder!(Time64NanosecondBuilder, get_time64_unchecked, i64);
1197
1198impl_arrow_builder!(Date32Builder, get_date_unchecked, i32);
1200
1201impl_arrow_builder!(Decimal128Builder, get_decimal128_unchecked, i128);
1203
1204impl_arrow_builder!(StringBuilder, take_string_unchecked, String);
1206impl_arrow_builder!(BinaryBuilder, take_binary_unchecked, Vec<u8>);
1207
1208pub struct RowBuilder<'a> {
1212 schema: &'a [Column],
1213 field_map: &'a HashMap<String, usize>, values: Vec<Option<Value>>,
1215}
1216
1217impl<'a> RowBuilder<'a> {
1218 fn new(schema: &'a [Column], field_map: &'a HashMap<String, usize>) -> Self {
1219 Self {
1220 schema,
1221 field_map,
1222 values: vec![None; schema.len()],
1223 }
1224 }
1225
1226 pub fn set(mut self, field_name: &str, value: Value) -> Result<Self> {
1229 let field_index = self
1230 .field_map
1231 .get(field_name)
1232 .context(error::MissingFieldSnafu { field: field_name })?;
1233
1234 self.values[*field_index] = Some(value);
1235 Ok(self)
1236 }
1237
1238 pub fn set_by_index(mut self, index: usize, value: Value) -> Result<Self> {
1244 ensure!(
1245 index < self.values.len(),
1246 error::InvalidColumnIndexSnafu {
1247 index,
1248 total: self.values.len(),
1249 }
1250 );
1251
1252 self.values[index] = Some(value);
1253 Ok(self)
1254 }
1255
1256 #[must_use]
1258 pub fn column_count(&self) -> usize {
1259 self.schema.len()
1260 }
1261
1262 pub fn build(self) -> Result<Row> {
1264 let mut row_values = Vec::with_capacity(self.values.len());
1265
1266 for (i, opt_value) in self.values.into_iter().enumerate() {
1267 match opt_value {
1268 Some(value) => row_values.push(value),
1269 None => {
1270 return error::MissingFieldSnafu {
1271 field: self.schema[i].name.clone(),
1272 }
1273 .fail();
1274 }
1275 }
1276 }
1277
1278 Ok(Row::new().add_values(row_values))
1279 }
1280}
1281
1282pub use crate::api::v1::ColumnDataType as ColumnType;
1284
1285#[cfg(test)]
1286mod tests {
1287 use super::*;
1288 use crate::api::v1::{ColumnDataType, SemanticType};
1289 use crate::table::{Column, Value};
1290
1291 #[test]
1292 fn test_rows_schema_validation() {
1293 let schema1 = vec![
1295 Column {
1296 name: "id".to_string(),
1297 data_type: ColumnDataType::Int64,
1298 semantic_type: SemanticType::Field,
1299 data_type_extension: None,
1300 },
1301 Column {
1302 name: "name".to_string(),
1303 data_type: ColumnDataType::String,
1304 semantic_type: SemanticType::Field,
1305 data_type_extension: None,
1306 },
1307 Column {
1308 name: "timestamp".to_string(),
1309 data_type: ColumnDataType::TimestampMillisecond,
1310 semantic_type: SemanticType::Timestamp,
1311 data_type_extension: None,
1312 },
1313 ];
1314
1315 let schema2 = vec![
1317 Column {
1318 name: "id".to_string(),
1319 data_type: ColumnDataType::Int64,
1320 semantic_type: SemanticType::Field,
1321 data_type_extension: None,
1322 },
1323 Column {
1324 name: "value".to_string(), data_type: ColumnDataType::Float64, semantic_type: SemanticType::Field,
1327 data_type_extension: None,
1328 },
1329 ];
1330
1331 let rows1 = Rows::new(&schema1, 10, 5).expect("Failed to create rows1");
1333
1334 let rows2 = Rows::new(&schema2, 10, 5).expect("Failed to create rows2");
1336
1337 assert_eq!(rows1.schema().fields().len(), 3);
1340 assert_eq!(rows2.schema().fields().len(), 2);
1341
1342 }
1345
1346 #[test]
1347 fn test_rows_creation_and_capacity() {
1348 let schema = vec![
1349 Column {
1350 name: "id".to_string(),
1351 data_type: ColumnDataType::Int64,
1352 semantic_type: SemanticType::Field,
1353 data_type_extension: None,
1354 },
1355 Column {
1356 name: "message".to_string(),
1357 data_type: ColumnDataType::String,
1358 semantic_type: SemanticType::Field,
1359 data_type_extension: None,
1360 },
1361 ];
1362
1363 let mut rows = Rows::new(&schema, 5, 5).expect("Failed to create rows");
1364
1365 assert_eq!(rows.len(), 0);
1367 assert!(rows.is_empty());
1368
1369 let row1 = crate::table::Row::new()
1371 .add_values(vec![Value::Int64(1), Value::String("first".to_string())]);
1372
1373 let row2 = crate::table::Row::new()
1374 .add_values(vec![Value::Int64(2), Value::String("second".to_string())]);
1375
1376 rows.add_row(row1).expect("Failed to add row1");
1377 rows.add_row(row2).expect("Failed to add row2");
1378
1379 assert_eq!(rows.len(), 2);
1381 assert!(!rows.is_empty());
1382 }
1383
1384 #[test]
1385 fn test_non_nullable_timestamp_field_with_null_should_error() {
1386 let schema = vec![
1388 Column {
1389 name: "ts".to_string(),
1390 data_type: ColumnDataType::TimestampMillisecond,
1391 semantic_type: SemanticType::Timestamp,
1392 data_type_extension: None,
1393 },
1394 Column {
1395 name: "value".to_string(),
1396 data_type: ColumnDataType::Int64,
1397 semantic_type: SemanticType::Field,
1398 data_type_extension: None,
1399 },
1400 ];
1401
1402 let mut rows = Rows::new(&schema, 5, 5).expect("Failed to create rows");
1403
1404 let row_with_null_timestamp =
1406 crate::table::Row::new().add_values(vec![Value::Null, Value::Int64(42)]);
1407
1408 rows.add_row(row_with_null_timestamp)
1409 .expect("Failed to add row");
1410
1411 let result = RecordBatch::try_from(rows);
1413 assert!(
1414 result.is_err(),
1415 "Should fail when timestamp field contains null value"
1416 );
1417 }
1418
1419 #[test]
1420 fn test_nullable_field_with_null_should_succeed() {
1421 let schema = vec![
1423 Column {
1424 name: "ts".to_string(),
1425 data_type: ColumnDataType::TimestampMillisecond,
1426 semantic_type: SemanticType::Timestamp,
1427 data_type_extension: None,
1428 },
1429 Column {
1430 name: "value".to_string(),
1431 data_type: ColumnDataType::Int64,
1432 semantic_type: SemanticType::Field,
1433 data_type_extension: None,
1434 },
1435 ];
1436
1437 let mut rows = Rows::new(&schema, 5, 5).expect("Failed to create rows");
1438
1439 let row_with_null_value = crate::table::Row::new()
1441 .add_values(vec![Value::TimestampMillisecond(1234567890), Value::Null]);
1442
1443 rows.add_row(row_with_null_value)
1444 .expect("Failed to add row");
1445
1446 let result = RecordBatch::try_from(rows);
1448 assert!(
1449 result.is_ok(),
1450 "Should succeed when nullable field contains null value"
1451 );
1452 }
1453
1454 #[test]
1455 fn test_arrow_schema_nullable_fields() {
1456 use arrow_schema::{DataType, Field};
1457
1458 let columns = [
1460 Column {
1461 name: "ts".to_string(),
1462 data_type: ColumnDataType::TimestampMillisecond,
1463 semantic_type: SemanticType::Timestamp,
1464 data_type_extension: None,
1465 },
1466 Column {
1467 name: "value".to_string(),
1468 data_type: ColumnDataType::Int64,
1469 semantic_type: SemanticType::Field,
1470 data_type_extension: None,
1471 },
1472 Column {
1473 name: "tag".to_string(),
1474 data_type: ColumnDataType::String,
1475 semantic_type: SemanticType::Tag,
1476 data_type_extension: None,
1477 },
1478 ];
1479
1480 let fields: Vec<Field> = columns
1482 .iter()
1483 .map(|col| {
1484 let nullable = col.semantic_type != SemanticType::Timestamp;
1485 let data_type = match col.data_type {
1486 ColumnDataType::TimestampMillisecond => {
1487 DataType::Timestamp(TimeUnit::Millisecond, None)
1488 }
1489 ColumnDataType::Int64 => DataType::Int64,
1490 ColumnDataType::String => DataType::Utf8,
1491 _ => DataType::Utf8, };
1493 Field::new(&col.name, data_type, nullable)
1494 })
1495 .collect();
1496
1497 assert_eq!(fields.len(), 3);
1498
1499 assert!(
1501 !fields[0].is_nullable(),
1502 "Timestamp field should be non-nullable"
1503 );
1504 assert_eq!(fields[0].name(), "ts");
1505
1506 assert!(fields[1].is_nullable(), "Value field should be nullable");
1508 assert_eq!(fields[1].name(), "value");
1509
1510 assert!(fields[2].is_nullable(), "Tag field should be nullable");
1512 assert_eq!(fields[2].name(), "tag");
1513 }
1514
1515 #[test]
1516 fn test_rows_from_record_batch() {
1517 let schema_vec = vec![
1519 Column {
1520 name: "id".to_string(),
1521 data_type: ColumnDataType::Int32,
1522 semantic_type: SemanticType::Field,
1523 data_type_extension: None,
1524 },
1525 Column {
1526 name: "msg".to_string(),
1527 data_type: ColumnDataType::String,
1528 semantic_type: SemanticType::Field,
1529 data_type_extension: None,
1530 },
1531 ];
1532 let record_batch = {
1533 let mut sample_rows = Rows::new(&schema_vec, 2, 2).unwrap();
1534 let row1 = crate::table::Row::new()
1535 .add_values(vec![Value::Int32(1), Value::String("hello".to_string())]);
1536 let row2 = crate::table::Row::new()
1537 .add_values(vec![Value::Int32(2), Value::String("world".to_string())]);
1538 sample_rows.add_row(row1).unwrap();
1539 sample_rows.add_row(row2).unwrap();
1540 RecordBatch::try_from(sample_rows).unwrap()
1541 };
1542
1543 let original_schema = record_batch.schema();
1544 let original_num_rows = record_batch.num_rows();
1545 let original_num_cols = record_batch.num_columns();
1546
1547 let mut rows_from_batch = Rows::from_record_batch(record_batch.clone()).unwrap();
1549
1550 assert_eq!(*rows_from_batch.schema(), *original_schema);
1552 assert_eq!(rows_from_batch.len(), original_num_rows);
1553 assert_eq!(rows_from_batch.column_count, original_num_cols);
1554 assert!(!rows_from_batch.is_empty());
1555 assert_eq!(rows_from_batch.len(), 2);
1556
1557 let row_to_add = crate::table::Row::new()
1559 .add_values(vec![Value::Int32(3), Value::String("new".to_string())]);
1560 let add_result = rows_from_batch.add_row(row_to_add);
1561 assert!(add_result.is_err());
1562 assert_eq!(
1563 add_result.unwrap_err().to_string(),
1564 "Cannot add row to a Rows object that was created from a RecordBatch"
1565 );
1566
1567 let converted_batch = RecordBatch::try_from(rows_from_batch).unwrap();
1569 assert_eq!(converted_batch, record_batch);
1570 }
1571}