Skip to main content

hyperdb_api/
async_arrow_inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`AsyncArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables asynchronously.
8
9use std::sync::Arc;
10use std::time::Instant;
11
12use hyperdb_api_core::client::{AsyncClient, AsyncCopyInWriter, AsyncCopyInWriterOwned};
13use tracing::{debug, info};
14
15use crate::async_connection::AsyncConnection;
16use crate::data_format::DataFormat;
17use crate::error::{Error, Result};
18use crate::table_definition::TableDefinition;
19
20/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
21const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
22
23/// Async inserter for Arrow IPC stream data into a Hyper table.
24///
25/// This is the async version of [`ArrowInserter`](crate::ArrowInserter), designed for use
26/// in tokio-based async applications.
27///
28/// # Ownership & Drop
29///
30/// You **must** call either [`execute()`](Self::execute) or [`cancel()`](Self::cancel)
31/// to properly terminate the COPY session. If the inserter is dropped without
32/// calling one of these, a best-effort `CopyFail` is queued and the connection
33/// will self-heal on the next async operation. Data sent so far will be lost.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api::{AsyncArrowInserter, AsyncConnection, CreateMode, TableDefinition, SqlType, Result};
39///
40/// #[tokio::main]
41/// async fn main() -> Result<()> {
42///     let conn = AsyncConnection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists).await?;
43///
44///     let table_def = TableDefinition::new("data")
45///         .add_required_column("id", SqlType::int())
46///         .add_nullable_column("value", SqlType::double());
47///
48///     // Arrow IPC data from external source
49///     let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
50///
51///     let mut inserter = AsyncArrowInserter::new(&conn, &table_def)?;
52///     inserter.insert_data(&arrow_data).await?;
53///     let rows = inserter.execute().await?;
54///     println!("Inserted {} rows", rows);
55///     Ok(())
56/// }
57/// ```
58#[derive(Debug)]
59pub struct AsyncArrowInserter<'conn> {
60    connection: &'conn AsyncConnection,
61    table_name: String,
62    columns: Vec<String>,
63    writer: Option<AsyncCopyInWriter<'conn>>,
64    /// Tracks whether an Arrow schema has been sent.
65    schema_sent: bool,
66    /// Total bytes sent (for logging).
67    total_bytes: usize,
68    /// Number of chunks sent.
69    chunk_count: usize,
70    /// Start time for timing the insert operation.
71    start_time: Instant,
72    /// Flush threshold in bytes. Data is buffered until this threshold is reached.
73    flush_threshold: usize,
74    /// Bytes buffered since the last flush.
75    buffered_bytes: usize,
76}
77
78impl<'conn> AsyncArrowInserter<'conn> {
79    /// Creates a new async Arrow inserter for the given table.
80    ///
81    /// The underlying COPY session is started lazily on the first data write,
82    /// so construction is lightweight. However, the connection's transport is
83    /// validated eagerly — using a gRPC connection will return an error
84    /// immediately.
85    ///
86    /// # Arguments
87    ///
88    /// * `connection` - The async database connection (must be TCP, not gRPC).
89    /// * `table_def` - The table definition for the target table.
90    ///
91    /// # Errors
92    ///
93    /// - Returns [`Error::Other`] with message
94    ///   `"Table definition must have at least one column"` if `table_def`
95    ///   has no columns.
96    /// - Returns [`Error::Other`] if `connection` is using gRPC transport
97    ///   (COPY is TCP-only).
98    pub fn new(connection: &'conn AsyncConnection, table_def: &TableDefinition) -> Result<Self> {
99        let column_count = table_def.column_count();
100        if column_count == 0 {
101            return Err(Error::new("Table definition must have at least one column"));
102        }
103
104        // Fail fast: verify the connection supports COPY (TCP only)
105        if connection.async_tcp_client().is_none() {
106            return Err(Error::new(
107                "AsyncArrowInserter requires a TCP connection. \
108                 gRPC connections do not support COPY operations.",
109            ));
110        }
111
112        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
113        let table_name = table_def.qualified_name();
114
115        Ok(AsyncArrowInserter {
116            connection,
117            table_name,
118            columns,
119            writer: None,
120            schema_sent: false,
121            total_bytes: 0,
122            chunk_count: 0,
123            start_time: Instant::now(),
124            flush_threshold: DEFAULT_FLUSH_THRESHOLD,
125            buffered_bytes: 0,
126        })
127    }
128
129    /// Sets a custom flush threshold in bytes.
130    ///
131    /// Data is buffered until the threshold is reached, then flushed to the server.
132    /// Default is 16 MB (matching `HyperBinary` Inserter).
133    #[must_use]
134    pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
135        self.flush_threshold = threshold;
136        self
137    }
138
139    /// Inserts a complete Arrow IPC stream (schema + record batches).
140    ///
141    /// Use this method for single-chunk inserts or for the first chunk of
142    /// multi-chunk inserts. The Arrow IPC stream must include the schema message
143    /// followed by one or more record batch messages.
144    ///
145    /// # Errors
146    ///
147    /// - Returns [`Error::Other`] if a schema was already sent (call
148    ///   [`insert_record_batches`](Self::insert_record_batches) for
149    ///   subsequent chunks instead).
150    /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
151    ///   session cannot be opened.
152    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
153    ///   the data or the socket write fails.
154    pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
155        if arrow_ipc_data.is_empty() {
156            return Ok(());
157        }
158
159        if self.schema_sent {
160            return Err(Error::new(
161                "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
162                 or use insert_data() only once with the complete Arrow IPC stream.",
163            ));
164        }
165
166        self.ensure_writer().await?;
167
168        if let Some(ref mut writer) = self.writer {
169            writer.send_direct(arrow_ipc_data).await?;
170        }
171        self.buffered_bytes += arrow_ipc_data.len();
172        self.maybe_flush().await?;
173
174        self.schema_sent = true;
175        self.total_bytes += arrow_ipc_data.len();
176        self.chunk_count += 1;
177
178        debug!(
179            target: "hyperdb_api",
180            chunk = self.chunk_count,
181            bytes = arrow_ipc_data.len(),
182            total_bytes = self.total_bytes,
183            buffered_bytes = self.buffered_bytes,
184            "async-arrow-inserter-chunk"
185        );
186
187        Ok(())
188    }
189
190    /// Inserts Arrow record batch data without schema.
191    ///
192    /// Use this method for subsequent chunks after the first `insert_data()` call.
193    /// The data should contain only Arrow record batch messages, **not** the schema.
194    ///
195    /// # Errors
196    ///
197    /// - Returns [`Error::Other`] if no schema has been sent yet (call
198    ///   [`insert_data`](Self::insert_data) first).
199    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
200    ///   the data or the socket write fails.
201    pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
202        if arrow_batch_data.is_empty() {
203            return Ok(());
204        }
205
206        if !self.schema_sent {
207            return Err(Error::new(
208                "No Arrow schema has been sent yet. Call insert_data() first with a complete \
209                 Arrow IPC stream that includes the schema.",
210            ));
211        }
212
213        if let Some(ref mut writer) = self.writer {
214            writer.send_direct(arrow_batch_data).await?;
215        }
216        self.buffered_bytes += arrow_batch_data.len();
217        self.maybe_flush().await?;
218
219        self.total_bytes += arrow_batch_data.len();
220        self.chunk_count += 1;
221
222        debug!(
223            target: "hyperdb_api",
224            chunk = self.chunk_count,
225            bytes = arrow_batch_data.len(),
226            total_bytes = self.total_bytes,
227            buffered_bytes = self.buffered_bytes,
228            "async-arrow-inserter-batch-chunk"
229        );
230
231        Ok(())
232    }
233
234    /// Inserts raw Arrow data without schema tracking.
235    ///
236    /// This is a low-level method that sends data directly without checking
237    /// whether schema has been sent. Use this only if you are managing schema
238    /// handling yourself.
239    ///
240    /// # Errors
241    ///
242    /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
243    ///   session cannot be opened.
244    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
245    ///   the data or the socket write fails.
246    pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
247        if data.is_empty() {
248            return Ok(());
249        }
250
251        self.ensure_writer().await?;
252
253        if let Some(ref mut writer) = self.writer {
254            writer.send_direct(data).await?;
255        }
256        self.buffered_bytes += data.len();
257        self.maybe_flush().await?;
258
259        self.total_bytes += data.len();
260        self.chunk_count += 1;
261
262        Ok(())
263    }
264
265    /// Finishes the insert operation and returns the number of rows inserted.
266    ///
267    /// This sends any remaining buffered data to the server and completes
268    /// the COPY session. Always call this (or [`cancel()`](Self::cancel))
269    /// to properly terminate the session.
270    ///
271    /// # Errors
272    ///
273    /// Returns [`Error::Client`] or [`Error::Io`] if the `CommandComplete`
274    /// round-trip fails (server rejected some buffered batch, or the socket
275    /// closed mid-flush). If no data was ever written, returns `Ok(0)`.
276    pub async fn execute(mut self) -> Result<u64> {
277        if self.writer.is_none() {
278            return Ok(0);
279        }
280
281        let rows = match self.writer.take() {
282            Some(w) => w.finish().await?,
283            None => 0,
284        };
285
286        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
287        info!(
288            target: "hyperdb_api",
289            rows,
290            chunks = self.chunk_count,
291            total_bytes = self.total_bytes,
292            duration_ms,
293            table = %self.table_name,
294            "async-arrow-inserter-end"
295        );
296
297        Ok(rows)
298    }
299
300    /// Cancels the insert operation.
301    ///
302    /// All data sent so far will be discarded.
303    pub async fn cancel(mut self) {
304        if let Some(writer) = self.writer.take() {
305            let _ = writer.cancel("Arrow insert cancelled").await;
306        }
307    }
308
309    /// Returns whether any data has been sent.
310    #[must_use]
311    pub fn has_data(&self) -> bool {
312        self.chunk_count > 0
313    }
314
315    /// Returns the total bytes sent so far.
316    #[must_use]
317    pub fn total_bytes(&self) -> usize {
318        self.total_bytes
319    }
320
321    /// Returns the number of chunks sent so far.
322    #[must_use]
323    pub fn chunk_count(&self) -> usize {
324        self.chunk_count
325    }
326
327    /// Ensures the COPY writer is initialized.
328    async fn ensure_writer(&mut self) -> Result<()> {
329        if self.writer.is_none() {
330            let client = self.connection.async_tcp_client().ok_or_else(|| {
331                crate::Error::new("AsyncArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.")
332            })?;
333            let columns: Vec<&str> = self
334                .columns
335                .iter()
336                .map(std::string::String::as_str)
337                .collect();
338            self.writer = Some(
339                client
340                    .copy_in_with_format(
341                        &self.table_name,
342                        &columns,
343                        DataFormat::ArrowStream.as_sql_str(),
344                    )
345                    .await?,
346            );
347        }
348        Ok(())
349    }
350
351    /// Flushes the TCP stream if the threshold is reached.
352    ///
353    /// With `send_direct()`, data is written directly to TCP. This periodic
354    /// flush ensures data is pushed to the server for high-latency connections.
355    async fn maybe_flush(&mut self) -> Result<()> {
356        if self.buffered_bytes >= self.flush_threshold {
357            if let Some(ref mut writer) = self.writer {
358                writer.flush_stream().await?;
359            }
360            debug!(
361                target: "hyperdb_api",
362                flushed_bytes = self.buffered_bytes,
363                threshold = self.flush_threshold,
364                "async-arrow-inserter-flush"
365            );
366            self.buffered_bytes = 0;
367        }
368        Ok(())
369    }
370}
371
372impl Drop for AsyncArrowInserter<'_> {
373    fn drop(&mut self) {
374        if self.writer.is_some() {
375            tracing::warn!(
376                target: "hyperdb_api",
377                chunks = self.chunk_count,
378                total_bytes = self.total_bytes,
379                table = %self.table_name,
380                "AsyncArrowInserter dropped without calling execute() or cancel(). \
381                 Data may be lost. The underlying AsyncCopyInWriter will \
382                 attempt a best-effort cancel to restore the connection."
383            );
384            // Take the writer so its Drop impl runs, which queues a CopyFail
385            // message via try_lock(). The next async operation on the connection
386            // will drain the cancel response and restore ReadyForQuery state.
387            drop(self.writer.take());
388        }
389    }
390}
391
392// =============================================================================
393// AsyncArrowInserterOwned — lifetime-free variant
394// =============================================================================
395
396/// Owned-handle variant of [`AsyncArrowInserter`] that holds an
397/// `Arc<AsyncConnection>` instead of a borrow.
398///
399/// Semantics are identical to [`AsyncArrowInserter`] — same
400/// `HyperBinary` Arrow-stream COPY pipeline, same flush threshold,
401/// same Drop-time best-effort cancel. The only difference is that
402/// this variant is `'static` and can therefore live in structs that
403/// can't carry lifetimes (N-API classes, `tokio::spawn` tasks that
404/// outlive the constructor's stack frame, etc).
405#[derive(Debug)]
406pub struct AsyncArrowInserterOwned {
407    #[allow(
408        dead_code,
409        reason = "kept alive to anchor the client's Mutex Arc for the writer's lifetime"
410    )]
411    connection: Arc<AsyncConnection>,
412    table_name: String,
413    columns: Vec<String>,
414    writer: Option<AsyncCopyInWriterOwned>,
415    schema_sent: bool,
416    total_bytes: usize,
417    chunk_count: usize,
418    start_time: Instant,
419    flush_threshold: usize,
420    buffered_bytes: usize,
421}
422
423impl AsyncArrowInserterOwned {
424    /// Creates a new owned-handle async Arrow inserter.
425    ///
426    /// # Arguments
427    ///
428    /// * `connection` - `Arc`-shared async database connection. The
429    ///   Arc is cloned into the inserter and kept alive for its
430    ///   lifetime, so callers can drop their own handle immediately
431    ///   after construction.
432    /// * `table_def` - The table definition for the target table.
433    ///
434    /// # Errors
435    ///
436    /// - Returns [`Error::Other`] with message
437    ///   `"Table definition must have at least one column"` if `table_def`
438    ///   has no columns.
439    /// - Returns [`Error::Other`] if `connection` is using gRPC transport.
440    pub fn new(connection: Arc<AsyncConnection>, table_def: &TableDefinition) -> Result<Self> {
441        let column_count = table_def.column_count();
442        if column_count == 0 {
443            return Err(Error::new("Table definition must have at least one column"));
444        }
445
446        if connection.async_tcp_client().is_none() {
447            return Err(Error::new(
448                "AsyncArrowInserterOwned requires a TCP connection. \
449                 gRPC connections do not support COPY operations.",
450            ));
451        }
452
453        let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
454        let table_name = table_def.qualified_name();
455
456        Ok(AsyncArrowInserterOwned {
457            connection,
458            table_name,
459            columns,
460            writer: None,
461            schema_sent: false,
462            total_bytes: 0,
463            chunk_count: 0,
464            start_time: Instant::now(),
465            flush_threshold: DEFAULT_FLUSH_THRESHOLD,
466            buffered_bytes: 0,
467        })
468    }
469
470    /// Sets a custom flush threshold in bytes. Default: 16 MB.
471    #[must_use]
472    pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
473        self.flush_threshold = threshold;
474        self
475    }
476
477    /// Inserts a complete Arrow IPC stream (schema + record batches).
478    /// Use this for single-chunk inserts or as the first call of a
479    /// multi-chunk insert; subsequent chunks use
480    /// [`insert_record_batches`](Self::insert_record_batches).
481    ///
482    /// # Errors
483    ///
484    /// - Returns [`Error::Other`] if a schema was already sent.
485    /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
486    ///   session cannot be opened.
487    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
488    ///   the data or the socket write fails.
489    pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
490        if arrow_ipc_data.is_empty() {
491            return Ok(());
492        }
493        if self.schema_sent {
494            return Err(Error::new(
495                "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks.",
496            ));
497        }
498        self.ensure_writer().await?;
499        if let Some(ref mut w) = self.writer {
500            w.send_direct(arrow_ipc_data).await?;
501        }
502        self.schema_sent = true;
503        self.buffered_bytes += arrow_ipc_data.len();
504        self.maybe_flush().await?;
505        self.total_bytes += arrow_ipc_data.len();
506        self.chunk_count += 1;
507        Ok(())
508    }
509
510    /// Inserts Arrow record-batch bytes *without* a schema header.
511    /// Must be called after [`insert_data`](Self::insert_data) or
512    /// [`insert_raw`](Self::insert_raw).
513    ///
514    /// # Errors
515    ///
516    /// - Returns [`Error::Other`] if no schema has been sent yet.
517    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
518    ///   the data or the socket write fails.
519    pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
520        if arrow_batch_data.is_empty() {
521            return Ok(());
522        }
523        if !self.schema_sent {
524            return Err(Error::new(
525                "No Arrow schema has been sent yet. Call insert_data() first.",
526            ));
527        }
528        if let Some(ref mut w) = self.writer {
529            w.send_direct(arrow_batch_data).await?;
530        }
531        self.buffered_bytes += arrow_batch_data.len();
532        self.maybe_flush().await?;
533        self.total_bytes += arrow_batch_data.len();
534        self.chunk_count += 1;
535        Ok(())
536    }
537
538    /// Low-level: send raw bytes without schema tracking. The first
539    /// call transitions `schema_sent` to `true`. Use this when you are
540    /// managing Arrow IPC framing yourself.
541    ///
542    /// # Errors
543    ///
544    /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
545    ///   session cannot be opened.
546    /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
547    ///   the data or the socket write fails.
548    pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
549        if data.is_empty() {
550            return Ok(());
551        }
552        self.ensure_writer().await?;
553        if let Some(ref mut w) = self.writer {
554            w.send_direct(data).await?;
555        }
556        self.schema_sent = true;
557        self.buffered_bytes += data.len();
558        self.maybe_flush().await?;
559        self.total_bytes += data.len();
560        self.chunk_count += 1;
561        Ok(())
562    }
563
564    /// Finalizes the COPY session and returns the affected row count.
565    ///
566    /// # Errors
567    ///
568    /// - Returns [`Error::Other`] with message
569    ///   `"No data was inserted before execute()"` if no COPY session was
570    ///   ever opened.
571    /// - Returns [`Error::Client`] / [`Error::Io`] if the `CommandComplete`
572    ///   round-trip fails.
573    pub async fn execute(mut self) -> Result<u64> {
574        let elapsed = self.start_time.elapsed();
575        info!(
576            target: "hyperdb_api",
577            chunks = self.chunk_count,
578            total_bytes = self.total_bytes,
579            elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
580            "async-arrow-inserter-execute"
581        );
582        let writer = self
583            .writer
584            .take()
585            .ok_or_else(|| Error::new("No data was inserted before execute()"))?;
586        writer.finish().await.map_err(Into::into)
587    }
588
589    /// Cancels the COPY session; any data sent so far is discarded.
590    pub async fn cancel(mut self) {
591        if let Some(writer) = self.writer.take() {
592            let _ = writer.cancel("AsyncArrowInserterOwned::cancel").await;
593        }
594    }
595
596    /// Returns `true` if any data has been inserted.
597    #[must_use]
598    pub fn has_data(&self) -> bool {
599        self.schema_sent
600    }
601
602    /// Returns the total bytes sent.
603    #[must_use]
604    pub fn total_bytes(&self) -> usize {
605        self.total_bytes
606    }
607
608    /// Returns the number of chunks sent.
609    #[must_use]
610    pub fn chunk_count(&self) -> usize {
611        self.chunk_count
612    }
613
614    async fn ensure_writer(&mut self) -> Result<()> {
615        if self.writer.is_none() {
616            let client: &AsyncClient = self.connection.async_tcp_client().ok_or_else(|| {
617                Error::new(
618                    "AsyncArrowInserterOwned requires a TCP connection. \
619                     gRPC connections do not support COPY operations.",
620                )
621            })?;
622            let columns: Vec<&str> = self
623                .columns
624                .iter()
625                .map(std::string::String::as_str)
626                .collect();
627            self.writer = Some(
628                client
629                    .copy_in_arc_with_format(
630                        &self.table_name,
631                        &columns,
632                        DataFormat::ArrowStream.as_sql_str(),
633                    )
634                    .await?,
635            );
636        }
637        Ok(())
638    }
639
640    async fn maybe_flush(&mut self) -> Result<()> {
641        if self.buffered_bytes >= self.flush_threshold {
642            if let Some(ref mut w) = self.writer {
643                w.flush_stream().await?;
644            }
645            debug!(
646                target: "hyperdb_api",
647                flushed_bytes = self.buffered_bytes,
648                threshold = self.flush_threshold,
649                "async-arrow-inserter-owned-flush"
650            );
651            self.buffered_bytes = 0;
652        }
653        Ok(())
654    }
655}
656
657impl Drop for AsyncArrowInserterOwned {
658    fn drop(&mut self) {
659        if self.writer.is_some() {
660            tracing::warn!(
661                target: "hyperdb_api",
662                chunks = self.chunk_count,
663                total_bytes = self.total_bytes,
664                table = %self.table_name,
665                "AsyncArrowInserterOwned dropped without calling execute() or cancel(). \
666                 Data may be lost."
667            );
668            drop(self.writer.take());
669        }
670    }
671}