Skip to main content

hyperdb_api/
async_arrow_inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`AsyncArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables asynchronously.
8
9use std::sync::Arc;
10use std::time::Instant;
11
12use hyperdb_api_core::client::{AsyncClient, AsyncCopyInWriter, AsyncCopyInWriterOwned};
13use tracing::{debug, info};
14
15use crate::async_connection::AsyncConnection;
16use crate::data_format::DataFormat;
17use crate::error::{Error, Result};
18use crate::table_definition::TableDefinition;
19
20/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
21const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
22
23/// Async inserter for Arrow IPC stream data into a Hyper table.
24///
25/// This is the async version of [`ArrowInserter`](crate::ArrowInserter), designed for use
26/// in tokio-based async applications.
27///
28/// # Ownership & Drop
29///
30/// You **must** call either [`execute()`](Self::execute) or [`cancel()`](Self::cancel)
31/// to properly terminate the COPY session. If the inserter is dropped without
32/// calling one of these, a best-effort `CopyFail` is queued and the connection
33/// will self-heal on the next async operation. Data sent so far will be lost.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api::{AsyncArrowInserter, AsyncConnection, CreateMode, TableDefinition, SqlType, Result};
39///
40/// #[tokio::main]
41/// async fn main() -> Result<()> {
42///     let conn = AsyncConnection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists).await?;
43///
44///     let table_def = TableDefinition::new("data")
45///         .add_required_column("id", SqlType::int())
46///         .add_nullable_column("value", SqlType::double());
47///
48///     // Arrow IPC data from external source
49///     let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
50///
51///     let mut inserter = AsyncArrowInserter::new(&conn, &table_def)?;
52///     inserter.insert_data(&arrow_data).await?;
53///     let rows = inserter.execute().await?;
54///     println!("Inserted {} rows", rows);
55///     Ok(())
56/// }
57/// ```
58#[derive(Debug)]
59pub struct AsyncArrowInserter<'conn> {
60    connection: &'conn AsyncConnection,
61    table_name: String,
62    columns: Vec<String>,
63    writer: Option<AsyncCopyInWriter<'conn>>,
64    /// Tracks whether an Arrow schema has been sent.
65    schema_sent: bool,
66    /// Total bytes sent (for logging).
67    total_bytes: usize,
68    /// Number of chunks sent.
69    chunk_count: usize,
70    /// Start time for timing the insert operation.
71    start_time: Instant,
72    /// Flush threshold in bytes. Data is buffered until this threshold is reached.
73    flush_threshold: usize,
74    /// Bytes buffered since the last flush.
75    buffered_bytes: usize,
76}
77
78impl<'conn> AsyncArrowInserter<'conn> {
79    /// Creates a new async Arrow inserter for the given table.
80    ///
81    /// The underlying COPY session is started lazily on the first data write,
82    /// so construction is lightweight. However, the connection's transport is
83    /// validated eagerly — using a gRPC connection will return an error
84    /// immediately.
85    ///
86    /// # Arguments
87    ///
88    /// * `connection` - The async database connection (must be TCP, not gRPC).
89    /// * `table_def` - The table definition for the target table.
90    ///
91    /// # Errors
92    ///
93    /// - Returns [`Error::InvalidTableDefinition`] with message
94    ///   `"Table definition must have at least one column"` if `table_def`
95    ///   has no columns.
96    /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
97    ///   (COPY is TCP-only).
98    pub fn new(connection: &'conn AsyncConnection, table_def: &TableDefinition) -> Result<Self> {
99        let column_count = table_def.column_count();
100        if column_count == 0 {
101            return Err(Error::invalid_table_definition(
102                "Table definition must have at least one column",
103            ));
104        }
105
106        // Fail fast: verify the connection supports COPY (TCP only)
107        if connection.async_tcp_client().is_none() {
108            return Err(Error::feature_not_supported(
109                "AsyncArrowInserter requires a TCP connection. \
110                 gRPC connections do not support COPY operations.",
111            ));
112        }
113
114        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
115        let table_name = table_def.qualified_name();
116
117        Ok(AsyncArrowInserter {
118            connection,
119            table_name,
120            columns,
121            writer: None,
122            schema_sent: false,
123            total_bytes: 0,
124            chunk_count: 0,
125            start_time: Instant::now(),
126            flush_threshold: DEFAULT_FLUSH_THRESHOLD,
127            buffered_bytes: 0,
128        })
129    }
130
131    /// Sets a custom flush threshold in bytes.
132    ///
133    /// Data is buffered until the threshold is reached, then flushed to the server.
134    /// Default is 16 MB (matching `HyperBinary` Inserter).
135    #[must_use]
136    pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
137        self.flush_threshold = threshold;
138        self
139    }
140
141    /// Inserts a complete Arrow IPC stream (schema + record batches).
142    ///
143    /// Use this method for single-chunk inserts or for the first chunk of
144    /// multi-chunk inserts. The Arrow IPC stream must include the schema message
145    /// followed by one or more record batch messages.
146    ///
147    /// # Errors
148    ///
149    /// - Returns [`Error::Internal`] if a schema was already sent (call
150    ///   [`insert_record_batches`](Self::insert_record_batches) for
151    ///   subsequent chunks instead).
152    /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
153    ///   session cannot be opened.
154    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
155    ///   the data or the socket write fails.
156    pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
157        if arrow_ipc_data.is_empty() {
158            return Ok(());
159        }
160
161        if self.schema_sent {
162            return Err(Error::internal(
163                "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
164                 or use insert_data() only once with the complete Arrow IPC stream.",
165            ));
166        }
167
168        self.ensure_writer().await?;
169
170        if let Some(ref mut writer) = self.writer {
171            writer.send_direct(arrow_ipc_data).await?;
172        }
173        self.buffered_bytes += arrow_ipc_data.len();
174        self.maybe_flush().await?;
175
176        self.schema_sent = true;
177        self.total_bytes += arrow_ipc_data.len();
178        self.chunk_count += 1;
179
180        debug!(
181            target: "hyperdb_api",
182            chunk = self.chunk_count,
183            bytes = arrow_ipc_data.len(),
184            total_bytes = self.total_bytes,
185            buffered_bytes = self.buffered_bytes,
186            "async-arrow-inserter-chunk"
187        );
188
189        Ok(())
190    }
191
192    /// Inserts Arrow record batch data without schema.
193    ///
194    /// Use this method for subsequent chunks after the first `insert_data()` call.
195    /// The data should contain only Arrow record batch messages, **not** the schema.
196    ///
197    /// # Errors
198    ///
199    /// - Returns [`Error::Internal`] if no schema has been sent yet (call
200    ///   [`insert_data`](Self::insert_data) first).
201    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
202    ///   the data or the socket write fails.
203    pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
204        if arrow_batch_data.is_empty() {
205            return Ok(());
206        }
207
208        if !self.schema_sent {
209            return Err(Error::internal(
210                "No Arrow schema has been sent yet. Call insert_data() first with a complete \
211                 Arrow IPC stream that includes the schema.",
212            ));
213        }
214
215        if let Some(ref mut writer) = self.writer {
216            writer.send_direct(arrow_batch_data).await?;
217        }
218        self.buffered_bytes += arrow_batch_data.len();
219        self.maybe_flush().await?;
220
221        self.total_bytes += arrow_batch_data.len();
222        self.chunk_count += 1;
223
224        debug!(
225            target: "hyperdb_api",
226            chunk = self.chunk_count,
227            bytes = arrow_batch_data.len(),
228            total_bytes = self.total_bytes,
229            buffered_bytes = self.buffered_bytes,
230            "async-arrow-inserter-batch-chunk"
231        );
232
233        Ok(())
234    }
235
236    /// Inserts raw Arrow data without schema tracking.
237    ///
238    /// This is a low-level method that sends data directly without checking
239    /// whether schema has been sent. Use this only if you are managing schema
240    /// handling yourself.
241    ///
242    /// # Errors
243    ///
244    /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
245    ///   session cannot be opened.
246    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
247    ///   the data or the socket write fails.
248    pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
249        if data.is_empty() {
250            return Ok(());
251        }
252
253        self.ensure_writer().await?;
254
255        if let Some(ref mut writer) = self.writer {
256            writer.send_direct(data).await?;
257        }
258        self.buffered_bytes += data.len();
259        self.maybe_flush().await?;
260
261        self.total_bytes += data.len();
262        self.chunk_count += 1;
263
264        Ok(())
265    }
266
267    /// Finishes the insert operation and returns the number of rows inserted.
268    ///
269    /// This sends any remaining buffered data to the server and completes
270    /// the COPY session. Always call this (or [`cancel()`](Self::cancel))
271    /// to properly terminate the session.
272    ///
273    /// # Errors
274    ///
275    /// Returns [`Error::Server`] or [`Error::Io`] if the `CommandComplete`
276    /// round-trip fails (server rejected some buffered batch, or the socket
277    /// closed mid-flush). If no data was ever written, returns `Ok(0)`.
278    pub async fn execute(mut self) -> Result<u64> {
279        if self.writer.is_none() {
280            return Ok(0);
281        }
282
283        let rows = match self.writer.take() {
284            Some(w) => w.finish().await?,
285            None => 0,
286        };
287
288        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
289        info!(
290            target: "hyperdb_api",
291            rows,
292            chunks = self.chunk_count,
293            total_bytes = self.total_bytes,
294            duration_ms,
295            table = %self.table_name,
296            "async-arrow-inserter-end"
297        );
298
299        Ok(rows)
300    }
301
302    /// Cancels the insert operation.
303    ///
304    /// All data sent so far will be discarded.
305    pub async fn cancel(mut self) {
306        if let Some(writer) = self.writer.take() {
307            let _ = writer.cancel("Arrow insert cancelled").await;
308        }
309    }
310
311    /// Returns whether any data has been sent.
312    #[must_use]
313    pub fn has_data(&self) -> bool {
314        self.chunk_count > 0
315    }
316
317    /// Returns the total bytes sent so far.
318    #[must_use]
319    pub fn total_bytes(&self) -> usize {
320        self.total_bytes
321    }
322
323    /// Returns the number of chunks sent so far.
324    #[must_use]
325    pub fn chunk_count(&self) -> usize {
326        self.chunk_count
327    }
328
329    /// Ensures the COPY writer is initialized.
330    async fn ensure_writer(&mut self) -> Result<()> {
331        if self.writer.is_none() {
332            let client = self.connection.async_tcp_client().ok_or_else(|| {
333                crate::Error::feature_not_supported(
334                    "AsyncArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.",
335                )
336            })?;
337            let columns: Vec<&str> = self
338                .columns
339                .iter()
340                .map(std::string::String::as_str)
341                .collect();
342            self.writer = Some(
343                client
344                    .copy_in_with_format(
345                        &self.table_name,
346                        &columns,
347                        DataFormat::ArrowStream.as_sql_str(),
348                    )
349                    .await?,
350            );
351        }
352        Ok(())
353    }
354
355    /// Flushes the TCP stream if the threshold is reached.
356    ///
357    /// With `send_direct()`, data is written directly to TCP. This periodic
358    /// flush ensures data is pushed to the server for high-latency connections.
359    async fn maybe_flush(&mut self) -> Result<()> {
360        if self.buffered_bytes >= self.flush_threshold {
361            if let Some(ref mut writer) = self.writer {
362                writer.flush_stream().await?;
363            }
364            debug!(
365                target: "hyperdb_api",
366                flushed_bytes = self.buffered_bytes,
367                threshold = self.flush_threshold,
368                "async-arrow-inserter-flush"
369            );
370            self.buffered_bytes = 0;
371        }
372        Ok(())
373    }
374}
375
376impl Drop for AsyncArrowInserter<'_> {
377    fn drop(&mut self) {
378        if self.writer.is_some() {
379            tracing::warn!(
380                target: "hyperdb_api",
381                chunks = self.chunk_count,
382                total_bytes = self.total_bytes,
383                table = %self.table_name,
384                "AsyncArrowInserter dropped without calling execute() or cancel(). \
385                 Data may be lost. The underlying AsyncCopyInWriter will \
386                 attempt a best-effort cancel to restore the connection."
387            );
388            // Take the writer so its Drop impl runs, which queues a CopyFail
389            // message via try_lock(). The next async operation on the connection
390            // will drain the cancel response and restore ReadyForQuery state.
391            drop(self.writer.take());
392        }
393    }
394}
395
396// =============================================================================
397// AsyncArrowInserterOwned — lifetime-free variant
398// =============================================================================
399
400/// Owned-handle variant of [`AsyncArrowInserter`] that holds an
401/// `Arc<AsyncConnection>` instead of a borrow.
402///
403/// Semantics are identical to [`AsyncArrowInserter`] — same
404/// `HyperBinary` Arrow-stream COPY pipeline, same flush threshold,
405/// same Drop-time best-effort cancel. The only difference is that
406/// this variant is `'static` and can therefore live in structs that
407/// can't carry lifetimes (N-API classes, `tokio::spawn` tasks that
408/// outlive the constructor's stack frame, etc).
409#[derive(Debug)]
410pub struct AsyncArrowInserterOwned {
411    #[allow(
412        dead_code,
413        reason = "kept alive to anchor the client's Mutex Arc for the writer's lifetime"
414    )]
415    connection: Arc<AsyncConnection>,
416    table_name: String,
417    columns: Vec<String>,
418    writer: Option<AsyncCopyInWriterOwned>,
419    schema_sent: bool,
420    total_bytes: usize,
421    chunk_count: usize,
422    start_time: Instant,
423    flush_threshold: usize,
424    buffered_bytes: usize,
425}
426
427impl AsyncArrowInserterOwned {
428    /// Creates a new owned-handle async Arrow inserter.
429    ///
430    /// # Arguments
431    ///
432    /// * `connection` - `Arc`-shared async database connection. The
433    ///   Arc is cloned into the inserter and kept alive for its
434    ///   lifetime, so callers can drop their own handle immediately
435    ///   after construction.
436    /// * `table_def` - The table definition for the target table.
437    ///
438    /// # Errors
439    ///
440    /// - Returns [`Error::InvalidTableDefinition`] with message
441    ///   `"Table definition must have at least one column"` if `table_def`
442    ///   has no columns.
443    /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport.
444    pub fn new(connection: Arc<AsyncConnection>, table_def: &TableDefinition) -> Result<Self> {
445        let column_count = table_def.column_count();
446        if column_count == 0 {
447            return Err(Error::invalid_table_definition(
448                "Table definition must have at least one column",
449            ));
450        }
451
452        if connection.async_tcp_client().is_none() {
453            return Err(Error::feature_not_supported(
454                "AsyncArrowInserterOwned requires a TCP connection. \
455                 gRPC connections do not support COPY operations.",
456            ));
457        }
458
459        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
460        let table_name = table_def.qualified_name();
461
462        Ok(AsyncArrowInserterOwned {
463            connection,
464            table_name,
465            columns,
466            writer: None,
467            schema_sent: false,
468            total_bytes: 0,
469            chunk_count: 0,
470            start_time: Instant::now(),
471            flush_threshold: DEFAULT_FLUSH_THRESHOLD,
472            buffered_bytes: 0,
473        })
474    }
475
476    /// Sets a custom flush threshold in bytes. Default: 16 MB.
477    #[must_use]
478    pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
479        self.flush_threshold = threshold;
480        self
481    }
482
483    /// Inserts a complete Arrow IPC stream (schema + record batches).
484    /// Use this for single-chunk inserts or as the first call of a
485    /// multi-chunk insert; subsequent chunks use
486    /// [`insert_record_batches`](Self::insert_record_batches).
487    ///
488    /// # Errors
489    ///
490    /// - Returns [`Error::Internal`] if a schema was already sent.
491    /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
492    ///   session cannot be opened.
493    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
494    ///   the data or the socket write fails.
495    pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
496        if arrow_ipc_data.is_empty() {
497            return Ok(());
498        }
499        if self.schema_sent {
500            return Err(Error::internal(
501                "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks.",
502            ));
503        }
504        self.ensure_writer().await?;
505        if let Some(ref mut w) = self.writer {
506            w.send_direct(arrow_ipc_data).await?;
507        }
508        self.schema_sent = true;
509        self.buffered_bytes += arrow_ipc_data.len();
510        self.maybe_flush().await?;
511        self.total_bytes += arrow_ipc_data.len();
512        self.chunk_count += 1;
513        Ok(())
514    }
515
516    /// Inserts Arrow record-batch bytes *without* a schema header.
517    /// Must be called after [`insert_data`](Self::insert_data) or
518    /// [`insert_raw`](Self::insert_raw).
519    ///
520    /// # Errors
521    ///
522    /// - Returns [`Error::Internal`] if no schema has been sent yet.
523    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
524    ///   the data or the socket write fails.
525    pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
526        if arrow_batch_data.is_empty() {
527            return Ok(());
528        }
529        if !self.schema_sent {
530            return Err(Error::internal(
531                "No Arrow schema has been sent yet. Call insert_data() first.",
532            ));
533        }
534        if let Some(ref mut w) = self.writer {
535            w.send_direct(arrow_batch_data).await?;
536        }
537        self.buffered_bytes += arrow_batch_data.len();
538        self.maybe_flush().await?;
539        self.total_bytes += arrow_batch_data.len();
540        self.chunk_count += 1;
541        Ok(())
542    }
543
544    /// Low-level: send raw bytes without schema tracking. The first
545    /// call transitions `schema_sent` to `true`. Use this when you are
546    /// managing Arrow IPC framing yourself.
547    ///
548    /// # Errors
549    ///
550    /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
551    ///   session cannot be opened.
552    /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
553    ///   the data or the socket write fails.
554    pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
555        if data.is_empty() {
556            return Ok(());
557        }
558        self.ensure_writer().await?;
559        if let Some(ref mut w) = self.writer {
560            w.send_direct(data).await?;
561        }
562        self.schema_sent = true;
563        self.buffered_bytes += data.len();
564        self.maybe_flush().await?;
565        self.total_bytes += data.len();
566        self.chunk_count += 1;
567        Ok(())
568    }
569
570    /// Finalizes the COPY session and returns the affected row count.
571    ///
572    /// # Errors
573    ///
574    /// - Returns [`Error::Internal`] with message
575    ///   `"No data was inserted before execute()"` if no COPY session was
576    ///   ever opened.
577    /// - Returns [`Error::Server`] / [`Error::Io`] if the `CommandComplete`
578    ///   round-trip fails.
579    pub async fn execute(mut self) -> Result<u64> {
580        let elapsed = self.start_time.elapsed();
581        info!(
582            target: "hyperdb_api",
583            chunks = self.chunk_count,
584            total_bytes = self.total_bytes,
585            elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
586            "async-arrow-inserter-execute"
587        );
588        let writer = self
589            .writer
590            .take()
591            .ok_or_else(|| Error::internal("No data was inserted before execute()"))?;
592        writer.finish().await.map_err(Into::into)
593    }
594
595    /// Cancels the COPY session; any data sent so far is discarded.
596    pub async fn cancel(mut self) {
597        if let Some(writer) = self.writer.take() {
598            let _ = writer.cancel("AsyncArrowInserterOwned::cancel").await;
599        }
600    }
601
602    /// Returns `true` if any data has been inserted.
603    #[must_use]
604    pub fn has_data(&self) -> bool {
605        self.schema_sent
606    }
607
608    /// Returns the total bytes sent.
609    #[must_use]
610    pub fn total_bytes(&self) -> usize {
611        self.total_bytes
612    }
613
614    /// Returns the number of chunks sent.
615    #[must_use]
616    pub fn chunk_count(&self) -> usize {
617        self.chunk_count
618    }
619
620    async fn ensure_writer(&mut self) -> Result<()> {
621        if self.writer.is_none() {
622            let client: &AsyncClient = self.connection.async_tcp_client().ok_or_else(|| {
623                Error::feature_not_supported(
624                    "AsyncArrowInserterOwned requires a TCP connection. \
625                     gRPC connections do not support COPY operations.",
626                )
627            })?;
628            let columns: Vec<&str> = self
629                .columns
630                .iter()
631                .map(std::string::String::as_str)
632                .collect();
633            self.writer = Some(
634                client
635                    .copy_in_arc_with_format(
636                        &self.table_name,
637                        &columns,
638                        DataFormat::ArrowStream.as_sql_str(),
639                    )
640                    .await?,
641            );
642        }
643        Ok(())
644    }
645
646    async fn maybe_flush(&mut self) -> Result<()> {
647        if self.buffered_bytes >= self.flush_threshold {
648            if let Some(ref mut w) = self.writer {
649                w.flush_stream().await?;
650            }
651            debug!(
652                target: "hyperdb_api",
653                flushed_bytes = self.buffered_bytes,
654                threshold = self.flush_threshold,
655                "async-arrow-inserter-owned-flush"
656            );
657            self.buffered_bytes = 0;
658        }
659        Ok(())
660    }
661}
662
663impl Drop for AsyncArrowInserterOwned {
664    fn drop(&mut self) {
665        if self.writer.is_some() {
666            tracing::warn!(
667                target: "hyperdb_api",
668                chunks = self.chunk_count,
669                total_bytes = self.total_bytes,
670                table = %self.table_name,
671                "AsyncArrowInserterOwned dropped without calling execute() or cancel(). \
672                 Data may be lost."
673            );
674            drop(self.writer.take());
675        }
676    }
677}