hyperdb_api/async_inserter.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Async high-performance bulk data inserter using the `HyperBinary` COPY
5//! protocol. The async mirror of the sync [`Inserter`](crate::Inserter).
6//!
7//! For zero-copy Arrow IPC bulk loads, see [`AsyncArrowInserter`](crate::AsyncArrowInserter).
8//!
9//! # Example
10//!
11//! ```no_run
12//! # use hyperdb_api::{AsyncConnection, AsyncInserter, CreateMode, Result, TableDefinition, SqlType};
13//! # async fn run() -> Result<()> {
14//! let conn = AsyncConnection::connect("localhost:7483", "data.hyper", CreateMode::CreateIfNotExists).await?;
15//! let table_def = TableDefinition::new("metrics")
16//! .add_required_column("id", SqlType::int())
17//! .add_required_column("value", SqlType::double());
18//! conn.execute_command(&table_def.to_create_sql(false)?).await?;
19//!
20//! let mut inserter = AsyncInserter::new(&conn, &table_def)?;
21//! for i in 0..100_000_i32 {
22//! inserter.add_i32(i)?;
23//! inserter.add_f64(f64::from(i) * 1.5)?;
24//! inserter.end_row().await?;
25//! }
26//! let rows = inserter.execute().await?;
27//! # let _ = rows;
28//! # Ok(())
29//! # }
30//! ```
31
32use std::time::Instant;
33
34use hyperdb_api_core::client::AsyncCopyInWriter;
35use hyperdb_api_core::protocol::copy;
36use hyperdb_api_core::types::bytes::BytesMut;
37use hyperdb_api_core::types::{Date, Interval, Numeric, OffsetTimestamp, Time, Timestamp};
38use tracing::info;
39
40use crate::async_connection::AsyncConnection;
41use crate::error::{Error, Result};
42use crate::inserter::InsertChunk;
43use crate::table_definition::TableDefinition;
44
45/// An async high-performance bulk data inserter.
46///
47/// `AsyncInserter` is the async mirror of [`Inserter`](crate::Inserter). Both
48/// produce identical wire output (HyperBinary COPY) and have identical
49/// throughput characteristics; the difference is whether the network I/O is
50/// blocking (`Inserter`) or `async/await` (`AsyncInserter`).
51///
52/// # Lifetime safety
53///
54/// The inserter borrows the underlying [`AsyncConnection`] for `'conn`. The
55/// borrow checker prevents the connection from being dropped or moved while
56/// the inserter is live.
57///
58/// # Single-use
59///
60/// Like the sync `Inserter`, `AsyncInserter` is intended for one COPY session
61/// per instance. After [`execute`](Self::execute) returns, the row counter is
62/// zeroed so a stray second call returns `Ok(0)` rather than corrupting the
63/// stream. To insert a second batch, construct a new `AsyncInserter`.
64#[derive(Debug)]
65pub struct AsyncInserter<'conn> {
66 connection: &'conn AsyncConnection,
67 table_def: TableDefinition,
68 chunk: InsertChunk,
69 row_count: u64,
70 chunk_count: usize,
71 writer: Option<AsyncCopyInWriter<'conn>>,
72 start_time: Instant,
73}
74
75#[allow(
76 clippy::missing_errors_doc,
77 reason = "per-column add_* methods all return the same error shape \
78 documented on Inserter::add_bool — repeating the same `# Errors` \
79 block on 15 thin delegators adds noise without adding info"
80)]
81impl<'conn> AsyncInserter<'conn> {
82 /// Creates a new async inserter for the given table.
83 ///
84 /// The underlying COPY session is started lazily on the first flush or
85 /// execute, so construction is lightweight. The connection's transport
86 /// is validated eagerly — a gRPC connection returns an error immediately.
87 ///
88 /// # Errors
89 ///
90 /// - Returns [`Error::InvalidTableDefinition`] if `table_def` has zero columns.
91 /// - Returns [`Error::Other`] if `connection` is using gRPC transport
92 /// (COPY is TCP-only).
93 pub fn new(connection: &'conn AsyncConnection, table_def: &TableDefinition) -> Result<Self> {
94 if table_def.column_count() == 0 {
95 return Err(Error::InvalidTableDefinition(
96 "Table definition must have at least one column".into(),
97 ));
98 }
99 if connection.async_tcp_client().is_none() {
100 return Err(Error::new(
101 "AsyncInserter requires a TCP connection. \
102 gRPC connections do not support COPY operations.",
103 ));
104 }
105 Ok(Self {
106 connection,
107 table_def: table_def.clone(),
108 chunk: InsertChunk::from_table_definition(table_def),
109 row_count: 0,
110 chunk_count: 0,
111 writer: None,
112 start_time: Instant::now(),
113 })
114 }
115
116 // -------------------------------------------------------------------
117 // Per-column adders — delegate straight to the inner chunk.
118 // The chunk owns column-order discipline; we just track row counts.
119 // -------------------------------------------------------------------
120
121 /// Adds a NULL value to the next column.
122 pub fn add_null(&mut self) -> Result<()> {
123 self.chunk.add_null()
124 }
125 /// Adds a boolean value.
126 pub fn add_bool(&mut self, value: bool) -> Result<()> {
127 self.chunk.add_bool(value)
128 }
129 /// Adds a 16-bit signed integer.
130 pub fn add_i16(&mut self, value: i16) -> Result<()> {
131 self.chunk.add_i16(value)
132 }
133 /// Adds a 32-bit signed integer.
134 pub fn add_i32(&mut self, value: i32) -> Result<()> {
135 self.chunk.add_i32(value)
136 }
137 /// Adds a 64-bit signed integer.
138 pub fn add_i64(&mut self, value: i64) -> Result<()> {
139 self.chunk.add_i64(value)
140 }
141 /// Adds a 32-bit float.
142 pub fn add_f32(&mut self, value: f32) -> Result<()> {
143 self.chunk.add_f32(value)
144 }
145 /// Adds a 64-bit float.
146 pub fn add_f64(&mut self, value: f64) -> Result<()> {
147 self.chunk.add_f64(value)
148 }
149 /// Adds a string (TEXT) value.
150 pub fn add_str(&mut self, value: &str) -> Result<()> {
151 self.chunk.add_str(value)
152 }
153 /// Adds a bytes (BYTEA) value.
154 pub fn add_bytes(&mut self, value: &[u8]) -> Result<()> {
155 self.chunk.add_bytes(value)
156 }
157 /// Adds a Date value.
158 pub fn add_date(&mut self, value: Date) -> Result<()> {
159 self.chunk.add_date(value)
160 }
161 /// Adds a Time value.
162 pub fn add_time(&mut self, value: Time) -> Result<()> {
163 self.chunk.add_time(value)
164 }
165 /// Adds a Timestamp value.
166 pub fn add_timestamp(&mut self, value: Timestamp) -> Result<()> {
167 self.chunk.add_timestamp(value)
168 }
169 /// Adds an OffsetTimestamp value.
170 pub fn add_offset_timestamp(&mut self, value: OffsetTimestamp) -> Result<()> {
171 self.chunk.add_offset_timestamp(value)
172 }
173 /// Adds an Interval value.
174 pub fn add_interval(&mut self, value: Interval) -> Result<()> {
175 self.chunk.add_interval(value)
176 }
177 /// Adds a `Numeric` value (NUMERIC). The encoding (small vs big) is
178 /// chosen from the table definition's column precision at this position.
179 ///
180 /// # Errors
181 ///
182 /// Returns an error if the column's precision cannot be determined from
183 /// the table definition (NUMERIC columns must be declared with explicit
184 /// precision/scale).
185 pub fn add_numeric(&mut self, value: Numeric) -> Result<()> {
186 let column_index = self.chunk.column_index();
187 let precision = self
188 .table_def
189 .columns
190 .get(column_index)
191 .and_then(super::table_definition::ColumnDefinition::sql_type)
192 .and_then(|t| t.precision())
193 .ok_or_else(|| {
194 let col_name = self
195 .table_def
196 .columns
197 .get(column_index)
198 .map_or("<unknown>", |c| c.name.as_str());
199 Error::new(format!(
200 "Cannot determine numeric precision for column '{col_name}' at index {column_index}. \
201 Ensure the column is defined with explicit SqlType including precision."
202 ))
203 })?;
204 if precision <= Numeric::SMALL_NUMERIC_MAX_PRECISION {
205 let unscaled = value.unscaled_value();
206 let narrowed = i64::try_from(unscaled).map_err(|_| {
207 Error::new(format!(
208 "Numeric value {unscaled} is out of range for i64 storage (precision {precision})"
209 ))
210 })?;
211 self.chunk.add_i64(narrowed)
212 } else {
213 self.chunk.add_data128(&value.to_packed())
214 }
215 }
216
217 /// Marks the end of the current row. Must be called after every full row.
218 /// May trigger an automatic flush of the buffered data to the server.
219 ///
220 /// # Errors
221 ///
222 /// - Returns [`Error::Other`] if the column count for the row doesn't match
223 /// the table definition.
224 /// - Returns [`Error::Client`] / [`Error::Io`] on transport failures
225 /// during the auto-flush.
226 pub async fn end_row(&mut self) -> Result<()> {
227 self.chunk.end_row()?;
228 self.row_count += 1;
229 if self.chunk.should_flush() {
230 self.flush().await?;
231 }
232 Ok(())
233 }
234
235 /// Sends any buffered chunk to the server. Idempotent: calling on an
236 /// already-empty chunk is a no-op.
237 async fn flush(&mut self) -> Result<()> {
238 // Lazily start the COPY session on first flush.
239 if self.writer.is_none() {
240 let client = self.connection.async_tcp_client().ok_or_else(|| {
241 Error::new(
242 "AsyncInserter requires a TCP connection. \
243 gRPC connections do not support COPY operations.",
244 )
245 })?;
246 let columns: Vec<&str> = self
247 .table_def
248 .columns
249 .iter()
250 .map(|c| c.name.as_str())
251 .collect();
252 let table_name = self.table_def.qualified_name();
253 self.writer = Some(client.copy_in(&table_name, &columns).await?);
254 }
255 if let Some(buffer) = self.chunk.take() {
256 if let Some(writer) = self.writer.as_mut() {
257 writer.send(&buffer).await?;
258 self.chunk_count += 1;
259 }
260 }
261 Ok(())
262 }
263
264 /// Executes the insert and commits all buffered rows.
265 ///
266 /// Sends any remaining buffered data and finishes the COPY operation.
267 /// Returns the number of rows inserted.
268 ///
269 /// The inserter is single-use: calling `execute` a second time returns
270 /// `Ok(0)`. Construct a new `AsyncInserter` to insert another batch.
271 ///
272 /// # Errors
273 ///
274 /// - Returns [`Error::Other`] if there's an incomplete row (partial column).
275 /// - Returns [`Error::Client`] / [`Error::Io`] if the COPY session or
276 /// transport fails.
277 pub async fn execute(&mut self) -> Result<u64> {
278 if self.chunk.column_index() != 0 {
279 return Err(Error::new("Incomplete row at execute time"));
280 }
281 if self.row_count == 0 {
282 return Ok(0);
283 }
284 // Flush any tail data, then send the trailer and finish.
285 self.flush().await?;
286
287 let mut trailer_buf = BytesMut::with_capacity(2);
288 copy::write_trailer(&mut trailer_buf);
289 if let Some(writer) = self.writer.as_mut() {
290 writer.send(&trailer_buf).await?;
291 }
292
293 let rows = if let Some(writer) = self.writer.take() {
294 writer.finish().await?
295 } else {
296 0
297 };
298
299 let duration_ms = u64::try_from(self.start_time.elapsed().as_millis()).unwrap_or(u64::MAX);
300 info!(
301 target: "hyperdb_api",
302 rows,
303 chunks = self.chunk_count,
304 duration_ms,
305 table = %self.table_def.qualified_name(),
306 "async-inserter-end"
307 );
308
309 // Reset so a stray second execute() returns Ok(0).
310 self.row_count = 0;
311 Ok(rows)
312 }
313
314 /// Cancels the insert and discards all buffered rows.
315 ///
316 /// The Drop impl on `AsyncCopyInWriter` queues a `CopyFail` on the
317 /// connection so the server tears the COPY down cleanly on the next
318 /// connection use.
319 pub fn cancel(&mut self) {
320 self.writer = None;
321 self.row_count = 0;
322 }
323}