Skip to main content

greptimedb_ingester/
bulk.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level bulk insert API for `GreptimeDB`
16//!
17//! This module provides a user-friendly API for bulk inserting data into `GreptimeDB`,
18//! abstracting away the low-level Arrow Flight details.
19
20use std::collections::HashMap;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use greptime_proto::v1::auth_header::AuthScheme;
26use greptime_proto::v1::SemanticType;
27use tokio::select;
28use tokio::time::timeout;
29
30use arrow_array::builder::{
31    BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, Float32Builder,
32    Float64Builder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, StringBuilder,
33    Time32MillisecondBuilder, Time32SecondBuilder, Time64MicrosecondBuilder,
34    Time64NanosecondBuilder, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
35    TimestampNanosecondBuilder, TimestampSecondBuilder, UInt16Builder, UInt32Builder,
36    UInt64Builder, UInt8Builder,
37};
38use arrow_array::{Array, RecordBatch};
39use arrow_flight::{FlightData, FlightDescriptor};
40use arrow_schema::{DataType, Field, Schema, TimeUnit};
41use futures::channel::mpsc;
42use futures::{FutureExt, SinkExt, Stream, StreamExt};
43
44use crate::api::v1::ColumnDataType;
45use crate::client::Client;
46use crate::database::Database;
47use crate::flight::do_put::{DoPutMetadata, DoPutResponse};
48use crate::flight::{FlightEncoder, FlightMessage};
49use crate::table::{Column, DataTypeExtension, Row, TableSchema, Value};
50use crate::{error, Result};
51use snafu::{ensure, OptionExt, ResultExt};
52
53/// Default channel buffer size for streaming FlightData
54/// This controls how many FlightData messages can be buffered in the channel
55/// before blocking the sender. A larger buffer allows for better throughput
56/// at the cost of memory usage.
57///
58/// Can be overridden by setting the GREPTIMEDB_CHANNEL_BUFFER_SIZE environment variable.
59const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1024;
60
61/// Get configuration value from environment variable with fallback to default
62fn get_env_or_default<T>(env_var: &str, default: T) -> T
63where
64    T: std::str::FromStr,
65{
66    std::env::var(env_var)
67        .ok()
68        .and_then(|s| s.parse().ok())
69        .unwrap_or(default)
70}
71
72pub type RequestId = i64;
73
74/// High-level bulk inserter for `GreptimeDB`
75#[derive(Clone, Debug)]
76pub struct BulkInserter {
77    database: Database,
78}
79
80impl BulkInserter {
81    /// Create a new bulk inserter
82    #[must_use]
83    pub fn new(client: Client, database_name: &str) -> Self {
84        Self {
85            database: Database::new_with_dbname(database_name, client),
86        }
87    }
88
89    pub fn set_auth(&mut self, auth: AuthScheme) {
90        self.database.set_auth(auth);
91    }
92
93    /// Create a bulk stream writer from a table template
94    ///
95    /// This is a convenience method that extracts the schema from a table
96    /// and creates a `BulkStreamWriter` bound to that schema.
97    pub async fn create_bulk_stream_writer(
98        &self,
99        table_schema: &TableSchema,
100        options: Option<BulkWriteOptions>,
101    ) -> Result<BulkStreamWriter> {
102        let options = options.unwrap_or_default();
103        BulkStreamWriter::new(&self.database, table_schema, options).await
104    }
105}
106
107/// Compression algorithm options for bulk write operations
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum CompressionType {
110    None,
111    #[default]
112    Lz4,
113    Zstd,
114}
115
116/// Configuration options for bulk write operations
117#[derive(Debug, Clone)]
118pub struct BulkWriteOptions {
119    pub compression: CompressionType,
120    pub timeout: Duration,
121    pub parallelism: usize,
122}
123
124impl Default for BulkWriteOptions {
125    fn default() -> Self {
126        Self {
127            compression: CompressionType::default(),
128            timeout: Duration::from_secs(60),
129            parallelism: 4,
130        }
131    }
132}
133
134impl BulkWriteOptions {
135    /// Set compression type
136    #[must_use]
137    pub fn with_compression(mut self, compression: CompressionType) -> Self {
138        self.compression = compression;
139        self
140    }
141
142    /// Set timeout duration
143    #[must_use]
144    pub fn with_timeout(mut self, timeout: Duration) -> Self {
145        self.timeout = timeout;
146        self
147    }
148
149    /// Set parallelism for concurrent requests
150    #[must_use]
151    pub fn with_parallelism(mut self, parallelism: usize) -> Self {
152        self.parallelism = parallelism;
153        self
154    }
155}
156
157/// High-performance bulk stream writer that maintains a persistent connection
158/// Each writer is bound to a specific table with fixed schema
159pub struct BulkStreamWriter {
160    sender: mpsc::Sender<FlightData>,
161    response_stream: Pin<Box<dyn Stream<Item = Result<DoPutResponse>>>>,
162    table_schema: TableSchema,
163    // Cache the Arrow schema to avoid recreating it for each batch
164    arrow_schema: Arc<Schema>,
165    // Pre-computed field name to index mapping for O(1) lookup in RowBuilder
166    field_map: HashMap<String, usize>,
167    next_request_id: RequestId,
168    encoder: FlightEncoder,
169    schema_sent: bool,
170    // Parallel processing fields
171    parallelism: usize,
172    timeout: Duration,
173    // Track pending requests: request_id -> sent_time
174    pending_requests: HashMap<RequestId, Instant>,
175    // Cache completed responses that were processed but not yet retrieved
176    completed_responses: HashMap<RequestId, (DoPutResponse, Instant)>,
177}
178
179impl BulkStreamWriter {
180    /// Create a new bulk stream writer bound to a specific table schema
181    pub async fn new(
182        database: &Database,
183        table_schema: &TableSchema,
184        options: BulkWriteOptions,
185    ) -> Result<Self> {
186        // Create the encoder with compression settings
187        let encoder = FlightEncoder::with_compression(options.compression);
188
189        // Convert table schema to Arrow schema
190        let fields: Result<Vec<Field>> = table_schema
191            .columns()
192            .iter()
193            .map(|col| {
194                let nullable = col.semantic_type != SemanticType::Timestamp;
195                column_to_arrow_data_type(col)
196                    .map(|data_type| Field::new(&col.name, data_type, nullable))
197            })
198            .collect();
199        let arrow_schema = Arc::new(Schema::new(fields?));
200
201        // Pre-compute field name to index mapping for O(1) lookups in RowBuilder
202        let field_map: HashMap<String, usize> = table_schema
203            .columns()
204            .iter()
205            .enumerate()
206            .map(|(i, col)| (col.name.clone(), i))
207            .collect();
208
209        // Create a channel for streaming FlightData
210        let channel_buffer_size = get_env_or_default(
211            "GREPTIMEDB_CHANNEL_BUFFER_SIZE",
212            DEFAULT_CHANNEL_BUFFER_SIZE,
213        );
214        let (sender, receiver) = mpsc::channel::<FlightData>(channel_buffer_size);
215
216        // Convert receiver to a stream and start the do_put operation
217        let flight_stream = receiver.boxed();
218        let response_stream = database.do_put(flight_stream).await?;
219
220        Ok(Self {
221            sender,
222            response_stream,
223            table_schema: table_schema.clone(),
224            arrow_schema,
225            field_map,
226            next_request_id: 0,
227            encoder,
228            schema_sent: false,
229            parallelism: options.parallelism,
230            timeout: options.timeout,
231            pending_requests: HashMap::new(),
232            completed_responses: HashMap::new(),
233        })
234    }
235
236    /// Write rows to the stream using the fixed table schema
237    pub async fn write_rows(&mut self, rows: Rows) -> Result<DoPutResponse> {
238        // Use the async implementation and wait for the response
239        let request_id = self.write_rows_async(rows).await?;
240        self.wait_for_response(request_id).await
241    }
242
243    /// Submit rows for writing without waiting for response
244    /// Returns a `request_id` that can be used to wait for the specific response
245    pub async fn write_rows_async(&mut self, rows: Rows) -> Result<RequestId> {
246        // Ensure that the rows are not empty
247        ensure!(!rows.is_empty(), error::EmptyRowsSnafu);
248        // Validate that the rows schema matches the writer's schema
249        self.validate_rows_schema(&rows)?;
250
251        let record_batch = RecordBatch::try_from(rows)?; // Zero-cost conversion
252        let request_id = self.submit_record_batch(record_batch).await?;
253
254        Ok(request_id)
255    }
256
257    /// Wait for a specific request's response by `request_id`
258    pub async fn wait_for_response(
259        &mut self,
260        target_request_id: RequestId,
261    ) -> Result<DoPutResponse> {
262        // Check if the response is already cached
263        if let Some((response, _)) = self.completed_responses.remove(&target_request_id) {
264            return Ok(response);
265        }
266
267        let timeout_duration = self.timeout;
268        let start_time = Instant::now();
269
270        loop {
271            let remaining_timeout = timeout_duration.saturating_sub(start_time.elapsed());
272            // Check timeout
273            if remaining_timeout.is_zero() {
274                return error::RequestTimeoutSnafu {
275                    request_ids: vec![target_request_id],
276                    timeout: self.timeout,
277                }
278                .fail();
279            }
280
281            let next_result = timeout(remaining_timeout, self.response_stream.next()).await;
282            let Ok(next_option) = next_result else {
283                return error::RequestTimeoutSnafu {
284                    request_ids: vec![target_request_id],
285                    timeout: self.timeout,
286                }
287                .fail();
288            };
289            if let Some(response) = next_option {
290                let response = response?;
291                let request_id = response.request_id();
292                self.pending_requests.remove(&request_id);
293                if request_id == target_request_id {
294                    return Ok(response);
295                }
296                self.completed_responses
297                    .insert(request_id, (response, Instant::now()));
298            } else {
299                return error::StreamEndedSnafu.fail();
300            }
301        }
302    }
303
304    /// Wait for all pending requests to complete and return the responses
305    pub async fn wait_for_all_pending(&mut self) -> Result<Vec<DoPutResponse>> {
306        let mut responses =
307            Vec::with_capacity(self.pending_requests.len() + self.completed_responses.len());
308
309        // First, drain all cached responses that have corresponding pending requests
310        let completed_responses = std::mem::take(&mut self.completed_responses);
311        for (request_id, (response, _)) in completed_responses {
312            // Always add response to results, and remove from pending if exists
313            self.pending_requests.remove(&request_id);
314            responses.push(response);
315        }
316
317        let timeout_duration = self.timeout;
318        let start_time = Instant::now();
319
320        // Then wait for remaining responses
321        while !self.pending_requests.is_empty() {
322            let remaining_timeout = timeout_duration.saturating_sub(start_time.elapsed());
323            let timeout_sleep = tokio::time::sleep(remaining_timeout);
324
325            select! {
326                () = timeout_sleep => {
327                    let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
328                    return error::RequestTimeoutSnafu {
329                        request_ids: pending_ids,
330                        timeout: self.timeout,
331                    }
332                    .fail();
333                }
334                next_option = self.response_stream.next() => {
335                    match next_option {
336                        Some(response) => {
337                            // Process the first response
338                            self.handle_single_response(response?, &mut responses);
339
340                            // Drain immediately available responses to avoid false timeouts
341                            loop {
342                                match self.response_stream.next().now_or_never() {
343                                    Some(Some(response)) => self.handle_single_response(response?, &mut responses),
344                                    Some(None) => return self.handle_stream_end(responses),
345                                    None => break, // No immediately available responses
346                                }
347                            }
348                        }
349                        None => return self.handle_stream_end(responses),
350                    }
351                }
352            }
353        }
354
355        Ok(responses)
356    }
357
358    /// Flush completed responses from cache and return them
359    ///
360    /// This method removes all cached responses that have been processed
361    /// but not yet retrieved, and returns them to the caller.
362    /// Useful for long-running bulk operations to prevent excessive
363    /// memory usage while still allowing access to response data.
364    ///
365    /// Returns a vector of completed responses that were flushed.
366    pub fn flush_completed_responses(&mut self) -> Vec<DoPutResponse> {
367        let responses = std::mem::take(&mut self.completed_responses);
368        responses
369            .into_values()
370            .map(|(response, _)| response)
371            .collect()
372    }
373
374    /// Finish the bulk write operation and close the connection
375    pub async fn finish(self) -> Result<()> {
376        let _responses = self.finish_with_responses().await?;
377        // Discard responses since finish() doesn't return them
378        Ok(())
379    }
380
381    /// Finish the bulk write operation and return all responses
382    pub async fn finish_with_responses(mut self) -> Result<Vec<DoPutResponse>> {
383        let mut all_responses = Vec::new();
384
385        // First, collect any already cached responses
386        let completed_responses = std::mem::take(&mut self.completed_responses);
387        for (request_id, (response, _)) in completed_responses {
388            // Remove from pending_requests if it exists, but collect the response regardless
389            // This handles both normal cases and orphaned responses
390            self.pending_requests.remove(&request_id);
391            all_responses.push(response);
392        }
393
394        // Then wait for any remaining pending requests
395        if !self.pending_requests.is_empty() {
396            let remaining_responses = self.wait_for_all_pending().await?;
397            all_responses.extend(remaining_responses);
398        }
399
400        // Close the sender to signal the end of the stream
401        // The result is ignored, as the stream being closed on the other
402        // end is not a critical error. We still want to return the responses.
403        let _ = self.sender.close().await;
404
405        Ok(all_responses)
406    }
407
408    /// Allocate a new rows buffer that is bound to this writer's schema
409    /// This ensures schema compatibility and provides optimal performance
410    pub fn alloc_rows_buffer(&self, capacity: usize, row_buffer_size: usize) -> Result<Rows> {
411        Rows::with_arrow_schema(
412            self.column_schemas(),
413            self.arrow_schema.clone(),
414            capacity,
415            row_buffer_size,
416        )
417    }
418
419    /// Create a new Row builder that is compatible with this writer's schema
420    /// Returns a `RowBuilder` that can efficiently build rows for this writer
421    /// Uses O(1) field name lookup for optimal performance
422    #[must_use]
423    pub fn new_row(&self) -> RowBuilder<'_> {
424        RowBuilder::new(self.column_schemas(), &self.field_map)
425    }
426
427    /// Get the table name that this writer is bound to
428    #[must_use]
429    pub fn table_name(&self) -> &str {
430        self.table_schema.name()
431    }
432
433    /// Get the column schemas that this writer is bound to
434    #[must_use]
435    pub fn column_schemas(&self) -> &[Column] {
436        self.table_schema.columns()
437    }
438
439    /// Helper method to handle a single response
440    fn handle_single_response(
441        &mut self,
442        response: DoPutResponse,
443        responses: &mut Vec<DoPutResponse>,
444    ) {
445        let request_id = response.request_id();
446        self.pending_requests.remove(&request_id);
447        responses.push(response);
448    }
449
450    /// Helper method to receive a single response and remove the pending request
451    fn receive_response_and_remove_pending(&mut self, response: DoPutResponse) {
452        let request_id = response.request_id();
453        self.pending_requests.remove(&request_id);
454        self.completed_responses
455            .insert(request_id, (response, Instant::now()));
456
457        // Clean up expired responses if cache is getting large
458        self.cleanup_expired_responses_if_needed();
459    }
460
461    /// Clean up expired responses when cache exceeds threshold to prevent unbounded growth
462    fn cleanup_expired_responses_if_needed(&mut self) {
463        const RESPONSE_CACHE_CLEANUP_THRESHOLD: usize = 1024;
464
465        if self.completed_responses.len() > RESPONSE_CACHE_CLEANUP_THRESHOLD {
466            let now = Instant::now();
467            self.completed_responses
468                .retain(|_, (_, cached_time)| now.duration_since(*cached_time) <= self.timeout);
469        }
470    }
471
472    /// Helper method to handle stream end cases
473    fn handle_stream_end(&self, responses: Vec<DoPutResponse>) -> Result<Vec<DoPutResponse>> {
474        ensure!(self.pending_requests.is_empty(), error::StreamEndedSnafu);
475        Ok(responses)
476    }
477
478    /// Helper method to handle stream end during processing
479    /// Returns Ok(()) if no pending requests, otherwise returns appropriate error
480    fn handle_stream_end_during_processing(&self) -> Result<()> {
481        if !self.pending_requests.is_empty() {
482            let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
483            return error::StreamEndedWithPendingRequestsSnafu {
484                request_ids: pending_ids,
485            }
486            .fail();
487        }
488        Ok(())
489    }
490
491    /// Submit a record batch without waiting for response
492    /// Returns the `request_id` for later tracking
493    async fn submit_record_batch(&mut self, batch: RecordBatch) -> Result<RequestId> {
494        // Send schema first if not already sent
495        if !self.schema_sent {
496            let mut schema_data = self.encoder.encode(FlightMessage::Schema(batch.schema()));
497            let metadata = DoPutMetadata::new(0);
498            schema_data.app_metadata = serde_json::to_vec(&metadata)
499                .context(error::SerializeMetadataSnafu)?
500                .into();
501
502            schema_data.flight_descriptor = Some(FlightDescriptor {
503                r#type: arrow_flight::flight_descriptor::DescriptorType::Path as i32,
504                path: vec![self.table_name().to_string()],
505                ..Default::default()
506            });
507
508            self.sender
509                .send(schema_data)
510                .await
511                .context(error::SendDataSnafu)?;
512
513            let response_result = timeout(self.timeout, self.response_stream.next()).await;
514            match response_result {
515                Ok(Some(response)) => {
516                    let _schema_response = response?;
517                }
518                Ok(None) => return error::StreamEndedSnafu.fail(),
519                Err(_) => {
520                    return error::RequestTimeoutSnafu {
521                        request_ids: vec![],
522                        timeout: self.timeout,
523                    }
524                    .fail();
525                }
526            }
527
528            self.schema_sent = true;
529        }
530
531        // Wait for available slot if we've reached parallelism limit
532        while self.pending_requests.len() >= self.parallelism {
533            self.process_pending_responses().await?;
534        }
535
536        // Send the request
537        let request_id = self.next_request_id();
538        let message = FlightMessage::RecordBatch(batch);
539        let mut data = self.encoder.encode(message);
540        let metadata = DoPutMetadata::new(request_id);
541        data.app_metadata = serde_json::to_vec(&metadata)
542            .context(error::SerializeMetadataSnafu)?
543            .into();
544
545        self.sender.send(data).await.context(error::SendDataSnafu)?;
546
547        // Track this request but don't wait for response
548        self.pending_requests.insert(request_id, Instant::now());
549
550        Ok(request_id)
551    }
552
553    /// Check for timed out requests
554    fn check_timeouts(&self) -> Result<()> {
555        let timeout_duration = self.timeout;
556        let now = Instant::now();
557
558        let timed_out_requests: Vec<RequestId> = self
559            .pending_requests
560            .iter()
561            .filter_map(|(&request_id, &sent_time)| {
562                if now.duration_since(sent_time) > timeout_duration {
563                    Some(request_id)
564                } else {
565                    None
566                }
567            })
568            .collect();
569
570        if !timed_out_requests.is_empty() {
571            return error::RequestTimeoutSnafu {
572                request_ids: timed_out_requests,
573                timeout: self.timeout,
574            }
575            .fail();
576        }
577
578        Ok(())
579    }
580
581    /// Process pending responses to make room for new requests
582    async fn process_pending_responses(&mut self) -> Result<()> {
583        // First check for any timed out requests
584        self.check_timeouts()?;
585
586        // Process responses to make room for new requests
587        // First, wait for at least one response (blocking)
588        let response_result = timeout(self.timeout, self.response_stream.next()).await;
589        match response_result {
590            Ok(Some(response)) => self.receive_response_and_remove_pending(response?),
591            Ok(None) => return self.handle_stream_end_during_processing(),
592            Err(_) => {
593                let pending_ids: Vec<RequestId> = self.pending_requests.keys().copied().collect();
594                return error::RequestTimeoutSnafu {
595                    request_ids: pending_ids,
596                    timeout: self.timeout,
597                }
598                .fail();
599            }
600        }
601
602        // Then drain any additional responses quickly
603        loop {
604            match self.response_stream.next().now_or_never() {
605                Some(Some(response)) => {
606                    self.receive_response_and_remove_pending(response?);
607                }
608                Some(None) => return self.handle_stream_end_during_processing(),
609                None => break, // No immediately available responses
610            }
611        }
612
613        Ok(())
614    }
615
616    /// Validate that the provided Rows schema matches the writer's bound schema
617    fn validate_rows_schema(&self, rows: &Rows) -> Result<()> {
618        // Fast path: if it's the exact same Arc, skip validation
619        if Arc::ptr_eq(&self.arrow_schema, &rows.schema) {
620            return Ok(());
621        }
622
623        // Fast path: check field count first (cheapest comparison)
624        let expected_fields = self.arrow_schema.fields();
625        let actual_fields = rows.schema.fields();
626
627        if expected_fields.len() != actual_fields.len() {
628            return Self::schema_mismatch_error(expected_fields, actual_fields);
629        }
630
631        // Check each field for compatibility
632        for (expected, actual) in expected_fields.iter().zip(actual_fields.iter()) {
633            if expected != actual {
634                return Self::schema_mismatch_error(expected_fields, actual_fields);
635            }
636        }
637
638        Ok(())
639    }
640
641    /// Helper to create schema mismatch error with lazy formatting
642    #[cold]
643    fn schema_mismatch_error(
644        expected_fields: &arrow_schema::Fields,
645        actual_fields: &arrow_schema::Fields,
646    ) -> Result<()> {
647        error::SchemaMismatchSnafu {
648            expected: format!("{expected_fields:?}"),
649            actual: format!("{actual_fields:?}"),
650        }
651        .fail()
652    }
653
654    fn next_request_id(&mut self) -> RequestId {
655        // Skip ID 0 as it's reserved for special cases
656        self.next_request_id = self.next_request_id.wrapping_add(1);
657        if self.next_request_id == 0 {
658            self.next_request_id = 1;
659        }
660        self.next_request_id
661    }
662}
663
664// Helper function to convert ColumnDataType to Arrow DataType
665// Based on GreptimeDB Java implementation - only supports actually implemented types
666fn column_to_arrow_data_type(column: &Column) -> Result<DataType> {
667    let data_type = column.data_type;
668    Ok(match data_type {
669        // Integer types
670        ColumnDataType::Int8 => DataType::Int8,
671        ColumnDataType::Int16 => DataType::Int16,
672        ColumnDataType::Int32 => DataType::Int32,
673        ColumnDataType::Int64 => DataType::Int64,
674        ColumnDataType::Uint8 => DataType::UInt8,
675        ColumnDataType::Uint16 => DataType::UInt16,
676        ColumnDataType::Uint32 => DataType::UInt32,
677        ColumnDataType::Uint64 => DataType::UInt64,
678
679        // Float types
680        ColumnDataType::Float32 => DataType::Float32,
681        ColumnDataType::Float64 => DataType::Float64,
682
683        // Boolean type
684        ColumnDataType::Boolean => DataType::Boolean,
685
686        // String and Binary types
687        ColumnDataType::String => DataType::Utf8,
688        ColumnDataType::Binary => DataType::Binary,
689
690        // Date type
691        ColumnDataType::Date => DataType::Date32,
692
693        // Timestamp types
694        ColumnDataType::TimestampSecond => DataType::Timestamp(TimeUnit::Second, None),
695        ColumnDataType::TimestampMillisecond => DataType::Timestamp(TimeUnit::Millisecond, None),
696        // DateTime is an alias of TIMESTAMP_MICROSECOND per GreptimeDB docs
697        ColumnDataType::Datetime | ColumnDataType::TimestampMicrosecond => {
698            DataType::Timestamp(TimeUnit::Microsecond, None)
699        }
700        ColumnDataType::TimestampNanosecond => DataType::Timestamp(TimeUnit::Nanosecond, None),
701
702        // Time types (without date)
703        ColumnDataType::TimeSecond => DataType::Time32(arrow_schema::TimeUnit::Second),
704        ColumnDataType::TimeMillisecond => DataType::Time32(arrow_schema::TimeUnit::Millisecond),
705        ColumnDataType::TimeMicrosecond => DataType::Time64(arrow_schema::TimeUnit::Microsecond),
706        ColumnDataType::TimeNanosecond => DataType::Time64(arrow_schema::TimeUnit::Nanosecond),
707
708        // Decimal type - extract precision and scale from column extension
709        ColumnDataType::Decimal128 => {
710            match &column.data_type_extension {
711                Some(DataTypeExtension::Decimal128 { precision, scale }) => {
712                    DataType::Decimal128(*precision, *scale)
713                }
714                _ => DataType::Decimal128(38, 10), // Default fallback
715            }
716        }
717
718        // JSON type (represented as Binary per Java implementation)
719        ColumnDataType::Json => DataType::Binary,
720
721        // Unsupported types - these should not be used
722        _ => {
723            return error::UnsupportedDataTypeSnafu {
724                data_type: format!("{data_type:?}. Not supported"),
725            }
726            .fail();
727        }
728    })
729}
730
731/// Represents the underlying data storage for `Rows`.
732/// This can be either a `RowBatchBuilder` for building rows,
733/// or a pre-existing `RecordBatch`.
734enum RowsData {
735    Builder(RowBatchBuilder),
736    RecordBatch(RecordBatch),
737}
738
739/// High-level rows abstraction with buffered batch conversion
740/// This provides a user-friendly API while maintaining optimal performance
741pub struct Rows {
742    data: RowsData,
743    schema: Arc<Schema>,
744    column_count: usize,
745    // Row buffering for improved performance
746    row_buffer: Vec<Row>,
747    buffer_size: usize,
748}
749
750impl Rows {
751    /// Create a new Rows collection with the given schema and capacity
752    pub fn new(column_schemas: &[Column], capacity: usize, row_buffer_size: usize) -> Result<Self> {
753        let builder = RowBatchBuilder::new(column_schemas, capacity)?;
754        let schema = builder.schema.clone();
755
756        Ok(Self {
757            data: RowsData::Builder(builder),
758            schema,
759            column_count: column_schemas.len(),
760            row_buffer: Vec::with_capacity(row_buffer_size),
761            buffer_size: row_buffer_size,
762        })
763    }
764
765    /// Create a new Rows collection with a pre-computed Arrow schema
766    fn with_arrow_schema(
767        column_schemas: &[Column],
768        arrow_schema: Arc<Schema>,
769        capacity: usize,
770        row_buffer_size: usize,
771    ) -> Result<Self> {
772        let builder =
773            RowBatchBuilder::with_arrow_schema(column_schemas, arrow_schema.clone(), capacity)?;
774
775        Ok(Self {
776            data: RowsData::Builder(builder),
777            schema: arrow_schema,
778            column_count: column_schemas.len(),
779            row_buffer: Vec::with_capacity(row_buffer_size),
780            buffer_size: row_buffer_size,
781        })
782    }
783
784    /// Create a `Rows` instance from a pre-existing `RecordBatch`.
785    ///
786    /// This is useful when you already have data in `RecordBatch` format and want to
787    /// send it using the `BulkStreamWriter`. This avoids the overhead of converting
788    /// the `RecordBatch` back to individual rows.
789    ///
790    /// # Note
791    ///
792    /// Once a `Rows` object is created from a `RecordBatch`, you cannot add more
793    /// rows to it.
794    pub fn from_record_batch(batch: RecordBatch) -> Result<Self> {
795        let schema = batch.schema();
796        let column_count = batch.num_columns();
797        Ok(Self {
798            data: RowsData::RecordBatch(batch),
799            schema,
800            column_count,
801            row_buffer: Vec::new(), // empty
802            buffer_size: 0,         // not applicable
803        })
804    }
805
806    /// Add a row to the collection using move semantics
807    pub fn add_row(&mut self, row: Row) -> Result<()> {
808        // Ensure we are in builder mode
809        ensure!(
810            matches!(self.data, RowsData::Builder(_)),
811            error::AddRowToBuiltBatchSnafu,
812        );
813
814        // Validate column count matches schema
815        ensure!(
816            row.len() == self.column_count,
817            error::InvalidColumnCountSnafu {
818                expected: self.column_count,
819                actual: row.len(),
820            }
821        );
822
823        self.row_buffer.push(row);
824
825        // If buffer is full, flush it to a RecordBatch
826        if self.row_buffer.len() >= self.buffer_size {
827            self.flush_buffer()?;
828        }
829
830        Ok(())
831    }
832
833    /// Flush the current row buffer to the builder
834    fn flush_buffer(&mut self) -> Result<()> {
835        if self.row_buffer.is_empty() {
836            return Ok(());
837        }
838
839        if let RowsData::Builder(ref mut builder) = self.data {
840            // Process all rows in the buffer at once for better performance
841            let rows = std::mem::take(&mut self.row_buffer);
842            builder.add_rows(rows)?;
843        }
844
845        Ok(())
846    }
847
848    /// Get the current number of rows
849    #[must_use]
850    pub fn len(&self) -> usize {
851        match &self.data {
852            RowsData::RecordBatch(batch) => batch.num_rows(),
853            RowsData::Builder(builder) => builder.len() + self.row_buffer.len(),
854        }
855    }
856
857    /// Check if the collection is empty
858    #[must_use]
859    pub fn is_empty(&self) -> bool {
860        self.len() == 0
861    }
862
863    // Note: No capacity limits - can grow dynamically as needed
864
865    /// Get the schema
866    #[must_use]
867    pub fn schema(&self) -> &Schema {
868        &self.schema
869    }
870}
871
872/// Convert Rows to RecordBatch, handling buffered data
873impl TryFrom<Rows> for RecordBatch {
874    type Error = crate::Error;
875
876    fn try_from(mut rows: Rows) -> Result<Self> {
877        // Flush any remaining buffered rows to the builder
878        rows.flush_buffer()?;
879
880        match rows.data {
881            RowsData::RecordBatch(batch) => {
882                // If we have a pre-made batch, just return it.
883                // Ensure no pending rows in buffer, which would be a logic error.
884                ensure!(rows.row_buffer.is_empty(), error::UnflushedRowsSnafu);
885                Ok(batch)
886            }
887            RowsData::Builder(builder) => {
888                // Build the single RecordBatch
889                builder.build()
890            }
891        }
892    }
893}
894
895/// Efficient batch builder that directly constructs Arrow arrays
896/// This avoids the overhead of creating intermediate Row objects and converting them
897/// Arrow builders automatically manage capacity and expand as needed
898pub struct RowBatchBuilder {
899    builders: Vec<ArrayBuilderEnum>,
900    schema: Arc<Schema>,
901    current_rows: usize,
902}
903
904impl RowBatchBuilder {
905    /// Create a new RowBatchBuilder with the given schema and capacity
906    fn new(column_schemas: &[Column], capacity: usize) -> Result<Self> {
907        let fields: Result<Vec<Field>> = column_schemas
908            .iter()
909            .map(|col| {
910                let nullable = col.semantic_type != SemanticType::Timestamp;
911                column_to_arrow_data_type(col)
912                    .map(|data_type| Field::new(&col.name, data_type, nullable))
913            })
914            .collect();
915        let schema = Arc::new(Schema::new(fields?));
916
917        let builders: Result<Vec<ArrayBuilderEnum>> = column_schemas
918            .iter()
919            .enumerate()
920            .map(|(col_idx, col)| create_array_builder(col, capacity, col_idx))
921            .collect();
922
923        Ok(Self {
924            builders: builders?,
925            schema,
926            current_rows: 0,
927        })
928    }
929
930    /// Create a new RowBatchBuilder with a pre-computed Arrow schema
931    fn with_arrow_schema(
932        column_schemas: &[Column],
933        schema: Arc<Schema>,
934        capacity: usize,
935    ) -> Result<Self> {
936        let builders: Result<Vec<ArrayBuilderEnum>> = column_schemas
937            .iter()
938            .enumerate()
939            .map(|(col_idx, col)| create_array_builder(col, capacity, col_idx))
940            .collect();
941
942        Ok(Self {
943            builders: builders?,
944            schema,
945            current_rows: 0,
946        })
947    }
948
949    /// Add multiple rows to the batch builder using batch operations
950    fn add_rows(&mut self, mut rows: Vec<Row>) -> Result<()> {
951        for (col_idx, builder) in self.builders.iter_mut().enumerate() {
952            builder.append_values_from_rows(&mut rows, col_idx)?;
953        }
954        self.current_rows += rows.len();
955        Ok(())
956    }
957
958    /// Build the RecordBatch from accumulated rows
959    fn build(mut self) -> Result<RecordBatch> {
960        let arrays: Result<Vec<Arc<dyn Array>>> = self
961            .builders
962            .iter_mut()
963            .map(ArrayBuilderEnum::finish)
964            .collect();
965
966        RecordBatch::try_new(self.schema, arrays?).context(error::CreateRecordBatchSnafu)
967    }
968
969    /// Get the current number of rows in the builder
970    fn len(&self) -> usize {
971        self.current_rows
972    }
973}
974
975/// Trait for type-erased array builders
976trait ArrayBuilder {
977    fn append_values_from_rows(&mut self, rows: &mut [Row], col_idx: usize) -> Result<()>;
978}
979
980enum ArrayBuilderEnum {
981    Boolean(BooleanBuilder),
982    Int8(Int8Builder),
983    Int16(Int16Builder),
984    Int32(Int32Builder),
985    Int64(Int64Builder),
986    UInt8(UInt8Builder),
987    UInt16(UInt16Builder),
988    UInt32(UInt32Builder),
989    UInt64(UInt64Builder),
990    Float32(Float32Builder),
991    Float64(Float64Builder),
992    String(StringBuilder),
993    Binary(BinaryBuilder),
994    Decimal128(Decimal128Builder),
995    Date(Date32Builder),
996    TimestampSecond(TimestampSecondBuilder),
997    TimestampMillisecond(TimestampMillisecondBuilder),
998    TimestampMicrosecond(TimestampMicrosecondBuilder),
999    TimestampNanosecond(TimestampNanosecondBuilder),
1000    TimeSecond(Time32SecondBuilder),
1001    TimeMillisecond(Time32MillisecondBuilder),
1002    TimeMicrosecond(Time64MicrosecondBuilder),
1003    TimeNanosecond(Time64NanosecondBuilder),
1004}
1005
1006impl ArrayBuilderEnum {
1007    fn append_values_from_rows(&mut self, rows: &mut [Row], col_idx: usize) -> Result<()> {
1008        match self {
1009            ArrayBuilderEnum::Boolean(builder) => builder.append_values_from_rows(rows, col_idx),
1010            ArrayBuilderEnum::Int8(builder) => builder.append_values_from_rows(rows, col_idx),
1011            ArrayBuilderEnum::Int16(builder) => builder.append_values_from_rows(rows, col_idx),
1012            ArrayBuilderEnum::Int32(builder) => builder.append_values_from_rows(rows, col_idx),
1013            ArrayBuilderEnum::Int64(builder) => builder.append_values_from_rows(rows, col_idx),
1014            ArrayBuilderEnum::UInt8(builder) => builder.append_values_from_rows(rows, col_idx),
1015            ArrayBuilderEnum::UInt16(builder) => builder.append_values_from_rows(rows, col_idx),
1016            ArrayBuilderEnum::UInt32(builder) => builder.append_values_from_rows(rows, col_idx),
1017            ArrayBuilderEnum::UInt64(builder) => builder.append_values_from_rows(rows, col_idx),
1018            ArrayBuilderEnum::Float32(builder) => builder.append_values_from_rows(rows, col_idx),
1019            ArrayBuilderEnum::Float64(builder) => builder.append_values_from_rows(rows, col_idx),
1020            ArrayBuilderEnum::String(builder) => builder.append_values_from_rows(rows, col_idx),
1021            ArrayBuilderEnum::Binary(builder) => builder.append_values_from_rows(rows, col_idx),
1022            ArrayBuilderEnum::Decimal128(builder) => builder.append_values_from_rows(rows, col_idx),
1023            ArrayBuilderEnum::Date(builder) => builder.append_values_from_rows(rows, col_idx),
1024            ArrayBuilderEnum::TimestampSecond(builder) => {
1025                builder.append_values_from_rows(rows, col_idx)
1026            }
1027            ArrayBuilderEnum::TimestampMillisecond(builder) => {
1028                builder.append_values_from_rows(rows, col_idx)
1029            }
1030            ArrayBuilderEnum::TimestampMicrosecond(builder) => {
1031                builder.append_values_from_rows(rows, col_idx)
1032            }
1033            ArrayBuilderEnum::TimestampNanosecond(builder) => {
1034                builder.append_values_from_rows(rows, col_idx)
1035            }
1036            ArrayBuilderEnum::TimeSecond(builder) => builder.append_values_from_rows(rows, col_idx),
1037            ArrayBuilderEnum::TimeMillisecond(builder) => {
1038                builder.append_values_from_rows(rows, col_idx)
1039            }
1040            ArrayBuilderEnum::TimeMicrosecond(builder) => {
1041                builder.append_values_from_rows(rows, col_idx)
1042            }
1043            ArrayBuilderEnum::TimeNanosecond(builder) => {
1044                builder.append_values_from_rows(rows, col_idx)
1045            }
1046        }
1047    }
1048
1049    fn finish(&mut self) -> Result<Arc<dyn Array>> {
1050        Ok(match self {
1051            ArrayBuilderEnum::Boolean(builder) => Arc::new(builder.finish()),
1052            ArrayBuilderEnum::Int8(builder) => Arc::new(builder.finish()),
1053            ArrayBuilderEnum::Int16(builder) => Arc::new(builder.finish()),
1054            ArrayBuilderEnum::Int32(builder) => Arc::new(builder.finish()),
1055            ArrayBuilderEnum::Int64(builder) => Arc::new(builder.finish()),
1056            ArrayBuilderEnum::UInt8(builder) => Arc::new(builder.finish()),
1057            ArrayBuilderEnum::UInt16(builder) => Arc::new(builder.finish()),
1058            ArrayBuilderEnum::UInt32(builder) => Arc::new(builder.finish()),
1059            ArrayBuilderEnum::UInt64(builder) => Arc::new(builder.finish()),
1060            ArrayBuilderEnum::Float32(builder) => Arc::new(builder.finish()),
1061            ArrayBuilderEnum::Float64(builder) => Arc::new(builder.finish()),
1062            ArrayBuilderEnum::String(builder) => Arc::new(builder.finish()),
1063            ArrayBuilderEnum::Binary(builder) => Arc::new(builder.finish()),
1064            ArrayBuilderEnum::Decimal128(builder) => Arc::new(builder.finish()),
1065            ArrayBuilderEnum::Date(builder) => Arc::new(builder.finish()),
1066            ArrayBuilderEnum::TimestampSecond(builder) => Arc::new(builder.finish()),
1067            ArrayBuilderEnum::TimestampMillisecond(builder) => Arc::new(builder.finish()),
1068            ArrayBuilderEnum::TimestampMicrosecond(builder) => Arc::new(builder.finish()),
1069            ArrayBuilderEnum::TimestampNanosecond(builder) => Arc::new(builder.finish()),
1070            ArrayBuilderEnum::TimeSecond(builder) => Arc::new(builder.finish()),
1071            ArrayBuilderEnum::TimeMillisecond(builder) => Arc::new(builder.finish()),
1072            ArrayBuilderEnum::TimeMicrosecond(builder) => Arc::new(builder.finish()),
1073            ArrayBuilderEnum::TimeNanosecond(builder) => Arc::new(builder.finish()),
1074        })
1075    }
1076}
1077
1078/// Create an array builder enum for the given column with adaptive sizing
1079/// Uses enum dispatch for maximum performance (zero-cost polymorphism)
1080fn create_array_builder(
1081    column: &Column,
1082    capacity: usize,
1083    _column_index: usize,
1084) -> Result<ArrayBuilderEnum> {
1085    let data_type = column.data_type;
1086    Ok(match data_type {
1087        ColumnDataType::Boolean => {
1088            ArrayBuilderEnum::Boolean(BooleanBuilder::with_capacity(capacity))
1089        }
1090        ColumnDataType::Int8 => ArrayBuilderEnum::Int8(Int8Builder::with_capacity(capacity)),
1091        ColumnDataType::Int16 => ArrayBuilderEnum::Int16(Int16Builder::with_capacity(capacity)),
1092        ColumnDataType::Int32 => ArrayBuilderEnum::Int32(Int32Builder::with_capacity(capacity)),
1093        ColumnDataType::Int64 => ArrayBuilderEnum::Int64(Int64Builder::with_capacity(capacity)),
1094        ColumnDataType::Uint8 => ArrayBuilderEnum::UInt8(UInt8Builder::with_capacity(capacity)),
1095        ColumnDataType::Uint16 => ArrayBuilderEnum::UInt16(UInt16Builder::with_capacity(capacity)),
1096        ColumnDataType::Uint32 => ArrayBuilderEnum::UInt32(UInt32Builder::with_capacity(capacity)),
1097        ColumnDataType::Uint64 => ArrayBuilderEnum::UInt64(UInt64Builder::with_capacity(capacity)),
1098        ColumnDataType::Float32 => {
1099            ArrayBuilderEnum::Float32(Float32Builder::with_capacity(capacity))
1100        }
1101        ColumnDataType::Float64 => {
1102            ArrayBuilderEnum::Float64(Float64Builder::with_capacity(capacity))
1103        }
1104        ColumnDataType::String => {
1105            ArrayBuilderEnum::String(StringBuilder::with_capacity(capacity, capacity * 64))
1106        }
1107        ColumnDataType::Date => ArrayBuilderEnum::Date(Date32Builder::with_capacity(capacity)),
1108        ColumnDataType::TimestampSecond => {
1109            ArrayBuilderEnum::TimestampSecond(TimestampSecondBuilder::with_capacity(capacity))
1110        }
1111        ColumnDataType::TimestampMillisecond => ArrayBuilderEnum::TimestampMillisecond(
1112            TimestampMillisecondBuilder::with_capacity(capacity),
1113        ),
1114        ColumnDataType::Datetime | ColumnDataType::TimestampMicrosecond => {
1115            ArrayBuilderEnum::TimestampMicrosecond(TimestampMicrosecondBuilder::with_capacity(
1116                capacity,
1117            ))
1118        }
1119        ColumnDataType::TimestampNanosecond => ArrayBuilderEnum::TimestampNanosecond(
1120            TimestampNanosecondBuilder::with_capacity(capacity),
1121        ),
1122        ColumnDataType::TimeSecond => {
1123            ArrayBuilderEnum::TimeSecond(Time32SecondBuilder::with_capacity(capacity))
1124        }
1125        ColumnDataType::TimeMillisecond => {
1126            ArrayBuilderEnum::TimeMillisecond(Time32MillisecondBuilder::with_capacity(capacity))
1127        }
1128        ColumnDataType::TimeMicrosecond => {
1129            ArrayBuilderEnum::TimeMicrosecond(Time64MicrosecondBuilder::with_capacity(capacity))
1130        }
1131        ColumnDataType::TimeNanosecond => {
1132            ArrayBuilderEnum::TimeNanosecond(Time64NanosecondBuilder::with_capacity(capacity))
1133        }
1134        ColumnDataType::Decimal128 => {
1135            // Extract precision and scale from column definition
1136            let (precision, scale) = match &column.data_type_extension {
1137                Some(DataTypeExtension::Decimal128 { precision, scale }) => (*precision, *scale),
1138                _ => (38, 10), // Default precision and scale if not specified
1139            };
1140
1141            ArrayBuilderEnum::Decimal128(
1142                Decimal128Builder::with_capacity(capacity)
1143                    .with_data_type(arrow_schema::DataType::Decimal128(precision, scale)),
1144            )
1145        }
1146        ColumnDataType::Binary | ColumnDataType::Json => {
1147            ArrayBuilderEnum::Binary(BinaryBuilder::with_capacity(capacity, capacity * 64))
1148        }
1149        _ => {
1150            return error::UnsupportedDataTypeSnafu {
1151                data_type: format!("{data_type:?}. Not supported in RowBatchBuilder"),
1152            }
1153            .fail();
1154        }
1155    })
1156}
1157
1158// Generate ArrayBuilder implementations for Arrow primitive types
1159macro_rules! impl_arrow_builder {
1160    ($builder_type:ty, $getter:ident, $value_type:ty) => {
1161        impl ArrayBuilder for $builder_type {
1162            fn append_values_from_rows(&mut self, rows: &mut [Row], col_idx: usize) -> Result<()> {
1163                for row in rows {
1164                    // Use unchecked version for performance - col_idx is guaranteed to be valid by schema
1165                    self.append_option(unsafe { row.$getter(col_idx) });
1166                }
1167                Ok(())
1168            }
1169        }
1170    };
1171}
1172
1173// Basic primitive types
1174impl_arrow_builder!(BooleanBuilder, get_bool_unchecked, bool);
1175impl_arrow_builder!(Int8Builder, get_i8_unchecked, i8);
1176impl_arrow_builder!(Int16Builder, get_i16_unchecked, i16);
1177impl_arrow_builder!(Int32Builder, get_i32_unchecked, i32);
1178impl_arrow_builder!(Int64Builder, get_i64_unchecked, i64);
1179impl_arrow_builder!(UInt8Builder, get_u8_unchecked, u8);
1180impl_arrow_builder!(UInt16Builder, get_u16_unchecked, u16);
1181impl_arrow_builder!(UInt32Builder, get_u32_unchecked, u32);
1182impl_arrow_builder!(UInt64Builder, get_u64_unchecked, u64);
1183impl_arrow_builder!(Float32Builder, get_f32_unchecked, f32);
1184impl_arrow_builder!(Float64Builder, get_f64_unchecked, f64);
1185
1186// Timestamp types
1187impl_arrow_builder!(TimestampSecondBuilder, get_timestamp_unchecked, i64);
1188impl_arrow_builder!(TimestampMillisecondBuilder, get_timestamp_unchecked, i64);
1189impl_arrow_builder!(TimestampMicrosecondBuilder, get_timestamp_unchecked, i64);
1190impl_arrow_builder!(TimestampNanosecondBuilder, get_timestamp_unchecked, i64);
1191
1192// Time types
1193impl_arrow_builder!(Time32SecondBuilder, get_time32_unchecked, i32);
1194impl_arrow_builder!(Time32MillisecondBuilder, get_time32_unchecked, i32);
1195impl_arrow_builder!(Time64MicrosecondBuilder, get_time64_unchecked, i64);
1196impl_arrow_builder!(Time64NanosecondBuilder, get_time64_unchecked, i64);
1197
1198// Date types
1199impl_arrow_builder!(Date32Builder, get_date_unchecked, i32);
1200
1201// Decimal128 type (uses column-defined precision and scale)
1202impl_arrow_builder!(Decimal128Builder, get_decimal128_unchecked, i128);
1203
1204// String and Binary types
1205impl_arrow_builder!(StringBuilder, take_string_unchecked, String);
1206impl_arrow_builder!(BinaryBuilder, take_binary_unchecked, Vec<u8>);
1207
1208/// A helper for building rows with schema-aware field access
1209/// This prevents common mistakes like incorrect field order or types
1210/// Uses O(1) field name lookup for optimal performance
1211pub struct RowBuilder<'a> {
1212    schema: &'a [Column],
1213    field_map: &'a HashMap<String, usize>, // Pre-computed field name to index mapping
1214    values: Vec<Option<Value>>,
1215}
1216
1217impl<'a> RowBuilder<'a> {
1218    fn new(schema: &'a [Column], field_map: &'a HashMap<String, usize>) -> Self {
1219        Self {
1220            schema,
1221            field_map,
1222            values: vec![None; schema.len()],
1223        }
1224    }
1225
1226    /// Set a field value by name with O(1) lookup performance.
1227    /// This ensures correct field mapping and prevents field order mistakes.
1228    pub fn set(mut self, field_name: &str, value: Value) -> Result<Self> {
1229        let field_index = self
1230            .field_map
1231            .get(field_name)
1232            .context(error::MissingFieldSnafu { field: field_name })?;
1233
1234        self.values[*field_index] = Some(value);
1235        Ok(self)
1236    }
1237
1238    /// Set a field value by index. This is faster than `set` as it avoids a map lookup.
1239    ///
1240    /// # Errors
1241    ///
1242    /// Returns `Err` if `index` is out of bounds.
1243    pub fn set_by_index(mut self, index: usize, value: Value) -> Result<Self> {
1244        ensure!(
1245            index < self.values.len(),
1246            error::InvalidColumnIndexSnafu {
1247                index,
1248                total: self.values.len(),
1249            }
1250        );
1251
1252        self.values[index] = Some(value);
1253        Ok(self)
1254    }
1255
1256    /// Get the number of columns
1257    #[must_use]
1258    pub fn column_count(&self) -> usize {
1259        self.schema.len()
1260    }
1261
1262    /// Build the final Row, ensuring all required fields are set
1263    pub fn build(self) -> Result<Row> {
1264        let mut row_values = Vec::with_capacity(self.values.len());
1265
1266        for (i, opt_value) in self.values.into_iter().enumerate() {
1267            match opt_value {
1268                Some(value) => row_values.push(value),
1269                None => {
1270                    return error::MissingFieldSnafu {
1271                        field: self.schema[i].name.clone(),
1272                    }
1273                    .fail();
1274                }
1275            }
1276        }
1277
1278        Ok(Row::new().add_values(row_values))
1279    }
1280}
1281
1282// Re-export the proto ColumnDataType for convenience
1283pub use crate::api::v1::ColumnDataType as ColumnType;
1284
1285#[cfg(test)]
1286mod tests {
1287    use super::*;
1288    use crate::api::v1::{ColumnDataType, SemanticType};
1289    use crate::table::{Column, Value};
1290
1291    #[test]
1292    fn test_rows_schema_validation() {
1293        // Create a schema with 3 columns
1294        let schema1 = vec![
1295            Column {
1296                name: "id".to_string(),
1297                data_type: ColumnDataType::Int64,
1298                semantic_type: SemanticType::Field,
1299                data_type_extension: None,
1300            },
1301            Column {
1302                name: "name".to_string(),
1303                data_type: ColumnDataType::String,
1304                semantic_type: SemanticType::Field,
1305                data_type_extension: None,
1306            },
1307            Column {
1308                name: "timestamp".to_string(),
1309                data_type: ColumnDataType::TimestampMillisecond,
1310                semantic_type: SemanticType::Timestamp,
1311                data_type_extension: None,
1312            },
1313        ];
1314
1315        // Create a different schema
1316        let schema2 = vec![
1317            Column {
1318                name: "id".to_string(),
1319                data_type: ColumnDataType::Int64,
1320                semantic_type: SemanticType::Field,
1321                data_type_extension: None,
1322            },
1323            Column {
1324                name: "value".to_string(),          // Different column name
1325                data_type: ColumnDataType::Float64, // Different data type
1326                semantic_type: SemanticType::Field,
1327                data_type_extension: None,
1328            },
1329        ];
1330
1331        // Test 1: Compatible rows should work
1332        let rows1 = Rows::new(&schema1, 10, 5).expect("Failed to create rows1");
1333
1334        // Test 2: Incompatible rows should fail validation
1335        let rows2 = Rows::new(&schema2, 10, 5).expect("Failed to create rows2");
1336
1337        // Mock the validation (since we can't easily create a BulkStreamWriter in tests)
1338        // In practice, this would be tested with a real BulkStreamWriter
1339        assert_eq!(rows1.schema().fields().len(), 3);
1340        assert_eq!(rows2.schema().fields().len(), 2);
1341
1342        // The actual schema validation would happen in validate_rows_schema()
1343        // which checks that field names and types match exactly
1344    }
1345
1346    #[test]
1347    fn test_rows_creation_and_capacity() {
1348        let schema = vec![
1349            Column {
1350                name: "id".to_string(),
1351                data_type: ColumnDataType::Int64,
1352                semantic_type: SemanticType::Field,
1353                data_type_extension: None,
1354            },
1355            Column {
1356                name: "message".to_string(),
1357                data_type: ColumnDataType::String,
1358                semantic_type: SemanticType::Field,
1359                data_type_extension: None,
1360            },
1361        ];
1362
1363        let mut rows = Rows::new(&schema, 5, 5).expect("Failed to create rows");
1364
1365        // Test initial state
1366        assert_eq!(rows.len(), 0);
1367        assert!(rows.is_empty());
1368
1369        // Add some rows
1370        let row1 = crate::table::Row::new()
1371            .add_values(vec![Value::Int64(1), Value::String("first".to_string())]);
1372
1373        let row2 = crate::table::Row::new()
1374            .add_values(vec![Value::Int64(2), Value::String("second".to_string())]);
1375
1376        rows.add_row(row1).expect("Failed to add row1");
1377        rows.add_row(row2).expect("Failed to add row2");
1378
1379        // Test state after adding rows
1380        assert_eq!(rows.len(), 2);
1381        assert!(!rows.is_empty());
1382    }
1383
1384    #[test]
1385    fn test_non_nullable_timestamp_field_with_null_should_error() {
1386        // Create schema with timestamp field (non-nullable)
1387        let schema = vec![
1388            Column {
1389                name: "ts".to_string(),
1390                data_type: ColumnDataType::TimestampMillisecond,
1391                semantic_type: SemanticType::Timestamp,
1392                data_type_extension: None,
1393            },
1394            Column {
1395                name: "value".to_string(),
1396                data_type: ColumnDataType::Int64,
1397                semantic_type: SemanticType::Field,
1398                data_type_extension: None,
1399            },
1400        ];
1401
1402        let mut rows = Rows::new(&schema, 5, 5).expect("Failed to create rows");
1403
1404        // Add a row with null timestamp (should cause error when converting to RecordBatch)
1405        let row_with_null_timestamp =
1406            crate::table::Row::new().add_values(vec![Value::Null, Value::Int64(42)]);
1407
1408        rows.add_row(row_with_null_timestamp)
1409            .expect("Failed to add row");
1410
1411        // Converting to RecordBatch should fail because timestamp is null but field is non-nullable
1412        let result = RecordBatch::try_from(rows);
1413        assert!(
1414            result.is_err(),
1415            "Should fail when timestamp field contains null value"
1416        );
1417    }
1418
1419    #[test]
1420    fn test_nullable_field_with_null_should_succeed() {
1421        // Create schema with nullable field
1422        let schema = vec![
1423            Column {
1424                name: "ts".to_string(),
1425                data_type: ColumnDataType::TimestampMillisecond,
1426                semantic_type: SemanticType::Timestamp,
1427                data_type_extension: None,
1428            },
1429            Column {
1430                name: "value".to_string(),
1431                data_type: ColumnDataType::Int64,
1432                semantic_type: SemanticType::Field,
1433                data_type_extension: None,
1434            },
1435        ];
1436
1437        let mut rows = Rows::new(&schema, 5, 5).expect("Failed to create rows");
1438
1439        // Add a row with null value field (should succeed since value field is nullable)
1440        let row_with_null_value = crate::table::Row::new()
1441            .add_values(vec![Value::TimestampMillisecond(1234567890), Value::Null]);
1442
1443        rows.add_row(row_with_null_value)
1444            .expect("Failed to add row");
1445
1446        // Converting to RecordBatch should succeed because value field is nullable
1447        let result = RecordBatch::try_from(rows);
1448        assert!(
1449            result.is_ok(),
1450            "Should succeed when nullable field contains null value"
1451        );
1452    }
1453
1454    #[test]
1455    fn test_arrow_schema_nullable_fields() {
1456        use arrow_schema::{DataType, Field};
1457
1458        // Create columns with different semantic types
1459        let columns = [
1460            Column {
1461                name: "ts".to_string(),
1462                data_type: ColumnDataType::TimestampMillisecond,
1463                semantic_type: SemanticType::Timestamp,
1464                data_type_extension: None,
1465            },
1466            Column {
1467                name: "value".to_string(),
1468                data_type: ColumnDataType::Int64,
1469                semantic_type: SemanticType::Field,
1470                data_type_extension: None,
1471            },
1472            Column {
1473                name: "tag".to_string(),
1474                data_type: ColumnDataType::String,
1475                semantic_type: SemanticType::Tag,
1476                data_type_extension: None,
1477            },
1478        ];
1479
1480        // Test the logic that creates Arrow schema fields
1481        let fields: Vec<Field> = columns
1482            .iter()
1483            .map(|col| {
1484                let nullable = col.semantic_type != SemanticType::Timestamp;
1485                let data_type = match col.data_type {
1486                    ColumnDataType::TimestampMillisecond => {
1487                        DataType::Timestamp(TimeUnit::Millisecond, None)
1488                    }
1489                    ColumnDataType::Int64 => DataType::Int64,
1490                    ColumnDataType::String => DataType::Utf8,
1491                    _ => DataType::Utf8, // fallback
1492                };
1493                Field::new(&col.name, data_type, nullable)
1494            })
1495            .collect();
1496
1497        assert_eq!(fields.len(), 3);
1498
1499        // Timestamp field should be non-nullable
1500        assert!(
1501            !fields[0].is_nullable(),
1502            "Timestamp field should be non-nullable"
1503        );
1504        assert_eq!(fields[0].name(), "ts");
1505
1506        // Value field should be nullable
1507        assert!(fields[1].is_nullable(), "Value field should be nullable");
1508        assert_eq!(fields[1].name(), "value");
1509
1510        // Tag field should be nullable
1511        assert!(fields[2].is_nullable(), "Tag field should be nullable");
1512        assert_eq!(fields[2].name(), "tag");
1513    }
1514
1515    #[test]
1516    fn test_rows_from_record_batch() {
1517        // 1. Create a sample RecordBatch
1518        let schema_vec = vec![
1519            Column {
1520                name: "id".to_string(),
1521                data_type: ColumnDataType::Int32,
1522                semantic_type: SemanticType::Field,
1523                data_type_extension: None,
1524            },
1525            Column {
1526                name: "msg".to_string(),
1527                data_type: ColumnDataType::String,
1528                semantic_type: SemanticType::Field,
1529                data_type_extension: None,
1530            },
1531        ];
1532        let record_batch = {
1533            let mut sample_rows = Rows::new(&schema_vec, 2, 2).unwrap();
1534            let row1 = crate::table::Row::new()
1535                .add_values(vec![Value::Int32(1), Value::String("hello".to_string())]);
1536            let row2 = crate::table::Row::new()
1537                .add_values(vec![Value::Int32(2), Value::String("world".to_string())]);
1538            sample_rows.add_row(row1).unwrap();
1539            sample_rows.add_row(row2).unwrap();
1540            RecordBatch::try_from(sample_rows).unwrap()
1541        };
1542
1543        let original_schema = record_batch.schema();
1544        let original_num_rows = record_batch.num_rows();
1545        let original_num_cols = record_batch.num_columns();
1546
1547        // 2. Create Rows from the RecordBatch
1548        let mut rows_from_batch = Rows::from_record_batch(record_batch.clone()).unwrap();
1549
1550        // 3. Verify schema, row count, and column count
1551        assert_eq!(*rows_from_batch.schema(), *original_schema);
1552        assert_eq!(rows_from_batch.len(), original_num_rows);
1553        assert_eq!(rows_from_batch.column_count, original_num_cols);
1554        assert!(!rows_from_batch.is_empty());
1555        assert_eq!(rows_from_batch.len(), 2);
1556
1557        // 4. Ensure adding a new row fails
1558        let row_to_add = crate::table::Row::new()
1559            .add_values(vec![Value::Int32(3), Value::String("new".to_string())]);
1560        let add_result = rows_from_batch.add_row(row_to_add);
1561        assert!(add_result.is_err());
1562        assert_eq!(
1563            add_result.unwrap_err().to_string(),
1564            "Cannot add row to a Rows object that was created from a RecordBatch"
1565        );
1566
1567        // 5. Verify that converting back yields the original RecordBatch
1568        let converted_batch = RecordBatch::try_from(rows_from_batch).unwrap();
1569        assert_eq!(converted_batch, record_batch);
1570    }
1571}