hyperdb_api/async_arrow_inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async Arrow IPC stream inserter for bulk data loading.
5//!
6//! This module provides the [`AsyncArrowInserter`] struct for inserting pre-formatted
7//! Arrow IPC stream data into Hyper tables asynchronously.
8
9use std::sync::Arc;
10use std::time::Instant;
11
12use hyperdb_api_core::client::{AsyncClient, AsyncCopyInWriter, AsyncCopyInWriterOwned};
13use tracing::{debug, info};
14
15use crate::async_connection::AsyncConnection;
16use crate::data_format::DataFormat;
17use crate::error::{Error, Result};
18use crate::table_definition::TableDefinition;
19
20/// Default flush threshold (16 MB) — matches `HyperBinary` Inserter.
21const DEFAULT_FLUSH_THRESHOLD: usize = 16 * 1024 * 1024;
22
23/// Async inserter for Arrow IPC stream data into a Hyper table.
24///
25/// This is the async version of [`ArrowInserter`](crate::ArrowInserter), designed for use
26/// in tokio-based async applications.
27///
28/// # Ownership & Drop
29///
30/// You **must** call either [`execute()`](Self::execute) or [`cancel()`](Self::cancel)
31/// to properly terminate the COPY session. If the inserter is dropped without
32/// calling one of these, a best-effort `CopyFail` is queued and the connection
33/// will self-heal on the next async operation. Data sent so far will be lost.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api::{AsyncArrowInserter, AsyncConnection, CreateMode, TableDefinition, SqlType, Result};
39///
40/// #[tokio::main]
41/// async fn main() -> Result<()> {
42/// let conn = AsyncConnection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists).await?;
43///
44/// let table_def = TableDefinition::new("data")
45/// .add_required_column("id", SqlType::int())
46/// .add_nullable_column("value", SqlType::double());
47///
48/// // Arrow IPC data from external source
49/// let arrow_data: Vec<u8> = vec![]; // Your Arrow IPC stream here
50///
51/// let mut inserter = AsyncArrowInserter::new(&conn, &table_def)?;
52/// inserter.insert_data(&arrow_data).await?;
53/// let rows = inserter.execute().await?;
54/// println!("Inserted {} rows", rows);
55/// Ok(())
56/// }
57/// ```
58#[derive(Debug)]
59pub struct AsyncArrowInserter<'conn> {
60 connection: &'conn AsyncConnection,
61 table_name: String,
62 columns: Vec<String>,
63 writer: Option<AsyncCopyInWriter<'conn>>,
64 /// Tracks whether an Arrow schema has been sent.
65 schema_sent: bool,
66 /// Total bytes sent (for logging).
67 total_bytes: usize,
68 /// Number of chunks sent.
69 chunk_count: usize,
70 /// Start time for timing the insert operation.
71 start_time: Instant,
72 /// Flush threshold in bytes. Data is buffered until this threshold is reached.
73 flush_threshold: usize,
74 /// Bytes buffered since the last flush.
75 buffered_bytes: usize,
76}
77
78impl<'conn> AsyncArrowInserter<'conn> {
79 /// Creates a new async Arrow inserter for the given table.
80 ///
81 /// The underlying COPY session is started lazily on the first data write,
82 /// so construction is lightweight. However, the connection's transport is
83 /// validated eagerly — using a gRPC connection will return an error
84 /// immediately.
85 ///
86 /// # Arguments
87 ///
88 /// * `connection` - The async database connection (must be TCP, not gRPC).
89 /// * `table_def` - The table definition for the target table.
90 ///
91 /// # Errors
92 ///
93 /// - Returns [`Error::Other`] with message
94 /// `"Table definition must have at least one column"` if `table_def`
95 /// has no columns.
96 /// - Returns [`Error::Other`] if `connection` is using gRPC transport
97 /// (COPY is TCP-only).
98 pub fn new(connection: &'conn AsyncConnection, table_def: &TableDefinition) -> Result<Self> {
99 let column_count = table_def.column_count();
100 if column_count == 0 {
101 return Err(Error::new("Table definition must have at least one column"));
102 }
103
104 // Fail fast: verify the connection supports COPY (TCP only)
105 if connection.async_tcp_client().is_none() {
106 return Err(Error::new(
107 "AsyncArrowInserter requires a TCP connection. \
108 gRPC connections do not support COPY operations.",
109 ));
110 }
111
112 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
113 let table_name = table_def.qualified_name();
114
115 Ok(AsyncArrowInserter {
116 connection,
117 table_name,
118 columns,
119 writer: None,
120 schema_sent: false,
121 total_bytes: 0,
122 chunk_count: 0,
123 start_time: Instant::now(),
124 flush_threshold: DEFAULT_FLUSH_THRESHOLD,
125 buffered_bytes: 0,
126 })
127 }
128
129 /// Sets a custom flush threshold in bytes.
130 ///
131 /// Data is buffered until the threshold is reached, then flushed to the server.
132 /// Default is 16 MB (matching `HyperBinary` Inserter).
133 #[must_use]
134 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
135 self.flush_threshold = threshold;
136 self
137 }
138
139 /// Inserts a complete Arrow IPC stream (schema + record batches).
140 ///
141 /// Use this method for single-chunk inserts or for the first chunk of
142 /// multi-chunk inserts. The Arrow IPC stream must include the schema message
143 /// followed by one or more record batch messages.
144 ///
145 /// # Errors
146 ///
147 /// - Returns [`Error::Other`] if a schema was already sent (call
148 /// [`insert_record_batches`](Self::insert_record_batches) for
149 /// subsequent chunks instead).
150 /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
151 /// session cannot be opened.
152 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
153 /// the data or the socket write fails.
154 pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
155 if arrow_ipc_data.is_empty() {
156 return Ok(());
157 }
158
159 if self.schema_sent {
160 return Err(Error::new(
161 "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks without schema, \
162 or use insert_data() only once with the complete Arrow IPC stream.",
163 ));
164 }
165
166 self.ensure_writer().await?;
167
168 if let Some(ref mut writer) = self.writer {
169 writer.send_direct(arrow_ipc_data).await?;
170 }
171 self.buffered_bytes += arrow_ipc_data.len();
172 self.maybe_flush().await?;
173
174 self.schema_sent = true;
175 self.total_bytes += arrow_ipc_data.len();
176 self.chunk_count += 1;
177
178 debug!(
179 target: "hyperdb_api",
180 chunk = self.chunk_count,
181 bytes = arrow_ipc_data.len(),
182 total_bytes = self.total_bytes,
183 buffered_bytes = self.buffered_bytes,
184 "async-arrow-inserter-chunk"
185 );
186
187 Ok(())
188 }
189
190 /// Inserts Arrow record batch data without schema.
191 ///
192 /// Use this method for subsequent chunks after the first `insert_data()` call.
193 /// The data should contain only Arrow record batch messages, **not** the schema.
194 ///
195 /// # Errors
196 ///
197 /// - Returns [`Error::Other`] if no schema has been sent yet (call
198 /// [`insert_data`](Self::insert_data) first).
199 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
200 /// the data or the socket write fails.
201 pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
202 if arrow_batch_data.is_empty() {
203 return Ok(());
204 }
205
206 if !self.schema_sent {
207 return Err(Error::new(
208 "No Arrow schema has been sent yet. Call insert_data() first with a complete \
209 Arrow IPC stream that includes the schema.",
210 ));
211 }
212
213 if let Some(ref mut writer) = self.writer {
214 writer.send_direct(arrow_batch_data).await?;
215 }
216 self.buffered_bytes += arrow_batch_data.len();
217 self.maybe_flush().await?;
218
219 self.total_bytes += arrow_batch_data.len();
220 self.chunk_count += 1;
221
222 debug!(
223 target: "hyperdb_api",
224 chunk = self.chunk_count,
225 bytes = arrow_batch_data.len(),
226 total_bytes = self.total_bytes,
227 buffered_bytes = self.buffered_bytes,
228 "async-arrow-inserter-batch-chunk"
229 );
230
231 Ok(())
232 }
233
234 /// Inserts raw Arrow data without schema tracking.
235 ///
236 /// This is a low-level method that sends data directly without checking
237 /// whether schema has been sent. Use this only if you are managing schema
238 /// handling yourself.
239 ///
240 /// # Errors
241 ///
242 /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
243 /// session cannot be opened.
244 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
245 /// the data or the socket write fails.
246 pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
247 if data.is_empty() {
248 return Ok(());
249 }
250
251 self.ensure_writer().await?;
252
253 if let Some(ref mut writer) = self.writer {
254 writer.send_direct(data).await?;
255 }
256 self.buffered_bytes += data.len();
257 self.maybe_flush().await?;
258
259 self.total_bytes += data.len();
260 self.chunk_count += 1;
261
262 Ok(())
263 }
264
265 /// Finishes the insert operation and returns the number of rows inserted.
266 ///
267 /// This sends any remaining buffered data to the server and completes
268 /// the COPY session. Always call this (or [`cancel()`](Self::cancel))
269 /// to properly terminate the session.
270 ///
271 /// # Errors
272 ///
273 /// Returns [`Error::Client`] or [`Error::Io`] if the `CommandComplete`
274 /// round-trip fails (server rejected some buffered batch, or the socket
275 /// closed mid-flush). If no data was ever written, returns `Ok(0)`.
276 pub async fn execute(mut self) -> Result<u64> {
277 if self.writer.is_none() {
278 return Ok(0);
279 }
280
281 let rows = match self.writer.take() {
282 Some(w) => w.finish().await?,
283 None => 0,
284 };
285
286 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
287 info!(
288 target: "hyperdb_api",
289 rows,
290 chunks = self.chunk_count,
291 total_bytes = self.total_bytes,
292 duration_ms,
293 table = %self.table_name,
294 "async-arrow-inserter-end"
295 );
296
297 Ok(rows)
298 }
299
300 /// Cancels the insert operation.
301 ///
302 /// All data sent so far will be discarded.
303 pub async fn cancel(mut self) {
304 if let Some(writer) = self.writer.take() {
305 let _ = writer.cancel("Arrow insert cancelled").await;
306 }
307 }
308
309 /// Returns whether any data has been sent.
310 #[must_use]
311 pub fn has_data(&self) -> bool {
312 self.chunk_count > 0
313 }
314
315 /// Returns the total bytes sent so far.
316 #[must_use]
317 pub fn total_bytes(&self) -> usize {
318 self.total_bytes
319 }
320
321 /// Returns the number of chunks sent so far.
322 #[must_use]
323 pub fn chunk_count(&self) -> usize {
324 self.chunk_count
325 }
326
327 /// Ensures the COPY writer is initialized.
328 async fn ensure_writer(&mut self) -> Result<()> {
329 if self.writer.is_none() {
330 let client = self.connection.async_tcp_client().ok_or_else(|| {
331 crate::Error::new("AsyncArrowInserter requires a TCP connection. gRPC connections do not support COPY operations.")
332 })?;
333 let columns: Vec<&str> = self
334 .columns
335 .iter()
336 .map(std::string::String::as_str)
337 .collect();
338 self.writer = Some(
339 client
340 .copy_in_with_format(
341 &self.table_name,
342 &columns,
343 DataFormat::ArrowStream.as_sql_str(),
344 )
345 .await?,
346 );
347 }
348 Ok(())
349 }
350
351 /// Flushes the TCP stream if the threshold is reached.
352 ///
353 /// With `send_direct()`, data is written directly to TCP. This periodic
354 /// flush ensures data is pushed to the server for high-latency connections.
355 async fn maybe_flush(&mut self) -> Result<()> {
356 if self.buffered_bytes >= self.flush_threshold {
357 if let Some(ref mut writer) = self.writer {
358 writer.flush_stream().await?;
359 }
360 debug!(
361 target: "hyperdb_api",
362 flushed_bytes = self.buffered_bytes,
363 threshold = self.flush_threshold,
364 "async-arrow-inserter-flush"
365 );
366 self.buffered_bytes = 0;
367 }
368 Ok(())
369 }
370}
371
372impl Drop for AsyncArrowInserter<'_> {
373 fn drop(&mut self) {
374 if self.writer.is_some() {
375 tracing::warn!(
376 target: "hyperdb_api",
377 chunks = self.chunk_count,
378 total_bytes = self.total_bytes,
379 table = %self.table_name,
380 "AsyncArrowInserter dropped without calling execute() or cancel(). \
381 Data may be lost. The underlying AsyncCopyInWriter will \
382 attempt a best-effort cancel to restore the connection."
383 );
384 // Take the writer so its Drop impl runs, which queues a CopyFail
385 // message via try_lock(). The next async operation on the connection
386 // will drain the cancel response and restore ReadyForQuery state.
387 drop(self.writer.take());
388 }
389 }
390}
391
392// =============================================================================
393// AsyncArrowInserterOwned — lifetime-free variant
394// =============================================================================
395
396/// Owned-handle variant of [`AsyncArrowInserter`] that holds an
397/// `Arc<AsyncConnection>` instead of a borrow.
398///
399/// Semantics are identical to [`AsyncArrowInserter`] — same
400/// `HyperBinary` Arrow-stream COPY pipeline, same flush threshold,
401/// same Drop-time best-effort cancel. The only difference is that
402/// this variant is `'static` and can therefore live in structs that
403/// can't carry lifetimes (N-API classes, `tokio::spawn` tasks that
404/// outlive the constructor's stack frame, etc).
405#[derive(Debug)]
406pub struct AsyncArrowInserterOwned {
407 #[allow(
408 dead_code,
409 reason = "kept alive to anchor the client's Mutex Arc for the writer's lifetime"
410 )]
411 connection: Arc<AsyncConnection>,
412 table_name: String,
413 columns: Vec<String>,
414 writer: Option<AsyncCopyInWriterOwned>,
415 schema_sent: bool,
416 total_bytes: usize,
417 chunk_count: usize,
418 start_time: Instant,
419 flush_threshold: usize,
420 buffered_bytes: usize,
421}
422
423impl AsyncArrowInserterOwned {
424 /// Creates a new owned-handle async Arrow inserter.
425 ///
426 /// # Arguments
427 ///
428 /// * `connection` - `Arc`-shared async database connection. The
429 /// Arc is cloned into the inserter and kept alive for its
430 /// lifetime, so callers can drop their own handle immediately
431 /// after construction.
432 /// * `table_def` - The table definition for the target table.
433 ///
434 /// # Errors
435 ///
436 /// - Returns [`Error::Other`] with message
437 /// `"Table definition must have at least one column"` if `table_def`
438 /// has no columns.
439 /// - Returns [`Error::Other`] if `connection` is using gRPC transport.
440 pub fn new(connection: Arc<AsyncConnection>, table_def: &TableDefinition) -> Result<Self> {
441 let column_count = table_def.column_count();
442 if column_count == 0 {
443 return Err(Error::new("Table definition must have at least one column"));
444 }
445
446 if connection.async_tcp_client().is_none() {
447 return Err(Error::new(
448 "AsyncArrowInserterOwned requires a TCP connection. \
449 gRPC connections do not support COPY operations.",
450 ));
451 }
452
453 let columns: Vec<String> = table_def.columns.iter().map(|c| c.name.clone()).collect();
454 let table_name = table_def.qualified_name();
455
456 Ok(AsyncArrowInserterOwned {
457 connection,
458 table_name,
459 columns,
460 writer: None,
461 schema_sent: false,
462 total_bytes: 0,
463 chunk_count: 0,
464 start_time: Instant::now(),
465 flush_threshold: DEFAULT_FLUSH_THRESHOLD,
466 buffered_bytes: 0,
467 })
468 }
469
470 /// Sets a custom flush threshold in bytes. Default: 16 MB.
471 #[must_use]
472 pub fn with_flush_threshold(mut self, threshold: usize) -> Self {
473 self.flush_threshold = threshold;
474 self
475 }
476
477 /// Inserts a complete Arrow IPC stream (schema + record batches).
478 /// Use this for single-chunk inserts or as the first call of a
479 /// multi-chunk insert; subsequent chunks use
480 /// [`insert_record_batches`](Self::insert_record_batches).
481 ///
482 /// # Errors
483 ///
484 /// - Returns [`Error::Other`] if a schema was already sent.
485 /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
486 /// session cannot be opened.
487 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
488 /// the data or the socket write fails.
489 pub async fn insert_data(&mut self, arrow_ipc_data: &[u8]) -> Result<()> {
490 if arrow_ipc_data.is_empty() {
491 return Ok(());
492 }
493 if self.schema_sent {
494 return Err(Error::new(
495 "Arrow schema was already sent. Use insert_record_batches() for subsequent chunks.",
496 ));
497 }
498 self.ensure_writer().await?;
499 if let Some(ref mut w) = self.writer {
500 w.send_direct(arrow_ipc_data).await?;
501 }
502 self.schema_sent = true;
503 self.buffered_bytes += arrow_ipc_data.len();
504 self.maybe_flush().await?;
505 self.total_bytes += arrow_ipc_data.len();
506 self.chunk_count += 1;
507 Ok(())
508 }
509
510 /// Inserts Arrow record-batch bytes *without* a schema header.
511 /// Must be called after [`insert_data`](Self::insert_data) or
512 /// [`insert_raw`](Self::insert_raw).
513 ///
514 /// # Errors
515 ///
516 /// - Returns [`Error::Other`] if no schema has been sent yet.
517 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
518 /// the data or the socket write fails.
519 pub async fn insert_record_batches(&mut self, arrow_batch_data: &[u8]) -> Result<()> {
520 if arrow_batch_data.is_empty() {
521 return Ok(());
522 }
523 if !self.schema_sent {
524 return Err(Error::new(
525 "No Arrow schema has been sent yet. Call insert_data() first.",
526 ));
527 }
528 if let Some(ref mut w) = self.writer {
529 w.send_direct(arrow_batch_data).await?;
530 }
531 self.buffered_bytes += arrow_batch_data.len();
532 self.maybe_flush().await?;
533 self.total_bytes += arrow_batch_data.len();
534 self.chunk_count += 1;
535 Ok(())
536 }
537
538 /// Low-level: send raw bytes without schema tracking. The first
539 /// call transitions `schema_sent` to `true`. Use this when you are
540 /// managing Arrow IPC framing yourself.
541 ///
542 /// # Errors
543 ///
544 /// - Returns [`Error::Other`] / [`Error::Client`] if the lazy COPY
545 /// session cannot be opened.
546 /// - Returns [`Error::Client`] / [`Error::Io`] if the server rejects
547 /// the data or the socket write fails.
548 pub async fn insert_raw(&mut self, data: &[u8]) -> Result<()> {
549 if data.is_empty() {
550 return Ok(());
551 }
552 self.ensure_writer().await?;
553 if let Some(ref mut w) = self.writer {
554 w.send_direct(data).await?;
555 }
556 self.schema_sent = true;
557 self.buffered_bytes += data.len();
558 self.maybe_flush().await?;
559 self.total_bytes += data.len();
560 self.chunk_count += 1;
561 Ok(())
562 }
563
564 /// Finalizes the COPY session and returns the affected row count.
565 ///
566 /// # Errors
567 ///
568 /// - Returns [`Error::Other`] with message
569 /// `"No data was inserted before execute()"` if no COPY session was
570 /// ever opened.
571 /// - Returns [`Error::Client`] / [`Error::Io`] if the `CommandComplete`
572 /// round-trip fails.
573 pub async fn execute(mut self) -> Result<u64> {
574 let elapsed = self.start_time.elapsed();
575 info!(
576 target: "hyperdb_api",
577 chunks = self.chunk_count,
578 total_bytes = self.total_bytes,
579 elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX),
580 "async-arrow-inserter-execute"
581 );
582 let writer = self
583 .writer
584 .take()
585 .ok_or_else(|| Error::new("No data was inserted before execute()"))?;
586 writer.finish().await.map_err(Into::into)
587 }
588
589 /// Cancels the COPY session; any data sent so far is discarded.
590 pub async fn cancel(mut self) {
591 if let Some(writer) = self.writer.take() {
592 let _ = writer.cancel("AsyncArrowInserterOwned::cancel").await;
593 }
594 }
595
596 /// Returns `true` if any data has been inserted.
597 #[must_use]
598 pub fn has_data(&self) -> bool {
599 self.schema_sent
600 }
601
602 /// Returns the total bytes sent.
603 #[must_use]
604 pub fn total_bytes(&self) -> usize {
605 self.total_bytes
606 }
607
608 /// Returns the number of chunks sent.
609 #[must_use]
610 pub fn chunk_count(&self) -> usize {
611 self.chunk_count
612 }
613
614 async fn ensure_writer(&mut self) -> Result<()> {
615 if self.writer.is_none() {
616 let client: &AsyncClient = self.connection.async_tcp_client().ok_or_else(|| {
617 Error::new(
618 "AsyncArrowInserterOwned requires a TCP connection. \
619 gRPC connections do not support COPY operations.",
620 )
621 })?;
622 let columns: Vec<&str> = self
623 .columns
624 .iter()
625 .map(std::string::String::as_str)
626 .collect();
627 self.writer = Some(
628 client
629 .copy_in_arc_with_format(
630 &self.table_name,
631 &columns,
632 DataFormat::ArrowStream.as_sql_str(),
633 )
634 .await?,
635 );
636 }
637 Ok(())
638 }
639
640 async fn maybe_flush(&mut self) -> Result<()> {
641 if self.buffered_bytes >= self.flush_threshold {
642 if let Some(ref mut w) = self.writer {
643 w.flush_stream().await?;
644 }
645 debug!(
646 target: "hyperdb_api",
647 flushed_bytes = self.buffered_bytes,
648 threshold = self.flush_threshold,
649 "async-arrow-inserter-owned-flush"
650 );
651 self.buffered_bytes = 0;
652 }
653 Ok(())
654 }
655}
656
657impl Drop for AsyncArrowInserterOwned {
658 fn drop(&mut self) {
659 if self.writer.is_some() {
660 tracing::warn!(
661 target: "hyperdb_api",
662 chunks = self.chunk_count,
663 total_bytes = self.total_bytes,
664 table = %self.table_name,
665 "AsyncArrowInserterOwned dropped without calling execute() or cancel(). \
666 Data may be lost."
667 );
668 drop(self.writer.take());
669 }
670 }
671}