Skip to main content

hyperdb_api/
async_inserter.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async high-performance bulk data inserter using the `HyperBinary` COPY
5//! protocol. The async mirror of the sync [`Inserter`](crate::Inserter).
6//!
7//! For zero-copy Arrow IPC bulk loads, see [`AsyncArrowInserter`](crate::AsyncArrowInserter).
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{AsyncConnection, AsyncInserter, CreateMode, Result, TableDefinition, SqlType};
13//! # async fn run() -> Result<()> {
14//! let conn = AsyncConnection::connect("localhost:7483", "data.hyper", CreateMode::CreateIfNotExists).await?;
15//! let table_def = TableDefinition::new("metrics")
16//!     .add_required_column("id", SqlType::int())
17//!     .add_required_column("value", SqlType::double());
18//! conn.execute_command(&table_def.to_create_sql(false)?).await?;
19//!
20//! let mut inserter = AsyncInserter::new(&conn, &table_def)?;
21//! for i in 0..100_000_i32 {
22//!     inserter.add_i32(i)?;
23//!     inserter.add_f64(f64::from(i) * 1.5)?;
24//!     inserter.end_row().await?;
25//! }
26//! let rows = inserter.execute().await?;
27//! # let _ = rows;
28//! # Ok(())
29//! # }
30//! ```
31
32use std::time::Instant;
33
34use hyperdb_api_core::client::AsyncCopyInWriter;
35use hyperdb_api_core::protocol::copy;
36use hyperdb_api_core::types::bytes::BytesMut;
37use hyperdb_api_core::types::{Date, Interval, Numeric, OffsetTimestamp, Time, Timestamp};
38use tracing::info;
39
40use crate::async_connection::AsyncConnection;
41use crate::error::{Error, Result};
42use crate::inserter::InsertChunk;
43use crate::table_definition::TableDefinition;
44
45/// An async high-performance bulk data inserter.
46///
47/// `AsyncInserter` is the async mirror of [`Inserter`](crate::Inserter). Both
48/// produce identical wire output (HyperBinary COPY) and have identical
49/// throughput characteristics; the difference is whether the network I/O is
50/// blocking (`Inserter`) or `async/await` (`AsyncInserter`).
51///
52/// # Lifetime safety
53///
54/// The inserter borrows the underlying [`AsyncConnection`] for `'conn`. The
55/// borrow checker prevents the connection from being dropped or moved while
56/// the inserter is live.
57///
58/// # Single-use
59///
60/// Like the sync `Inserter`, `AsyncInserter` is intended for one COPY session
61/// per instance. After [`execute`](Self::execute) returns, the row counter is
62/// zeroed so a stray second call returns `Ok(0)` rather than corrupting the
63/// stream. To insert a second batch, construct a new `AsyncInserter`.
64#[derive(Debug)]
65pub struct AsyncInserter<'conn> {
66    connection: &'conn AsyncConnection,
67    table_def: TableDefinition,
68    chunk: InsertChunk,
69    row_count: u64,
70    chunk_count: usize,
71    writer: Option<AsyncCopyInWriter<'conn>>,
72    start_time: Instant,
73}
74
75#[allow(
76    clippy::missing_errors_doc,
77    reason = "per-column add_* methods all return the same error shape \
78              documented on Inserter::add_bool — repeating the same `# Errors` \
79              block on 15 thin delegators adds noise without adding info"
80)]
81impl<'conn> AsyncInserter<'conn> {
82    /// Creates a new async inserter for the given table.
83    ///
84    /// The underlying COPY session is started lazily on the first flush or
85    /// execute, so construction is lightweight. The connection's transport
86    /// is validated eagerly — a gRPC connection returns an error immediately.
87    ///
88    /// # Errors
89    ///
90    /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero columns.
91    /// - Returns [`Error::Other`] if `connection` is using gRPC transport
92    ///   (COPY is TCP-only).
93    pub fn new(connection: &'conn AsyncConnection, table_def: &TableDefinition) -> Result<Self> {
94        if table_def.column_count() == 0 {
95            return Err(Error::InvalidTableDefinition(
96                "Table definition must have at least one column".into(),
97            ));
98        }
99        if connection.async_tcp_client().is_none() {
100            return Err(Error::new(
101                "AsyncInserter requires a TCP connection. \
102                 gRPC connections do not support COPY operations.",
103            ));
104        }
105        Ok(Self {
106            connection,
107            table_def: table_def.clone(),
108            chunk: InsertChunk::from_table_definition(table_def),
109            row_count: 0,
110            chunk_count: 0,
111            writer: None,
112            start_time: Instant::now(),
113        })
114    }
115
116    // -------------------------------------------------------------------
117    // Per-column adders — delegate straight to the inner chunk.
118    // The chunk owns column-order discipline; we just track row counts.
119    // -------------------------------------------------------------------
120
121    /// Adds a NULL value to the next column.
122    pub fn add_null(&mut self) -> Result<()> {
123        self.chunk.add_null()
124    }
125    /// Adds a boolean value.
126    pub fn add_bool(&mut self, value: bool) -> Result<()> {
127        self.chunk.add_bool(value)
128    }
129    /// Adds a 16-bit signed integer.
130    pub fn add_i16(&mut self, value: i16) -> Result<()> {
131        self.chunk.add_i16(value)
132    }
133    /// Adds a 32-bit signed integer.
134    pub fn add_i32(&mut self, value: i32) -> Result<()> {
135        self.chunk.add_i32(value)
136    }
137    /// Adds a 64-bit signed integer.
138    pub fn add_i64(&mut self, value: i64) -> Result<()> {
139        self.chunk.add_i64(value)
140    }
141    /// Adds a 32-bit float.
142    pub fn add_f32(&mut self, value: f32) -> Result<()> {
143        self.chunk.add_f32(value)
144    }
145    /// Adds a 64-bit float.
146    pub fn add_f64(&mut self, value: f64) -> Result<()> {
147        self.chunk.add_f64(value)
148    }
149    /// Adds a string (TEXT) value.
150    pub fn add_str(&mut self, value: &str) -> Result<()> {
151        self.chunk.add_str(value)
152    }
153    /// Adds a bytes (BYTEA) value.
154    pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
155        self.chunk.add_bytes(value)
156    }
157    /// Adds a Date value.
158    pub fn add_date(&mut self, value: Date) -> Result<()> {
159        self.chunk.add_date(value)
160    }
161    /// Adds a Time value.
162    pub fn add_time(&mut self, value: Time) -> Result<()> {
163        self.chunk.add_time(value)
164    }
165    /// Adds a Timestamp value.
166    pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
167        self.chunk.add_timestamp(value)
168    }
169    /// Adds an OffsetTimestamp value.
170    pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
171        self.chunk.add_offset_timestamp(value)
172    }
173    /// Adds an Interval value.
174    pub fn add_interval(&mut self, value: Interval) -> Result<()> {
175        self.chunk.add_interval(value)
176    }
177    /// Adds a `Numeric` value (NUMERIC). The encoding (small vs big) is
178    /// chosen from the table definition's column precision at this position.
179    ///
180    /// # Errors
181    ///
182    /// Returns an error if the column's precision cannot be determined from
183    /// the table definition (NUMERIC columns must be declared with explicit
184    /// precision/scale).
185    pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
186        let column_index = self.chunk.column_index();
187        let precision = self
188            .table_def
189            .columns
190            .get(column_index)
191            .and_then(super::table_definition::ColumnDefinition::sql_type)
192            .and_then(|t| t.precision())
193            .ok_or_else(|| {
194                let col_name = self
195                    .table_def
196                    .columns
197                    .get(column_index)
198                    .map_or("<unknown>", |c| c.name.as_str());
199                Error::new(format!(
200                    "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
201                     Ensure the column is defined with explicit SqlType including precision."
202                ))
203            })?;
204        if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
205            let unscaled = value.unscaled_value();
206            let narrowed = i64::try_from(unscaled).map_err(|_| {
207                Error::new(format!(
208                    "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
209                ))
210            })?;
211            self.chunk.add_i64(narrowed)
212        } else {
213            self.chunk.add_data128(&value.to_packed())
214        }
215    }
216
217    /// Marks the end of the current row. Must be called after every full row.
218    /// May trigger an automatic flush of the buffered data to the server.
219    ///
220    /// # Errors
221    ///
222    /// - Returns [`Error::Other`] if the column count for the row doesn't match
223    ///   the table definition.
224    /// - Returns [`Error::Client`] / [`Error::Io`] on transport failures
225    ///   during the auto-flush.
226    pub async fn end_row(&mut self) -> Result<()> {
227        self.chunk.end_row()?;
228        self.row_count += 1;
229        if self.chunk.should_flush() {
230            self.flush().await?;
231        }
232        Ok(())
233    }
234
235    /// Sends any buffered chunk to the server. Idempotent: calling on an
236    /// already-empty chunk is a no-op.
237    async fn flush(&mut self) -> Result<()> {
238        // Lazily start the COPY session on first flush.
239        if self.writer.is_none() {
240            let client = self.connection.async_tcp_client().ok_or_else(|| {
241                Error::new(
242                    "AsyncInserter requires a TCP connection. \
243                     gRPC connections do not support COPY operations.",
244                )
245            })?;
246            let columns: Vec<&str> = self
247                .table_def
248                .columns
249                .iter()
250                .map(|c| c.name.as_str())
251                .collect();
252            let table_name = self.table_def.qualified_name();
253            self.writer = Some(client.copy_in(&table_name, &columns).await?);
254        }
255        if let Some(buffer) = self.chunk.take() {
256            if let Some(writer) = self.writer.as_mut() {
257                writer.send(&buffer).await?;
258                self.chunk_count += 1;
259            }
260        }
261        Ok(())
262    }
263
264    /// Executes the insert and commits all buffered rows.
265    ///
266    /// Sends any remaining buffered data and finishes the COPY operation.
267    /// Returns the number of rows inserted.
268    ///
269    /// The inserter is single-use: calling `execute` a second time returns
270    /// `Ok(0)`. Construct a new `AsyncInserter` to insert another batch.
271    ///
272    /// # Errors
273    ///
274    /// - Returns [`Error::Other`] if there's an incomplete row (partial column).
275    /// - Returns [`Error::Client`] / [`Error::Io`] if the COPY session or
276    ///   transport fails.
277    pub async fn execute(&mut self) -> Result<u64> {
278        if self.chunk.column_index() != 0 {
279            return Err(Error::new("Incomplete row at execute time"));
280        }
281        if self.row_count == 0 {
282            return Ok(0);
283        }
284        // Flush any tail data, then send the trailer and finish.
285        self.flush().await?;
286
287        let mut trailer_buf = BytesMut::with_capacity(2);
288        copy::write_trailer(&mut trailer_buf);
289        if let Some(writer) = self.writer.as_mut() {
290            writer.send(&trailer_buf).await?;
291        }
292
293        let rows = if let Some(writer) = self.writer.take() {
294            writer.finish().await?
295        } else {
296            0
297        };
298
299        let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
300        info!(
301            target: "hyperdb_api",
302            rows,
303            chunks = self.chunk_count,
304            duration_ms,
305            table = %self.table_def.qualified_name(),
306            "async-inserter-end"
307        );
308
309        // Reset so a stray second execute() returns Ok(0).
310        self.row_count = 0;
311        Ok(rows)
312    }
313
314    /// Cancels the insert and discards all buffered rows.
315    ///
316    /// The Drop impl on `AsyncCopyInWriter` queues a `CopyFail` on the
317    /// connection so the server tears the COPY down cleanly on the next
318    /// connection use.
319    pub fn cancel(&mut self) {
320        self.writer = None;
321        self.row_count = 0;
322    }
323}