Skip to main content

hyperdb_api/
arrow_inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`ArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables.
8//!
9//! Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
10//! format, `ArrowInserter` accepts complete Arrow IPC stream data, making it ideal
11//! for integration with Arrow-based data pipelines.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
17//! # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
18//! # fn get_arrow_data() -> Vec<u8> { vec![] }
19//! use hyperdb_api::{ArrowInserter, Connection, TableDefinition};
20//!
21//! // Arrow IPC data from external source (e.g., arrow crate, Parquet reader)
22//! let arrow_ipc_data: Vec<u8> = get_arrow_data();
23//!
24//! let mut inserter = ArrowInserter::new(&conn, &table_def)?;
25//! inserter.insert_data(&arrow_ipc_data)?;
26//! let rows = inserter.execute()?;
27//! # Ok(())
28//! # }
29//! ```
30
31use std::time::Instant;
32
33use arrow::ipc::writer::StreamWriter;
34use hyperdb_api_core::client::client::CopyInWriter;
35use tracing::{debug, info};
36
37/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
38const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
39
40use crate::catalog::Catalog;
41use crate::connection::Connection;
42use crate::data_format::DataFormat;
43use crate::error::{Error, Result};
44use crate::table_definition::TableDefinition;
45
46/// Tracks which insertion pathway is active, preventing unsafe mixing of
47/// raw IPC methods (`insert_data`/`insert_record_batches`/`insert_raw`) with
48/// the `RecordBatch`-based method (`insert_batch`).
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50enum InsertMode {
51    /// Raw IPC bytes sent via `insert_data` / `insert_record_batches` / `insert_raw`.
52    RawIpc,
53    /// `RecordBatch` objects serialized through a `StreamWriter` via `insert_batch`.
54    BatchIpc,
55}
56
57/// Inserts Arrow IPC stream data into a Hyper table.
58///
59/// Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
60/// format, `ArrowInserter` accepts pre-formatted Arrow IPC stream data. This is useful
61/// when integrating with Arrow-based data pipelines or when you already have data in
62/// Arrow format.
63///
64/// # Arrow IPC Stream Format
65///
66/// Arrow IPC streams consist of:
67/// 1. A schema message (describing column names and types)
68/// 2. One or more record batch messages (containing the actual data)
69///
70/// The schema must match the target table's schema.
71///
72/// # Single vs Multiple Chunks
73///
74/// For single-chunk inserts, use [`insert_data()`](Self::insert_data) with a complete
75/// Arrow IPC stream (schema + record batches).
76///
77/// For multiple chunks (large datasets), use:
78/// 1. [`insert_data()`](Self::insert_data) for the first chunk (with schema)
79/// 2. [`insert_record_batches()`](Self::insert_record_batches) for subsequent chunks (without schema)
80///
81/// # Example
82///
83/// ```no_run
84/// use hyperdb_api::{ArrowInserter, Connection, CreateMode, Catalog, TableDefinition, SqlType, Result};
85///
86/// fn main() -> Result<()> {
87///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
88///
89///     let table_def = TableDefinition::new("data")
90///         .add_required_column("id", SqlType::int())
91///         .add_nullable_column("value", SqlType::double());
92///
93///     Catalog::new(&conn).create_table(&table_def)?;
94///
95///     // Arrow IPC data from external source
96///     let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
97///
98///     let mut inserter = ArrowInserter::new(&conn, &table_def)?;
99///     inserter.insert_data(&arrow_data)?;
100///     let rows = inserter.execute()?;
101///     println!("Inserted {} rows", rows);
102///     Ok(())
103/// }
104/// ```
105pub struct ArrowInserter<'conn> {
106    connection: &'conn Connection,
107    table_name: String,
108    columns: Vec<String>,
109    writer: Option<CopyInWriter<'conn>>,
110    /// Tracks whether an Arrow schema has been sent.
111    /// Sending schema twice causes an error in Hyper.
112    schema_sent: bool,
113    /// Total bytes sent (for logging).
114    total_bytes: usize,
115    /// Number of chunks sent.
116    chunk_count: usize,
117    /// Start time for timing the insert operation.
118    start_time: Instant,
119    /// Flush threshold in bytes. Data is buffered until this threshold is reached.
120    flush_threshold: usize,
121    /// Bytes buffered since the last flush.
122    buffered_bytes: usize,
123    /// Persistent IPC `StreamWriter` for `insert_batch()`. Streams each `RecordBatch`
124    /// eagerly — the schema is written on first use, then each batch is serialized
125    /// and sent immediately via `send_direct()`. The internal `Vec<u8>` buffer is
126    /// drained after every write to keep memory usage bounded at `O(batch_size)`.
127    batch_ipc_writer: Option<StreamWriter<Vec<u8>>>,
128    /// Tracks which insertion pathway is active. `None` means no data has been
129    /// sent yet. Once set, mixing pathways is an error.
130    insert_mode: Option<InsertMode>,
131}
132
133impl std::fmt::Debug for ArrowInserter<'_> {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        f.debug_struct("ArrowInserter")
136            .field("table_name", &self.table_name)
137            .field("columns", &self.columns)
138            .field("schema_sent", &self.schema_sent)
139            .field("chunk_count", &self.chunk_count)
140            .field("total_bytes", &self.total_bytes)
141            .finish_non_exhaustive()
142    }
143}
144
145impl<'conn> ArrowInserter<'conn> {
146    /// Creates a new Arrow inserter for the given table.
147    ///
148    /// # Arguments
149    ///
150    /// * `connection` - The database connection.
151    /// * `table_def` - The table definition for the target table.
152    ///
153    /// # Example
154    ///
155    /// ```no_run
156    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
157    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
158    /// let inserter = ArrowInserter::new(&conn, &table_def)?;
159    /// # Ok(())
160    /// # }
161    /// ```
162    ///
163    /// # Errors
164    ///
165    /// - Returns [`Error::InvalidTableDefinition`] with message
166    ///   `"Table definition must have at least one column"` if `table_def`
167    ///   has no columns.
168    /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
169    ///   (COPY is TCP-only).
170    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
171        let column_count = table_def.column_count();
172        if column_count == 0 {
173            return Err(Error::invalid_table_definition(
174                "Table definition must have at least one column",
175            ));
176        }
177
178        // Fail fast: verify the connection supports COPY (TCP only).
179        // The actual COPY session is started lazily on the first data write
180        // to avoid locking the connection into COPY mode prematurely.
181        if connection.tcp_client().is_none() {
182            return Err(Error::feature_not_supported(
183                "ArrowInserter requires a TCP connection. \
184                 gRPC connections do not support COPY operations.",
185            ));
186        }
187
188        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
189        let table_name = table_def.qualified_name();
190
191        Ok(ArrowInserter {
192            connection,
193            table_name,
194            columns,
195            writer: None,
196            schema_sent: false,
197            total_bytes: 0,
198            chunk_count: 0,
199            start_time: Instant::now(),
200            flush_threshold: DEFAULT_FLUSH_THRESHOLD,
201            buffered_bytes: 0,
202            batch_ipc_writer: None,
203            insert_mode: None,
204        })
205    }
206
207    /// Creates an Arrow inserter by querying the table schema from the database.
208    ///
209    /// This method queries the database to get the table definition automatically,
210    /// which is useful when you want to insert into an existing table without
211    /// manually specifying the schema.
212    ///
213    /// # Arguments
214    ///
215    /// * `connection` - The database connection.
216    /// * `table_name` - The table name (can include database and schema qualifiers).
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
221    ///
222    /// # Example
223    ///
224    /// ```no_run
225    /// # use hyperdb_api::{ArrowInserter, Connection, Result};
226    /// # fn example(conn: &Connection) -> Result<()> {
227    /// let inserter = ArrowInserter::from_table(&conn, "public.my_table")?;
228    /// # Ok(())
229    /// # }
230    /// ```
231    pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
232    where
233        T: TryInto<crate::TableName>,
234        crate::Error: From<T::Error>,
235    {
236        let catalog = Catalog::new(connection);
237        let table_def = catalog.get_table_definition(table_name)?;
238        Self::new(connection, &table_def)
239    }
240
241    /// Sets a custom flush threshold in bytes.
242    ///
243    /// Data is buffered until the threshold is reached, then flushed to the server.
244    /// Default is 16 MB (matching `HyperBinary` Inserter).
245    ///
246    /// # Example
247    ///
248    /// ```no_run
249    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
250    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
251    /// let inserter = ArrowInserter::new(&conn, &table_def)?
252    ///     .with_flush_threshold(32 * 1024 * 1024);  // 32 MB
253    /// # Ok(())
254    /// # }
255    /// ```
256    #[must_use]
257    pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
258        self.flush_threshold = threshold;
259        self
260    }
261
262    /// Inserts a complete Arrow IPC stream (schema + record batches).
263    ///
264    /// Use this method for single-chunk inserts or for the first chunk of
265    /// multi-chunk inserts. The Arrow IPC stream must include the schema message
266    /// followed by one or more record batch messages.
267    ///
268    /// # Arguments
269    ///
270    /// * `arrow_ipc_data` - Complete Arrow IPC stream data (schema + record batches).
271    ///
272    /// # Errors
273    ///
274    /// Returns an error if:
275    /// - The connection fails to start COPY
276    /// - Sending data fails
277    /// - Schema was already sent (use `insert_record_batches()` for subsequent chunks)
278    ///
279    /// # Example
280    ///
281    /// ```no_run
282    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
283    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
284    /// # let first_arrow_ipc_stream = vec![];
285    /// # let second_chunk_batches_only = vec![];
286    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
287    ///
288    /// // First chunk with schema
289    /// inserter.insert_data(&first_arrow_ipc_stream)?;
290    ///
291    /// // For subsequent chunks without schema, use insert_record_batches()
292    /// inserter.insert_record_batches(&second_chunk_batches_only)?;
293    ///
294    /// let rows = inserter.execute()?;
295    /// # Ok(())
296    /// # }
297    /// ```
298    pub fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
299        if arrow_ipc_data.is_empty() {
300            return Ok(());
301        }
302
303        if self.insert_mode == Some(InsertMode::BatchIpc) {
304            return Err(Error::invalid_operation(
305                "Cannot mix insert_data() with insert_batch(). \
306                 Use either raw IPC methods (insert_data/insert_record_batches) \
307                 or RecordBatch methods (insert_batch), not both.",
308            ));
309        }
310
311        if self.schema_sent {
312            return Err(Error::invalid_operation(
313                "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
314                 or use insert_data() only once with the complete Arrow IPC stream.",
315            ));
316        }
317
318        self.ensure_writer()?;
319
320        if let Some(ref mut writer) = self.writer {
321            writer.send_direct(arrow_ipc_data)?;
322        }
323        self.buffered_bytes += arrow_ipc_data.len();
324        self.maybe_flush()?;
325
326        self.insert_mode = Some(InsertMode::RawIpc);
327        self.schema_sent = true;
328        self.total_bytes += arrow_ipc_data.len();
329        self.chunk_count += 1;
330
331        debug!(
332            target: "hyperdb_api",
333            chunk = self.chunk_count,
334            bytes = arrow_ipc_data.len(),
335            total_bytes = self.total_bytes,
336            buffered_bytes = self.buffered_bytes,
337            "arrow-inserter-chunk"
338        );
339
340        Ok(())
341    }
342
343    /// Inserts Arrow record batch data without schema.
344    ///
345    /// Use this method for subsequent chunks after the first `insert_data()` call.
346    /// The data should contain only Arrow record batch messages, **not** the schema.
347    ///
348    /// **Important**: Sending schema twice causes an error in Hyper. If you have
349    /// multiple complete Arrow IPC streams (each with schema), you need to strip
350    /// the schema from all but the first one.
351    ///
352    /// # Arguments
353    ///
354    /// * `arrow_batch_data` - Arrow record batch data (no schema message).
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if:
359    /// - No schema has been sent yet (call `insert_data()` first)
360    /// - Sending data fails
361    ///
362    /// # Example
363    ///
364    /// ```no_run
365    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
366    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
367    /// # let first_chunk_with_schema = vec![];
368    /// # let second_chunk_batches = vec![];
369    /// # let third_chunk_batches = vec![];
370    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
371    ///
372    /// // First chunk must include schema
373    /// inserter.insert_data(&first_chunk_with_schema)?;
374    ///
375    /// // Subsequent chunks are record batches only
376    /// inserter.insert_record_batches(&second_chunk_batches)?;
377    /// inserter.insert_record_batches(&third_chunk_batches)?;
378    ///
379    /// let rows = inserter.execute()?;
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
384        if arrow_batch_data.is_empty() {
385            return Ok(());
386        }
387
388        if self.insert_mode == Some(InsertMode::BatchIpc) {
389            return Err(Error::invalid_operation(
390                "Cannot mix insert_record_batches() with insert_batch(). \
391                 Use either raw IPC methods (insert_data/insert_record_batches) \
392                 or RecordBatch methods (insert_batch), not both.",
393            ));
394        }
395
396        if !self.schema_sent {
397            return Err(Error::invalid_operation(
398                "No Arrow schema has been sent yet. Call insert_data() first with a complete \
399                 Arrow IPC stream that includes the schema.",
400            ));
401        }
402
403        if let Some(ref mut writer) = self.writer {
404            writer.send_direct(arrow_batch_data)?;
405        }
406        self.buffered_bytes += arrow_batch_data.len();
407        self.maybe_flush()?;
408
409        self.total_bytes += arrow_batch_data.len();
410        self.chunk_count += 1;
411
412        debug!(
413            target: "hyperdb_api",
414            chunk = self.chunk_count,
415            bytes = arrow_batch_data.len(),
416            total_bytes = self.total_bytes,
417            buffered_bytes = self.buffered_bytes,
418            "arrow-inserter-batch-chunk"
419        );
420
421        Ok(())
422    }
423
424    /// Inserts raw Arrow data without schema tracking.
425    ///
426    /// This is a low-level method that sends data directly without checking
427    /// whether schema has been sent. Use this only if you are managing schema
428    /// handling yourself.
429    ///
430    /// For most use cases, prefer [`insert_data()`](Self::insert_data) and
431    /// [`insert_record_batches()`](Self::insert_record_batches).
432    ///
433    /// # Arguments
434    ///
435    /// * `data` - Raw Arrow IPC data to send.
436    ///
437    /// # Errors
438    ///
439    /// - Returns [`Error::InvalidOperation`] if a previous `insert_batch`
440    ///   call already locked the inserter into `RecordBatch` IPC mode —
441    ///   raw IPC and `RecordBatch` paths cannot be mixed.
442    /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
443    ///   session fails to open.
444    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
445    ///   the data or the socket write fails.
446    pub fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
447        if data.is_empty() {
448            return Ok(());
449        }
450
451        if self.insert_mode == Some(InsertMode::BatchIpc) {
452            return Err(Error::invalid_operation(
453                "Cannot mix insert_raw() with insert_batch(). \
454                 Use either raw IPC methods (insert_data/insert_record_batches/insert_raw) \
455                 or RecordBatch methods (insert_batch), not both.",
456            ));
457        }
458
459        self.ensure_writer()?;
460
461        if let Some(ref mut writer) = self.writer {
462            writer.send_direct(data)?;
463        }
464        self.buffered_bytes += data.len();
465        self.maybe_flush()?;
466
467        self.total_bytes += data.len();
468        self.chunk_count += 1;
469
470        Ok(())
471    }
472
473    /// Finishes the insert operation and returns the number of rows inserted.
474    ///
475    /// This method completes the COPY operation and returns the row count
476    /// reported by the server.
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if:
481    /// - No data was sent
482    /// - The COPY operation fails to complete
483    ///
484    /// # Example
485    ///
486    /// ```no_run
487    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
488    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
489    /// # let arrow_data = vec![];
490    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
491    /// inserter.insert_data(&arrow_data)?;
492    /// let rows = inserter.execute()?;
493    /// # Ok(())
494    /// # }
495    /// ```
496    /// println!("Inserted {} rows", rows);
497    pub fn execute(mut self) -> Result<u64> {
498        // Finalize the IPC StreamWriter if insert_batch() was used.
499        // `into_inner()` calls `finish()` internally (writing the EOS marker)
500        // and returns the underlying buffer — a single fallible operation
501        // instead of the previous finish() + into_inner() pair.
502        //
503        // On error, `self` is dropped, and the Drop impl cancels the COPY
504        // writer, so the connection is always left in a clean state.
505        if let Some(ipc) = self.batch_ipc_writer.take() {
506            let buf = ipc.into_inner().map_err(|e| {
507                Error::conversion(format!("Failed to finalize Arrow IPC stream: {e}"))
508            })?;
509            if !buf.is_empty() {
510                if let Some(ref mut writer) = self.writer {
511                    writer.send_direct(&buf)?;
512                }
513            }
514        }
515
516        if self.writer.is_none() {
517            // No data was sent
518            return Ok(0);
519        }
520
521        let rows = self
522            .writer
523            .take()
524            .map(hyperdb_api_core::client::CopyInWriter::finish)
525            .transpose()?
526            .unwrap_or(0);
527
528        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
529        info!(
530            target: "hyperdb_api",
531            rows,
532            chunks = self.chunk_count,
533            total_bytes = self.total_bytes,
534            duration_ms,
535            table = %self.table_name,
536            "arrow-inserter-end"
537        );
538
539        Ok(rows)
540    }
541
542    /// Cancels the insert operation.
543    ///
544    /// All data sent so far will be discarded.
545    pub fn cancel(mut self) {
546        if let Some(writer) = self.writer.take() {
547            let _ = writer.cancel("Arrow insert cancelled");
548        }
549    }
550
551    /// Returns whether any data has been sent.
552    #[must_use]
553    pub fn has_data(&self) -> bool {
554        self.chunk_count > 0
555    }
556
557    /// Returns the total bytes sent so far.
558    #[must_use]
559    pub fn total_bytes(&self) -> usize {
560        self.total_bytes
561    }
562
563    /// Returns the number of chunks sent so far.
564    #[must_use]
565    pub fn chunk_count(&self) -> usize {
566        self.chunk_count
567    }
568
569    /// Inserts an Arrow `RecordBatch` directly, streaming it immediately.
570    ///
571    /// Each batch is serialized to Arrow IPC format and sent to the server right
572    /// away — no accumulation in memory. The schema is written automatically on
573    /// the first call; subsequent batches send only record-batch IPC messages.
574    ///
575    /// Memory usage is `O(batch_size)` regardless of how many batches are inserted.
576    ///
577    /// # Arguments
578    ///
579    /// * `batch` - The Arrow `RecordBatch` to insert.
580    ///
581    /// # Example
582    ///
583    /// ```no_run
584    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, SqlType, Catalog, CreateMode, Result};
585    /// # fn example() -> Result<()> {
586    /// # let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
587    /// # let table_def = TableDefinition::new("data")
588    /// #     .add_required_column("id", SqlType::int())
589    /// #     .add_nullable_column("value", SqlType::double());
590    /// # Catalog::new(&conn).create_table(&table_def)?;
591    /// use arrow::array::{Int32Array, Float64Array};
592    /// use arrow::datatypes::{Schema, Field, DataType};
593    /// use arrow::record_batch::RecordBatch;
594    /// use std::sync::Arc;
595    ///
596    /// let schema = Arc::new(Schema::new(vec![
597    ///     Field::new("id", DataType::Int32, false),
598    ///     Field::new("value", DataType::Float64, true),
599    /// ]));
600    /// let batch = RecordBatch::try_new(schema, vec![
601    ///     Arc::new(Int32Array::from(vec![1, 2, 3])),
602    ///     Arc::new(Float64Array::from(vec![Some(1.5), None, Some(3.5)])),
603    /// ]).unwrap();
604    ///
605    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
606    /// inserter.insert_batch(&batch)?;
607    /// let rows = inserter.execute()?;
608    /// # Ok(())
609    /// # }
610    /// ```
611    ///
612    /// # Errors
613    ///
614    /// - Returns [`Error::InvalidOperation`] if a previous raw-IPC call
615    ///   locked this inserter into the other mode — raw IPC and
616    ///   `RecordBatch` paths cannot be mixed.
617    /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
618    ///   session cannot be opened.
619    /// - Returns [`Error::Conversion`] wrapping the underlying Arrow IPC
620    ///   writer error if the schema or batch cannot be serialized (e.g.
621    ///   dictionary misalignment, encoding failure).
622    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
623    ///   the data or the socket write fails.
624    ///
625    /// # Panics
626    ///
627    /// Panics internally only if the IPC writer state is corrupted —
628    /// callers cannot trigger this from the public API. The `batch_ipc_writer`
629    /// is consulted via `as_mut().unwrap()` after it has just been set to
630    /// `Some`, so the unwrap is unreachable.
631    pub fn insert_batch(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
632        if self.insert_mode == Some(InsertMode::RawIpc) {
633            return Err(Error::invalid_operation(
634                "Cannot mix insert_batch() with raw IPC methods. \
635                 Use either RecordBatch methods (insert_batch) \
636                 or raw IPC methods (insert_data/insert_record_batches/insert_raw), not both.",
637            ));
638        }
639
640        self.ensure_writer()?;
641        self.insert_mode = Some(InsertMode::BatchIpc);
642
643        // Create the IPC StreamWriter on first use — this writes the schema message
644        if self.batch_ipc_writer.is_none() {
645            let ipc_writer = StreamWriter::try_new(Vec::new(), &batch.schema()).map_err(|e| {
646                Error::conversion(format!("Failed to create Arrow IPC writer: {e}"))
647            })?;
648            self.batch_ipc_writer = Some(ipc_writer);
649
650            // Drain the schema bytes that StreamWriter wrote during construction
651            self.drain_ipc_buffer()?;
652            self.schema_sent = true;
653        }
654
655        // Write the record batch — this appends batch IPC bytes to the internal Vec
656        self.batch_ipc_writer
657            .as_mut()
658            .expect("IPC writer must exist")
659            .write(batch)
660            .map_err(|e| Error::conversion(format!("Failed to write Arrow batch: {e}")))?;
661
662        // Drain the batch bytes and send them immediately
663        self.drain_ipc_buffer()?;
664        self.chunk_count += 1;
665
666        debug!(
667            target: "hyperdb_api",
668            chunk = self.chunk_count,
669            total_bytes = self.total_bytes,
670            buffered_bytes = self.buffered_bytes,
671            "arrow-inserter-batch"
672        );
673
674        Ok(())
675    }
676
677    /// Drains the internal IPC writer buffer and sends bytes via the COPY writer.
678    ///
679    /// # Safety contract with `StreamWriter`
680    ///
681    /// This method accesses the `StreamWriter`'s underlying `Vec<u8>` via
682    /// `get_mut()` (part of Arrow's public API) and drains it. This is safe
683    /// because `StreamWriter` writes sequentially via `Write::write_all()`
684    /// and does not cache buffer offsets or positions. After draining, the
685    /// Vec is empty but retains its allocation, so subsequent writes append
686    /// from offset 0 without reallocation.
687    fn drain_ipc_buffer(&mut self) -> Result<()> {
688        let ipc = self
689            .batch_ipc_writer
690            .as_mut()
691            .expect("IPC writer must exist");
692        let buf = ipc.get_mut();
693        if buf.is_empty() {
694            return Ok(());
695        }
696
697        // Send the current buffer contents, then clear while preserving the
698        // heap allocation so the next StreamWriter write avoids reallocation.
699        let len = buf.len();
700        if let Some(ref mut writer) = self.writer {
701            writer.send_direct(buf)?;
702        }
703        buf.clear();
704
705        self.buffered_bytes += len;
706        self.total_bytes += len;
707        self.maybe_flush()?;
708
709        Ok(())
710    }
711
712    /// Inserts multiple Arrow `RecordBatch`es, streaming each one immediately.
713    ///
714    /// This is a convenience method that calls [`insert_batch`](Self::insert_batch)
715    /// for each batch in the iterator. Memory usage stays bounded at `O(batch_size)`.
716    ///
717    /// # Example
718    ///
719    /// ```no_run
720    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
721    /// # fn example(conn: &Connection, table_def: &TableDefinition, batches: Vec<arrow::record_batch::RecordBatch>) -> Result<()> {
722    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
723    /// inserter.insert_batches(batches.iter())?;
724    /// let rows = inserter.execute()?;
725    /// # Ok(())
726    /// # }
727    /// ```
728    ///
729    /// # Errors
730    ///
731    /// Returns on the first batch that fails — see
732    /// [`insert_batch`](Self::insert_batch) for the failure modes.
733    pub fn insert_batches<'b>(
734        &mut self,
735        batches: impl IntoIterator<Item = &'b arrow::record_batch::RecordBatch>,
736    ) -> Result<()> {
737        for batch in batches {
738            self.insert_batch(batch)?;
739        }
740        Ok(())
741    }
742
743    /// Ensures the COPY writer is initialized.
744    fn ensure_writer(&mut self) -> Result<()> {
745        if self.writer.is_none() {
746            let client = self.connection.tcp_client().ok_or_else(|| {
747                crate::Error::feature_not_supported(
748                    "ArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.",
749                )
750            })?;
751            let columns: Vec<&str> = self
752                .columns
753                .iter()
754                .map(std::string::String::as_str)
755                .collect();
756            let mut writer = client.copy_in_with_format(
757                &self.table_name,
758                &columns,
759                DataFormat::ArrowStream.as_sql_str(),
760            )?;
761            // Pre-allocate buffer to avoid reallocations during bulk insert
762            writer.reserve_buffer(self.flush_threshold + 1024 * 1024);
763            self.writer = Some(writer);
764        }
765        Ok(())
766    }
767
768    /// Flushes the TCP stream if the threshold is reached.
769    ///
770    /// With `send_direct()`, data is written directly to TCP. This periodic
771    /// flush ensures data is pushed to the server for high-latency connections.
772    fn maybe_flush(&mut self) -> Result<()> {
773        if self.buffered_bytes >= self.flush_threshold {
774            if let Some(ref mut writer) = self.writer {
775                writer.flush_stream()?;
776            }
777            debug!(
778                target: "hyperdb_api",
779                flushed_bytes = self.buffered_bytes,
780                threshold = self.flush_threshold,
781                "arrow-inserter-flush"
782            );
783            self.buffered_bytes = 0;
784        }
785        Ok(())
786    }
787}
788
789// Implement Drop to handle cleanup if the inserter is dropped without calling execute()
790impl Drop for ArrowInserter<'_> {
791    fn drop(&mut self) {
792        // If writer exists and we're being dropped without execute(),
793        // cancel the operation to avoid leaving the connection in a bad state.
794        if let Some(writer) = self.writer.take() {
795            let _ = writer.cancel("Arrow inserter dropped without execute");
796        }
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803
804    #[test]
805    fn test_data_format_sql_str() {
806        assert_eq!(DataFormat::ArrowStream.as_sql_str(), "ARROWSTREAM");
807    }
808}