Skip to main content

hyperdb_api_core/client/
async_client.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level asynchronous client for Hyper database.
5//!
6//! This module provides [`AsyncClient`], the async version of [`Client`](crate::client::Client).
7//! It uses tokio for async I/O operations.
8
9use std::sync::Arc;
10
11use tokio::net::TcpStream;
12use tokio::sync::Mutex;
13use tracing::{debug, info, warn};
14
15#[cfg(unix)]
16use tokio::net::UnixStream;
17
18use super::async_connection::AsyncRawConnection;
19use super::async_stream::AsyncStream;
20use super::async_stream_query::AsyncQueryStream;
21use super::cancel::Cancellable;
22use super::config::Config;
23use super::endpoint::ConnectionEndpoint;
24use super::error::{Error, Result};
25use super::notice::{Notice, NoticeReceiver};
26use super::row::{Row, StreamRow};
27
28use crate::protocol::message::Message;
29
30/// An asynchronous client for Hyper database.
31///
32/// This is the async equivalent of [`Client`](crate::client::Client), designed for use
33/// in tokio-based async applications. All I/O operations are non-blocking.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api_core::client::{AsyncClient, Config};
39///
40/// #[tokio::main]
41/// async fn main() -> hyperdb_api_core::client::Result<()> {
42///     let config = Config::new()
43///         .with_host("localhost")
44///         .with_port(7483)
45///         .with_database("test.hyper");
46///
47///     let client = AsyncClient::connect(&config).await?;
48///     let rows = client.query("SELECT 1").await?;
49///     client.close().await?;
50///     Ok(())
51/// }
52/// ```
53pub struct AsyncClient {
54    /// The underlying async connection, protected by a mutex for concurrent access.
55    connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
56    /// Backend process ID (for cancel requests).
57    process_id: i32,
58    /// Secret key for authenticating cancel requests.
59    secret_key: i32,
60    /// Connection endpoint for cancel requests and reconnection.
61    endpoint: ConnectionEndpoint,
62    /// Optional notice receiver callback for server notices/warnings.
63    notice_receiver: Option<Arc<NoticeReceiver>>,
64}
65
66impl std::fmt::Debug for AsyncClient {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("AsyncClient")
69            .field("process_id", &self.process_id)
70            .field("secret_key", &self.secret_key)
71            .field("endpoint", &self.endpoint)
72            .field(
73                "notice_receiver",
74                &self.notice_receiver.as_ref().map(|_| "<callback>"),
75            )
76            .finish_non_exhaustive()
77    }
78}
79
80impl AsyncClient {
81    /// Connects to a Hyper server using the given configuration (async).
82    ///
83    /// # Example
84    ///
85    /// ```no_run
86    /// # use hyperdb_api_core::client::{AsyncClient, Config};
87    /// # async fn example() -> hyperdb_api_core::client::Result<()> {
88    /// let config = Config::new()
89    ///     .with_host("localhost")
90    ///     .with_port(7483)
91    ///     .with_database("test.hyper");
92    ///
93    /// let client = AsyncClient::connect(&config).await?;
94    /// # Ok(())
95    /// # }
96    /// ```
97    ///
98    /// # Errors
99    ///
100    /// - Returns [`Error`] (connection) if the TCP connection cannot be
101    ///   established to `config.host():config.port()`.
102    /// - Propagates any [`Error`] from the startup handshake —
103    ///   [`Error`] (auth) for missing/wrong credentials,
104    ///   [`Error`] (server) for server-side startup errors, [`Error`] (protocol)
105    ///   for out-of-sequence messages, or [`Error`] (I/O) for wire
106    ///   failure.
107    pub async fn connect(config: &Config) -> Result<Self> {
108        info!(
109            target: "hyperdb_api",
110            host = %config.host(),
111            port = config.port(),
112            user = config.user().unwrap_or("(default)"),
113            database = config.database().unwrap_or("(none)"),
114            "connection-parameters"
115        );
116
117        let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
118        let addr = format!("{}:{}", config.host(), config.port());
119        let tcp_stream = TcpStream::connect(&addr).await.map_err(|e| {
120            warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
121            Error::connection(format!("failed to connect to {addr}: {e}"))
122        })?;
123
124        // Set TCP options. See the sync mirror in
125        // [`super::client::Client::connect`] for the full rationale and
126        // empirical knee analysis. We bump `SO_RCVBUF` / `SO_SNDBUF` to
127        // 4 MiB (Windows default ~64 KiB throttles loopback throughput;
128        // Linux auto-tunes higher).
129        tcp_stream.set_nodelay(true).ok();
130        let sock = socket2::SockRef::from(&tcp_stream);
131        sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
132        sock.set_send_buffer_size(4 * 1024 * 1024).ok();
133
134        let stream = AsyncStream::tcp(tcp_stream);
135        let mut connection = AsyncRawConnection::new(stream);
136
137        // Perform startup with authentication
138        let params = config.startup_params();
139        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
140        connection.startup(&params_ref, config.password()).await?;
141
142        let process_id = connection.process_id();
143        let secret_key = connection.secret_key();
144
145        debug!(
146            target: "hyperdb_api",
147            process_id,
148            "connection-established"
149        );
150
151        Ok(AsyncClient {
152            connection: Arc::new(Mutex::new(connection)),
153            process_id,
154            secret_key,
155            endpoint,
156            notice_receiver: None,
157        })
158    }
159
160    /// Connects to a Hyper server via Unix Domain Socket (async, Unix only).
161    ///
162    /// # Example
163    ///
164    /// ```no_run
165    /// # use hyperdb_api_core::client::{AsyncClient, Config};
166    /// # use std::path::Path;
167    /// # async fn example() -> hyperdb_api_core::client::Result<()> {
168    /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
169    /// let config = Config::new().with_database("test.hyper");
170    /// let client = AsyncClient::connect_unix(socket_path, &config).await?;
171    /// # Ok(())
172    /// # }
173    /// ```
174    ///
175    /// # Errors
176    ///
177    /// - Returns [`Error`] (connection) if the Unix domain socket cannot
178    ///   be connected.
179    /// - Propagates any error from the startup handshake (see
180    ///   [`Self::connect`]).
181    #[cfg(unix)]
182    pub async fn connect_unix(
183        socket_path: impl AsRef<std::path::Path>,
184        config: &Config,
185    ) -> Result<Self> {
186        use std::path::Path;
187
188        let path = socket_path.as_ref();
189        info!(
190            target: "hyperdb_api",
191            socket_path = %path.display(),
192            user = config.user().unwrap_or("(default)"),
193            database = config.database().unwrap_or("(none)"),
194            "connection-parameters-unix"
195        );
196
197        let unix_stream = UnixStream::connect(path).await.map_err(|e| {
198            warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
199            Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
200        })?;
201
202        // Parse endpoint from socket path
203        let directory = path.parent().unwrap_or(Path::new("/"));
204        let name = path
205            .file_name()
206            .and_then(|n| n.to_str())
207            .unwrap_or("socket");
208        let endpoint = ConnectionEndpoint::domain_socket(directory, name);
209
210        let stream = AsyncStream::unix(unix_stream);
211        let mut connection = AsyncRawConnection::new(stream);
212
213        // Perform startup with authentication
214        let params = config.startup_params();
215        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
216        connection.startup(&params_ref, config.password()).await?;
217
218        let process_id = connection.process_id();
219        let secret_key = connection.secret_key();
220
221        debug!(
222            target: "hyperdb_api",
223            process_id,
224            "connection-established-unix"
225        );
226
227        Ok(AsyncClient {
228            connection: Arc::new(Mutex::new(connection)),
229            process_id,
230            secret_key,
231            endpoint,
232            notice_receiver: None,
233        })
234    }
235
236    /// Connects to a Hyper server via Windows Named Pipe (async, Windows only).
237    ///
238    /// # Arguments
239    ///
240    /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
241    /// * `config` - Connection configuration
242    ///
243    /// # Errors
244    ///
245    /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
246    /// exist, all instances are busy after the retry window, or permission is
247    /// denied) or if the authentication handshake fails.
248    #[cfg(windows)]
249    pub async fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
250        use std::time::{Duration, Instant};
251        use tokio::net::windows::named_pipe::ClientOptions;
252
253        info!(
254            target: "hyperdb_api",
255            pipe_path = %pipe_path,
256            user = config.user().unwrap_or("(default)"),
257            database = config.database().unwrap_or("(none)"),
258            "connection-parameters-named-pipe"
259        );
260
261        // Retry on `ERROR_PIPE_BUSY` (231) — Windows named pipes have a finite
262        // number of server-side instances and concurrent clients can hit the
263        // cap. See the sync mirror in [`super::client::Client::connect_named_pipe`]
264        // for the full rationale.
265        const RETRY_INTERVAL: Duration = Duration::from_millis(20);
266        const MAX_WAIT: Duration = Duration::from_secs(10);
267        const ERROR_PIPE_BUSY: i32 = 231;
268
269        let deadline = Instant::now() + MAX_WAIT;
270        let client = loop {
271            match ClientOptions::new().open(pipe_path) {
272                Ok(c) => break c,
273                Err(e)
274                    if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
275                {
276                    tokio::time::sleep(RETRY_INTERVAL).await;
277                }
278                Err(e) => {
279                    warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
280                    return Err(Error::connection(format!(
281                        "failed to connect to named pipe {pipe_path}: {e}"
282                    )));
283                }
284            }
285        };
286
287        // Parse endpoint from pipe path
288        let endpoint = ConnectionEndpoint::parse(&format!(
289            "tab.pipe://{}",
290            pipe_path.trim_start_matches(r"\\").replace('\\', "/")
291        ))
292        .unwrap_or_else(|_| {
293            let parts: Vec<&str> = pipe_path
294                .trim_start_matches(r"\\")
295                .splitn(3, '\\')
296                .collect();
297            if parts.len() >= 3 {
298                ConnectionEndpoint::named_pipe(parts[0], parts[2])
299            } else {
300                ConnectionEndpoint::named_pipe(".", pipe_path)
301            }
302        });
303
304        let stream = AsyncStream::named_pipe(client);
305        let mut connection = AsyncRawConnection::new(stream);
306
307        // Perform startup with authentication
308        let params = config.startup_params();
309        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
310        connection.startup(&params_ref, config.password()).await?;
311
312        let process_id = connection.process_id();
313        let secret_key = connection.secret_key();
314
315        debug!(
316            target: "hyperdb_api",
317            process_id,
318            "connection-established-named-pipe"
319        );
320
321        Ok(AsyncClient {
322            connection: Arc::new(Mutex::new(connection)),
323            process_id,
324            secret_key,
325            endpoint,
326            notice_receiver: None,
327        })
328    }
329
330    /// Connects to a Hyper server using a `ConnectionEndpoint` (async).
331    ///
332    /// This is a lower-level method that accepts a pre-parsed endpoint.
333    ///
334    /// # Errors
335    ///
336    /// Delegates to [`Self::connect`], [`Self::connect_unix`], or
337    /// `Self::connect_named_pipe` depending on the endpoint variant,
338    /// and propagates their errors unchanged.
339    pub async fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
340        match endpoint {
341            ConnectionEndpoint::Tcp { host, port } => {
342                let mut cfg = config.clone();
343                cfg = cfg.with_host(host.clone()).with_port(*port);
344                Self::connect(&cfg).await
345            }
346            #[cfg(unix)]
347            ConnectionEndpoint::DomainSocket { directory, name } => {
348                let socket_path = directory.join(name);
349                Self::connect_unix(&socket_path, config).await
350            }
351            #[cfg(windows)]
352            ConnectionEndpoint::NamedPipe { host, name } => {
353                let pipe_path = format!(r"\\{host}\pipe\{name}");
354                Self::connect_named_pipe(&pipe_path, config).await
355            }
356        }
357    }
358
359    /// Returns the connection endpoint.
360    #[must_use]
361    pub fn endpoint(&self) -> &ConnectionEndpoint {
362        &self.endpoint
363    }
364
365    /// Returns the server process ID for this connection.
366    #[must_use]
367    pub fn process_id(&self) -> i32 {
368        self.process_id
369    }
370
371    /// Returns the secret key for cancel requests.
372    #[must_use]
373    pub fn secret_key(&self) -> i32 {
374        self.secret_key
375    }
376
377    /// Cancels the currently executing query on this connection (async).
378    ///
379    /// This method opens a separate connection to send a cancel request.
380    /// For TCP endpoints, it opens a new TCP connection.
381    /// For Unix domain sockets, it connects to the same socket path.
382    ///
383    /// # Errors
384    ///
385    /// - Returns [`Error`] (connection) if a fresh cancel-side socket
386    ///   (TCP / UDS / named-pipe) cannot be opened to
387    ///   [`Self::endpoint`].
388    /// - Returns [`Error`] (I/O) if writing the cancel request fails.
389    pub async fn cancel(&self) -> Result<()> {
390        use crate::protocol::message::frontend;
391        use bytes::BytesMut;
392        use tokio::io::AsyncWriteExt;
393
394        info!(
395            target: "hyperdb_api",
396            process_id = self.process_id,
397            "query-cancel-request"
398        );
399
400        let endpoint_str = self.endpoint.to_string();
401
402        match &self.endpoint {
403            ConnectionEndpoint::Tcp { host, port } => {
404                let addr = format!("{host}:{port}");
405                let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
406                    warn!(
407                        target: "hyperdb_api",
408                        addr = %endpoint_str,
409                        error = %e,
410                        "query-cancel-connect-failed"
411                    );
412                    Error::connection(format!(
413                        "failed to connect for cancel request to {endpoint_str}: {e}"
414                    ))
415                })?;
416                // Cancel is a 16-byte fire-and-forget — disable Nagle so the
417                // request hits the wire without waiting on a coalesce timer.
418                stream.set_nodelay(true).ok();
419
420                let mut buf = BytesMut::new();
421                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
422
423                stream.write_all(&buf).await.map_err(|e| {
424                    warn!(
425                        target: "hyperdb_api",
426                        error = %e,
427                        "query-cancel-send-failed"
428                    );
429                    Error::io(e)
430                })?;
431            }
432            #[cfg(unix)]
433            ConnectionEndpoint::DomainSocket { directory, name } => {
434                let socket_path = directory.join(name);
435                let mut stream = UnixStream::connect(&socket_path).await.map_err(|e| {
436                    warn!(
437                        target: "hyperdb_api",
438                        addr = %endpoint_str,
439                        error = %e,
440                        "query-cancel-connect-failed"
441                    );
442                    Error::connection(format!(
443                        "failed to connect for cancel request to {endpoint_str}: {e}"
444                    ))
445                })?;
446
447                let mut buf = BytesMut::new();
448                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
449
450                stream.write_all(&buf).await.map_err(|e| {
451                    warn!(
452                        target: "hyperdb_api",
453                        error = %e,
454                        "query-cancel-send-failed"
455                    );
456                    Error::io(e)
457                })?;
458            }
459            #[cfg(windows)]
460            ConnectionEndpoint::NamedPipe { host, name } => {
461                let pipe_path = format!(r"\\{host}\pipe\{name}");
462                // Use sync file I/O for cancel (short-lived connection)
463                let mut file = std::fs::OpenOptions::new()
464                    .read(true)
465                    .write(true)
466                    .open(&pipe_path)
467                    .map_err(|e| {
468                        warn!(
469                            target: "hyperdb_api",
470                            addr = %endpoint_str,
471                            error = %e,
472                            "query-cancel-connect-failed"
473                        );
474                        Error::connection(format!(
475                            "failed to connect for cancel request to {endpoint_str}: {e}"
476                        ))
477                    })?;
478
479                let mut buf = BytesMut::new();
480                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
481
482                use std::io::Write;
483                file.write_all(&buf).map_err(|e| {
484                    warn!(
485                        target: "hyperdb_api",
486                        error = %e,
487                        "query-cancel-send-failed"
488                    );
489                    Error::io(e)
490                })?;
491
492                file.flush().map_err(Error::io)?;
493            }
494        }
495
496        debug!(target: "hyperdb_api", "query-cancel-sent");
497        Ok(())
498    }
499
500    /// Executes a query and returns all result rows (async).
501    ///
502    /// # Errors
503    ///
504    /// Propagates any [`Error`] from the underlying connection's
505    /// [`AsyncRawConnection::simple_query`] — [`Error`] (server) for
506    /// server-side SQL errors, [`Error`] (I/O) / [`Error`] (closed) for
507    /// transport failures, and [`Error`] (connection) if the connection
508    /// is unhealthy. Row construction may also raise an [`Error`] when
509    /// a `DataRow` cannot be decoded against its `RowDescription`.
510    pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
511        let mut conn = self.connection.lock().await;
512        let messages = conn.simple_query(sql).await?;
513        Self::process_query_messages(messages, self.notice_receiver.as_ref())
514    }
515
516    /// Executes a query with `HyperBinary` format for better performance (async).
517    ///
518    /// # Errors
519    ///
520    /// Same failure modes as [`Self::query`].
521    pub async fn query_fast(&self, sql: &str) -> Result<Vec<StreamRow>> {
522        let mut conn = self.connection.lock().await;
523        let messages = conn.query_binary(sql).await?;
524        Ok(Self::process_binary_messages(
525            messages,
526            self.notice_receiver.as_ref(),
527        ))
528    }
529
530    /// Executes a query with `HyperBinary` format and returns a streaming
531    /// result reader (async).
532    ///
533    /// This is the async mirror of
534    /// [`Client::query_streaming`](super::client::Client::query_streaming).
535    /// The returned [`AsyncQueryStream`] yields rows in chunks so callers
536    /// can process arbitrarily large result sets with constant memory. The
537    /// connection mutex is held for the duration of iteration; dropping the
538    /// stream before completion issues a best-effort cancel and marks the
539    /// connection desynchronized.
540    ///
541    /// # Errors
542    ///
543    /// - Returns [`Error`] (connection) if the connection is unhealthy.
544    /// - Returns [`Error`] (I/O) if writing the Parse/Bind/Execute/Sync
545    ///   sequence fails on the transport.
546    pub async fn query_streaming(
547        &self,
548        sql: &str,
549        chunk_size: usize,
550    ) -> Result<AsyncQueryStream<'_>> {
551        let mut conn = self.connection.lock().await;
552        conn.start_query_binary(sql).await?;
553        Ok(AsyncQueryStream::new(conn, self, chunk_size))
554    }
555
556    /// Sends a best-effort `CancelRequest` using *synchronous* I/O so it
557    /// is usable from [`Drop`] impls (notably
558    /// [`AsyncQueryStream::drop`](super::async_stream_query::AsyncQueryStream)).
559    ///
560    /// Cancellation opens a short-lived TCP / UDS / Named-Pipe connection,
561    /// writes the cancel packet, and drops it — the server recognizes the
562    /// (`process_id`, `secret_key`) tuple and signals the long-running query
563    /// to abort. No response is expected.
564    fn cancel_sync(&self) -> Result<()> {
565        use crate::protocol::message::frontend;
566        use bytes::BytesMut;
567        use std::io::Write;
568
569        info!(
570            target: "hyperdb_api",
571            process_id = self.process_id,
572            "query-cancel-request"
573        );
574
575        let endpoint_str = self.endpoint.to_string();
576
577        match &self.endpoint {
578            ConnectionEndpoint::Tcp { host, port } => {
579                let addr = format!("{host}:{port}");
580                let mut stream = std::net::TcpStream::connect(&addr).map_err(|e| {
581                    warn!(
582                        target: "hyperdb_api",
583                        addr = %endpoint_str,
584                        error = %e,
585                        "query-cancel-connect-failed"
586                    );
587                    Error::connection(format!(
588                        "failed to connect for cancel request to {endpoint_str}: {e}"
589                    ))
590                })?;
591                // Cancel is a 16-byte fire-and-forget — disable Nagle so the
592                // request hits the wire without waiting on a coalesce timer.
593                stream.set_nodelay(true).ok();
594
595                let mut buf = BytesMut::with_capacity(16);
596                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
597
598                stream.write_all(&buf).map_err(Error::io)?;
599                stream.flush().map_err(Error::io)?;
600            }
601            #[cfg(unix)]
602            ConnectionEndpoint::DomainSocket { directory, name } => {
603                let socket_path = directory.join(name);
604                let mut stream =
605                    std::os::unix::net::UnixStream::connect(&socket_path).map_err(|e| {
606                        warn!(
607                            target: "hyperdb_api",
608                            addr = %endpoint_str,
609                            error = %e,
610                            "query-cancel-connect-failed"
611                        );
612                        Error::connection(format!(
613                            "failed to connect for cancel request to {endpoint_str}: {e}"
614                        ))
615                    })?;
616
617                let mut buf = BytesMut::with_capacity(16);
618                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
619
620                stream.write_all(&buf).map_err(Error::io)?;
621                stream.flush().map_err(Error::io)?;
622            }
623            #[cfg(windows)]
624            ConnectionEndpoint::NamedPipe { host, name } => {
625                let pipe_path = format!(r"\\{host}\pipe\{name}");
626                let mut file = std::fs::OpenOptions::new()
627                    .read(true)
628                    .write(true)
629                    .open(&pipe_path)
630                    .map_err(|e| {
631                        warn!(
632                            target: "hyperdb_api",
633                            addr = %endpoint_str,
634                            error = %e,
635                            "query-cancel-connect-failed"
636                        );
637                        Error::connection(format!(
638                            "failed to connect for cancel request to {endpoint_str}: {e}"
639                        ))
640                    })?;
641
642                let mut buf = BytesMut::with_capacity(16);
643                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
644
645                file.write_all(&buf).map_err(Error::io)?;
646                file.flush().map_err(Error::io)?;
647            }
648        }
649
650        debug!(target: "hyperdb_api", "query-cancel-sent");
651        Ok(())
652    }
653
654    /// Executes a command (INSERT/UPDATE/DELETE/DDL) and returns affected row count (async).
655    ///
656    /// # Errors
657    ///
658    /// Same failure modes as [`Self::query`] — server-side SQL errors,
659    /// transport failures, and unhealthy-connection state all surface
660    /// as [`Error`].
661    pub async fn exec(&self, sql: &str) -> Result<u64> {
662        let mut conn = self.connection.lock().await;
663        let messages = conn.simple_query(sql).await?;
664        Ok(Self::extract_row_count(&messages))
665    }
666
667    /// Returns a server parameter value by name.
668    pub async fn parameter_status(&self, name: &str) -> Option<String> {
669        let conn = self.connection.lock().await;
670        conn.parameter_status(name)
671            .map(std::string::ToString::to_string)
672    }
673
674    /// Sets the notice receiver callback.
675    pub fn set_notice_receiver(&mut self, receiver: Option<Box<dyn Fn(Notice) + Send + Sync>>) {
676        self.notice_receiver = receiver.map(Arc::from);
677    }
678
679    /// Closes the connection gracefully (async).
680    ///
681    /// # Errors
682    ///
683    /// Returns [`Error`] (I/O) if writing the `Terminate` frame or
684    /// flushing the async transport fails.
685    pub async fn close(self) -> Result<()> {
686        let mut conn = self.connection.lock().await;
687        conn.terminate().await
688    }
689
690    /// Executes a batch of statements separated by semicolons (async).
691    ///
692    /// # Errors
693    ///
694    /// Same failure modes as [`Self::query`].
695    pub async fn batch_execute(&self, sql: &str) -> Result<()> {
696        let mut conn = self.connection.lock().await;
697        let _messages = conn.simple_query(sql).await?;
698        Ok(())
699    }
700
701    /// Starts a COPY IN operation for bulk data insertion (async).
702    ///
703    /// # Errors
704    ///
705    /// Delegates to [`Self::copy_in_with_format`]; see that method
706    /// for concrete failure modes.
707    pub async fn copy_in(
708        &self,
709        table_name: &str,
710        columns: &[&str],
711    ) -> Result<AsyncCopyInWriter<'_>> {
712        self.copy_in_with_format(table_name, columns, "HYPERBINARY")
713            .await
714    }
715
716    /// Starts a COPY IN operation and returns an owned-handle writer
717    /// whose lifetime is independent of this client. The writer holds an
718    /// `Arc`-cloned reference to the underlying connection mutex, so it
719    /// can be stored in structs that need a `'static`-lifetime writer —
720    /// e.g. N-API classes that can't carry borrowed references across
721    /// JS callbacks.
722    ///
723    /// # Errors
724    ///
725    /// Same failure modes as [`Self::copy_in_with_format`].
726    pub async fn copy_in_arc_with_format(
727        &self,
728        table_name: &str,
729        columns: &[&str],
730        format: &str,
731    ) -> Result<AsyncCopyInWriterOwned> {
732        let mut conn = self.connection.lock().await;
733        conn.start_copy_in_with_format(table_name, columns, format)
734            .await?;
735        drop(conn);
736        Ok(AsyncCopyInWriterOwned::new(Arc::clone(&self.connection)))
737    }
738
739    /// Starts a COPY IN operation with a specified data format (async).
740    ///
741    /// # Errors
742    ///
743    /// - Returns [`Error`] (connection) if the connection is unhealthy.
744    /// - Returns [`Error`] (server) if the server rejects the generated
745    ///   `COPY ... FROM STDIN` statement.
746    /// - Returns [`Error`] (I/O) on transport read/write failure.
747    pub async fn copy_in_with_format(
748        &self,
749        table_name: &str,
750        columns: &[&str],
751        format: &str,
752    ) -> Result<AsyncCopyInWriter<'_>> {
753        let mut conn = self.connection.lock().await;
754        conn.start_copy_in_with_format(table_name, columns, format)
755            .await?;
756        drop(conn);
757        Ok(AsyncCopyInWriter::new(&self.connection))
758    }
759
760    /// Executes a COPY ... TO STDOUT query and returns all output data (async).
761    ///
762    /// # Errors
763    ///
764    /// - Returns [`Error`] (connection) if the connection is unhealthy.
765    /// - Returns [`Error`] (server) when the server rejects the statement.
766    /// - Returns [`Error`] (I/O) / [`Error`] (closed) on transport
767    ///   read/write failure.
768    pub async fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
769        let mut conn = self.connection.lock().await;
770        conn.copy_out(query).await
771    }
772
773    /// Returns true if the connection is alive.
774    #[must_use]
775    pub fn is_alive(&self) -> bool {
776        // Try to acquire the lock - if we can, connection is alive
777        self.connection.try_lock().is_ok()
778    }
779
780    /// Prepares a statement for execution (async).
781    ///
782    /// # Errors
783    ///
784    /// Delegates to [`Self::prepare_typed`]; see that method for the
785    /// failure modes.
786    pub async fn prepare(&self, query: &str) -> Result<AsyncPreparedStatement> {
787        self.prepare_typed(query, &[]).await
788    }
789
790    /// Prepares a statement with explicit parameter types (async).
791    ///
792    /// # Errors
793    ///
794    /// - Returns [`Error`] (connection) if the connection is unhealthy.
795    /// - Returns [`Error`] (server) if the server rejects the `Parse`
796    ///   request (SQL syntax, unknown parameter OIDs, etc.).
797    /// - Returns [`Error`] (I/O) on transport read/write failure.
798    pub async fn prepare_typed(
799        &self,
800        query: &str,
801        param_types: &[crate::types::Oid],
802    ) -> Result<AsyncPreparedStatement> {
803        use std::sync::atomic::{AtomicU64, Ordering};
804        static COUNTER: AtomicU64 = AtomicU64::new(0);
805
806        let name = format!(
807            "__hyper_async_stmt_{}",
808            COUNTER.fetch_add(1, Ordering::Relaxed)
809        );
810        let mut conn = self.connection.lock().await;
811        let (params, columns) = conn.prepare(&name, query, param_types).await?;
812
813        Ok(AsyncPreparedStatement {
814            name,
815            query: query.to_string(),
816            param_types: params,
817            columns,
818            connection: Arc::downgrade(&self.connection),
819            closed: false,
820        })
821    }
822
823    /// Closes a prepared statement on the server (async).
824    ///
825    /// Prefer the RAII [`AsyncPreparedStatement::close`] method on the
826    /// statement itself — it consumes the statement and prevents the
827    /// auto-close Drop path from double-closing.
828    ///
829    /// # Errors
830    ///
831    /// Propagates any error from
832    /// [`AsyncRawConnection::close_statement`] — unhealthy connection,
833    /// server-side error during `Close`/`Sync`, or transport failure.
834    pub async fn close_statement(&self, statement: &AsyncPreparedStatement) -> Result<()> {
835        let mut conn = self.connection.lock().await;
836        conn.close_statement(&statement.name).await
837    }
838
839    /// Executes a prepared statement with parameters (async).
840    ///
841    /// # Errors
842    ///
843    /// Propagates any error from
844    /// [`AsyncRawConnection::execute_prepared`] — unhealthy connection,
845    /// parameter/type mismatch, server-side execution failure, or
846    /// transport failure. Row construction may also raise an [`Error`]
847    /// when a `DataRow` cannot be decoded.
848    pub async fn execute_prepared<P: AsRef<[Option<Vec<u8>>]>>(
849        &self,
850        statement: &AsyncPreparedStatement,
851        params: P,
852    ) -> Result<Vec<Row>> {
853        let params_ref: Vec<Option<&[u8]>> = params
854            .as_ref()
855            .iter()
856            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
857            .collect();
858
859        let mut conn = self.connection.lock().await;
860        conn.execute_prepared(&statement.name, &params_ref, statement.columns.len())
861            .await
862    }
863
864    /// Executes a prepared statement that doesn't return rows (async).
865    ///
866    /// # Errors
867    ///
868    /// Same failure modes as [`Self::execute_prepared`] (excluding
869    /// row-construction errors — this path never builds rows).
870    pub async fn execute_prepared_no_result<P: AsRef<[Option<Vec<u8>>]>>(
871        &self,
872        statement: &AsyncPreparedStatement,
873        params: P,
874    ) -> Result<u64> {
875        let params_ref: Vec<Option<&[u8]>> = params
876            .as_ref()
877            .iter()
878            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
879            .collect();
880
881        let mut conn = self.connection.lock().await;
882        conn.execute_prepared_no_result(&statement.name, &params_ref)
883            .await
884    }
885
886    /// Executes a prepared statement with streaming results (async).
887    ///
888    /// Returns an [`AsyncPreparedQueryStream`](super::async_prepared_stream::AsyncPreparedQueryStream)
889    /// that yields rows in chunks, keeping memory bounded regardless of
890    /// result size. Async mirror of
891    /// [`Client::execute_streaming`](crate::client::Client::execute_streaming).
892    ///
893    /// # Errors
894    ///
895    /// - Returns [`Error`] (connection) if the connection is unhealthy.
896    /// - Returns [`Error`] (I/O) if writing the initial Bind/Execute/Sync
897    ///   sequence fails on the transport.
898    pub async fn execute_prepared_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
899        &'a self,
900        statement: &AsyncPreparedStatement,
901        params: P,
902        chunk_size: usize,
903    ) -> Result<super::async_prepared_stream::AsyncPreparedQueryStream<'a>> {
904        let params_ref: Vec<Option<&[u8]>> = params
905            .as_ref()
906            .iter()
907            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
908            .collect();
909
910        let mut conn = self.connection.lock().await;
911        conn.start_execute_prepared(&statement.name, &params_ref, statement.columns.len())
912            .await?;
913
914        let columns = std::sync::Arc::new(statement.columns.clone());
915        Ok(super::async_prepared_stream::AsyncPreparedQueryStream::new(
916            conn, self, chunk_size, columns,
917        ))
918    }
919}
920
921impl Cancellable for AsyncClient {
922    /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
923    /// fresh connection. Uses synchronous I/O so it is callable from
924    /// `Drop` impls. Errors are logged and swallowed because cancellation
925    /// is best-effort and callers cannot meaningfully recover.
926    fn cancel(&self) {
927        if let Err(e) = AsyncClient::cancel_sync(self) {
928            warn!(
929                target: "hyperdb_api_core::client",
930                error = %e,
931                process_id = self.process_id,
932                "cancel request failed (best-effort, swallowed)",
933            );
934        }
935    }
936}
937
938impl AsyncClient {
939    fn process_query_messages(
940        messages: Vec<Message>,
941        notice_receiver: Option<&Arc<NoticeReceiver>>,
942    ) -> Result<Vec<Row>> {
943        use super::statement::{Column, ColumnFormat};
944
945        let mut rows = Vec::new();
946        let mut columns: Option<Arc<Vec<Column>>> = None;
947
948        for msg in messages {
949            match msg {
950                Message::RowDescription(desc) => {
951                    let mut cols = Vec::new();
952                    for field in desc.fields().filter_map(std::result::Result::ok) {
953                        cols.push(Column::new(
954                            field.name().to_string(),
955                            field.type_oid(),
956                            field.type_modifier(),
957                            ColumnFormat::from_code(field.format()),
958                        ));
959                    }
960                    columns = Some(Arc::new(cols));
961                }
962                Message::DataRow(data) => {
963                    if let Some(ref cols) = columns {
964                        rows.push(Row::new(Arc::clone(cols), data)?);
965                    }
966                }
967                Message::NoticeResponse(body) => {
968                    if let Some(receiver) = notice_receiver {
969                        let notice = Notice::from_response_body(&body);
970                        receiver(notice);
971                    }
972                }
973                _ => {}
974            }
975        }
976        Ok(rows)
977    }
978
979    fn process_binary_messages(
980        messages: Vec<Message>,
981        notice_receiver: Option<&Arc<NoticeReceiver>>,
982    ) -> Vec<StreamRow> {
983        let mut rows = Vec::new();
984
985        for msg in messages {
986            match msg {
987                Message::DataRow(data) => {
988                    rows.push(StreamRow::new(data));
989                }
990                Message::NoticeResponse(body) => {
991                    if let Some(receiver) = notice_receiver {
992                        let notice = Notice::from_response_body(&body);
993                        receiver(notice);
994                    }
995                }
996                _ => {}
997            }
998        }
999        rows
1000    }
1001
1002    fn extract_row_count(messages: &[Message]) -> u64 {
1003        for msg in messages {
1004            if let Message::CommandComplete(body) = msg {
1005                if let Ok(tag) = body.tag() {
1006                    // Parse formats like "INSERT 0 5", "UPDATE 10", "DELETE 3"
1007                    let parts: Vec<&str> = tag.split_whitespace().collect();
1008                    if let Some(last) = parts.last() {
1009                        if let Ok(count) = last.parse() {
1010                            return count;
1011                        }
1012                    }
1013                }
1014            }
1015        }
1016        0
1017    }
1018}
1019
1020/// An async prepared statement.
1021///
1022/// Represents a server-side prepared statement that can be executed
1023/// multiple times with different parameters. **Auto-closes on `Drop`**
1024/// via a best-effort `tokio::spawn` task — if no tokio runtime is
1025/// available at drop time we log a warning and flag the connection
1026/// desynchronized rather than silently leaking the server-side
1027/// statement slot.
1028///
1029/// For callers who need confirmed close with error propagation, use
1030/// [`AsyncPreparedStatement::close`] (explicit async close).
1031#[derive(Debug)]
1032pub struct AsyncPreparedStatement {
1033    /// Statement name on the server.
1034    pub(crate) name: String,
1035    /// Original SQL query string.
1036    query: String,
1037    /// Parameter type OIDs.
1038    param_types: Vec<crate::types::Oid>,
1039    /// Result column descriptions.
1040    pub(crate) columns: Vec<super::statement::Column>,
1041    /// Weak handle to the owning connection for the Drop path. `Weak`
1042    /// so that a lingering statement never keeps the connection alive
1043    /// past the `AsyncClient` it was prepared against.
1044    connection: std::sync::Weak<Mutex<AsyncRawConnection<AsyncStream>>>,
1045    /// Flipped by `close(self)` to suppress the Drop-path auto-close.
1046    closed: bool,
1047}
1048
1049impl AsyncPreparedStatement {
1050    /// Returns the statement name.
1051    #[must_use]
1052    pub fn name(&self) -> &str {
1053        &self.name
1054    }
1055
1056    /// Returns the original query.
1057    #[must_use]
1058    pub fn query(&self) -> &str {
1059        &self.query
1060    }
1061
1062    /// Returns the parameter types.
1063    #[must_use]
1064    pub fn param_types(&self) -> &[crate::types::Oid] {
1065        &self.param_types
1066    }
1067
1068    /// Returns the number of parameters.
1069    #[must_use]
1070    pub fn param_count(&self) -> usize {
1071        self.param_types.len()
1072    }
1073
1074    /// Returns the result column descriptions.
1075    #[must_use]
1076    pub fn columns(&self) -> &[super::statement::Column] {
1077        &self.columns
1078    }
1079
1080    /// Returns the number of result columns.
1081    #[must_use]
1082    pub fn column_count(&self) -> usize {
1083        self.columns.len()
1084    }
1085
1086    /// Explicitly closes the prepared statement on the server (async).
1087    ///
1088    /// Consumes the statement — no further `execute_prepared` /
1089    /// `execute_prepared_streaming` calls are possible — and suppresses
1090    /// the Drop-path auto-close. Returns the `close_statement` result so
1091    /// callers can observe any transport errors.
1092    ///
1093    /// If you don't need error propagation, simply dropping the
1094    /// statement has the same effect (best-effort auto-close via
1095    /// `tokio::spawn`).
1096    ///
1097    /// # Errors
1098    ///
1099    /// Propagates any error from
1100    /// [`AsyncClient::close_statement`] — unhealthy connection,
1101    /// server-side error during `Close`/`Sync`, or transport failure.
1102    pub async fn close(mut self, client: &AsyncClient) -> Result<()> {
1103        self.closed = true;
1104        client.close_statement(&self).await
1105    }
1106}
1107
1108impl Drop for AsyncPreparedStatement {
1109    fn drop(&mut self) {
1110        // Explicit close already ran — nothing to do.
1111        if self.closed {
1112            return;
1113        }
1114
1115        // Best-effort close. Try to grab a tokio handle; if we're being
1116        // dropped outside a runtime (e.g. the runtime has already shut
1117        // down or the caller never had one), fall back to a warning and
1118        // flag the connection desynchronized so the next operation fails
1119        // loudly rather than racing with a lingering statement.
1120        let Some(conn) = self.connection.upgrade() else {
1121            // Connection has already been dropped — nothing to close.
1122            return;
1123        };
1124
1125        let name = std::mem::take(&mut self.name);
1126
1127        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1128            handle.spawn(async move {
1129                let mut c = conn.lock().await;
1130                if let Err(e) = c.close_statement(&name).await {
1131                    warn!(
1132                        target: "hyperdb_api_core::client",
1133                        statement = %name,
1134                        error = %e,
1135                        "AsyncPreparedStatement drop-close failed (best-effort, swallowed)"
1136                    );
1137                }
1138            });
1139        } else {
1140            // No runtime — can't do async I/O from here. Mark the
1141            // connection desynchronized so the next caller gets a
1142            // clear error instead of a latent leaked statement.
1143            if let Ok(mut c) = conn.try_lock() {
1144                c.mark_desynchronized();
1145            }
1146            warn!(
1147                target: "hyperdb_api_core::client",
1148                statement = %name,
1149                "AsyncPreparedStatement dropped outside of a tokio runtime; \
1150                 server-side statement slot leaked and connection marked \
1151                 desynchronized — call statement.close(&client) explicitly \
1152                 for deterministic cleanup"
1153            );
1154        }
1155    }
1156}
1157
1158/// Async COPY IN writer for bulk data insertion.
1159///
1160/// # Drop Safety
1161///
1162/// If this writer is dropped without calling [`finish()`](Self::finish) or
1163/// [`cancel()`](Self::cancel), it will attempt a best-effort synchronous cancel
1164/// by queuing a `CopyFail` message in the connection's write buffer. The next
1165/// async operation on the connection will flush and drain the cancel response,
1166/// restoring the connection to a usable state.
1167#[derive(Debug)]
1168pub struct AsyncCopyInWriter<'a> {
1169    connection: &'a Mutex<AsyncRawConnection<AsyncStream>>,
1170    /// Set to `true` after `finish()` or `cancel()` consumes the writer.
1171    /// Checked in `Drop` to avoid queuing a spurious `CopyFail`.
1172    finished: bool,
1173}
1174
1175/// Owned-handle variant of [`AsyncCopyInWriter`] that holds an
1176/// `Arc<Mutex<_>>` to the underlying connection instead of a borrow.
1177/// Used by callers that need a `'static`-lifetime writer — e.g. N-API
1178/// classes that can't carry borrowed references across JS callbacks.
1179///
1180/// Semantics are identical to [`AsyncCopyInWriter`]; the only
1181/// difference is lifetime.
1182#[derive(Debug)]
1183pub struct AsyncCopyInWriterOwned {
1184    connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
1185    finished: bool,
1186}
1187
1188impl AsyncCopyInWriterOwned {
1189    /// Creates a new owned-handle COPY IN writer.
1190    pub(crate) fn new(connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>) -> Self {
1191        AsyncCopyInWriterOwned {
1192            connection,
1193            finished: false,
1194        }
1195    }
1196
1197    /// Sends data to the server.
1198    ///
1199    /// # Errors
1200    ///
1201    /// Currently infallible — frame construction is pure. The `Result`
1202    /// return type is preserved for forward compatibility.
1203    pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1204        let mut conn = self.connection.lock().await;
1205        conn.send_copy_data(data)?;
1206        Ok(())
1207    }
1208
1209    /// Flushes any buffered data to the server.
1210    ///
1211    /// # Errors
1212    ///
1213    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1214    pub async fn flush(&mut self) -> Result<()> {
1215        let mut conn = self.connection.lock().await;
1216        conn.flush().await
1217    }
1218
1219    /// Sends COPY data directly to the stream without internal buffering.
1220    ///
1221    /// # Errors
1222    ///
1223    /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1224    ///   `u32::MAX`.
1225    /// - Returns [`Error`] (I/O) on transport write failure.
1226    pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1227        let mut conn = self.connection.lock().await;
1228        conn.send_copy_data_direct(data).await
1229    }
1230
1231    /// Flushes the TCP stream.
1232    ///
1233    /// # Errors
1234    ///
1235    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1236    pub async fn flush_stream(&mut self) -> Result<()> {
1237        let mut conn = self.connection.lock().await;
1238        conn.flush_stream().await
1239    }
1240
1241    /// Finishes the COPY operation and returns the number of rows inserted.
1242    ///
1243    /// # Errors
1244    ///
1245    /// Returns [`Error`] (server) if the server reports an `ErrorResponse`
1246    /// (e.g. constraint violation), or [`Error`] (I/O) / [`Error`] (closed)
1247    /// on transport failure.
1248    pub async fn finish(mut self) -> Result<u64> {
1249        self.finished = true;
1250        let mut conn = self.connection.lock().await;
1251        conn.finish_copy().await
1252    }
1253
1254    /// Cancels the COPY operation.
1255    ///
1256    /// # Errors
1257    ///
1258    /// Returns [`Error`] (I/O) on transport write failure, or
1259    /// [`Error`] (closed) if the server drops the connection before
1260    /// acknowledging the cancel.
1261    pub async fn cancel(mut self, reason: &str) -> Result<()> {
1262        self.finished = true;
1263        let mut conn = self.connection.lock().await;
1264        conn.cancel_copy(reason).await
1265    }
1266}
1267
1268impl Drop for AsyncCopyInWriterOwned {
1269    fn drop(&mut self) {
1270        if self.finished {
1271            return;
1272        }
1273        // Mirror AsyncCopyInWriter's Drop — queue a CopyFail via
1274        // try_lock; the next async operation on the connection will
1275        // flush and drain.
1276        if let Ok(mut conn) = self.connection.try_lock() {
1277            conn.queue_copy_fail("AsyncCopyInWriterOwned dropped without finish/cancel");
1278        }
1279    }
1280}
1281
1282impl<'a> AsyncCopyInWriter<'a> {
1283    /// Creates a new COPY IN writer.
1284    pub(crate) fn new(connection: &'a Mutex<AsyncRawConnection<AsyncStream>>) -> Self {
1285        AsyncCopyInWriter {
1286            connection,
1287            finished: false,
1288        }
1289    }
1290
1291    /// Sends data to the server.
1292    ///
1293    /// # Errors
1294    ///
1295    /// Currently infallible — frame construction is pure. The `Result`
1296    /// return type is preserved for forward compatibility.
1297    pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1298        let mut conn = self.connection.lock().await;
1299        conn.send_copy_data(data)?;
1300        Ok(())
1301    }
1302
1303    /// Flushes any buffered data to the server.
1304    ///
1305    /// # Errors
1306    ///
1307    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1308    pub async fn flush(&mut self) -> Result<()> {
1309        let mut conn = self.connection.lock().await;
1310        conn.flush().await
1311    }
1312
1313    /// Sends COPY data directly to the stream without internal buffering.
1314    ///
1315    /// This writes data directly to the TCP stream, letting the kernel handle
1316    /// buffering. More efficient for streaming large amounts of data.
1317    /// Call `flush_stream()` periodically to ensure data is sent.
1318    ///
1319    /// # Errors
1320    ///
1321    /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1322    ///   `u32::MAX`.
1323    /// - Returns [`Error`] (I/O) on transport write failure.
1324    pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1325        let mut conn = self.connection.lock().await;
1326        conn.send_copy_data_direct(data).await
1327    }
1328
1329    /// Flushes the TCP stream.
1330    ///
1331    /// Use with `send_direct()` to periodically ensure data reaches the server.
1332    ///
1333    /// # Errors
1334    ///
1335    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1336    pub async fn flush_stream(&mut self) -> Result<()> {
1337        let mut conn = self.connection.lock().await;
1338        conn.flush_stream().await
1339    }
1340
1341    /// Finishes the COPY operation and returns the number of rows inserted.
1342    ///
1343    /// # Errors
1344    ///
1345    /// Returns [`Error`] (server) if the server reports an `ErrorResponse`,
1346    /// or [`Error`] (I/O) / [`Error`] (closed) on transport failure.
1347    pub async fn finish(mut self) -> Result<u64> {
1348        self.finished = true;
1349        let mut conn = self.connection.lock().await;
1350        conn.finish_copy().await
1351    }
1352
1353    /// Cancels the COPY operation.
1354    ///
1355    /// # Errors
1356    ///
1357    /// Returns [`Error`] (I/O) on transport write failure, or
1358    /// [`Error`] (closed) if the server drops the connection before
1359    /// acknowledging the cancel.
1360    pub async fn cancel(mut self, reason: &str) -> Result<()> {
1361        self.finished = true;
1362        let mut conn = self.connection.lock().await;
1363        conn.cancel_copy(reason).await
1364    }
1365}
1366
1367impl Drop for AsyncCopyInWriter<'_> {
1368    fn drop(&mut self) {
1369        if self.finished {
1370            return;
1371        }
1372        // Best-effort cancel: try to acquire the mutex synchronously and
1373        // queue a CopyFail message. We cannot do async I/O from Drop, so
1374        // the message is only written to the buffer — not flushed. The next
1375        // async operation will call drain_pending_copy_cancel() to flush it
1376        // and restore the connection to ReadyForQuery state.
1377        if let Ok(mut conn) = self.connection.try_lock() {
1378            conn.queue_copy_fail("COPY writer dropped without finish or cancel");
1379            warn!(
1380                target: "hyperdb_api_core::client",
1381                "AsyncCopyInWriter dropped without finish() or cancel(). \
1382                 Queued best-effort CopyFail — connection will self-heal on next operation."
1383            );
1384        } else {
1385            warn!(
1386                target: "hyperdb_api_core::client",
1387                "AsyncCopyInWriter dropped without finish() or cancel(), \
1388                 and the connection mutex was locked. The connection may be \
1389                 left in an unusable COPY-IN state."
1390            );
1391        }
1392    }
1393}