hyperdb_api/async_arrow_inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`AsyncArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables asynchronously.
8
9use std::sync::Arc;
10use std::time::Instant;
11
12use hyperdb_api_core::client::{AsyncClient, AsyncCopyInWriter, AsyncCopyInWriterOwned};
13use tracing::{debug, info};
14
15use crate::async_connection::AsyncConnection;
16use crate::data_format::DataFormat;
17use crate::error::{Error, Result};
18use crate::table_definition::TableDefinition;
19
20/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
21const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
22
23/// Async inserter for Arrow IPC stream data into a Hyper table.
24///
25/// This is the async version of [`ArrowInserter`](crate::ArrowInserter), designed for use
26/// in tokio-based async applications.
27///
28/// # Ownership & Drop
29///
30/// You **must** call either [`execute()`](Self::execute) or [`cancel()`](Self::cancel)
31/// to properly terminate the COPY session. If the inserter is dropped without
32/// calling one of these, a best-effort `CopyFail` is queued and the connection
33/// will self-heal on the next async operation. Data sent so far will be lost.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api::{AsyncArrowInserter, AsyncConnection, CreateMode, TableDefinition, SqlType, Result};
39///
40/// #[tokio::main]
41/// async fn main() -> Result<()> {
42/// let conn = AsyncConnection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists).await?;
43///
44/// let table_def = TableDefinition::new("data")
45/// .add_required_column("id", SqlType::int())
46/// .add_nullable_column("value", SqlType::double());
47///
48/// // Arrow IPC data from external source
49/// let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
50///
51/// let mut inserter = AsyncArrowInserter::new(&conn, &table_def)?;
52/// inserter.insert_data(&arrow_data).await?;
53/// let rows = inserter.execute().await?;
54/// println!("Inserted {} rows", rows);
55/// Ok(())
56/// }
57/// ```
58#[derive(Debug)]
59pub struct AsyncArrowInserter<'conn> {
60 connection: &'conn AsyncConnection,
61 table_name: String,
62 columns: Vec<String>,
63 writer: Option<AsyncCopyInWriter<'conn>>,
64 /// Tracks whether an Arrow schema has been sent.
65 schema_sent: bool,
66 /// Total bytes sent (for logging).
67 total_bytes: usize,
68 /// Number of chunks sent.
69 chunk_count: usize,
70 /// Start time for timing the insert operation.
71 start_time: Instant,
72 /// Flush threshold in bytes. Data is buffered until this threshold is reached.
73 flush_threshold: usize,
74 /// Bytes buffered since the last flush.
75 buffered_bytes: usize,
76}
77
78impl<'conn> AsyncArrowInserter<'conn> {
79 /// Creates a new async Arrow inserter for the given table.
80 ///
81 /// The underlying COPY session is started lazily on the first data write,
82 /// so construction is lightweight. However, the connection's transport is
83 /// validated eagerly — using a gRPC connection will return an error
84 /// immediately.
85 ///
86 /// # Arguments
87 ///
88 /// * `connection` - The async database connection (must be TCP, not gRPC).
89 /// * `table_def` - The table definition for the target table.
90 ///
91 /// # Errors
92 ///
93 /// - Returns [`Error::InvalidTableDefinition`] with message
94 /// `"Table definition must have at least one column"` if `table_def`
95 /// has no columns.
96 /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport
97 /// (COPY is TCP-only).
98 pub fn new(connection: &'conn AsyncConnection, table_def: &TableDefinition) -> Result<Self> {
99 let column_count = table_def.column_count();
100 if column_count == 0 {
101 return Err(Error::invalid_table_definition(
102 "Table definition must have at least one column",
103 ));
104 }
105
106 // Fail fast: verify the connection supports COPY (TCP only)
107 if connection.async_tcp_client().is_none() {
108 return Err(Error::feature_not_supported(
109 "AsyncArrowInserter requires a TCP connection. \
110 gRPC connections do not support COPY operations.",
111 ));
112 }
113
114 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
115 let table_name = table_def.qualified_name();
116
117 Ok(AsyncArrowInserter {
118 connection,
119 table_name,
120 columns,
121 writer: None,
122 schema_sent: false,
123 total_bytes: 0,
124 chunk_count: 0,
125 start_time: Instant::now(),
126 flush_threshold: DEFAULT_FLUSH_THRESHOLD,
127 buffered_bytes: 0,
128 })
129 }
130
131 /// Sets a custom flush threshold in bytes.
132 ///
133 /// Data is buffered until the threshold is reached, then flushed to the server.
134 /// Default is 16 MB (matching `HyperBinary` Inserter).
135 #[must_use]
136 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
137 self.flush_threshold = threshold;
138 self
139 }
140
141 /// Inserts a complete Arrow IPC stream (schema + record batches).
142 ///
143 /// Use this method for single-chunk inserts or for the first chunk of
144 /// multi-chunk inserts. The Arrow IPC stream must include the schema message
145 /// followed by one or more record batch messages.
146 ///
147 /// # Errors
148 ///
149 /// - Returns [`Error::Internal`] if a schema was already sent (call
150 /// [`insert_record_batches`](Self::insert_record_batches) for
151 /// subsequent chunks instead).
152 /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
153 /// session cannot be opened.
154 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
155 /// the data or the socket write fails.
156 pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
157 if arrow_ipc_data.is_empty() {
158 return Ok(());
159 }
160
161 if self.schema_sent {
162 return Err(Error::internal(
163 "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
164 or use insert_data() only once with the complete Arrow IPC stream.",
165 ));
166 }
167
168 self.ensure_writer().await?;
169
170 if let Some(ref mut writer) = self.writer {
171 writer.send_direct(arrow_ipc_data).await?;
172 }
173 self.buffered_bytes += arrow_ipc_data.len();
174 self.maybe_flush().await?;
175
176 self.schema_sent = true;
177 self.total_bytes += arrow_ipc_data.len();
178 self.chunk_count += 1;
179
180 debug!(
181 target: "hyperdb_api",
182 chunk = self.chunk_count,
183 bytes = arrow_ipc_data.len(),
184 total_bytes = self.total_bytes,
185 buffered_bytes = self.buffered_bytes,
186 "async-arrow-inserter-chunk"
187 );
188
189 Ok(())
190 }
191
192 /// Inserts Arrow record batch data without schema.
193 ///
194 /// Use this method for subsequent chunks after the first `insert_data()` call.
195 /// The data should contain only Arrow record batch messages, **not** the schema.
196 ///
197 /// # Errors
198 ///
199 /// - Returns [`Error::Internal`] if no schema has been sent yet (call
200 /// [`insert_data`](Self::insert_data) first).
201 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
202 /// the data or the socket write fails.
203 pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
204 if arrow_batch_data.is_empty() {
205 return Ok(());
206 }
207
208 if !self.schema_sent {
209 return Err(Error::internal(
210 "No Arrow schema has been sent yet. Call insert_data() first with a complete \
211 Arrow IPC stream that includes the schema.",
212 ));
213 }
214
215 if let Some(ref mut writer) = self.writer {
216 writer.send_direct(arrow_batch_data).await?;
217 }
218 self.buffered_bytes += arrow_batch_data.len();
219 self.maybe_flush().await?;
220
221 self.total_bytes += arrow_batch_data.len();
222 self.chunk_count += 1;
223
224 debug!(
225 target: "hyperdb_api",
226 chunk = self.chunk_count,
227 bytes = arrow_batch_data.len(),
228 total_bytes = self.total_bytes,
229 buffered_bytes = self.buffered_bytes,
230 "async-arrow-inserter-batch-chunk"
231 );
232
233 Ok(())
234 }
235
236 /// Inserts raw Arrow data without schema tracking.
237 ///
238 /// This is a low-level method that sends data directly without checking
239 /// whether schema has been sent. Use this only if you are managing schema
240 /// handling yourself.
241 ///
242 /// # Errors
243 ///
244 /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
245 /// session cannot be opened.
246 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
247 /// the data or the socket write fails.
248 pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
249 if data.is_empty() {
250 return Ok(());
251 }
252
253 self.ensure_writer().await?;
254
255 if let Some(ref mut writer) = self.writer {
256 writer.send_direct(data).await?;
257 }
258 self.buffered_bytes += data.len();
259 self.maybe_flush().await?;
260
261 self.total_bytes += data.len();
262 self.chunk_count += 1;
263
264 Ok(())
265 }
266
267 /// Finishes the insert operation and returns the number of rows inserted.
268 ///
269 /// This sends any remaining buffered data to the server and completes
270 /// the COPY session. Always call this (or [`cancel()`](Self::cancel))
271 /// to properly terminate the session.
272 ///
273 /// # Errors
274 ///
275 /// Returns [`Error::Server`] or [`Error::Io`] if the `CommandComplete`
276 /// round-trip fails (server rejected some buffered batch, or the socket
277 /// closed mid-flush). If no data was ever written, returns `Ok(0)`.
278 pub async fn execute(mut self) -> Result<u64> {
279 if self.writer.is_none() {
280 return Ok(0);
281 }
282
283 let rows = match self.writer.take() {
284 Some(w) => w.finish().await?,
285 None => 0,
286 };
287
288 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
289 info!(
290 target: "hyperdb_api",
291 rows,
292 chunks = self.chunk_count,
293 total_bytes = self.total_bytes,
294 duration_ms,
295 table = %self.table_name,
296 "async-arrow-inserter-end"
297 );
298
299 Ok(rows)
300 }
301
302 /// Cancels the insert operation.
303 ///
304 /// All data sent so far will be discarded.
305 pub async fn cancel(mut self) {
306 if let Some(writer) = self.writer.take() {
307 let _ = writer.cancel("Arrow insert cancelled").await;
308 }
309 }
310
311 /// Returns whether any data has been sent.
312 #[must_use]
313 pub fn has_data(&self) -> bool {
314 self.chunk_count > 0
315 }
316
317 /// Returns the total bytes sent so far.
318 #[must_use]
319 pub fn total_bytes(&self) -> usize {
320 self.total_bytes
321 }
322
323 /// Returns the number of chunks sent so far.
324 #[must_use]
325 pub fn chunk_count(&self) -> usize {
326 self.chunk_count
327 }
328
329 /// Ensures the COPY writer is initialized.
330 async fn ensure_writer(&mut self) -> Result<()> {
331 if self.writer.is_none() {
332 let client = self.connection.async_tcp_client().ok_or_else(|| {
333 crate::Error::feature_not_supported(
334 "AsyncArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.",
335 )
336 })?;
337 let columns: Vec<&str> = self
338 .columns
339 .iter()
340 .map(std::string::String::as_str)
341 .collect();
342 self.writer = Some(
343 client
344 .copy_in_with_format(
345 &self.table_name,
346 &columns,
347 DataFormat::ArrowStream.as_sql_str(),
348 )
349 .await?,
350 );
351 }
352 Ok(())
353 }
354
355 /// Flushes the TCP stream if the threshold is reached.
356 ///
357 /// With `send_direct()`, data is written directly to TCP. This periodic
358 /// flush ensures data is pushed to the server for high-latency connections.
359 async fn maybe_flush(&mut self) -> Result<()> {
360 if self.buffered_bytes >= self.flush_threshold {
361 if let Some(ref mut writer) = self.writer {
362 writer.flush_stream().await?;
363 }
364 debug!(
365 target: "hyperdb_api",
366 flushed_bytes = self.buffered_bytes,
367 threshold = self.flush_threshold,
368 "async-arrow-inserter-flush"
369 );
370 self.buffered_bytes = 0;
371 }
372 Ok(())
373 }
374}
375
376impl Drop for AsyncArrowInserter<'_> {
377 fn drop(&mut self) {
378 if self.writer.is_some() {
379 tracing::warn!(
380 target: "hyperdb_api",
381 chunks = self.chunk_count,
382 total_bytes = self.total_bytes,
383 table = %self.table_name,
384 "AsyncArrowInserter dropped without calling execute() or cancel(). \
385 Data may be lost. The underlying AsyncCopyInWriter will \
386 attempt a best-effort cancel to restore the connection."
387 );
388 // Take the writer so its Drop impl runs, which queues a CopyFail
389 // message via try_lock(). The next async operation on the connection
390 // will drain the cancel response and restore ReadyForQuery state.
391 drop(self.writer.take());
392 }
393 }
394}
395
396// =============================================================================
397// AsyncArrowInserterOwned — lifetime-free variant
398// =============================================================================
399
400/// Owned-handle variant of [`AsyncArrowInserter`] that holds an
401/// `Arc<AsyncConnection>` instead of a borrow.
402///
403/// Semantics are identical to [`AsyncArrowInserter`] — same
404/// `HyperBinary` Arrow-stream COPY pipeline, same flush threshold,
405/// same Drop-time best-effort cancel. The only difference is that
406/// this variant is `'static` and can therefore live in structs that
407/// can't carry lifetimes (N-API classes, `tokio::spawn` tasks that
408/// outlive the constructor's stack frame, etc).
409#[derive(Debug)]
410pub struct AsyncArrowInserterOwned {
411 #[allow(
412 dead_code,
413 reason = "kept alive to anchor the client's Mutex Arc for the writer's lifetime"
414 )]
415 connection: Arc<AsyncConnection>,
416 table_name: String,
417 columns: Vec<String>,
418 writer: Option<AsyncCopyInWriterOwned>,
419 schema_sent: bool,
420 total_bytes: usize,
421 chunk_count: usize,
422 start_time: Instant,
423 flush_threshold: usize,
424 buffered_bytes: usize,
425}
426
427impl AsyncArrowInserterOwned {
428 /// Creates a new owned-handle async Arrow inserter.
429 ///
430 /// # Arguments
431 ///
432 /// * `connection` - `Arc`-shared async database connection. The
433 /// Arc is cloned into the inserter and kept alive for its
434 /// lifetime, so callers can drop their own handle immediately
435 /// after construction.
436 /// * `table_def` - The table definition for the target table.
437 ///
438 /// # Errors
439 ///
440 /// - Returns [`Error::InvalidTableDefinition`] with message
441 /// `"Table definition must have at least one column"` if `table_def`
442 /// has no columns.
443 /// - Returns [`Error::FeatureNotSupported`] if `connection` is using gRPC transport.
444 pub fn new(connection: Arc<AsyncConnection>, table_def: &TableDefinition) -> Result<Self> {
445 let column_count = table_def.column_count();
446 if column_count == 0 {
447 return Err(Error::invalid_table_definition(
448 "Table definition must have at least one column",
449 ));
450 }
451
452 if connection.async_tcp_client().is_none() {
453 return Err(Error::feature_not_supported(
454 "AsyncArrowInserterOwned requires a TCP connection. \
455 gRPC connections do not support COPY operations.",
456 ));
457 }
458
459 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
460 let table_name = table_def.qualified_name();
461
462 Ok(AsyncArrowInserterOwned {
463 connection,
464 table_name,
465 columns,
466 writer: None,
467 schema_sent: false,
468 total_bytes: 0,
469 chunk_count: 0,
470 start_time: Instant::now(),
471 flush_threshold: DEFAULT_FLUSH_THRESHOLD,
472 buffered_bytes: 0,
473 })
474 }
475
476 /// Sets a custom flush threshold in bytes. Default: 16 MB.
477 #[must_use]
478 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
479 self.flush_threshold = threshold;
480 self
481 }
482
483 /// Inserts a complete Arrow IPC stream (schema + record batches).
484 /// Use this for single-chunk inserts or as the first call of a
485 /// multi-chunk insert; subsequent chunks use
486 /// [`insert_record_batches`](Self::insert_record_batches).
487 ///
488 /// # Errors
489 ///
490 /// - Returns [`Error::Internal`] if a schema was already sent.
491 /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
492 /// session cannot be opened.
493 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
494 /// the data or the socket write fails.
495 pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
496 if arrow_ipc_data.is_empty() {
497 return Ok(());
498 }
499 if self.schema_sent {
500 return Err(Error::internal(
501 "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks.",
502 ));
503 }
504 self.ensure_writer().await?;
505 if let Some(ref mut w) = self.writer {
506 w.send_direct(arrow_ipc_data).await?;
507 }
508 self.schema_sent = true;
509 self.buffered_bytes += arrow_ipc_data.len();
510 self.maybe_flush().await?;
511 self.total_bytes += arrow_ipc_data.len();
512 self.chunk_count += 1;
513 Ok(())
514 }
515
516 /// Inserts Arrow record-batch bytes *without* a schema header.
517 /// Must be called after [`insert_data`](Self::insert_data) or
518 /// [`insert_raw`](Self::insert_raw).
519 ///
520 /// # Errors
521 ///
522 /// - Returns [`Error::Internal`] if no schema has been sent yet.
523 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
524 /// the data or the socket write fails.
525 pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
526 if arrow_batch_data.is_empty() {
527 return Ok(());
528 }
529 if !self.schema_sent {
530 return Err(Error::internal(
531 "No Arrow schema has been sent yet. Call insert_data() first.",
532 ));
533 }
534 if let Some(ref mut w) = self.writer {
535 w.send_direct(arrow_batch_data).await?;
536 }
537 self.buffered_bytes += arrow_batch_data.len();
538 self.maybe_flush().await?;
539 self.total_bytes += arrow_batch_data.len();
540 self.chunk_count += 1;
541 Ok(())
542 }
543
544 /// Low-level: send raw bytes without schema tracking. The first
545 /// call transitions `schema_sent` to `true`. Use this when you are
546 /// managing Arrow IPC framing yourself.
547 ///
548 /// # Errors
549 ///
550 /// - Returns [`Error::FeatureNotSupported`] / [`Error::Server`] if the lazy COPY
551 /// session cannot be opened.
552 /// - Returns [`Error::Server`] / [`Error::Io`] if the server rejects
553 /// the data or the socket write fails.
554 pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
555 if data.is_empty() {
556 return Ok(());
557 }
558 self.ensure_writer().await?;
559 if let Some(ref mut w) = self.writer {
560 w.send_direct(data).await?;
561 }
562 self.schema_sent = true;
563 self.buffered_bytes += data.len();
564 self.maybe_flush().await?;
565 self.total_bytes += data.len();
566 self.chunk_count += 1;
567 Ok(())
568 }
569
570 /// Finalizes the COPY session and returns the affected row count.
571 ///
572 /// # Errors
573 ///
574 /// - Returns [`Error::Internal`] with message
575 /// `"No data was inserted before execute()"` if no COPY session was
576 /// ever opened.
577 /// - Returns [`Error::Server`] / [`Error::Io`] if the `CommandComplete`
578 /// round-trip fails.
579 pub async fn execute(mut self) -> Result<u64> {
580 let elapsed = self.start_time.elapsed();
581 info!(
582 target: "hyperdb_api",
583 chunks = self.chunk_count,
584 total_bytes = self.total_bytes,
585 elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
586 "async-arrow-inserter-execute"
587 );
588 let writer = self
589 .writer
590 .take()
591 .ok_or_else(|| Error::internal("No data was inserted before execute()"))?;
592 writer.finish().await.map_err(Into::into)
593 }
594
595 /// Cancels the COPY session; any data sent so far is discarded.
596 pub async fn cancel(mut self) {
597 if let Some(writer) = self.writer.take() {
598 let _ = writer.cancel("AsyncArrowInserterOwned::cancel").await;
599 }
600 }
601
602 /// Returns `true` if any data has been inserted.
603 #[must_use]
604 pub fn has_data(&self) -> bool {
605 self.schema_sent
606 }
607
608 /// Returns the total bytes sent.
609 #[must_use]
610 pub fn total_bytes(&self) -> usize {
611 self.total_bytes
612 }
613
614 /// Returns the number of chunks sent.
615 #[must_use]
616 pub fn chunk_count(&self) -> usize {
617 self.chunk_count
618 }
619
620 async fn ensure_writer(&mut self) -> Result<()> {
621 if self.writer.is_none() {
622 let client: &AsyncClient = self.connection.async_tcp_client().ok_or_else(|| {
623 Error::feature_not_supported(
624 "AsyncArrowInserterOwned requires a TCP connection. \
625 gRPC connections do not support COPY operations.",
626 )
627 })?;
628 let columns: Vec<&str> = self
629 .columns
630 .iter()
631 .map(std::string::String::as_str)
632 .collect();
633 self.writer = Some(
634 client
635 .copy_in_arc_with_format(
636 &self.table_name,
637 &columns,
638 DataFormat::ArrowStream.as_sql_str(),
639 )
640 .await?,
641 );
642 }
643 Ok(())
644 }
645
646 async fn maybe_flush(&mut self) -> Result<()> {
647 if self.buffered_bytes >= self.flush_threshold {
648 if let Some(ref mut w) = self.writer {
649 w.flush_stream().await?;
650 }
651 debug!(
652 target: "hyperdb_api",
653 flushed_bytes = self.buffered_bytes,
654 threshold = self.flush_threshold,
655 "async-arrow-inserter-owned-flush"
656 );
657 self.buffered_bytes = 0;
658 }
659 Ok(())
660 }
661}
662
663impl Drop for AsyncArrowInserterOwned {
664 fn drop(&mut self) {
665 if self.writer.is_some() {
666 tracing::warn!(
667 target: "hyperdb_api",
668 chunks = self.chunk_count,
669 total_bytes = self.total_bytes,
670 table = %self.table_name,
671 "AsyncArrowInserterOwned dropped without calling execute() or cancel(). \
672 Data may be lost."
673 );
674 drop(self.writer.take());
675 }
676 }
677}