Skip to main content

hyperdb_api/
arrow_inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`ArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables.
8//!
9//! Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
10//! format, `ArrowInserter` accepts complete Arrow IPC stream data, making it ideal
11//! for integration with Arrow-based data pipelines.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
17//! # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
18//! # fn get_arrow_data() -> Vec<u8> { vec![] }
19//! use hyperdb_api::{ArrowInserter, Connection, TableDefinition};
20//!
21//! // Arrow IPC data from external source (e.g., arrow crate, Parquet reader)
22//! let arrow_ipc_data: Vec<u8> = get_arrow_data();
23//!
24//! let mut inserter = ArrowInserter::new(&conn, &table_def)?;
25//! inserter.insert_data(&arrow_ipc_data)?;
26//! let rows = inserter.execute()?;
27//! # Ok(())
28//! # }
29//! ```
30
31use std::time::Instant;
32
33use arrow::ipc::writer::StreamWriter;
34use hyperdb_api_core::client::client::CopyInWriter;
35use tracing::{debug, info};
36
37/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
38const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
39
40use crate::catalog::Catalog;
41use crate::connection::Connection;
42use crate::data_format::DataFormat;
43use crate::error::{Error, Result};
44use crate::table_definition::TableDefinition;
45
46/// Tracks which insertion pathway is active, preventing unsafe mixing of
47/// raw IPC methods (`insert_data`/`insert_record_batches`/`insert_raw`) with
48/// the `RecordBatch`-based method (`insert_batch`).
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50enum InsertMode {
51    /// Raw IPC bytes sent via `insert_data` / `insert_record_batches` / `insert_raw`.
52    RawIpc,
53    /// `RecordBatch` objects serialized through a `StreamWriter` via `insert_batch`.
54    BatchIpc,
55}
56
57/// Inserts Arrow IPC stream data into a Hyper table.
58///
59/// Unlike [`Inserter`](crate::Inserter) which builds rows incrementally in `HyperBinary`
60/// format, `ArrowInserter` accepts pre-formatted Arrow IPC stream data. This is useful
61/// when integrating with Arrow-based data pipelines or when you already have data in
62/// Arrow format.
63///
64/// # Arrow IPC Stream Format
65///
66/// Arrow IPC streams consist of:
67/// 1. A schema message (describing column names and types)
68/// 2. One or more record batch messages (containing the actual data)
69///
70/// The schema must match the target table's schema.
71///
72/// # Single vs Multiple Chunks
73///
74/// For single-chunk inserts, use [`insert_data()`](Self::insert_data) with a complete
75/// Arrow IPC stream (schema + record batches).
76///
77/// For multiple chunks (large datasets), use:
78/// 1. [`insert_data()`](Self::insert_data) for the first chunk (with schema)
79/// 2. [`insert_record_batches()`](Self::insert_record_batches) for subsequent chunks (without schema)
80///
81/// # Example
82///
83/// ```no_run
84/// use hyperdb_api::{ArrowInserter, Connection, CreateMode, Catalog, TableDefinition, SqlType, Result};
85///
86/// fn main() -> Result<()> {
87///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
88///
89///     let table_def = TableDefinition::new("data")
90///         .add_required_column("id", SqlType::int())
91///         .add_nullable_column("value", SqlType::double());
92///
93///     Catalog::new(&conn).create_table(&table_def)?;
94///
95///     // Arrow IPC data from external source
96///     let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
97///
98///     let mut inserter = ArrowInserter::new(&conn, &table_def)?;
99///     inserter.insert_data(&arrow_data)?;
100///     let rows = inserter.execute()?;
101///     println!("Inserted {} rows", rows);
102///     Ok(())
103/// }
104/// ```
105pub struct ArrowInserter<'conn> {
106    connection: &'conn Connection,
107    table_name: String,
108    columns: Vec<String>,
109    writer: Option<CopyInWriter<'conn>>,
110    /// Tracks whether an Arrow schema has been sent.
111    /// Sending schema twice causes an error in Hyper.
112    schema_sent: bool,
113    /// Total bytes sent (for logging).
114    total_bytes: usize,
115    /// Number of chunks sent.
116    chunk_count: usize,
117    /// Start time for timing the insert operation.
118    start_time: Instant,
119    /// Flush threshold in bytes. Data is buffered until this threshold is reached.
120    flush_threshold: usize,
121    /// Bytes buffered since the last flush.
122    buffered_bytes: usize,
123    /// Persistent IPC `StreamWriter` for `insert_batch()`. Streams each `RecordBatch`
124    /// eagerly — the schema is written on first use, then each batch is serialized
125    /// and sent immediately via `send_direct()`. The internal `Vec<u8>` buffer is
126    /// drained after every write to keep memory usage bounded at `O(batch_size)`.
127    batch_ipc_writer: Option<StreamWriter<Vec<u8>>>,
128    /// Tracks which insertion pathway is active. `None` means no data has been
129    /// sent yet. Once set, mixing pathways is an error.
130    insert_mode: Option<InsertMode>,
131}
132
133impl std::fmt::Debug for ArrowInserter<'_> {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        f.debug_struct("ArrowInserter")
136            .field("table_name", &self.table_name)
137            .field("columns", &self.columns)
138            .field("schema_sent", &self.schema_sent)
139            .field("chunk_count", &self.chunk_count)
140            .field("total_bytes", &self.total_bytes)
141            .finish_non_exhaustive()
142    }
143}
144
145impl<'conn> ArrowInserter<'conn> {
146    /// Creates a new Arrow inserter for the given table.
147    ///
148    /// # Arguments
149    ///
150    /// * `connection` - The database connection.
151    /// * `table_def` - The table definition for the target table.
152    ///
153    /// # Example
154    ///
155    /// ```no_run
156    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
157    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
158    /// let inserter = ArrowInserter::new(&conn, &table_def)?;
159    /// # Ok(())
160    /// # }
161    /// ```
162    ///
163    /// # Errors
164    ///
165    /// - Returns [`Error::Other`] with message
166    ///   `"Table definition must have at least one column"` if `table_def`
167    ///   has no columns.
168    /// - Returns [`Error::Other`] if `connection` is using gRPC transport
169    ///   (COPY is TCP-only).
170    pub fn new(connection: &'conn Connection, table_def: &TableDefinition) -> Result<Self> {
171        let column_count = table_def.column_count();
172        if column_count == 0 {
173            return Err(Error::new("Table definition must have at least one column"));
174        }
175
176        // Fail fast: verify the connection supports COPY (TCP only).
177        // The actual COPY session is started lazily on the first data write
178        // to avoid locking the connection into COPY mode prematurely.
179        if connection.tcp_client().is_none() {
180            return Err(Error::new(
181                "ArrowInserter requires a TCP connection. \
182                 gRPC connections do not support COPY operations.",
183            ));
184        }
185
186        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
187        let table_name = table_def.qualified_name();
188
189        Ok(ArrowInserter {
190            connection,
191            table_name,
192            columns,
193            writer: None,
194            schema_sent: false,
195            total_bytes: 0,
196            chunk_count: 0,
197            start_time: Instant::now(),
198            flush_threshold: DEFAULT_FLUSH_THRESHOLD,
199            buffered_bytes: 0,
200            batch_ipc_writer: None,
201            insert_mode: None,
202        })
203    }
204
205    /// Creates an Arrow inserter by querying the table schema from the database.
206    ///
207    /// This method queries the database to get the table definition automatically,
208    /// which is useful when you want to insert into an existing table without
209    /// manually specifying the schema.
210    ///
211    /// # Arguments
212    ///
213    /// * `connection` - The database connection.
214    /// * `table_name` - The table name (can include database and schema qualifiers).
215    ///
216    /// # Errors
217    ///
218    /// Returns an error if the table doesn't exist or if the schema cannot be retrieved.
219    ///
220    /// # Example
221    ///
222    /// ```no_run
223    /// # use hyperdb_api::{ArrowInserter, Connection, Result};
224    /// # fn example(conn: &Connection) -> Result<()> {
225    /// let inserter = ArrowInserter::from_table(&conn, "public.my_table")?;
226    /// # Ok(())
227    /// # }
228    /// ```
229    pub fn from_table<T>(connection: &'conn Connection, table_name: T) -> Result<Self>
230    where
231        T: TryInto<crate::TableName>,
232        crate::Error: From<T::Error>,
233    {
234        let catalog = Catalog::new(connection);
235        let table_def = catalog.get_table_definition(table_name)?;
236        Self::new(connection, &table_def)
237    }
238
239    /// Sets a custom flush threshold in bytes.
240    ///
241    /// Data is buffered until the threshold is reached, then flushed to the server.
242    /// Default is 16 MB (matching `HyperBinary` Inserter).
243    ///
244    /// # Example
245    ///
246    /// ```no_run
247    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
248    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
249    /// let inserter = ArrowInserter::new(&conn, &table_def)?
250    ///     .with_flush_threshold(32 * 1024 * 1024);  // 32 MB
251    /// # Ok(())
252    /// # }
253    /// ```
254    #[must_use]
255    pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
256        self.flush_threshold = threshold;
257        self
258    }
259
260    /// Inserts a complete Arrow IPC stream (schema + record batches).
261    ///
262    /// Use this method for single-chunk inserts or for the first chunk of
263    /// multi-chunk inserts. The Arrow IPC stream must include the schema message
264    /// followed by one or more record batch messages.
265    ///
266    /// # Arguments
267    ///
268    /// * `arrow_ipc_data` - Complete Arrow IPC stream data (schema + record batches).
269    ///
270    /// # Errors
271    ///
272    /// Returns an error if:
273    /// - The connection fails to start COPY
274    /// - Sending data fails
275    /// - Schema was already sent (use `insert_record_batches()` for subsequent chunks)
276    ///
277    /// # Example
278    ///
279    /// ```no_run
280    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
281    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
282    /// # let first_arrow_ipc_stream = vec![];
283    /// # let second_chunk_batches_only = vec![];
284    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
285    ///
286    /// // First chunk with schema
287    /// inserter.insert_data(&first_arrow_ipc_stream)?;
288    ///
289    /// // For subsequent chunks without schema, use insert_record_batches()
290    /// inserter.insert_record_batches(&second_chunk_batches_only)?;
291    ///
292    /// let rows = inserter.execute()?;
293    /// # Ok(())
294    /// # }
295    /// ```
296    pub fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
297        if arrow_ipc_data.is_empty() {
298            return Ok(());
299        }
300
301        if self.insert_mode == Some(InsertMode::BatchIpc) {
302            return Err(Error::new(
303                "Cannot mix insert_data() with insert_batch(). \
304                 Use either raw IPC methods (insert_data/insert_record_batches) \
305                 or RecordBatch methods (insert_batch), not both.",
306            ));
307        }
308
309        if self.schema_sent {
310            return Err(Error::new(
311                "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
312                 or use insert_data() only once with the complete Arrow IPC stream.",
313            ));
314        }
315
316        self.ensure_writer()?;
317
318        if let Some(ref mut writer) = self.writer {
319            writer.send_direct(arrow_ipc_data)?;
320        }
321        self.buffered_bytes += arrow_ipc_data.len();
322        self.maybe_flush()?;
323
324        self.insert_mode = Some(InsertMode::RawIpc);
325        self.schema_sent = true;
326        self.total_bytes += arrow_ipc_data.len();
327        self.chunk_count += 1;
328
329        debug!(
330            target: "hyperdb_api",
331            chunk = self.chunk_count,
332            bytes = arrow_ipc_data.len(),
333            total_bytes = self.total_bytes,
334            buffered_bytes = self.buffered_bytes,
335            "arrow-inserter-chunk"
336        );
337
338        Ok(())
339    }
340
341    /// Inserts Arrow record batch data without schema.
342    ///
343    /// Use this method for subsequent chunks after the first `insert_data()` call.
344    /// The data should contain only Arrow record batch messages, **not** the schema.
345    ///
346    /// **Important**: Sending schema twice causes an error in Hyper. If you have
347    /// multiple complete Arrow IPC streams (each with schema), you need to strip
348    /// the schema from all but the first one.
349    ///
350    /// # Arguments
351    ///
352    /// * `arrow_batch_data` - Arrow record batch data (no schema message).
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if:
357    /// - No schema has been sent yet (call `insert_data()` first)
358    /// - Sending data fails
359    ///
360    /// # Example
361    ///
362    /// ```no_run
363    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
364    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
365    /// # let first_chunk_with_schema = vec![];
366    /// # let second_chunk_batches = vec![];
367    /// # let third_chunk_batches = vec![];
368    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
369    ///
370    /// // First chunk must include schema
371    /// inserter.insert_data(&first_chunk_with_schema)?;
372    ///
373    /// // Subsequent chunks are record batches only
374    /// inserter.insert_record_batches(&second_chunk_batches)?;
375    /// inserter.insert_record_batches(&third_chunk_batches)?;
376    ///
377    /// let rows = inserter.execute()?;
378    /// # Ok(())
379    /// # }
380    /// ```
381    pub fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
382        if arrow_batch_data.is_empty() {
383            return Ok(());
384        }
385
386        if self.insert_mode == Some(InsertMode::BatchIpc) {
387            return Err(Error::new(
388                "Cannot mix insert_record_batches() with insert_batch(). \
389                 Use either raw IPC methods (insert_data/insert_record_batches) \
390                 or RecordBatch methods (insert_batch), not both.",
391            ));
392        }
393
394        if !self.schema_sent {
395            return Err(Error::new(
396                "No Arrow schema has been sent yet. Call insert_data() first with a complete \
397                 Arrow IPC stream that includes the schema.",
398            ));
399        }
400
401        if let Some(ref mut writer) = self.writer {
402            writer.send_direct(arrow_batch_data)?;
403        }
404        self.buffered_bytes += arrow_batch_data.len();
405        self.maybe_flush()?;
406
407        self.total_bytes += arrow_batch_data.len();
408        self.chunk_count += 1;
409
410        debug!(
411            target: "hyperdb_api",
412            chunk = self.chunk_count,
413            bytes = arrow_batch_data.len(),
414            total_bytes = self.total_bytes,
415            buffered_bytes = self.buffered_bytes,
416            "arrow-inserter-batch-chunk"
417        );
418
419        Ok(())
420    }
421
422    /// Inserts raw Arrow data without schema tracking.
423    ///
424    /// This is a low-level method that sends data directly without checking
425    /// whether schema has been sent. Use this only if you are managing schema
426    /// handling yourself.
427    ///
428    /// For most use cases, prefer [`insert_data()`](Self::insert_data) and
429    /// [`insert_record_batches()`](Self::insert_record_batches).
430    ///
431    /// # Arguments
432    ///
433    /// * `data` - Raw Arrow IPC data to send.
434    ///
435    /// # Errors
436    ///
437    /// - Returns [`Error::Other`] if a previous `insert_batch` call already
438    ///   locked the inserter into `RecordBatch` IPC mode — raw IPC and
439    ///   `RecordBatch` paths cannot be mixed.
440    /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
441    ///   session fails to open.
442    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
443    ///   the data or the socket write fails.
444    pub fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
445        if data.is_empty() {
446            return Ok(());
447        }
448
449        if self.insert_mode == Some(InsertMode::BatchIpc) {
450            return Err(Error::new(
451                "Cannot mix insert_raw() with insert_batch(). \
452                 Use either raw IPC methods (insert_data/insert_record_batches/insert_raw) \
453                 or RecordBatch methods (insert_batch), not both.",
454            ));
455        }
456
457        self.ensure_writer()?;
458
459        if let Some(ref mut writer) = self.writer {
460            writer.send_direct(data)?;
461        }
462        self.buffered_bytes += data.len();
463        self.maybe_flush()?;
464
465        self.total_bytes += data.len();
466        self.chunk_count += 1;
467
468        Ok(())
469    }
470
471    /// Finishes the insert operation and returns the number of rows inserted.
472    ///
473    /// This method completes the COPY operation and returns the row count
474    /// reported by the server.
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if:
479    /// - No data was sent
480    /// - The COPY operation fails to complete
481    ///
482    /// # Example
483    ///
484    /// ```no_run
485    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
486    /// # fn example(conn: &Connection, table_def: &TableDefinition) -> Result<()> {
487    /// # let arrow_data = vec![];
488    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
489    /// inserter.insert_data(&arrow_data)?;
490    /// let rows = inserter.execute()?;
491    /// # Ok(())
492    /// # }
493    /// ```
494    /// println!("Inserted {} rows", rows);
495    pub fn execute(mut self) -> Result<u64> {
496        // Finalize the IPC StreamWriter if insert_batch() was used.
497        // `into_inner()` calls `finish()` internally (writing the EOS marker)
498        // and returns the underlying buffer — a single fallible operation
499        // instead of the previous finish() + into_inner() pair.
500        //
501        // On error, `self` is dropped, and the Drop impl cancels the COPY
502        // writer, so the connection is always left in a clean state.
503        if let Some(ipc) = self.batch_ipc_writer.take() {
504            let buf = ipc
505                .into_inner()
506                .map_err(|e| Error::new(format!("Failed to finalize Arrow IPC stream: {e}")))?;
507            if !buf.is_empty() {
508                if let Some(ref mut writer) = self.writer {
509                    writer.send_direct(&buf)?;
510                }
511            }
512        }
513
514        if self.writer.is_none() {
515            // No data was sent
516            return Ok(0);
517        }
518
519        let rows = self
520            .writer
521            .take()
522            .map(hyperdb_api_core::client::CopyInWriter::finish)
523            .transpose()?
524            .unwrap_or(0);
525
526        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
527        info!(
528            target: "hyperdb_api",
529            rows,
530            chunks = self.chunk_count,
531            total_bytes = self.total_bytes,
532            duration_ms,
533            table = %self.table_name,
534            "arrow-inserter-end"
535        );
536
537        Ok(rows)
538    }
539
540    /// Cancels the insert operation.
541    ///
542    /// All data sent so far will be discarded.
543    pub fn cancel(mut self) {
544        if let Some(writer) = self.writer.take() {
545            let _ = writer.cancel("Arrow insert cancelled");
546        }
547    }
548
549    /// Returns whether any data has been sent.
550    #[must_use]
551    pub fn has_data(&self) -> bool {
552        self.chunk_count > 0
553    }
554
555    /// Returns the total bytes sent so far.
556    #[must_use]
557    pub fn total_bytes(&self) -> usize {
558        self.total_bytes
559    }
560
561    /// Returns the number of chunks sent so far.
562    #[must_use]
563    pub fn chunk_count(&self) -> usize {
564        self.chunk_count
565    }
566
567    /// Inserts an Arrow `RecordBatch` directly, streaming it immediately.
568    ///
569    /// Each batch is serialized to Arrow IPC format and sent to the server right
570    /// away — no accumulation in memory. The schema is written automatically on
571    /// the first call; subsequent batches send only record-batch IPC messages.
572    ///
573    /// Memory usage is `O(batch_size)` regardless of how many batches are inserted.
574    ///
575    /// # Arguments
576    ///
577    /// * `batch` - The Arrow `RecordBatch` to insert.
578    ///
579    /// # Example
580    ///
581    /// ```no_run
582    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, SqlType, Catalog, CreateMode, Result};
583    /// # fn example() -> Result<()> {
584    /// # let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
585    /// # let table_def = TableDefinition::new("data")
586    /// #     .add_required_column("id", SqlType::int())
587    /// #     .add_nullable_column("value", SqlType::double());
588    /// # Catalog::new(&conn).create_table(&table_def)?;
589    /// use arrow::array::{Int32Array, Float64Array};
590    /// use arrow::datatypes::{Schema, Field, DataType};
591    /// use arrow::record_batch::RecordBatch;
592    /// use std::sync::Arc;
593    ///
594    /// let schema = Arc::new(Schema::new(vec![
595    ///     Field::new("id", DataType::Int32, false),
596    ///     Field::new("value", DataType::Float64, true),
597    /// ]));
598    /// let batch = RecordBatch::try_new(schema, vec![
599    ///     Arc::new(Int32Array::from(vec![1, 2, 3])),
600    ///     Arc::new(Float64Array::from(vec![Some(1.5), None, Some(3.5)])),
601    /// ]).unwrap();
602    ///
603    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
604    /// inserter.insert_batch(&batch)?;
605    /// let rows = inserter.execute()?;
606    /// # Ok(())
607    /// # }
608    /// ```
609    ///
610    /// # Errors
611    ///
612    /// - Returns [`Error::Other`] if a previous raw-IPC call locked this
613    ///   inserter into the other mode — raw IPC and `RecordBatch` paths
614    ///   cannot be mixed.
615    /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
616    ///   session cannot be opened.
617    /// - Returns [`Error::Other`] wrapping the underlying Arrow IPC
618    ///   writer error if the schema or batch cannot be serialized (e.g.
619    ///   dictionary misalignment, encoding failure).
620    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
621    ///   the data or the socket write fails.
622    ///
623    /// # Panics
624    ///
625    /// Panics internally only if the IPC writer state is corrupted —
626    /// callers cannot trigger this from the public API. The `batch_ipc_writer`
627    /// is consulted via `as_mut().unwrap()` after it has just been set to
628    /// `Some`, so the unwrap is unreachable.
629    pub fn insert_batch(&mut self, batch: &arrow::record_batch::RecordBatch) -> Result<()> {
630        if self.insert_mode == Some(InsertMode::RawIpc) {
631            return Err(Error::new(
632                "Cannot mix insert_batch() with raw IPC methods. \
633                 Use either RecordBatch methods (insert_batch) \
634                 or raw IPC methods (insert_data/insert_record_batches/insert_raw), not both.",
635            ));
636        }
637
638        self.ensure_writer()?;
639        self.insert_mode = Some(InsertMode::BatchIpc);
640
641        // Create the IPC StreamWriter on first use — this writes the schema message
642        if self.batch_ipc_writer.is_none() {
643            let ipc_writer = StreamWriter::try_new(Vec::new(), &batch.schema())
644                .map_err(|e| Error::new(format!("Failed to create Arrow IPC writer: {e}")))?;
645            self.batch_ipc_writer = Some(ipc_writer);
646
647            // Drain the schema bytes that StreamWriter wrote during construction
648            self.drain_ipc_buffer()?;
649            self.schema_sent = true;
650        }
651
652        // Write the record batch — this appends batch IPC bytes to the internal Vec
653        self.batch_ipc_writer
654            .as_mut()
655            .expect("IPC writer must exist")
656            .write(batch)
657            .map_err(|e| Error::new(format!("Failed to write Arrow batch: {e}")))?;
658
659        // Drain the batch bytes and send them immediately
660        self.drain_ipc_buffer()?;
661        self.chunk_count += 1;
662
663        debug!(
664            target: "hyperdb_api",
665            chunk = self.chunk_count,
666            total_bytes = self.total_bytes,
667            buffered_bytes = self.buffered_bytes,
668            "arrow-inserter-batch"
669        );
670
671        Ok(())
672    }
673
674    /// Drains the internal IPC writer buffer and sends bytes via the COPY writer.
675    ///
676    /// # Safety contract with `StreamWriter`
677    ///
678    /// This method accesses the `StreamWriter`'s underlying `Vec<u8>` via
679    /// `get_mut()` (part of Arrow's public API) and drains it. This is safe
680    /// because `StreamWriter` writes sequentially via `Write::write_all()`
681    /// and does not cache buffer offsets or positions. After draining, the
682    /// Vec is empty but retains its allocation, so subsequent writes append
683    /// from offset 0 without reallocation.
684    fn drain_ipc_buffer(&mut self) -> Result<()> {
685        let ipc = self
686            .batch_ipc_writer
687            .as_mut()
688            .expect("IPC writer must exist");
689        let buf = ipc.get_mut();
690        if buf.is_empty() {
691            return Ok(());
692        }
693
694        // Send the current buffer contents, then clear while preserving the
695        // heap allocation so the next StreamWriter write avoids reallocation.
696        let len = buf.len();
697        if let Some(ref mut writer) = self.writer {
698            writer.send_direct(buf)?;
699        }
700        buf.clear();
701
702        self.buffered_bytes += len;
703        self.total_bytes += len;
704        self.maybe_flush()?;
705
706        Ok(())
707    }
708
709    /// Inserts multiple Arrow `RecordBatch`es, streaming each one immediately.
710    ///
711    /// This is a convenience method that calls [`insert_batch`](Self::insert_batch)
712    /// for each batch in the iterator. Memory usage stays bounded at `O(batch_size)`.
713    ///
714    /// # Example
715    ///
716    /// ```no_run
717    /// # use hyperdb_api::{ArrowInserter, Connection, TableDefinition, Result};
718    /// # fn example(conn: &Connection, table_def: &TableDefinition, batches: Vec<arrow::record_batch::RecordBatch>) -> Result<()> {
719    /// let mut inserter = ArrowInserter::new(&conn, &table_def)?;
720    /// inserter.insert_batches(batches.iter())?;
721    /// let rows = inserter.execute()?;
722    /// # Ok(())
723    /// # }
724    /// ```
725    ///
726    /// # Errors
727    ///
728    /// Returns on the first batch that fails — see
729    /// [`insert_batch`](Self::insert_batch) for the failure modes.
730    pub fn insert_batches<'b>(
731        &mut self,
732        batches: impl IntoIterator<Item = &'b arrow::record_batch::RecordBatch>,
733    ) -> Result<()> {
734        for batch in batches {
735            self.insert_batch(batch)?;
736        }
737        Ok(())
738    }
739
740    /// Ensures the COPY writer is initialized.
741    fn ensure_writer(&mut self) -> Result<()> {
742        if self.writer.is_none() {
743            let client = self.connection.tcp_client().ok_or_else(|| {
744                crate::Error::new("ArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.")
745            })?;
746            let columns: Vec<&str> = self
747                .columns
748                .iter()
749                .map(std::string::String::as_str)
750                .collect();
751            let mut writer = client.copy_in_with_format(
752                &self.table_name,
753                &columns,
754                DataFormat::ArrowStream.as_sql_str(),
755            )?;
756            // Pre-allocate buffer to avoid reallocations during bulk insert
757            writer.reserve_buffer(self.flush_threshold + 1024 * 1024);
758            self.writer = Some(writer);
759        }
760        Ok(())
761    }
762
763    /// Flushes the TCP stream if the threshold is reached.
764    ///
765    /// With `send_direct()`, data is written directly to TCP. This periodic
766    /// flush ensures data is pushed to the server for high-latency connections.
767    fn maybe_flush(&mut self) -> Result<()> {
768        if self.buffered_bytes >= self.flush_threshold {
769            if let Some(ref mut writer) = self.writer {
770                writer.flush_stream()?;
771            }
772            debug!(
773                target: "hyperdb_api",
774                flushed_bytes = self.buffered_bytes,
775                threshold = self.flush_threshold,
776                "arrow-inserter-flush"
777            );
778            self.buffered_bytes = 0;
779        }
780        Ok(())
781    }
782}
783
784// Implement Drop to handle cleanup if the inserter is dropped without calling execute()
785impl Drop for ArrowInserter<'_> {
786    fn drop(&mut self) {
787        // If writer exists and we're being dropped without execute(),
788        // cancel the operation to avoid leaving the connection in a bad state.
789        if let Some(writer) = self.writer.take() {
790            let _ = writer.cancel("Arrow inserter dropped without execute");
791        }
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use super::*;
798
799    #[test]
800    fn test_data_format_sql_str() {
801        assert_eq!(DataFormat::ArrowStream.as_sql_str(), "ARROWSTREAM");
802    }
803}