hyperdb_api_core/client/async_client.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level asynchronous client for Hyper database.
5//!
6//! This module provides [`AsyncClient`], the async version of [`Client`](crate::client::Client).
7//! It uses tokio for async I/O operations.
8
9use std::sync::Arc;
10
11use tokio::net::TcpStream;
12use tokio::sync::Mutex;
13use tracing::{debug, info, warn};
14
15#[cfg(unix)]
16use tokio::net::UnixStream;
17
18use super::async_connection::AsyncRawConnection;
19use super::async_stream::AsyncStream;
20use super::async_stream_query::AsyncQueryStream;
21use super::cancel::Cancellable;
22use super::config::Config;
23use super::endpoint::ConnectionEndpoint;
24use super::error::{Error, Result};
25use super::notice::{Notice, NoticeReceiver};
26use super::row::{Row, StreamRow};
27
28use crate::protocol::message::Message;
29
30/// An asynchronous client for Hyper database.
31///
32/// This is the async equivalent of [`Client`](crate::client::Client), designed for use
33/// in tokio-based async applications. All I/O operations are non-blocking.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api_core::client::{AsyncClient, Config};
39///
40/// #[tokio::main]
41/// async fn main() -> hyperdb_api_core::client::Result<()> {
42/// let config = Config::new()
43/// .with_host("localhost")
44/// .with_port(7483)
45/// .with_database("test.hyper");
46///
47/// let client = AsyncClient::connect(&config).await?;
48/// let rows = client.query("SELECT 1").await?;
49/// client.close().await?;
50/// Ok(())
51/// }
52/// ```
53pub struct AsyncClient {
54 /// The underlying async connection, protected by a mutex for concurrent access.
55 connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
56 /// Backend process ID (for cancel requests).
57 process_id: i32,
58 /// Secret key for authenticating cancel requests.
59 secret_key: i32,
60 /// Connection endpoint for cancel requests and reconnection.
61 endpoint: ConnectionEndpoint,
62 /// Optional notice receiver callback for server notices/warnings.
63 notice_receiver: Option<Arc<NoticeReceiver>>,
64}
65
66impl std::fmt::Debug for AsyncClient {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("AsyncClient")
69 .field("process_id", &self.process_id)
70 .field("secret_key", &self.secret_key)
71 .field("endpoint", &self.endpoint)
72 .field(
73 "notice_receiver",
74 &self.notice_receiver.as_ref().map(|_| "<callback>"),
75 )
76 .finish_non_exhaustive()
77 }
78}
79
80impl AsyncClient {
81 /// Connects to a Hyper server using the given configuration (async).
82 ///
83 /// # Example
84 ///
85 /// ```no_run
86 /// # use hyperdb_api_core::client::{AsyncClient, Config};
87 /// # async fn example() -> hyperdb_api_core::client::Result<()> {
88 /// let config = Config::new()
89 /// .with_host("localhost")
90 /// .with_port(7483)
91 /// .with_database("test.hyper");
92 ///
93 /// let client = AsyncClient::connect(&config).await?;
94 /// # Ok(())
95 /// # }
96 /// ```
97 ///
98 /// # Errors
99 ///
100 /// - Returns [`Error`] (connection) if the TCP connection cannot be
101 /// established to `config.host():config.port()`.
102 /// - Propagates any [`Error`] from the startup handshake —
103 /// [`Error`] (auth) for missing/wrong credentials,
104 /// [`Error`] (server) for server-side startup errors, [`Error`] (protocol)
105 /// for out-of-sequence messages, or [`Error`] (I/O) for wire
106 /// failure.
107 pub async fn connect(config: &Config) -> Result<Self> {
108 info!(
109 target: "hyperdb_api",
110 host = %config.host(),
111 port = config.port(),
112 user = config.user().unwrap_or("(default)"),
113 database = config.database().unwrap_or("(none)"),
114 "connection-parameters"
115 );
116
117 let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
118 let addr = format!("{}:{}", config.host(), config.port());
119 let tcp_stream = TcpStream::connect(&addr).await.map_err(|e| {
120 warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
121 Error::connection(format!("failed to connect to {addr}: {e}"))
122 })?;
123
124 // Set TCP options. See the sync mirror in
125 // [`super::client::Client::connect`] for the full rationale and
126 // empirical knee analysis. We bump `SO_RCVBUF` / `SO_SNDBUF` to
127 // 4 MiB (Windows default ~64 KiB throttles loopback throughput;
128 // Linux auto-tunes higher).
129 tcp_stream.set_nodelay(true).ok();
130 let sock = socket2::SockRef::from(&tcp_stream);
131 sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
132 sock.set_send_buffer_size(4 * 1024 * 1024).ok();
133 // TCP keepalive: detect a half-open peer (laptop sleep, network blip,
134 // a hyperd that vanished without a FIN) in ~90s instead of the 2h OS
135 // idle default. See the rationale on `super::client::apply_tcp_keepalive`
136 // (the sync mirror). Best-effort: a rejected knob leaves OS defaults.
137 {
138 let keepalive = socket2::TcpKeepalive::new()
139 .with_time(std::time::Duration::from_secs(60))
140 .with_interval(std::time::Duration::from_secs(10));
141 #[cfg(not(any(target_os = "macos", target_os = "windows")))]
142 let keepalive = keepalive.with_retries(3);
143 sock.set_tcp_keepalive(&keepalive).ok();
144 }
145
146 let stream = AsyncStream::tcp(tcp_stream);
147 let mut connection = AsyncRawConnection::new(stream);
148
149 // Perform startup with authentication
150 let params = config.startup_params();
151 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
152 connection.startup(¶ms_ref, config.password()).await?;
153
154 let process_id = connection.process_id();
155 let secret_key = connection.secret_key();
156
157 debug!(
158 target: "hyperdb_api",
159 process_id,
160 "connection-established"
161 );
162
163 Ok(AsyncClient {
164 connection: Arc::new(Mutex::new(connection)),
165 process_id,
166 secret_key,
167 endpoint,
168 notice_receiver: None,
169 })
170 }
171
172 /// Connects to a Hyper server via Unix Domain Socket (async, Unix only).
173 ///
174 /// # Example
175 ///
176 /// ```no_run
177 /// # use hyperdb_api_core::client::{AsyncClient, Config};
178 /// # use std::path::Path;
179 /// # async fn example() -> hyperdb_api_core::client::Result<()> {
180 /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
181 /// let config = Config::new().with_database("test.hyper");
182 /// let client = AsyncClient::connect_unix(socket_path, &config).await?;
183 /// # Ok(())
184 /// # }
185 /// ```
186 ///
187 /// # Errors
188 ///
189 /// - Returns [`Error`] (connection) if the Unix domain socket cannot
190 /// be connected.
191 /// - Propagates any error from the startup handshake (see
192 /// [`Self::connect`]).
193 #[cfg(unix)]
194 pub async fn connect_unix(
195 socket_path: impl AsRef<std::path::Path>,
196 config: &Config,
197 ) -> Result<Self> {
198 use std::path::Path;
199
200 let path = socket_path.as_ref();
201 info!(
202 target: "hyperdb_api",
203 socket_path = %path.display(),
204 user = config.user().unwrap_or("(default)"),
205 database = config.database().unwrap_or("(none)"),
206 "connection-parameters-unix"
207 );
208
209 let unix_stream = UnixStream::connect(path).await.map_err(|e| {
210 warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
211 Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
212 })?;
213
214 // Parse endpoint from socket path
215 let directory = path.parent().unwrap_or(Path::new("/"));
216 let name = path
217 .file_name()
218 .and_then(|n| n.to_str())
219 .unwrap_or("socket");
220 let endpoint = ConnectionEndpoint::domain_socket(directory, name);
221
222 let stream = AsyncStream::unix(unix_stream);
223 let mut connection = AsyncRawConnection::new(stream);
224
225 // Perform startup with authentication
226 let params = config.startup_params();
227 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
228 connection.startup(¶ms_ref, config.password()).await?;
229
230 let process_id = connection.process_id();
231 let secret_key = connection.secret_key();
232
233 debug!(
234 target: "hyperdb_api",
235 process_id,
236 "connection-established-unix"
237 );
238
239 Ok(AsyncClient {
240 connection: Arc::new(Mutex::new(connection)),
241 process_id,
242 secret_key,
243 endpoint,
244 notice_receiver: None,
245 })
246 }
247
248 /// Connects to a Hyper server via Windows Named Pipe (async, Windows only).
249 ///
250 /// # Arguments
251 ///
252 /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
253 /// * `config` - Connection configuration
254 ///
255 /// # Errors
256 ///
257 /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
258 /// exist, all instances are busy after the retry window, or permission is
259 /// denied) or if the authentication handshake fails.
260 #[cfg(windows)]
261 pub async fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
262 use std::time::{Duration, Instant};
263 use tokio::net::windows::named_pipe::ClientOptions;
264
265 info!(
266 target: "hyperdb_api",
267 pipe_path = %pipe_path,
268 user = config.user().unwrap_or("(default)"),
269 database = config.database().unwrap_or("(none)"),
270 "connection-parameters-named-pipe"
271 );
272
273 // Retry on `ERROR_PIPE_BUSY` (231) — Windows named pipes have a finite
274 // number of server-side instances and concurrent clients can hit the
275 // cap. See the sync mirror in [`super::client::Client::connect_named_pipe`]
276 // for the full rationale.
277 const RETRY_INTERVAL: Duration = Duration::from_millis(20);
278 const MAX_WAIT: Duration = Duration::from_secs(10);
279 const ERROR_PIPE_BUSY: i32 = 231;
280
281 let deadline = Instant::now() + MAX_WAIT;
282 let client = loop {
283 match ClientOptions::new().open(pipe_path) {
284 Ok(c) => break c,
285 Err(e)
286 if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
287 {
288 tokio::time::sleep(RETRY_INTERVAL).await;
289 }
290 Err(e) => {
291 warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
292 return Err(Error::connection(format!(
293 "failed to connect to named pipe {pipe_path}: {e}"
294 )));
295 }
296 }
297 };
298
299 // Parse endpoint from pipe path
300 let endpoint = ConnectionEndpoint::parse(&format!(
301 "tab.pipe://{}",
302 pipe_path.trim_start_matches(r"\\").replace('\\', "/")
303 ))
304 .unwrap_or_else(|_| {
305 let parts: Vec<&str> = pipe_path
306 .trim_start_matches(r"\\")
307 .splitn(3, '\\')
308 .collect();
309 if parts.len() >= 3 {
310 ConnectionEndpoint::named_pipe(parts[0], parts[2])
311 } else {
312 ConnectionEndpoint::named_pipe(".", pipe_path)
313 }
314 });
315
316 let stream = AsyncStream::named_pipe(client);
317 let mut connection = AsyncRawConnection::new(stream);
318
319 // Perform startup with authentication
320 let params = config.startup_params();
321 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
322 connection.startup(¶ms_ref, config.password()).await?;
323
324 let process_id = connection.process_id();
325 let secret_key = connection.secret_key();
326
327 debug!(
328 target: "hyperdb_api",
329 process_id,
330 "connection-established-named-pipe"
331 );
332
333 Ok(AsyncClient {
334 connection: Arc::new(Mutex::new(connection)),
335 process_id,
336 secret_key,
337 endpoint,
338 notice_receiver: None,
339 })
340 }
341
342 /// Connects to a Hyper server using a `ConnectionEndpoint` (async).
343 ///
344 /// This is a lower-level method that accepts a pre-parsed endpoint.
345 ///
346 /// # Errors
347 ///
348 /// Delegates to [`Self::connect`], [`Self::connect_unix`], or
349 /// `Self::connect_named_pipe` depending on the endpoint variant,
350 /// and propagates their errors unchanged.
351 pub async fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
352 match endpoint {
353 ConnectionEndpoint::Tcp { host, port } => {
354 let mut cfg = config.clone();
355 cfg = cfg.with_host(host.clone()).with_port(*port);
356 Self::connect(&cfg).await
357 }
358 #[cfg(unix)]
359 ConnectionEndpoint::DomainSocket { directory, name } => {
360 let socket_path = directory.join(name);
361 Self::connect_unix(&socket_path, config).await
362 }
363 #[cfg(windows)]
364 ConnectionEndpoint::NamedPipe { host, name } => {
365 let pipe_path = format!(r"\\{host}\pipe\{name}");
366 Self::connect_named_pipe(&pipe_path, config).await
367 }
368 }
369 }
370
371 /// Returns the connection endpoint.
372 #[must_use]
373 pub fn endpoint(&self) -> &ConnectionEndpoint {
374 &self.endpoint
375 }
376
377 /// Returns the server process ID for this connection.
378 #[must_use]
379 pub fn process_id(&self) -> i32 {
380 self.process_id
381 }
382
383 /// Returns the secret key for cancel requests.
384 #[must_use]
385 pub fn secret_key(&self) -> i32 {
386 self.secret_key
387 }
388
389 /// Cancels the currently executing query on this connection (async).
390 ///
391 /// This method opens a separate connection to send a cancel request.
392 /// For TCP endpoints, it opens a new TCP connection.
393 /// For Unix domain sockets, it connects to the same socket path.
394 ///
395 /// # Errors
396 ///
397 /// - Returns [`Error`] (connection) if a fresh cancel-side socket
398 /// (TCP / UDS / named-pipe) cannot be opened to
399 /// [`Self::endpoint`].
400 /// - Returns [`Error`] (I/O) if writing the cancel request fails.
401 pub async fn cancel(&self) -> Result<()> {
402 use crate::protocol::message::frontend;
403 use bytes::BytesMut;
404 use tokio::io::AsyncWriteExt;
405
406 info!(
407 target: "hyperdb_api",
408 process_id = self.process_id,
409 "query-cancel-request"
410 );
411
412 let endpoint_str = self.endpoint.to_string();
413
414 match &self.endpoint {
415 ConnectionEndpoint::Tcp { host, port } => {
416 let addr = format!("{host}:{port}");
417 let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
418 warn!(
419 target: "hyperdb_api",
420 addr = %endpoint_str,
421 error = %e,
422 "query-cancel-connect-failed"
423 );
424 Error::connection(format!(
425 "failed to connect for cancel request to {endpoint_str}: {e}"
426 ))
427 })?;
428 // Cancel is a 16-byte fire-and-forget — disable Nagle so the
429 // request hits the wire without waiting on a coalesce timer.
430 stream.set_nodelay(true).ok();
431
432 let mut buf = BytesMut::new();
433 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
434
435 stream.write_all(&buf).await.map_err(|e| {
436 warn!(
437 target: "hyperdb_api",
438 error = %e,
439 "query-cancel-send-failed"
440 );
441 Error::io(e)
442 })?;
443 }
444 #[cfg(unix)]
445 ConnectionEndpoint::DomainSocket { directory, name } => {
446 let socket_path = directory.join(name);
447 let mut stream = UnixStream::connect(&socket_path).await.map_err(|e| {
448 warn!(
449 target: "hyperdb_api",
450 addr = %endpoint_str,
451 error = %e,
452 "query-cancel-connect-failed"
453 );
454 Error::connection(format!(
455 "failed to connect for cancel request to {endpoint_str}: {e}"
456 ))
457 })?;
458
459 let mut buf = BytesMut::new();
460 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
461
462 stream.write_all(&buf).await.map_err(|e| {
463 warn!(
464 target: "hyperdb_api",
465 error = %e,
466 "query-cancel-send-failed"
467 );
468 Error::io(e)
469 })?;
470 }
471 #[cfg(windows)]
472 ConnectionEndpoint::NamedPipe { host, name } => {
473 let pipe_path = format!(r"\\{host}\pipe\{name}");
474 // Use sync file I/O for cancel (short-lived connection)
475 let mut file = std::fs::OpenOptions::new()
476 .read(true)
477 .write(true)
478 .open(&pipe_path)
479 .map_err(|e| {
480 warn!(
481 target: "hyperdb_api",
482 addr = %endpoint_str,
483 error = %e,
484 "query-cancel-connect-failed"
485 );
486 Error::connection(format!(
487 "failed to connect for cancel request to {endpoint_str}: {e}"
488 ))
489 })?;
490
491 let mut buf = BytesMut::new();
492 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
493
494 use std::io::Write;
495 file.write_all(&buf).map_err(|e| {
496 warn!(
497 target: "hyperdb_api",
498 error = %e,
499 "query-cancel-send-failed"
500 );
501 Error::io(e)
502 })?;
503
504 file.flush().map_err(Error::io)?;
505 }
506 }
507
508 debug!(target: "hyperdb_api", "query-cancel-sent");
509 Ok(())
510 }
511
512 /// Executes a query and returns all result rows (async).
513 ///
514 /// # Errors
515 ///
516 /// Propagates any [`Error`] from the underlying connection's
517 /// [`AsyncRawConnection::simple_query`] — [`Error`] (server) for
518 /// server-side SQL errors, [`Error`] (I/O) / [`Error`] (closed) for
519 /// transport failures, and [`Error`] (connection) if the connection
520 /// is unhealthy. Row construction may also raise an [`Error`] when
521 /// a `DataRow` cannot be decoded against its `RowDescription`.
522 pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
523 let mut conn = self.connection.lock().await;
524 let messages = conn.simple_query(sql).await?;
525 Self::process_query_messages(messages, self.notice_receiver.as_ref())
526 }
527
528 /// Executes a query with `HyperBinary` format for better performance (async).
529 ///
530 /// # Errors
531 ///
532 /// Same failure modes as [`Self::query`].
533 pub async fn query_fast(&self, sql: &str) -> Result<Vec<StreamRow>> {
534 let mut conn = self.connection.lock().await;
535 let messages = conn.query_binary(sql).await?;
536 Ok(Self::process_binary_messages(
537 messages,
538 self.notice_receiver.as_ref(),
539 ))
540 }
541
542 /// Executes a query with `HyperBinary` format and returns a streaming
543 /// result reader (async).
544 ///
545 /// This is the async mirror of
546 /// [`Client::query_streaming`](super::client::Client::query_streaming).
547 /// The returned [`AsyncQueryStream`] yields rows in chunks so callers
548 /// can process arbitrarily large result sets with constant memory. The
549 /// connection mutex is held for the duration of iteration; dropping the
550 /// stream before completion issues a best-effort cancel and marks the
551 /// connection desynchronized.
552 ///
553 /// # Errors
554 ///
555 /// - Returns [`Error`] (connection) if the connection is unhealthy.
556 /// - Returns [`Error`] (I/O) if writing the Parse/Bind/Execute/Sync
557 /// sequence fails on the transport.
558 pub async fn query_streaming(
559 &self,
560 sql: &str,
561 chunk_size: usize,
562 ) -> Result<AsyncQueryStream<'_>> {
563 let mut conn = self.connection.lock().await;
564 conn.start_query_binary(sql).await?;
565 Ok(AsyncQueryStream::new(conn, self, chunk_size))
566 }
567
568 /// Sends a best-effort `CancelRequest` using *synchronous* I/O so it
569 /// is usable from [`Drop`] impls (notably
570 /// [`AsyncQueryStream::drop`](super::async_stream_query::AsyncQueryStream)).
571 ///
572 /// Cancellation opens a short-lived TCP / UDS / Named-Pipe connection,
573 /// writes the cancel packet, and drops it — the server recognizes the
574 /// (`process_id`, `secret_key`) tuple and signals the long-running query
575 /// to abort. No response is expected.
576 fn cancel_sync(&self) -> Result<()> {
577 use crate::protocol::message::frontend;
578 use bytes::BytesMut;
579 use std::io::Write;
580
581 info!(
582 target: "hyperdb_api",
583 process_id = self.process_id,
584 "query-cancel-request"
585 );
586
587 let endpoint_str = self.endpoint.to_string();
588
589 match &self.endpoint {
590 ConnectionEndpoint::Tcp { host, port } => {
591 let addr = format!("{host}:{port}");
592 let mut stream = std::net::TcpStream::connect(&addr).map_err(|e| {
593 warn!(
594 target: "hyperdb_api",
595 addr = %endpoint_str,
596 error = %e,
597 "query-cancel-connect-failed"
598 );
599 Error::connection(format!(
600 "failed to connect for cancel request to {endpoint_str}: {e}"
601 ))
602 })?;
603 // Cancel is a 16-byte fire-and-forget — disable Nagle so the
604 // request hits the wire without waiting on a coalesce timer.
605 stream.set_nodelay(true).ok();
606
607 let mut buf = BytesMut::with_capacity(16);
608 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
609
610 stream.write_all(&buf).map_err(Error::io)?;
611 stream.flush().map_err(Error::io)?;
612 }
613 #[cfg(unix)]
614 ConnectionEndpoint::DomainSocket { directory, name } => {
615 let socket_path = directory.join(name);
616 let mut stream =
617 std::os::unix::net::UnixStream::connect(&socket_path).map_err(|e| {
618 warn!(
619 target: "hyperdb_api",
620 addr = %endpoint_str,
621 error = %e,
622 "query-cancel-connect-failed"
623 );
624 Error::connection(format!(
625 "failed to connect for cancel request to {endpoint_str}: {e}"
626 ))
627 })?;
628
629 let mut buf = BytesMut::with_capacity(16);
630 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
631
632 stream.write_all(&buf).map_err(Error::io)?;
633 stream.flush().map_err(Error::io)?;
634 }
635 #[cfg(windows)]
636 ConnectionEndpoint::NamedPipe { host, name } => {
637 let pipe_path = format!(r"\\{host}\pipe\{name}");
638 let mut file = std::fs::OpenOptions::new()
639 .read(true)
640 .write(true)
641 .open(&pipe_path)
642 .map_err(|e| {
643 warn!(
644 target: "hyperdb_api",
645 addr = %endpoint_str,
646 error = %e,
647 "query-cancel-connect-failed"
648 );
649 Error::connection(format!(
650 "failed to connect for cancel request to {endpoint_str}: {e}"
651 ))
652 })?;
653
654 let mut buf = BytesMut::with_capacity(16);
655 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
656
657 file.write_all(&buf).map_err(Error::io)?;
658 file.flush().map_err(Error::io)?;
659 }
660 }
661
662 debug!(target: "hyperdb_api", "query-cancel-sent");
663 Ok(())
664 }
665
666 /// Executes a command (INSERT/UPDATE/DELETE/DDL) and returns affected row count (async).
667 ///
668 /// # Errors
669 ///
670 /// Same failure modes as [`Self::query`] — server-side SQL errors,
671 /// transport failures, and unhealthy-connection state all surface
672 /// as [`Error`].
673 pub async fn exec(&self, sql: &str) -> Result<u64> {
674 let mut conn = self.connection.lock().await;
675 let messages = conn.simple_query(sql).await?;
676 Ok(Self::extract_row_count(&messages))
677 }
678
679 /// Returns a server parameter value by name.
680 pub async fn parameter_status(&self, name: &str) -> Option<String> {
681 let conn = self.connection.lock().await;
682 conn.parameter_status(name)
683 .map(std::string::ToString::to_string)
684 }
685
686 /// Sets the notice receiver callback.
687 pub fn set_notice_receiver(&mut self, receiver: Option<Box<dyn Fn(Notice) + Send + Sync>>) {
688 self.notice_receiver = receiver.map(Arc::from);
689 }
690
691 /// Closes the connection gracefully (async).
692 ///
693 /// # Errors
694 ///
695 /// Returns [`Error`] (I/O) if writing the `Terminate` frame or
696 /// flushing the async transport fails.
697 pub async fn close(self) -> Result<()> {
698 let mut conn = self.connection.lock().await;
699 conn.terminate().await
700 }
701
702 /// Executes a batch of statements separated by semicolons (async).
703 ///
704 /// # Errors
705 ///
706 /// Same failure modes as [`Self::query`].
707 pub async fn batch_execute(&self, sql: &str) -> Result<()> {
708 let mut conn = self.connection.lock().await;
709 let _messages = conn.simple_query(sql).await?;
710 Ok(())
711 }
712
713 /// Starts a COPY IN operation for bulk data insertion (async).
714 ///
715 /// # Errors
716 ///
717 /// Delegates to [`Self::copy_in_with_format`]; see that method
718 /// for concrete failure modes.
719 pub async fn copy_in(
720 &self,
721 table_name: &str,
722 columns: &[&str],
723 ) -> Result<AsyncCopyInWriter<'_>> {
724 self.copy_in_with_format(table_name, columns, "HYPERBINARY")
725 .await
726 }
727
728 /// Starts a COPY IN operation and returns an owned-handle writer
729 /// whose lifetime is independent of this client. The writer holds an
730 /// `Arc`-cloned reference to the underlying connection mutex, so it
731 /// can be stored in structs that need a `'static`-lifetime writer —
732 /// e.g. N-API classes that can't carry borrowed references across
733 /// JS callbacks.
734 ///
735 /// # Errors
736 ///
737 /// Same failure modes as [`Self::copy_in_with_format`].
738 pub async fn copy_in_arc_with_format(
739 &self,
740 table_name: &str,
741 columns: &[&str],
742 format: &str,
743 ) -> Result<AsyncCopyInWriterOwned> {
744 let mut conn = self.connection.lock().await;
745 conn.start_copy_in_with_format(table_name, columns, format)
746 .await?;
747 drop(conn);
748 Ok(AsyncCopyInWriterOwned::new(Arc::clone(&self.connection)))
749 }
750
751 /// Starts a COPY IN operation with a specified data format (async).
752 ///
753 /// # Errors
754 ///
755 /// - Returns [`Error`] (connection) if the connection is unhealthy.
756 /// - Returns [`Error`] (server) if the server rejects the generated
757 /// `COPY ... FROM STDIN` statement.
758 /// - Returns [`Error`] (I/O) on transport read/write failure.
759 pub async fn copy_in_with_format(
760 &self,
761 table_name: &str,
762 columns: &[&str],
763 format: &str,
764 ) -> Result<AsyncCopyInWriter<'_>> {
765 let mut conn = self.connection.lock().await;
766 conn.start_copy_in_with_format(table_name, columns, format)
767 .await?;
768 drop(conn);
769 Ok(AsyncCopyInWriter::new(&self.connection))
770 }
771
772 /// Executes a COPY ... TO STDOUT query and returns all output data (async).
773 ///
774 /// # Errors
775 ///
776 /// - Returns [`Error`] (connection) if the connection is unhealthy.
777 /// - Returns [`Error`] (server) when the server rejects the statement.
778 /// - Returns [`Error`] (I/O) / [`Error`] (closed) on transport
779 /// read/write failure.
780 pub async fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
781 let mut conn = self.connection.lock().await;
782 conn.copy_out(query).await
783 }
784
785 /// Returns true if the connection is alive.
786 #[must_use]
787 pub fn is_alive(&self) -> bool {
788 // Try to acquire the lock - if we can, connection is alive
789 self.connection.try_lock().is_ok()
790 }
791
792 /// Prepares a statement for execution (async).
793 ///
794 /// # Errors
795 ///
796 /// Delegates to [`Self::prepare_typed`]; see that method for the
797 /// failure modes.
798 pub async fn prepare(&self, query: &str) -> Result<AsyncPreparedStatement> {
799 self.prepare_typed(query, &[]).await
800 }
801
802 /// Prepares a statement with explicit parameter types (async).
803 ///
804 /// # Errors
805 ///
806 /// - Returns [`Error`] (connection) if the connection is unhealthy.
807 /// - Returns [`Error`] (server) if the server rejects the `Parse`
808 /// request (SQL syntax, unknown parameter OIDs, etc.).
809 /// - Returns [`Error`] (I/O) on transport read/write failure.
810 pub async fn prepare_typed(
811 &self,
812 query: &str,
813 param_types: &[crate::types::Oid],
814 ) -> Result<AsyncPreparedStatement> {
815 use std::sync::atomic::{AtomicU64, Ordering};
816 static COUNTER: AtomicU64 = AtomicU64::new(0);
817
818 let name = format!(
819 "__hyper_async_stmt_{}",
820 COUNTER.fetch_add(1, Ordering::Relaxed)
821 );
822 let mut conn = self.connection.lock().await;
823 let (params, columns) = conn.prepare(&name, query, param_types).await?;
824
825 Ok(AsyncPreparedStatement {
826 name,
827 query: query.to_string(),
828 param_types: params,
829 columns,
830 connection: Arc::downgrade(&self.connection),
831 closed: false,
832 })
833 }
834
835 /// Closes a prepared statement on the server (async).
836 ///
837 /// Prefer the RAII [`AsyncPreparedStatement::close`] method on the
838 /// statement itself — it consumes the statement and prevents the
839 /// auto-close Drop path from double-closing.
840 ///
841 /// # Errors
842 ///
843 /// Propagates any error from
844 /// [`AsyncRawConnection::close_statement`] — unhealthy connection,
845 /// server-side error during `Close`/`Sync`, or transport failure.
846 pub async fn close_statement(&self, statement: &AsyncPreparedStatement) -> Result<()> {
847 let mut conn = self.connection.lock().await;
848 conn.close_statement(&statement.name).await
849 }
850
851 /// Executes a prepared statement with parameters (async).
852 ///
853 /// # Errors
854 ///
855 /// Propagates any error from
856 /// [`AsyncRawConnection::execute_prepared`] — unhealthy connection,
857 /// parameter/type mismatch, server-side execution failure, or
858 /// transport failure. Row construction may also raise an [`Error`]
859 /// when a `DataRow` cannot be decoded.
860 pub async fn execute_prepared<P: AsRef<[Option<Vec<u8>>]>>(
861 &self,
862 statement: &AsyncPreparedStatement,
863 params: P,
864 ) -> Result<Vec<Row>> {
865 let params_ref: Vec<Option<&[u8]>> = params
866 .as_ref()
867 .iter()
868 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
869 .collect();
870
871 let mut conn = self.connection.lock().await;
872 conn.execute_prepared(&statement.name, ¶ms_ref, statement.columns.len())
873 .await
874 }
875
876 /// Executes a prepared statement that doesn't return rows (async).
877 ///
878 /// # Errors
879 ///
880 /// Same failure modes as [`Self::execute_prepared`] (excluding
881 /// row-construction errors — this path never builds rows).
882 pub async fn execute_prepared_no_result<P: AsRef<[Option<Vec<u8>>]>>(
883 &self,
884 statement: &AsyncPreparedStatement,
885 params: P,
886 ) -> Result<u64> {
887 let params_ref: Vec<Option<&[u8]>> = params
888 .as_ref()
889 .iter()
890 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
891 .collect();
892
893 let mut conn = self.connection.lock().await;
894 conn.execute_prepared_no_result(&statement.name, ¶ms_ref)
895 .await
896 }
897
898 /// Executes a prepared statement with streaming results (async).
899 ///
900 /// Returns an [`AsyncPreparedQueryStream`](super::async_prepared_stream::AsyncPreparedQueryStream)
901 /// that yields rows in chunks, keeping memory bounded regardless of
902 /// result size. Async mirror of
903 /// [`Client::execute_streaming`](crate::client::Client::execute_streaming).
904 ///
905 /// # Errors
906 ///
907 /// - Returns [`Error`] (connection) if the connection is unhealthy.
908 /// - Returns [`Error`] (I/O) if writing the initial Bind/Execute/Sync
909 /// sequence fails on the transport.
910 pub async fn execute_prepared_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
911 &'a self,
912 statement: &AsyncPreparedStatement,
913 params: P,
914 chunk_size: usize,
915 ) -> Result<super::async_prepared_stream::AsyncPreparedQueryStream<'a>> {
916 let params_ref: Vec<Option<&[u8]>> = params
917 .as_ref()
918 .iter()
919 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
920 .collect();
921
922 let mut conn = self.connection.lock().await;
923 conn.start_execute_prepared(&statement.name, ¶ms_ref, statement.columns.len())
924 .await?;
925
926 let columns = std::sync::Arc::new(statement.columns.clone());
927 Ok(super::async_prepared_stream::AsyncPreparedQueryStream::new(
928 conn, self, chunk_size, columns,
929 ))
930 }
931}
932
933impl Cancellable for AsyncClient {
934 /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
935 /// fresh connection. Uses synchronous I/O so it is callable from
936 /// `Drop` impls. Errors are logged and swallowed because cancellation
937 /// is best-effort and callers cannot meaningfully recover.
938 fn cancel(&self) {
939 if let Err(e) = AsyncClient::cancel_sync(self) {
940 warn!(
941 target: "hyperdb_api_core::client",
942 error = %e,
943 process_id = self.process_id,
944 "cancel request failed (best-effort, swallowed)",
945 );
946 }
947 }
948}
949
950impl AsyncClient {
951 fn process_query_messages(
952 messages: Vec<Message>,
953 notice_receiver: Option<&Arc<NoticeReceiver>>,
954 ) -> Result<Vec<Row>> {
955 use super::statement::{Column, ColumnFormat};
956
957 let mut rows = Vec::new();
958 let mut columns: Option<Arc<Vec<Column>>> = None;
959
960 for msg in messages {
961 match msg {
962 Message::RowDescription(desc) => {
963 let mut cols = Vec::new();
964 for field in desc.fields().filter_map(std::result::Result::ok) {
965 cols.push(Column::new(
966 field.name().to_string(),
967 field.type_oid(),
968 field.type_modifier(),
969 ColumnFormat::from_code(field.format()),
970 ));
971 }
972 columns = Some(Arc::new(cols));
973 }
974 Message::DataRow(data) => {
975 if let Some(ref cols) = columns {
976 rows.push(Row::new(Arc::clone(cols), data)?);
977 }
978 }
979 Message::NoticeResponse(body) => {
980 if let Some(receiver) = notice_receiver {
981 let notice = Notice::from_response_body(&body);
982 receiver(notice);
983 }
984 }
985 _ => {}
986 }
987 }
988 Ok(rows)
989 }
990
991 fn process_binary_messages(
992 messages: Vec<Message>,
993 notice_receiver: Option<&Arc<NoticeReceiver>>,
994 ) -> Vec<StreamRow> {
995 let mut rows = Vec::new();
996
997 for msg in messages {
998 match msg {
999 Message::DataRow(data) => {
1000 rows.push(StreamRow::new(data));
1001 }
1002 Message::NoticeResponse(body) => {
1003 if let Some(receiver) = notice_receiver {
1004 let notice = Notice::from_response_body(&body);
1005 receiver(notice);
1006 }
1007 }
1008 _ => {}
1009 }
1010 }
1011 rows
1012 }
1013
1014 fn extract_row_count(messages: &[Message]) -> u64 {
1015 for msg in messages {
1016 if let Message::CommandComplete(body) = msg {
1017 if let Ok(tag) = body.tag() {
1018 // Parse formats like "INSERT 0 5", "UPDATE 10", "DELETE 3"
1019 let parts: Vec<&str> = tag.split_whitespace().collect();
1020 if let Some(last) = parts.last() {
1021 if let Ok(count) = last.parse() {
1022 return count;
1023 }
1024 }
1025 }
1026 }
1027 }
1028 0
1029 }
1030}
1031
1032/// An async prepared statement.
1033///
1034/// Represents a server-side prepared statement that can be executed
1035/// multiple times with different parameters. **Auto-closes on `Drop`**
1036/// via a best-effort `tokio::spawn` task — if no tokio runtime is
1037/// available at drop time we log a warning and flag the connection
1038/// desynchronized rather than silently leaking the server-side
1039/// statement slot.
1040///
1041/// For callers who need confirmed close with error propagation, use
1042/// [`AsyncPreparedStatement::close`] (explicit async close).
1043#[derive(Debug)]
1044pub struct AsyncPreparedStatement {
1045 /// Statement name on the server.
1046 pub(crate) name: String,
1047 /// Original SQL query string.
1048 query: String,
1049 /// Parameter type OIDs.
1050 param_types: Vec<crate::types::Oid>,
1051 /// Result column descriptions.
1052 pub(crate) columns: Vec<super::statement::Column>,
1053 /// Weak handle to the owning connection for the Drop path. `Weak`
1054 /// so that a lingering statement never keeps the connection alive
1055 /// past the `AsyncClient` it was prepared against.
1056 connection: std::sync::Weak<Mutex<AsyncRawConnection<AsyncStream>>>,
1057 /// Flipped by `close(self)` to suppress the Drop-path auto-close.
1058 closed: bool,
1059}
1060
1061impl AsyncPreparedStatement {
1062 /// Returns the statement name.
1063 #[must_use]
1064 pub fn name(&self) -> &str {
1065 &self.name
1066 }
1067
1068 /// Returns the original query.
1069 #[must_use]
1070 pub fn query(&self) -> &str {
1071 &self.query
1072 }
1073
1074 /// Returns the parameter types.
1075 #[must_use]
1076 pub fn param_types(&self) -> &[crate::types::Oid] {
1077 &self.param_types
1078 }
1079
1080 /// Returns the number of parameters.
1081 #[must_use]
1082 pub fn param_count(&self) -> usize {
1083 self.param_types.len()
1084 }
1085
1086 /// Returns the result column descriptions.
1087 #[must_use]
1088 pub fn columns(&self) -> &[super::statement::Column] {
1089 &self.columns
1090 }
1091
1092 /// Returns the number of result columns.
1093 #[must_use]
1094 pub fn column_count(&self) -> usize {
1095 self.columns.len()
1096 }
1097
1098 /// Explicitly closes the prepared statement on the server (async).
1099 ///
1100 /// Consumes the statement — no further `execute_prepared` /
1101 /// `execute_prepared_streaming` calls are possible — and suppresses
1102 /// the Drop-path auto-close. Returns the `close_statement` result so
1103 /// callers can observe any transport errors.
1104 ///
1105 /// If you don't need error propagation, simply dropping the
1106 /// statement has the same effect (best-effort auto-close via
1107 /// `tokio::spawn`).
1108 ///
1109 /// # Errors
1110 ///
1111 /// Propagates any error from
1112 /// [`AsyncClient::close_statement`] — unhealthy connection,
1113 /// server-side error during `Close`/`Sync`, or transport failure.
1114 pub async fn close(mut self, client: &AsyncClient) -> Result<()> {
1115 self.closed = true;
1116 client.close_statement(&self).await
1117 }
1118}
1119
1120impl Drop for AsyncPreparedStatement {
1121 fn drop(&mut self) {
1122 // Explicit close already ran — nothing to do.
1123 if self.closed {
1124 return;
1125 }
1126
1127 // Best-effort close. Try to grab a tokio handle; if we're being
1128 // dropped outside a runtime (e.g. the runtime has already shut
1129 // down or the caller never had one), fall back to a warning and
1130 // flag the connection desynchronized so the next operation fails
1131 // loudly rather than racing with a lingering statement.
1132 let Some(conn) = self.connection.upgrade() else {
1133 // Connection has already been dropped — nothing to close.
1134 return;
1135 };
1136
1137 let name = std::mem::take(&mut self.name);
1138
1139 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1140 handle.spawn(async move {
1141 let mut c = conn.lock().await;
1142 if let Err(e) = c.close_statement(&name).await {
1143 warn!(
1144 target: "hyperdb_api_core::client",
1145 statement = %name,
1146 error = %e,
1147 "AsyncPreparedStatement drop-close failed (best-effort, swallowed)"
1148 );
1149 }
1150 });
1151 } else {
1152 // No runtime — can't do async I/O from here. Mark the
1153 // connection desynchronized so the next caller gets a
1154 // clear error instead of a latent leaked statement.
1155 if let Ok(mut c) = conn.try_lock() {
1156 c.mark_desynchronized();
1157 }
1158 warn!(
1159 target: "hyperdb_api_core::client",
1160 statement = %name,
1161 "AsyncPreparedStatement dropped outside of a tokio runtime; \
1162 server-side statement slot leaked and connection marked \
1163 desynchronized — call statement.close(&client) explicitly \
1164 for deterministic cleanup"
1165 );
1166 }
1167 }
1168}
1169
1170/// Async COPY IN writer for bulk data insertion.
1171///
1172/// # Drop Safety
1173///
1174/// If this writer is dropped without calling [`finish()`](Self::finish) or
1175/// [`cancel()`](Self::cancel), it will attempt a best-effort synchronous cancel
1176/// by queuing a `CopyFail` message in the connection's write buffer. The next
1177/// async operation on the connection will flush and drain the cancel response,
1178/// restoring the connection to a usable state.
1179#[derive(Debug)]
1180pub struct AsyncCopyInWriter<'a> {
1181 connection: &'a Mutex<AsyncRawConnection<AsyncStream>>,
1182 /// Set to `true` after `finish()` or `cancel()` consumes the writer.
1183 /// Checked in `Drop` to avoid queuing a spurious `CopyFail`.
1184 finished: bool,
1185}
1186
1187/// Owned-handle variant of [`AsyncCopyInWriter`] that holds an
1188/// `Arc<Mutex<_>>` to the underlying connection instead of a borrow.
1189/// Used by callers that need a `'static`-lifetime writer — e.g. N-API
1190/// classes that can't carry borrowed references across JS callbacks.
1191///
1192/// Semantics are identical to [`AsyncCopyInWriter`]; the only
1193/// difference is lifetime.
1194#[derive(Debug)]
1195pub struct AsyncCopyInWriterOwned {
1196 connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
1197 finished: bool,
1198}
1199
1200impl AsyncCopyInWriterOwned {
1201 /// Creates a new owned-handle COPY IN writer.
1202 pub(crate) fn new(connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>) -> Self {
1203 AsyncCopyInWriterOwned {
1204 connection,
1205 finished: false,
1206 }
1207 }
1208
1209 /// Sends data to the server.
1210 ///
1211 /// # Errors
1212 ///
1213 /// Currently infallible — frame construction is pure. The `Result`
1214 /// return type is preserved for forward compatibility.
1215 pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1216 let mut conn = self.connection.lock().await;
1217 conn.send_copy_data(data)?;
1218 Ok(())
1219 }
1220
1221 /// Flushes any buffered data to the server.
1222 ///
1223 /// # Errors
1224 ///
1225 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1226 pub async fn flush(&mut self) -> Result<()> {
1227 let mut conn = self.connection.lock().await;
1228 conn.flush().await
1229 }
1230
1231 /// Sends COPY data directly to the stream without internal buffering.
1232 ///
1233 /// # Errors
1234 ///
1235 /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1236 /// `u32::MAX`.
1237 /// - Returns [`Error`] (I/O) on transport write failure.
1238 pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1239 let mut conn = self.connection.lock().await;
1240 conn.send_copy_data_direct(data).await
1241 }
1242
1243 /// Flushes the TCP stream.
1244 ///
1245 /// # Errors
1246 ///
1247 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1248 pub async fn flush_stream(&mut self) -> Result<()> {
1249 let mut conn = self.connection.lock().await;
1250 conn.flush_stream().await
1251 }
1252
1253 /// Finishes the COPY operation and returns the number of rows inserted.
1254 ///
1255 /// # Errors
1256 ///
1257 /// Returns [`Error`] (server) if the server reports an `ErrorResponse`
1258 /// (e.g. constraint violation), or [`Error`] (I/O) / [`Error`] (closed)
1259 /// on transport failure.
1260 pub async fn finish(mut self) -> Result<u64> {
1261 self.finished = true;
1262 let mut conn = self.connection.lock().await;
1263 conn.finish_copy().await
1264 }
1265
1266 /// Cancels the COPY operation.
1267 ///
1268 /// # Errors
1269 ///
1270 /// Returns [`Error`] (I/O) on transport write failure, or
1271 /// [`Error`] (closed) if the server drops the connection before
1272 /// acknowledging the cancel.
1273 pub async fn cancel(mut self, reason: &str) -> Result<()> {
1274 self.finished = true;
1275 let mut conn = self.connection.lock().await;
1276 conn.cancel_copy(reason).await
1277 }
1278}
1279
1280impl Drop for AsyncCopyInWriterOwned {
1281 fn drop(&mut self) {
1282 if self.finished {
1283 return;
1284 }
1285 // Mirror AsyncCopyInWriter's Drop — queue a CopyFail via
1286 // try_lock; the next async operation on the connection will
1287 // flush and drain.
1288 if let Ok(mut conn) = self.connection.try_lock() {
1289 conn.queue_copy_fail("AsyncCopyInWriterOwned dropped without finish/cancel");
1290 }
1291 }
1292}
1293
1294impl<'a> AsyncCopyInWriter<'a> {
1295 /// Creates a new COPY IN writer.
1296 pub(crate) fn new(connection: &'a Mutex<AsyncRawConnection<AsyncStream>>) -> Self {
1297 AsyncCopyInWriter {
1298 connection,
1299 finished: false,
1300 }
1301 }
1302
1303 /// Sends data to the server.
1304 ///
1305 /// # Errors
1306 ///
1307 /// Currently infallible — frame construction is pure. The `Result`
1308 /// return type is preserved for forward compatibility.
1309 pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1310 let mut conn = self.connection.lock().await;
1311 conn.send_copy_data(data)?;
1312 Ok(())
1313 }
1314
1315 /// Flushes any buffered data to the server.
1316 ///
1317 /// # Errors
1318 ///
1319 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1320 pub async fn flush(&mut self) -> Result<()> {
1321 let mut conn = self.connection.lock().await;
1322 conn.flush().await
1323 }
1324
1325 /// Sends COPY data directly to the stream without internal buffering.
1326 ///
1327 /// This writes data directly to the TCP stream, letting the kernel handle
1328 /// buffering. More efficient for streaming large amounts of data.
1329 /// Call `flush_stream()` periodically to ensure data is sent.
1330 ///
1331 /// # Errors
1332 ///
1333 /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1334 /// `u32::MAX`.
1335 /// - Returns [`Error`] (I/O) on transport write failure.
1336 pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1337 let mut conn = self.connection.lock().await;
1338 conn.send_copy_data_direct(data).await
1339 }
1340
1341 /// Flushes the TCP stream.
1342 ///
1343 /// Use with `send_direct()` to periodically ensure data reaches the server.
1344 ///
1345 /// # Errors
1346 ///
1347 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1348 pub async fn flush_stream(&mut self) -> Result<()> {
1349 let mut conn = self.connection.lock().await;
1350 conn.flush_stream().await
1351 }
1352
1353 /// Finishes the COPY operation and returns the number of rows inserted.
1354 ///
1355 /// # Errors
1356 ///
1357 /// Returns [`Error`] (server) if the server reports an `ErrorResponse`,
1358 /// or [`Error`] (I/O) / [`Error`] (closed) on transport failure.
1359 pub async fn finish(mut self) -> Result<u64> {
1360 self.finished = true;
1361 let mut conn = self.connection.lock().await;
1362 conn.finish_copy().await
1363 }
1364
1365 /// Cancels the COPY operation.
1366 ///
1367 /// # Errors
1368 ///
1369 /// Returns [`Error`] (I/O) on transport write failure, or
1370 /// [`Error`] (closed) if the server drops the connection before
1371 /// acknowledging the cancel.
1372 pub async fn cancel(mut self, reason: &str) -> Result<()> {
1373 self.finished = true;
1374 let mut conn = self.connection.lock().await;
1375 conn.cancel_copy(reason).await
1376 }
1377}
1378
1379impl Drop for AsyncCopyInWriter<'_> {
1380 fn drop(&mut self) {
1381 if self.finished {
1382 return;
1383 }
1384 // Best-effort cancel: try to acquire the mutex synchronously and
1385 // queue a CopyFail message. We cannot do async I/O from Drop, so
1386 // the message is only written to the buffer — not flushed. The next
1387 // async operation will call drain_pending_copy_cancel() to flush it
1388 // and restore the connection to ReadyForQuery state.
1389 if let Ok(mut conn) = self.connection.try_lock() {
1390 conn.queue_copy_fail("COPY writer dropped without finish or cancel");
1391 warn!(
1392 target: "hyperdb_api_core::client",
1393 "AsyncCopyInWriter dropped without finish() or cancel(). \
1394 Queued best-effort CopyFail — connection will self-heal on next operation."
1395 );
1396 } else {
1397 warn!(
1398 target: "hyperdb_api_core::client",
1399 "AsyncCopyInWriter dropped without finish() or cancel(), \
1400 and the connection mutex was locked. The connection may be \
1401 left in an unusable COPY-IN state."
1402 );
1403 }
1404 }
1405}