hyperdb_api_core/client/async_client.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level asynchronous client for Hyper database.
5//!
6//! This module provides [`AsyncClient`], the async version of [`Client`](crate::client::Client).
7//! It uses tokio for async I/O operations.
8
9use std::sync::Arc;
10
11use tokio::net::TcpStream;
12use tokio::sync::Mutex;
13use tracing::{debug, info, warn};
14
15#[cfg(unix)]
16use tokio::net::UnixStream;
17
18use super::async_connection::AsyncRawConnection;
19use super::async_stream::AsyncStream;
20use super::async_stream_query::AsyncQueryStream;
21use super::cancel::Cancellable;
22use super::config::Config;
23use super::endpoint::ConnectionEndpoint;
24use super::error::{Error, Result};
25use super::notice::{Notice, NoticeReceiver};
26use super::row::{Row, StreamRow};
27
28use crate::protocol::message::Message;
29
30/// An asynchronous client for Hyper database.
31///
32/// This is the async equivalent of [`Client`](crate::client::Client), designed for use
33/// in tokio-based async applications. All I/O operations are non-blocking.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api_core::client::{AsyncClient, Config};
39///
40/// #[tokio::main]
41/// async fn main() -> hyperdb_api_core::client::Result<()> {
42/// let config = Config::new()
43/// .with_host("localhost")
44/// .with_port(7483)
45/// .with_database("test.hyper");
46///
47/// let client = AsyncClient::connect(&config).await?;
48/// let rows = client.query("SELECT 1").await?;
49/// client.close().await?;
50/// Ok(())
51/// }
52/// ```
53pub struct AsyncClient {
54 /// The underlying async connection, protected by a mutex for concurrent access.
55 connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
56 /// Backend process ID (for cancel requests).
57 process_id: i32,
58 /// Secret key for authenticating cancel requests.
59 secret_key: i32,
60 /// Connection endpoint for cancel requests and reconnection.
61 endpoint: ConnectionEndpoint,
62 /// Optional notice receiver callback for server notices/warnings.
63 notice_receiver: Option<Arc<NoticeReceiver>>,
64}
65
66impl std::fmt::Debug for AsyncClient {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 f.debug_struct("AsyncClient")
69 .field("process_id", &self.process_id)
70 .field("secret_key", &self.secret_key)
71 .field("endpoint", &self.endpoint)
72 .field(
73 "notice_receiver",
74 &self.notice_receiver.as_ref().map(|_| "<callback>"),
75 )
76 .finish_non_exhaustive()
77 }
78}
79
80impl AsyncClient {
81 /// Connects to a Hyper server using the given configuration (async).
82 ///
83 /// # Example
84 ///
85 /// ```no_run
86 /// # use hyperdb_api_core::client::{AsyncClient, Config};
87 /// # async fn example() -> hyperdb_api_core::client::Result<()> {
88 /// let config = Config::new()
89 /// .with_host("localhost")
90 /// .with_port(7483)
91 /// .with_database("test.hyper");
92 ///
93 /// let client = AsyncClient::connect(&config).await?;
94 /// # Ok(())
95 /// # }
96 /// ```
97 ///
98 /// # Errors
99 ///
100 /// - Returns [`Error`] (connection) if the TCP connection cannot be
101 /// established to `config.host():config.port()`.
102 /// - Propagates any [`Error`] from the startup handshake —
103 /// [`Error`] (auth) for missing/wrong credentials,
104 /// [`Error`] (server) for server-side startup errors, [`Error`] (protocol)
105 /// for out-of-sequence messages, or [`Error`] (I/O) for wire
106 /// failure.
107 pub async fn connect(config: &Config) -> Result<Self> {
108 info!(
109 target: "hyperdb_api",
110 host = %config.host(),
111 port = config.port(),
112 user = config.user().unwrap_or("(default)"),
113 database = config.database().unwrap_or("(none)"),
114 "connection-parameters"
115 );
116
117 let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
118 let addr = format!("{}:{}", config.host(), config.port());
119 let tcp_stream = TcpStream::connect(&addr).await.map_err(|e| {
120 warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
121 Error::connection(format!("failed to connect to {addr}: {e}"))
122 })?;
123
124 // Set TCP options. See the sync mirror in
125 // [`super::client::Client::connect`] for the full rationale and
126 // empirical knee analysis. We bump `SO_RCVBUF` / `SO_SNDBUF` to
127 // 4 MiB (Windows default ~64 KiB throttles loopback throughput;
128 // Linux auto-tunes higher).
129 tcp_stream.set_nodelay(true).ok();
130 let sock = socket2::SockRef::from(&tcp_stream);
131 sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
132 sock.set_send_buffer_size(4 * 1024 * 1024).ok();
133
134 let stream = AsyncStream::tcp(tcp_stream);
135 let mut connection = AsyncRawConnection::new(stream);
136
137 // Perform startup with authentication
138 let params = config.startup_params();
139 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
140 connection.startup(¶ms_ref, config.password()).await?;
141
142 let process_id = connection.process_id();
143 let secret_key = connection.secret_key();
144
145 debug!(
146 target: "hyperdb_api",
147 process_id,
148 "connection-established"
149 );
150
151 Ok(AsyncClient {
152 connection: Arc::new(Mutex::new(connection)),
153 process_id,
154 secret_key,
155 endpoint,
156 notice_receiver: None,
157 })
158 }
159
160 /// Connects to a Hyper server via Unix Domain Socket (async, Unix only).
161 ///
162 /// # Example
163 ///
164 /// ```no_run
165 /// # use hyperdb_api_core::client::{AsyncClient, Config};
166 /// # use std::path::Path;
167 /// # async fn example() -> hyperdb_api_core::client::Result<()> {
168 /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
169 /// let config = Config::new().with_database("test.hyper");
170 /// let client = AsyncClient::connect_unix(socket_path, &config).await?;
171 /// # Ok(())
172 /// # }
173 /// ```
174 ///
175 /// # Errors
176 ///
177 /// - Returns [`Error`] (connection) if the Unix domain socket cannot
178 /// be connected.
179 /// - Propagates any error from the startup handshake (see
180 /// [`Self::connect`]).
181 #[cfg(unix)]
182 pub async fn connect_unix(
183 socket_path: impl AsRef<std::path::Path>,
184 config: &Config,
185 ) -> Result<Self> {
186 use std::path::Path;
187
188 let path = socket_path.as_ref();
189 info!(
190 target: "hyperdb_api",
191 socket_path = %path.display(),
192 user = config.user().unwrap_or("(default)"),
193 database = config.database().unwrap_or("(none)"),
194 "connection-parameters-unix"
195 );
196
197 let unix_stream = UnixStream::connect(path).await.map_err(|e| {
198 warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
199 Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
200 })?;
201
202 // Parse endpoint from socket path
203 let directory = path.parent().unwrap_or(Path::new("/"));
204 let name = path
205 .file_name()
206 .and_then(|n| n.to_str())
207 .unwrap_or("socket");
208 let endpoint = ConnectionEndpoint::domain_socket(directory, name);
209
210 let stream = AsyncStream::unix(unix_stream);
211 let mut connection = AsyncRawConnection::new(stream);
212
213 // Perform startup with authentication
214 let params = config.startup_params();
215 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
216 connection.startup(¶ms_ref, config.password()).await?;
217
218 let process_id = connection.process_id();
219 let secret_key = connection.secret_key();
220
221 debug!(
222 target: "hyperdb_api",
223 process_id,
224 "connection-established-unix"
225 );
226
227 Ok(AsyncClient {
228 connection: Arc::new(Mutex::new(connection)),
229 process_id,
230 secret_key,
231 endpoint,
232 notice_receiver: None,
233 })
234 }
235
236 /// Connects to a Hyper server via Windows Named Pipe (async, Windows only).
237 ///
238 /// # Arguments
239 ///
240 /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
241 /// * `config` - Connection configuration
242 ///
243 /// # Errors
244 ///
245 /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
246 /// exist, all instances are busy after the retry window, or permission is
247 /// denied) or if the authentication handshake fails.
248 #[cfg(windows)]
249 pub async fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
250 use std::time::{Duration, Instant};
251 use tokio::net::windows::named_pipe::ClientOptions;
252
253 info!(
254 target: "hyperdb_api",
255 pipe_path = %pipe_path,
256 user = config.user().unwrap_or("(default)"),
257 database = config.database().unwrap_or("(none)"),
258 "connection-parameters-named-pipe"
259 );
260
261 // Retry on `ERROR_PIPE_BUSY` (231) — Windows named pipes have a finite
262 // number of server-side instances and concurrent clients can hit the
263 // cap. See the sync mirror in [`super::client::Client::connect_named_pipe`]
264 // for the full rationale.
265 const RETRY_INTERVAL: Duration = Duration::from_millis(20);
266 const MAX_WAIT: Duration = Duration::from_secs(10);
267 const ERROR_PIPE_BUSY: i32 = 231;
268
269 let deadline = Instant::now() + MAX_WAIT;
270 let client = loop {
271 match ClientOptions::new().open(pipe_path) {
272 Ok(c) => break c,
273 Err(e)
274 if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
275 {
276 tokio::time::sleep(RETRY_INTERVAL).await;
277 }
278 Err(e) => {
279 warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
280 return Err(Error::connection(format!(
281 "failed to connect to named pipe {pipe_path}: {e}"
282 )));
283 }
284 }
285 };
286
287 // Parse endpoint from pipe path
288 let endpoint = ConnectionEndpoint::parse(&format!(
289 "tab.pipe://{}",
290 pipe_path.trim_start_matches(r"\\").replace('\\', "/")
291 ))
292 .unwrap_or_else(|_| {
293 let parts: Vec<&str> = pipe_path
294 .trim_start_matches(r"\\")
295 .splitn(3, '\\')
296 .collect();
297 if parts.len() >= 3 {
298 ConnectionEndpoint::named_pipe(parts[0], parts[2])
299 } else {
300 ConnectionEndpoint::named_pipe(".", pipe_path)
301 }
302 });
303
304 let stream = AsyncStream::named_pipe(client);
305 let mut connection = AsyncRawConnection::new(stream);
306
307 // Perform startup with authentication
308 let params = config.startup_params();
309 let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
310 connection.startup(¶ms_ref, config.password()).await?;
311
312 let process_id = connection.process_id();
313 let secret_key = connection.secret_key();
314
315 debug!(
316 target: "hyperdb_api",
317 process_id,
318 "connection-established-named-pipe"
319 );
320
321 Ok(AsyncClient {
322 connection: Arc::new(Mutex::new(connection)),
323 process_id,
324 secret_key,
325 endpoint,
326 notice_receiver: None,
327 })
328 }
329
330 /// Connects to a Hyper server using a `ConnectionEndpoint` (async).
331 ///
332 /// This is a lower-level method that accepts a pre-parsed endpoint.
333 ///
334 /// # Errors
335 ///
336 /// Delegates to [`Self::connect`], [`Self::connect_unix`], or
337 /// `Self::connect_named_pipe` depending on the endpoint variant,
338 /// and propagates their errors unchanged.
339 pub async fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
340 match endpoint {
341 ConnectionEndpoint::Tcp { host, port } => {
342 let mut cfg = config.clone();
343 cfg = cfg.with_host(host.clone()).with_port(*port);
344 Self::connect(&cfg).await
345 }
346 #[cfg(unix)]
347 ConnectionEndpoint::DomainSocket { directory, name } => {
348 let socket_path = directory.join(name);
349 Self::connect_unix(&socket_path, config).await
350 }
351 #[cfg(windows)]
352 ConnectionEndpoint::NamedPipe { host, name } => {
353 let pipe_path = format!(r"\\{host}\pipe\{name}");
354 Self::connect_named_pipe(&pipe_path, config).await
355 }
356 }
357 }
358
359 /// Returns the connection endpoint.
360 #[must_use]
361 pub fn endpoint(&self) -> &ConnectionEndpoint {
362 &self.endpoint
363 }
364
365 /// Returns the server process ID for this connection.
366 #[must_use]
367 pub fn process_id(&self) -> i32 {
368 self.process_id
369 }
370
371 /// Returns the secret key for cancel requests.
372 #[must_use]
373 pub fn secret_key(&self) -> i32 {
374 self.secret_key
375 }
376
377 /// Cancels the currently executing query on this connection (async).
378 ///
379 /// This method opens a separate connection to send a cancel request.
380 /// For TCP endpoints, it opens a new TCP connection.
381 /// For Unix domain sockets, it connects to the same socket path.
382 ///
383 /// # Errors
384 ///
385 /// - Returns [`Error`] (connection) if a fresh cancel-side socket
386 /// (TCP / UDS / named-pipe) cannot be opened to
387 /// [`Self::endpoint`].
388 /// - Returns [`Error`] (I/O) if writing the cancel request fails.
389 pub async fn cancel(&self) -> Result<()> {
390 use crate::protocol::message::frontend;
391 use bytes::BytesMut;
392 use tokio::io::AsyncWriteExt;
393
394 info!(
395 target: "hyperdb_api",
396 process_id = self.process_id,
397 "query-cancel-request"
398 );
399
400 let endpoint_str = self.endpoint.to_string();
401
402 match &self.endpoint {
403 ConnectionEndpoint::Tcp { host, port } => {
404 let addr = format!("{host}:{port}");
405 let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
406 warn!(
407 target: "hyperdb_api",
408 addr = %endpoint_str,
409 error = %e,
410 "query-cancel-connect-failed"
411 );
412 Error::connection(format!(
413 "failed to connect for cancel request to {endpoint_str}: {e}"
414 ))
415 })?;
416 // Cancel is a 16-byte fire-and-forget — disable Nagle so the
417 // request hits the wire without waiting on a coalesce timer.
418 stream.set_nodelay(true).ok();
419
420 let mut buf = BytesMut::new();
421 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
422
423 stream.write_all(&buf).await.map_err(|e| {
424 warn!(
425 target: "hyperdb_api",
426 error = %e,
427 "query-cancel-send-failed"
428 );
429 Error::io(e)
430 })?;
431 }
432 #[cfg(unix)]
433 ConnectionEndpoint::DomainSocket { directory, name } => {
434 let socket_path = directory.join(name);
435 let mut stream = UnixStream::connect(&socket_path).await.map_err(|e| {
436 warn!(
437 target: "hyperdb_api",
438 addr = %endpoint_str,
439 error = %e,
440 "query-cancel-connect-failed"
441 );
442 Error::connection(format!(
443 "failed to connect for cancel request to {endpoint_str}: {e}"
444 ))
445 })?;
446
447 let mut buf = BytesMut::new();
448 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
449
450 stream.write_all(&buf).await.map_err(|e| {
451 warn!(
452 target: "hyperdb_api",
453 error = %e,
454 "query-cancel-send-failed"
455 );
456 Error::io(e)
457 })?;
458 }
459 #[cfg(windows)]
460 ConnectionEndpoint::NamedPipe { host, name } => {
461 let pipe_path = format!(r"\\{host}\pipe\{name}");
462 // Use sync file I/O for cancel (short-lived connection)
463 let mut file = std::fs::OpenOptions::new()
464 .read(true)
465 .write(true)
466 .open(&pipe_path)
467 .map_err(|e| {
468 warn!(
469 target: "hyperdb_api",
470 addr = %endpoint_str,
471 error = %e,
472 "query-cancel-connect-failed"
473 );
474 Error::connection(format!(
475 "failed to connect for cancel request to {endpoint_str}: {e}"
476 ))
477 })?;
478
479 let mut buf = BytesMut::new();
480 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
481
482 use std::io::Write;
483 file.write_all(&buf).map_err(|e| {
484 warn!(
485 target: "hyperdb_api",
486 error = %e,
487 "query-cancel-send-failed"
488 );
489 Error::io(e)
490 })?;
491
492 file.flush().map_err(Error::io)?;
493 }
494 }
495
496 debug!(target: "hyperdb_api", "query-cancel-sent");
497 Ok(())
498 }
499
500 /// Executes a query and returns all result rows (async).
501 ///
502 /// # Errors
503 ///
504 /// Propagates any [`Error`] from the underlying connection's
505 /// [`AsyncRawConnection::simple_query`] — [`Error`] (server) for
506 /// server-side SQL errors, [`Error`] (I/O) / [`Error`] (closed) for
507 /// transport failures, and [`Error`] (connection) if the connection
508 /// is unhealthy. Row construction may also raise an [`Error`] when
509 /// a `DataRow` cannot be decoded against its `RowDescription`.
510 pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
511 let mut conn = self.connection.lock().await;
512 let messages = conn.simple_query(sql).await?;
513 Self::process_query_messages(messages, self.notice_receiver.as_ref())
514 }
515
516 /// Executes a query with `HyperBinary` format for better performance (async).
517 ///
518 /// # Errors
519 ///
520 /// Same failure modes as [`Self::query`].
521 pub async fn query_fast(&self, sql: &str) -> Result<Vec<StreamRow>> {
522 let mut conn = self.connection.lock().await;
523 let messages = conn.query_binary(sql).await?;
524 Ok(Self::process_binary_messages(
525 messages,
526 self.notice_receiver.as_ref(),
527 ))
528 }
529
530 /// Executes a query with `HyperBinary` format and returns a streaming
531 /// result reader (async).
532 ///
533 /// This is the async mirror of
534 /// [`Client::query_streaming`](super::client::Client::query_streaming).
535 /// The returned [`AsyncQueryStream`] yields rows in chunks so callers
536 /// can process arbitrarily large result sets with constant memory. The
537 /// connection mutex is held for the duration of iteration; dropping the
538 /// stream before completion issues a best-effort cancel and marks the
539 /// connection desynchronized.
540 ///
541 /// # Errors
542 ///
543 /// - Returns [`Error`] (connection) if the connection is unhealthy.
544 /// - Returns [`Error`] (I/O) if writing the Parse/Bind/Execute/Sync
545 /// sequence fails on the transport.
546 pub async fn query_streaming(
547 &self,
548 sql: &str,
549 chunk_size: usize,
550 ) -> Result<AsyncQueryStream<'_>> {
551 let mut conn = self.connection.lock().await;
552 conn.start_query_binary(sql).await?;
553 Ok(AsyncQueryStream::new(conn, self, chunk_size))
554 }
555
556 /// Sends a best-effort `CancelRequest` using *synchronous* I/O so it
557 /// is usable from [`Drop`] impls (notably
558 /// [`AsyncQueryStream::drop`](super::async_stream_query::AsyncQueryStream)).
559 ///
560 /// Cancellation opens a short-lived TCP / UDS / Named-Pipe connection,
561 /// writes the cancel packet, and drops it — the server recognizes the
562 /// (`process_id`, `secret_key`) tuple and signals the long-running query
563 /// to abort. No response is expected.
564 fn cancel_sync(&self) -> Result<()> {
565 use crate::protocol::message::frontend;
566 use bytes::BytesMut;
567 use std::io::Write;
568
569 info!(
570 target: "hyperdb_api",
571 process_id = self.process_id,
572 "query-cancel-request"
573 );
574
575 let endpoint_str = self.endpoint.to_string();
576
577 match &self.endpoint {
578 ConnectionEndpoint::Tcp { host, port } => {
579 let addr = format!("{host}:{port}");
580 let mut stream = std::net::TcpStream::connect(&addr).map_err(|e| {
581 warn!(
582 target: "hyperdb_api",
583 addr = %endpoint_str,
584 error = %e,
585 "query-cancel-connect-failed"
586 );
587 Error::connection(format!(
588 "failed to connect for cancel request to {endpoint_str}: {e}"
589 ))
590 })?;
591 // Cancel is a 16-byte fire-and-forget — disable Nagle so the
592 // request hits the wire without waiting on a coalesce timer.
593 stream.set_nodelay(true).ok();
594
595 let mut buf = BytesMut::with_capacity(16);
596 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
597
598 stream.write_all(&buf).map_err(Error::io)?;
599 stream.flush().map_err(Error::io)?;
600 }
601 #[cfg(unix)]
602 ConnectionEndpoint::DomainSocket { directory, name } => {
603 let socket_path = directory.join(name);
604 let mut stream =
605 std::os::unix::net::UnixStream::connect(&socket_path).map_err(|e| {
606 warn!(
607 target: "hyperdb_api",
608 addr = %endpoint_str,
609 error = %e,
610 "query-cancel-connect-failed"
611 );
612 Error::connection(format!(
613 "failed to connect for cancel request to {endpoint_str}: {e}"
614 ))
615 })?;
616
617 let mut buf = BytesMut::with_capacity(16);
618 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
619
620 stream.write_all(&buf).map_err(Error::io)?;
621 stream.flush().map_err(Error::io)?;
622 }
623 #[cfg(windows)]
624 ConnectionEndpoint::NamedPipe { host, name } => {
625 let pipe_path = format!(r"\\{host}\pipe\{name}");
626 let mut file = std::fs::OpenOptions::new()
627 .read(true)
628 .write(true)
629 .open(&pipe_path)
630 .map_err(|e| {
631 warn!(
632 target: "hyperdb_api",
633 addr = %endpoint_str,
634 error = %e,
635 "query-cancel-connect-failed"
636 );
637 Error::connection(format!(
638 "failed to connect for cancel request to {endpoint_str}: {e}"
639 ))
640 })?;
641
642 let mut buf = BytesMut::with_capacity(16);
643 frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
644
645 file.write_all(&buf).map_err(Error::io)?;
646 file.flush().map_err(Error::io)?;
647 }
648 }
649
650 debug!(target: "hyperdb_api", "query-cancel-sent");
651 Ok(())
652 }
653
654 /// Executes a command (INSERT/UPDATE/DELETE/DDL) and returns affected row count (async).
655 ///
656 /// # Errors
657 ///
658 /// Same failure modes as [`Self::query`] — server-side SQL errors,
659 /// transport failures, and unhealthy-connection state all surface
660 /// as [`Error`].
661 pub async fn exec(&self, sql: &str) -> Result<u64> {
662 let mut conn = self.connection.lock().await;
663 let messages = conn.simple_query(sql).await?;
664 Ok(Self::extract_row_count(&messages))
665 }
666
667 /// Returns a server parameter value by name.
668 pub async fn parameter_status(&self, name: &str) -> Option<String> {
669 let conn = self.connection.lock().await;
670 conn.parameter_status(name)
671 .map(std::string::ToString::to_string)
672 }
673
674 /// Sets the notice receiver callback.
675 pub fn set_notice_receiver(&mut self, receiver: Option<Box<dyn Fn(Notice) + Send + Sync>>) {
676 self.notice_receiver = receiver.map(Arc::from);
677 }
678
679 /// Closes the connection gracefully (async).
680 ///
681 /// # Errors
682 ///
683 /// Returns [`Error`] (I/O) if writing the `Terminate` frame or
684 /// flushing the async transport fails.
685 pub async fn close(self) -> Result<()> {
686 let mut conn = self.connection.lock().await;
687 conn.terminate().await
688 }
689
690 /// Executes a batch of statements separated by semicolons (async).
691 ///
692 /// # Errors
693 ///
694 /// Same failure modes as [`Self::query`].
695 pub async fn batch_execute(&self, sql: &str) -> Result<()> {
696 let mut conn = self.connection.lock().await;
697 let _messages = conn.simple_query(sql).await?;
698 Ok(())
699 }
700
701 /// Starts a COPY IN operation for bulk data insertion (async).
702 ///
703 /// # Errors
704 ///
705 /// Delegates to [`Self::copy_in_with_format`]; see that method
706 /// for concrete failure modes.
707 pub async fn copy_in(
708 &self,
709 table_name: &str,
710 columns: &[&str],
711 ) -> Result<AsyncCopyInWriter<'_>> {
712 self.copy_in_with_format(table_name, columns, "HYPERBINARY")
713 .await
714 }
715
716 /// Starts a COPY IN operation and returns an owned-handle writer
717 /// whose lifetime is independent of this client. The writer holds an
718 /// `Arc`-cloned reference to the underlying connection mutex, so it
719 /// can be stored in structs that need a `'static`-lifetime writer —
720 /// e.g. N-API classes that can't carry borrowed references across
721 /// JS callbacks.
722 ///
723 /// # Errors
724 ///
725 /// Same failure modes as [`Self::copy_in_with_format`].
726 pub async fn copy_in_arc_with_format(
727 &self,
728 table_name: &str,
729 columns: &[&str],
730 format: &str,
731 ) -> Result<AsyncCopyInWriterOwned> {
732 let mut conn = self.connection.lock().await;
733 conn.start_copy_in_with_format(table_name, columns, format)
734 .await?;
735 drop(conn);
736 Ok(AsyncCopyInWriterOwned::new(Arc::clone(&self.connection)))
737 }
738
739 /// Starts a COPY IN operation with a specified data format (async).
740 ///
741 /// # Errors
742 ///
743 /// - Returns [`Error`] (connection) if the connection is unhealthy.
744 /// - Returns [`Error`] (server) if the server rejects the generated
745 /// `COPY ... FROM STDIN` statement.
746 /// - Returns [`Error`] (I/O) on transport read/write failure.
747 pub async fn copy_in_with_format(
748 &self,
749 table_name: &str,
750 columns: &[&str],
751 format: &str,
752 ) -> Result<AsyncCopyInWriter<'_>> {
753 let mut conn = self.connection.lock().await;
754 conn.start_copy_in_with_format(table_name, columns, format)
755 .await?;
756 drop(conn);
757 Ok(AsyncCopyInWriter::new(&self.connection))
758 }
759
760 /// Executes a COPY ... TO STDOUT query and returns all output data (async).
761 ///
762 /// # Errors
763 ///
764 /// - Returns [`Error`] (connection) if the connection is unhealthy.
765 /// - Returns [`Error`] (server) when the server rejects the statement.
766 /// - Returns [`Error`] (I/O) / [`Error`] (closed) on transport
767 /// read/write failure.
768 pub async fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
769 let mut conn = self.connection.lock().await;
770 conn.copy_out(query).await
771 }
772
773 /// Returns true if the connection is alive.
774 #[must_use]
775 pub fn is_alive(&self) -> bool {
776 // Try to acquire the lock - if we can, connection is alive
777 self.connection.try_lock().is_ok()
778 }
779
780 /// Prepares a statement for execution (async).
781 ///
782 /// # Errors
783 ///
784 /// Delegates to [`Self::prepare_typed`]; see that method for the
785 /// failure modes.
786 pub async fn prepare(&self, query: &str) -> Result<AsyncPreparedStatement> {
787 self.prepare_typed(query, &[]).await
788 }
789
790 /// Prepares a statement with explicit parameter types (async).
791 ///
792 /// # Errors
793 ///
794 /// - Returns [`Error`] (connection) if the connection is unhealthy.
795 /// - Returns [`Error`] (server) if the server rejects the `Parse`
796 /// request (SQL syntax, unknown parameter OIDs, etc.).
797 /// - Returns [`Error`] (I/O) on transport read/write failure.
798 pub async fn prepare_typed(
799 &self,
800 query: &str,
801 param_types: &[crate::types::Oid],
802 ) -> Result<AsyncPreparedStatement> {
803 use std::sync::atomic::{AtomicU64, Ordering};
804 static COUNTER: AtomicU64 = AtomicU64::new(0);
805
806 let name = format!(
807 "__hyper_async_stmt_{}",
808 COUNTER.fetch_add(1, Ordering::Relaxed)
809 );
810 let mut conn = self.connection.lock().await;
811 let (params, columns) = conn.prepare(&name, query, param_types).await?;
812
813 Ok(AsyncPreparedStatement {
814 name,
815 query: query.to_string(),
816 param_types: params,
817 columns,
818 connection: Arc::downgrade(&self.connection),
819 closed: false,
820 })
821 }
822
823 /// Closes a prepared statement on the server (async).
824 ///
825 /// Prefer the RAII [`AsyncPreparedStatement::close`] method on the
826 /// statement itself — it consumes the statement and prevents the
827 /// auto-close Drop path from double-closing.
828 ///
829 /// # Errors
830 ///
831 /// Propagates any error from
832 /// [`AsyncRawConnection::close_statement`] — unhealthy connection,
833 /// server-side error during `Close`/`Sync`, or transport failure.
834 pub async fn close_statement(&self, statement: &AsyncPreparedStatement) -> Result<()> {
835 let mut conn = self.connection.lock().await;
836 conn.close_statement(&statement.name).await
837 }
838
839 /// Executes a prepared statement with parameters (async).
840 ///
841 /// # Errors
842 ///
843 /// Propagates any error from
844 /// [`AsyncRawConnection::execute_prepared`] — unhealthy connection,
845 /// parameter/type mismatch, server-side execution failure, or
846 /// transport failure. Row construction may also raise an [`Error`]
847 /// when a `DataRow` cannot be decoded.
848 pub async fn execute_prepared<P: AsRef<[Option<Vec<u8>>]>>(
849 &self,
850 statement: &AsyncPreparedStatement,
851 params: P,
852 ) -> Result<Vec<Row>> {
853 let params_ref: Vec<Option<&[u8]>> = params
854 .as_ref()
855 .iter()
856 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
857 .collect();
858
859 let mut conn = self.connection.lock().await;
860 conn.execute_prepared(&statement.name, ¶ms_ref, statement.columns.len())
861 .await
862 }
863
864 /// Executes a prepared statement that doesn't return rows (async).
865 ///
866 /// # Errors
867 ///
868 /// Same failure modes as [`Self::execute_prepared`] (excluding
869 /// row-construction errors — this path never builds rows).
870 pub async fn execute_prepared_no_result<P: AsRef<[Option<Vec<u8>>]>>(
871 &self,
872 statement: &AsyncPreparedStatement,
873 params: P,
874 ) -> Result<u64> {
875 let params_ref: Vec<Option<&[u8]>> = params
876 .as_ref()
877 .iter()
878 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
879 .collect();
880
881 let mut conn = self.connection.lock().await;
882 conn.execute_prepared_no_result(&statement.name, ¶ms_ref)
883 .await
884 }
885
886 /// Executes a prepared statement with streaming results (async).
887 ///
888 /// Returns an [`AsyncPreparedQueryStream`](super::async_prepared_stream::AsyncPreparedQueryStream)
889 /// that yields rows in chunks, keeping memory bounded regardless of
890 /// result size. Async mirror of
891 /// [`Client::execute_streaming`](crate::client::Client::execute_streaming).
892 ///
893 /// # Errors
894 ///
895 /// - Returns [`Error`] (connection) if the connection is unhealthy.
896 /// - Returns [`Error`] (I/O) if writing the initial Bind/Execute/Sync
897 /// sequence fails on the transport.
898 pub async fn execute_prepared_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
899 &'a self,
900 statement: &AsyncPreparedStatement,
901 params: P,
902 chunk_size: usize,
903 ) -> Result<super::async_prepared_stream::AsyncPreparedQueryStream<'a>> {
904 let params_ref: Vec<Option<&[u8]>> = params
905 .as_ref()
906 .iter()
907 .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
908 .collect();
909
910 let mut conn = self.connection.lock().await;
911 conn.start_execute_prepared(&statement.name, ¶ms_ref, statement.columns.len())
912 .await?;
913
914 let columns = std::sync::Arc::new(statement.columns.clone());
915 Ok(super::async_prepared_stream::AsyncPreparedQueryStream::new(
916 conn, self, chunk_size, columns,
917 ))
918 }
919}
920
921impl Cancellable for AsyncClient {
922 /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
923 /// fresh connection. Uses synchronous I/O so it is callable from
924 /// `Drop` impls. Errors are logged and swallowed because cancellation
925 /// is best-effort and callers cannot meaningfully recover.
926 fn cancel(&self) {
927 if let Err(e) = AsyncClient::cancel_sync(self) {
928 warn!(
929 target: "hyperdb_api_core::client",
930 error = %e,
931 process_id = self.process_id,
932 "cancel request failed (best-effort, swallowed)",
933 );
934 }
935 }
936}
937
938impl AsyncClient {
939 fn process_query_messages(
940 messages: Vec<Message>,
941 notice_receiver: Option<&Arc<NoticeReceiver>>,
942 ) -> Result<Vec<Row>> {
943 use super::statement::{Column, ColumnFormat};
944
945 let mut rows = Vec::new();
946 let mut columns: Option<Arc<Vec<Column>>> = None;
947
948 for msg in messages {
949 match msg {
950 Message::RowDescription(desc) => {
951 let mut cols = Vec::new();
952 for field in desc.fields().filter_map(std::result::Result::ok) {
953 cols.push(Column::new(
954 field.name().to_string(),
955 field.type_oid(),
956 field.type_modifier(),
957 ColumnFormat::from_code(field.format()),
958 ));
959 }
960 columns = Some(Arc::new(cols));
961 }
962 Message::DataRow(data) => {
963 if let Some(ref cols) = columns {
964 rows.push(Row::new(Arc::clone(cols), data)?);
965 }
966 }
967 Message::NoticeResponse(body) => {
968 if let Some(receiver) = notice_receiver {
969 let notice = Notice::from_response_body(&body);
970 receiver(notice);
971 }
972 }
973 _ => {}
974 }
975 }
976 Ok(rows)
977 }
978
979 fn process_binary_messages(
980 messages: Vec<Message>,
981 notice_receiver: Option<&Arc<NoticeReceiver>>,
982 ) -> Vec<StreamRow> {
983 let mut rows = Vec::new();
984
985 for msg in messages {
986 match msg {
987 Message::DataRow(data) => {
988 rows.push(StreamRow::new(data));
989 }
990 Message::NoticeResponse(body) => {
991 if let Some(receiver) = notice_receiver {
992 let notice = Notice::from_response_body(&body);
993 receiver(notice);
994 }
995 }
996 _ => {}
997 }
998 }
999 rows
1000 }
1001
1002 fn extract_row_count(messages: &[Message]) -> u64 {
1003 for msg in messages {
1004 if let Message::CommandComplete(body) = msg {
1005 if let Ok(tag) = body.tag() {
1006 // Parse formats like "INSERT 0 5", "UPDATE 10", "DELETE 3"
1007 let parts: Vec<&str> = tag.split_whitespace().collect();
1008 if let Some(last) = parts.last() {
1009 if let Ok(count) = last.parse() {
1010 return count;
1011 }
1012 }
1013 }
1014 }
1015 }
1016 0
1017 }
1018}
1019
1020/// An async prepared statement.
1021///
1022/// Represents a server-side prepared statement that can be executed
1023/// multiple times with different parameters. **Auto-closes on `Drop`**
1024/// via a best-effort `tokio::spawn` task — if no tokio runtime is
1025/// available at drop time we log a warning and flag the connection
1026/// desynchronized rather than silently leaking the server-side
1027/// statement slot.
1028///
1029/// For callers who need confirmed close with error propagation, use
1030/// [`AsyncPreparedStatement::close`] (explicit async close).
1031#[derive(Debug)]
1032pub struct AsyncPreparedStatement {
1033 /// Statement name on the server.
1034 pub(crate) name: String,
1035 /// Original SQL query string.
1036 query: String,
1037 /// Parameter type OIDs.
1038 param_types: Vec<crate::types::Oid>,
1039 /// Result column descriptions.
1040 pub(crate) columns: Vec<super::statement::Column>,
1041 /// Weak handle to the owning connection for the Drop path. `Weak`
1042 /// so that a lingering statement never keeps the connection alive
1043 /// past the `AsyncClient` it was prepared against.
1044 connection: std::sync::Weak<Mutex<AsyncRawConnection<AsyncStream>>>,
1045 /// Flipped by `close(self)` to suppress the Drop-path auto-close.
1046 closed: bool,
1047}
1048
1049impl AsyncPreparedStatement {
1050 /// Returns the statement name.
1051 #[must_use]
1052 pub fn name(&self) -> &str {
1053 &self.name
1054 }
1055
1056 /// Returns the original query.
1057 #[must_use]
1058 pub fn query(&self) -> &str {
1059 &self.query
1060 }
1061
1062 /// Returns the parameter types.
1063 #[must_use]
1064 pub fn param_types(&self) -> &[crate::types::Oid] {
1065 &self.param_types
1066 }
1067
1068 /// Returns the number of parameters.
1069 #[must_use]
1070 pub fn param_count(&self) -> usize {
1071 self.param_types.len()
1072 }
1073
1074 /// Returns the result column descriptions.
1075 #[must_use]
1076 pub fn columns(&self) -> &[super::statement::Column] {
1077 &self.columns
1078 }
1079
1080 /// Returns the number of result columns.
1081 #[must_use]
1082 pub fn column_count(&self) -> usize {
1083 self.columns.len()
1084 }
1085
1086 /// Explicitly closes the prepared statement on the server (async).
1087 ///
1088 /// Consumes the statement — no further `execute_prepared` /
1089 /// `execute_prepared_streaming` calls are possible — and suppresses
1090 /// the Drop-path auto-close. Returns the `close_statement` result so
1091 /// callers can observe any transport errors.
1092 ///
1093 /// If you don't need error propagation, simply dropping the
1094 /// statement has the same effect (best-effort auto-close via
1095 /// `tokio::spawn`).
1096 ///
1097 /// # Errors
1098 ///
1099 /// Propagates any error from
1100 /// [`AsyncClient::close_statement`] — unhealthy connection,
1101 /// server-side error during `Close`/`Sync`, or transport failure.
1102 pub async fn close(mut self, client: &AsyncClient) -> Result<()> {
1103 self.closed = true;
1104 client.close_statement(&self).await
1105 }
1106}
1107
1108impl Drop for AsyncPreparedStatement {
1109 fn drop(&mut self) {
1110 // Explicit close already ran — nothing to do.
1111 if self.closed {
1112 return;
1113 }
1114
1115 // Best-effort close. Try to grab a tokio handle; if we're being
1116 // dropped outside a runtime (e.g. the runtime has already shut
1117 // down or the caller never had one), fall back to a warning and
1118 // flag the connection desynchronized so the next operation fails
1119 // loudly rather than racing with a lingering statement.
1120 let Some(conn) = self.connection.upgrade() else {
1121 // Connection has already been dropped — nothing to close.
1122 return;
1123 };
1124
1125 let name = std::mem::take(&mut self.name);
1126
1127 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1128 handle.spawn(async move {
1129 let mut c = conn.lock().await;
1130 if let Err(e) = c.close_statement(&name).await {
1131 warn!(
1132 target: "hyperdb_api_core::client",
1133 statement = %name,
1134 error = %e,
1135 "AsyncPreparedStatement drop-close failed (best-effort, swallowed)"
1136 );
1137 }
1138 });
1139 } else {
1140 // No runtime — can't do async I/O from here. Mark the
1141 // connection desynchronized so the next caller gets a
1142 // clear error instead of a latent leaked statement.
1143 if let Ok(mut c) = conn.try_lock() {
1144 c.mark_desynchronized();
1145 }
1146 warn!(
1147 target: "hyperdb_api_core::client",
1148 statement = %name,
1149 "AsyncPreparedStatement dropped outside of a tokio runtime; \
1150 server-side statement slot leaked and connection marked \
1151 desynchronized — call statement.close(&client) explicitly \
1152 for deterministic cleanup"
1153 );
1154 }
1155 }
1156}
1157
1158/// Async COPY IN writer for bulk data insertion.
1159///
1160/// # Drop Safety
1161///
1162/// If this writer is dropped without calling [`finish()`](Self::finish) or
1163/// [`cancel()`](Self::cancel), it will attempt a best-effort synchronous cancel
1164/// by queuing a `CopyFail` message in the connection's write buffer. The next
1165/// async operation on the connection will flush and drain the cancel response,
1166/// restoring the connection to a usable state.
1167#[derive(Debug)]
1168pub struct AsyncCopyInWriter<'a> {
1169 connection: &'a Mutex<AsyncRawConnection<AsyncStream>>,
1170 /// Set to `true` after `finish()` or `cancel()` consumes the writer.
1171 /// Checked in `Drop` to avoid queuing a spurious `CopyFail`.
1172 finished: bool,
1173}
1174
1175/// Owned-handle variant of [`AsyncCopyInWriter`] that holds an
1176/// `Arc<Mutex<_>>` to the underlying connection instead of a borrow.
1177/// Used by callers that need a `'static`-lifetime writer — e.g. N-API
1178/// classes that can't carry borrowed references across JS callbacks.
1179///
1180/// Semantics are identical to [`AsyncCopyInWriter`]; the only
1181/// difference is lifetime.
1182#[derive(Debug)]
1183pub struct AsyncCopyInWriterOwned {
1184 connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
1185 finished: bool,
1186}
1187
1188impl AsyncCopyInWriterOwned {
1189 /// Creates a new owned-handle COPY IN writer.
1190 pub(crate) fn new(connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>) -> Self {
1191 AsyncCopyInWriterOwned {
1192 connection,
1193 finished: false,
1194 }
1195 }
1196
1197 /// Sends data to the server.
1198 ///
1199 /// # Errors
1200 ///
1201 /// Currently infallible — frame construction is pure. The `Result`
1202 /// return type is preserved for forward compatibility.
1203 pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1204 let mut conn = self.connection.lock().await;
1205 conn.send_copy_data(data)?;
1206 Ok(())
1207 }
1208
1209 /// Flushes any buffered data to the server.
1210 ///
1211 /// # Errors
1212 ///
1213 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1214 pub async fn flush(&mut self) -> Result<()> {
1215 let mut conn = self.connection.lock().await;
1216 conn.flush().await
1217 }
1218
1219 /// Sends COPY data directly to the stream without internal buffering.
1220 ///
1221 /// # Errors
1222 ///
1223 /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1224 /// `u32::MAX`.
1225 /// - Returns [`Error`] (I/O) on transport write failure.
1226 pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1227 let mut conn = self.connection.lock().await;
1228 conn.send_copy_data_direct(data).await
1229 }
1230
1231 /// Flushes the TCP stream.
1232 ///
1233 /// # Errors
1234 ///
1235 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1236 pub async fn flush_stream(&mut self) -> Result<()> {
1237 let mut conn = self.connection.lock().await;
1238 conn.flush_stream().await
1239 }
1240
1241 /// Finishes the COPY operation and returns the number of rows inserted.
1242 ///
1243 /// # Errors
1244 ///
1245 /// Returns [`Error`] (server) if the server reports an `ErrorResponse`
1246 /// (e.g. constraint violation), or [`Error`] (I/O) / [`Error`] (closed)
1247 /// on transport failure.
1248 pub async fn finish(mut self) -> Result<u64> {
1249 self.finished = true;
1250 let mut conn = self.connection.lock().await;
1251 conn.finish_copy().await
1252 }
1253
1254 /// Cancels the COPY operation.
1255 ///
1256 /// # Errors
1257 ///
1258 /// Returns [`Error`] (I/O) on transport write failure, or
1259 /// [`Error`] (closed) if the server drops the connection before
1260 /// acknowledging the cancel.
1261 pub async fn cancel(mut self, reason: &str) -> Result<()> {
1262 self.finished = true;
1263 let mut conn = self.connection.lock().await;
1264 conn.cancel_copy(reason).await
1265 }
1266}
1267
1268impl Drop for AsyncCopyInWriterOwned {
1269 fn drop(&mut self) {
1270 if self.finished {
1271 return;
1272 }
1273 // Mirror AsyncCopyInWriter's Drop — queue a CopyFail via
1274 // try_lock; the next async operation on the connection will
1275 // flush and drain.
1276 if let Ok(mut conn) = self.connection.try_lock() {
1277 conn.queue_copy_fail("AsyncCopyInWriterOwned dropped without finish/cancel");
1278 }
1279 }
1280}
1281
1282impl<'a> AsyncCopyInWriter<'a> {
1283 /// Creates a new COPY IN writer.
1284 pub(crate) fn new(connection: &'a Mutex<AsyncRawConnection<AsyncStream>>) -> Self {
1285 AsyncCopyInWriter {
1286 connection,
1287 finished: false,
1288 }
1289 }
1290
1291 /// Sends data to the server.
1292 ///
1293 /// # Errors
1294 ///
1295 /// Currently infallible — frame construction is pure. The `Result`
1296 /// return type is preserved for forward compatibility.
1297 pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1298 let mut conn = self.connection.lock().await;
1299 conn.send_copy_data(data)?;
1300 Ok(())
1301 }
1302
1303 /// Flushes any buffered data to the server.
1304 ///
1305 /// # Errors
1306 ///
1307 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1308 pub async fn flush(&mut self) -> Result<()> {
1309 let mut conn = self.connection.lock().await;
1310 conn.flush().await
1311 }
1312
1313 /// Sends COPY data directly to the stream without internal buffering.
1314 ///
1315 /// This writes data directly to the TCP stream, letting the kernel handle
1316 /// buffering. More efficient for streaming large amounts of data.
1317 /// Call `flush_stream()` periodically to ensure data is sent.
1318 ///
1319 /// # Errors
1320 ///
1321 /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1322 /// `u32::MAX`.
1323 /// - Returns [`Error`] (I/O) on transport write failure.
1324 pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1325 let mut conn = self.connection.lock().await;
1326 conn.send_copy_data_direct(data).await
1327 }
1328
1329 /// Flushes the TCP stream.
1330 ///
1331 /// Use with `send_direct()` to periodically ensure data reaches the server.
1332 ///
1333 /// # Errors
1334 ///
1335 /// Returns [`Error`] (I/O) if flushing the async transport fails.
1336 pub async fn flush_stream(&mut self) -> Result<()> {
1337 let mut conn = self.connection.lock().await;
1338 conn.flush_stream().await
1339 }
1340
1341 /// Finishes the COPY operation and returns the number of rows inserted.
1342 ///
1343 /// # Errors
1344 ///
1345 /// Returns [`Error`] (server) if the server reports an `ErrorResponse`,
1346 /// or [`Error`] (I/O) / [`Error`] (closed) on transport failure.
1347 pub async fn finish(mut self) -> Result<u64> {
1348 self.finished = true;
1349 let mut conn = self.connection.lock().await;
1350 conn.finish_copy().await
1351 }
1352
1353 /// Cancels the COPY operation.
1354 ///
1355 /// # Errors
1356 ///
1357 /// Returns [`Error`] (I/O) on transport write failure, or
1358 /// [`Error`] (closed) if the server drops the connection before
1359 /// acknowledging the cancel.
1360 pub async fn cancel(mut self, reason: &str) -> Result<()> {
1361 self.finished = true;
1362 let mut conn = self.connection.lock().await;
1363 conn.cancel_copy(reason).await
1364 }
1365}
1366
1367impl Drop for AsyncCopyInWriter<'_> {
1368 fn drop(&mut self) {
1369 if self.finished {
1370 return;
1371 }
1372 // Best-effort cancel: try to acquire the mutex synchronously and
1373 // queue a CopyFail message. We cannot do async I/O from Drop, so
1374 // the message is only written to the buffer — not flushed. The next
1375 // async operation will call drain_pending_copy_cancel() to flush it
1376 // and restore the connection to ReadyForQuery state.
1377 if let Ok(mut conn) = self.connection.try_lock() {
1378 conn.queue_copy_fail("COPY writer dropped without finish or cancel");
1379 warn!(
1380 target: "hyperdb_api_core::client",
1381 "AsyncCopyInWriter dropped without finish() or cancel(). \
1382 Queued best-effort CopyFail — connection will self-heal on next operation."
1383 );
1384 } else {
1385 warn!(
1386 target: "hyperdb_api_core::client",
1387 "AsyncCopyInWriter dropped without finish() or cancel(), \
1388 and the connection mutex was locked. The connection may be \
1389 left in an unusable COPY-IN state."
1390 );
1391 }
1392 }
1393}