Skip to main content

hyperdb_api_core/client/
async_client.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-level asynchronous client for Hyper database.
5//!
6//! This module provides [`AsyncClient`], the async version of [`Client`](crate::client::Client).
7//! It uses tokio for async I/O operations.
8
9use std::sync::Arc;
10
11use tokio::net::TcpStream;
12use tokio::sync::Mutex;
13use tracing::{debug, info, warn};
14
15#[cfg(unix)]
16use tokio::net::UnixStream;
17
18use super::async_connection::AsyncRawConnection;
19use super::async_stream::AsyncStream;
20use super::async_stream_query::AsyncQueryStream;
21use super::cancel::Cancellable;
22use super::config::Config;
23use super::endpoint::ConnectionEndpoint;
24use super::error::{Error, Result};
25use super::notice::{Notice, NoticeReceiver};
26use super::row::{Row, StreamRow};
27
28use crate::protocol::message::Message;
29
30/// An asynchronous client for Hyper database.
31///
32/// This is the async equivalent of [`Client`](crate::client::Client), designed for use
33/// in tokio-based async applications. All I/O operations are non-blocking.
34///
35/// # Example
36///
37/// ```no_run
38/// use hyperdb_api_core::client::{AsyncClient, Config};
39///
40/// #[tokio::main]
41/// async fn main() -> hyperdb_api_core::client::Result<()> {
42///     let config = Config::new()
43///         .with_host("localhost")
44///         .with_port(7483)
45///         .with_database("test.hyper");
46///
47///     let client = AsyncClient::connect(&config).await?;
48///     let rows = client.query("SELECT 1").await?;
49///     client.close().await?;
50///     Ok(())
51/// }
52/// ```
53pub struct AsyncClient {
54    /// The underlying async connection, protected by a mutex for concurrent access.
55    connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
56    /// Backend process ID (for cancel requests).
57    process_id: i32,
58    /// Secret key for authenticating cancel requests.
59    secret_key: i32,
60    /// Connection endpoint for cancel requests and reconnection.
61    endpoint: ConnectionEndpoint,
62    /// Optional notice receiver callback for server notices/warnings.
63    notice_receiver: Option<Arc<NoticeReceiver>>,
64}
65
66impl std::fmt::Debug for AsyncClient {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("AsyncClient")
69            .field("process_id", &self.process_id)
70            .field("secret_key", &self.secret_key)
71            .field("endpoint", &self.endpoint)
72            .field(
73                "notice_receiver",
74                &self.notice_receiver.as_ref().map(|_| "<callback>"),
75            )
76            .finish_non_exhaustive()
77    }
78}
79
80impl AsyncClient {
81    /// Connects to a Hyper server using the given configuration (async).
82    ///
83    /// # Example
84    ///
85    /// ```no_run
86    /// # use hyperdb_api_core::client::{AsyncClient, Config};
87    /// # async fn example() -> hyperdb_api_core::client::Result<()> {
88    /// let config = Config::new()
89    ///     .with_host("localhost")
90    ///     .with_port(7483)
91    ///     .with_database("test.hyper");
92    ///
93    /// let client = AsyncClient::connect(&config).await?;
94    /// # Ok(())
95    /// # }
96    /// ```
97    ///
98    /// # Errors
99    ///
100    /// - Returns [`Error`] (connection) if the TCP connection cannot be
101    ///   established to `config.host():config.port()`.
102    /// - Propagates any [`Error`] from the startup handshake —
103    ///   [`Error`] (auth) for missing/wrong credentials,
104    ///   [`Error`] (server) for server-side startup errors, [`Error`] (protocol)
105    ///   for out-of-sequence messages, or [`Error`] (I/O) for wire
106    ///   failure.
107    pub async fn connect(config: &Config) -> Result<Self> {
108        info!(
109            target: "hyperdb_api",
110            host = %config.host(),
111            port = config.port(),
112            user = config.user().unwrap_or("(default)"),
113            database = config.database().unwrap_or("(none)"),
114            "connection-parameters"
115        );
116
117        let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
118        let addr = format!("{}:{}", config.host(), config.port());
119        let tcp_stream = TcpStream::connect(&addr).await.map_err(|e| {
120            warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
121            Error::connection(format!("failed to connect to {addr}: {e}"))
122        })?;
123
124        // Set TCP options. See the sync mirror in
125        // [`super::client::Client::connect`] for the full rationale and
126        // empirical knee analysis. We bump `SO_RCVBUF` / `SO_SNDBUF` to
127        // 4 MiB (Windows default ~64 KiB throttles loopback throughput;
128        // Linux auto-tunes higher).
129        tcp_stream.set_nodelay(true).ok();
130        let sock = socket2::SockRef::from(&tcp_stream);
131        sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
132        sock.set_send_buffer_size(4 * 1024 * 1024).ok();
133        // TCP keepalive: detect a half-open peer (laptop sleep, network blip,
134        // a hyperd that vanished without a FIN) in ~90s instead of the 2h OS
135        // idle default. See the rationale on `super::client::apply_tcp_keepalive`
136        // (the sync mirror). Best-effort: a rejected knob leaves OS defaults.
137        {
138            let keepalive = socket2::TcpKeepalive::new()
139                .with_time(std::time::Duration::from_secs(60))
140                .with_interval(std::time::Duration::from_secs(10));
141            #[cfg(not(any(target_os = "macos", target_os = "windows")))]
142            let keepalive = keepalive.with_retries(3);
143            sock.set_tcp_keepalive(&keepalive).ok();
144        }
145
146        let stream = AsyncStream::tcp(tcp_stream);
147        let mut connection = AsyncRawConnection::new(stream);
148
149        // Perform startup with authentication
150        let params = config.startup_params();
151        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
152        connection.startup(&params_ref, config.password()).await?;
153
154        let process_id = connection.process_id();
155        let secret_key = connection.secret_key();
156
157        debug!(
158            target: "hyperdb_api",
159            process_id,
160            "connection-established"
161        );
162
163        Ok(AsyncClient {
164            connection: Arc::new(Mutex::new(connection)),
165            process_id,
166            secret_key,
167            endpoint,
168            notice_receiver: None,
169        })
170    }
171
172    /// Connects to a Hyper server via Unix Domain Socket (async, Unix only).
173    ///
174    /// # Example
175    ///
176    /// ```no_run
177    /// # use hyperdb_api_core::client::{AsyncClient, Config};
178    /// # use std::path::Path;
179    /// # async fn example() -> hyperdb_api_core::client::Result<()> {
180    /// let socket_path = Path::new("/tmp/hyper/.s.PGSQL.12345");
181    /// let config = Config::new().with_database("test.hyper");
182    /// let client = AsyncClient::connect_unix(socket_path, &config).await?;
183    /// # Ok(())
184    /// # }
185    /// ```
186    ///
187    /// # Errors
188    ///
189    /// - Returns [`Error`] (connection) if the Unix domain socket cannot
190    ///   be connected.
191    /// - Propagates any error from the startup handshake (see
192    ///   [`Self::connect`]).
193    #[cfg(unix)]
194    pub async fn connect_unix(
195        socket_path: impl AsRef<std::path::Path>,
196        config: &Config,
197    ) -> Result<Self> {
198        use std::path::Path;
199
200        let path = socket_path.as_ref();
201        info!(
202            target: "hyperdb_api",
203            socket_path = %path.display(),
204            user = config.user().unwrap_or("(default)"),
205            database = config.database().unwrap_or("(none)"),
206            "connection-parameters-unix"
207        );
208
209        let unix_stream = UnixStream::connect(path).await.map_err(|e| {
210            warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
211            Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
212        })?;
213
214        // Parse endpoint from socket path
215        let directory = path.parent().unwrap_or(Path::new("/"));
216        let name = path
217            .file_name()
218            .and_then(|n| n.to_str())
219            .unwrap_or("socket");
220        let endpoint = ConnectionEndpoint::domain_socket(directory, name);
221
222        let stream = AsyncStream::unix(unix_stream);
223        let mut connection = AsyncRawConnection::new(stream);
224
225        // Perform startup with authentication
226        let params = config.startup_params();
227        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
228        connection.startup(&params_ref, config.password()).await?;
229
230        let process_id = connection.process_id();
231        let secret_key = connection.secret_key();
232
233        debug!(
234            target: "hyperdb_api",
235            process_id,
236            "connection-established-unix"
237        );
238
239        Ok(AsyncClient {
240            connection: Arc::new(Mutex::new(connection)),
241            process_id,
242            secret_key,
243            endpoint,
244            notice_receiver: None,
245        })
246    }
247
248    /// Connects to a Hyper server via Windows Named Pipe (async, Windows only).
249    ///
250    /// # Arguments
251    ///
252    /// * `pipe_path` - The full pipe path (e.g., `\\.\pipe\hyper-12345`)
253    /// * `config` - Connection configuration
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if the Named Pipe cannot be opened (e.g., pipe does not
258    /// exist, all instances are busy after the retry window, or permission is
259    /// denied) or if the authentication handshake fails.
260    #[cfg(windows)]
261    pub async fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
262        use std::time::{Duration, Instant};
263        use tokio::net::windows::named_pipe::ClientOptions;
264
265        info!(
266            target: "hyperdb_api",
267            pipe_path = %pipe_path,
268            user = config.user().unwrap_or("(default)"),
269            database = config.database().unwrap_or("(none)"),
270            "connection-parameters-named-pipe"
271        );
272
273        // Retry on `ERROR_PIPE_BUSY` (231) — Windows named pipes have a finite
274        // number of server-side instances and concurrent clients can hit the
275        // cap. See the sync mirror in [`super::client::Client::connect_named_pipe`]
276        // for the full rationale.
277        const RETRY_INTERVAL: Duration = Duration::from_millis(20);
278        const MAX_WAIT: Duration = Duration::from_secs(10);
279        const ERROR_PIPE_BUSY: i32 = 231;
280
281        let deadline = Instant::now() + MAX_WAIT;
282        let client = loop {
283            match ClientOptions::new().open(pipe_path) {
284                Ok(c) => break c,
285                Err(e)
286                    if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
287                {
288                    tokio::time::sleep(RETRY_INTERVAL).await;
289                }
290                Err(e) => {
291                    warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
292                    return Err(Error::connection(format!(
293                        "failed to connect to named pipe {pipe_path}: {e}"
294                    )));
295                }
296            }
297        };
298
299        // Parse endpoint from pipe path
300        let endpoint = ConnectionEndpoint::parse(&format!(
301            "tab.pipe://{}",
302            pipe_path.trim_start_matches(r"\\").replace('\\', "/")
303        ))
304        .unwrap_or_else(|_| {
305            let parts: Vec<&str> = pipe_path
306                .trim_start_matches(r"\\")
307                .splitn(3, '\\')
308                .collect();
309            if parts.len() >= 3 {
310                ConnectionEndpoint::named_pipe(parts[0], parts[2])
311            } else {
312                ConnectionEndpoint::named_pipe(".", pipe_path)
313            }
314        });
315
316        let stream = AsyncStream::named_pipe(client);
317        let mut connection = AsyncRawConnection::new(stream);
318
319        // Perform startup with authentication
320        let params = config.startup_params();
321        let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
322        connection.startup(&params_ref, config.password()).await?;
323
324        let process_id = connection.process_id();
325        let secret_key = connection.secret_key();
326
327        debug!(
328            target: "hyperdb_api",
329            process_id,
330            "connection-established-named-pipe"
331        );
332
333        Ok(AsyncClient {
334            connection: Arc::new(Mutex::new(connection)),
335            process_id,
336            secret_key,
337            endpoint,
338            notice_receiver: None,
339        })
340    }
341
342    /// Connects to a Hyper server using a `ConnectionEndpoint` (async).
343    ///
344    /// This is a lower-level method that accepts a pre-parsed endpoint.
345    ///
346    /// # Errors
347    ///
348    /// Delegates to [`Self::connect`], [`Self::connect_unix`], or
349    /// `Self::connect_named_pipe` depending on the endpoint variant,
350    /// and propagates their errors unchanged.
351    pub async fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
352        match endpoint {
353            ConnectionEndpoint::Tcp { host, port } => {
354                let mut cfg = config.clone();
355                cfg = cfg.with_host(host.clone()).with_port(*port);
356                Self::connect(&cfg).await
357            }
358            #[cfg(unix)]
359            ConnectionEndpoint::DomainSocket { directory, name } => {
360                let socket_path = directory.join(name);
361                Self::connect_unix(&socket_path, config).await
362            }
363            #[cfg(windows)]
364            ConnectionEndpoint::NamedPipe { host, name } => {
365                let pipe_path = format!(r"\\{host}\pipe\{name}");
366                Self::connect_named_pipe(&pipe_path, config).await
367            }
368        }
369    }
370
371    /// Returns the connection endpoint.
372    #[must_use]
373    pub fn endpoint(&self) -> &ConnectionEndpoint {
374        &self.endpoint
375    }
376
377    /// Returns the server process ID for this connection.
378    #[must_use]
379    pub fn process_id(&self) -> i32 {
380        self.process_id
381    }
382
383    /// Returns the secret key for cancel requests.
384    #[must_use]
385    pub fn secret_key(&self) -> i32 {
386        self.secret_key
387    }
388
389    /// Cancels the currently executing query on this connection (async).
390    ///
391    /// This method opens a separate connection to send a cancel request.
392    /// For TCP endpoints, it opens a new TCP connection.
393    /// For Unix domain sockets, it connects to the same socket path.
394    ///
395    /// # Errors
396    ///
397    /// - Returns [`Error`] (connection) if a fresh cancel-side socket
398    ///   (TCP / UDS / named-pipe) cannot be opened to
399    ///   [`Self::endpoint`].
400    /// - Returns [`Error`] (I/O) if writing the cancel request fails.
401    pub async fn cancel(&self) -> Result<()> {
402        use crate::protocol::message::frontend;
403        use bytes::BytesMut;
404        use tokio::io::AsyncWriteExt;
405
406        info!(
407            target: "hyperdb_api",
408            process_id = self.process_id,
409            "query-cancel-request"
410        );
411
412        let endpoint_str = self.endpoint.to_string();
413
414        match &self.endpoint {
415            ConnectionEndpoint::Tcp { host, port } => {
416                let addr = format!("{host}:{port}");
417                let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
418                    warn!(
419                        target: "hyperdb_api",
420                        addr = %endpoint_str,
421                        error = %e,
422                        "query-cancel-connect-failed"
423                    );
424                    Error::connection(format!(
425                        "failed to connect for cancel request to {endpoint_str}: {e}"
426                    ))
427                })?;
428                // Cancel is a 16-byte fire-and-forget — disable Nagle so the
429                // request hits the wire without waiting on a coalesce timer.
430                stream.set_nodelay(true).ok();
431
432                let mut buf = BytesMut::new();
433                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
434
435                stream.write_all(&buf).await.map_err(|e| {
436                    warn!(
437                        target: "hyperdb_api",
438                        error = %e,
439                        "query-cancel-send-failed"
440                    );
441                    Error::io(e)
442                })?;
443            }
444            #[cfg(unix)]
445            ConnectionEndpoint::DomainSocket { directory, name } => {
446                let socket_path = directory.join(name);
447                let mut stream = UnixStream::connect(&socket_path).await.map_err(|e| {
448                    warn!(
449                        target: "hyperdb_api",
450                        addr = %endpoint_str,
451                        error = %e,
452                        "query-cancel-connect-failed"
453                    );
454                    Error::connection(format!(
455                        "failed to connect for cancel request to {endpoint_str}: {e}"
456                    ))
457                })?;
458
459                let mut buf = BytesMut::new();
460                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
461
462                stream.write_all(&buf).await.map_err(|e| {
463                    warn!(
464                        target: "hyperdb_api",
465                        error = %e,
466                        "query-cancel-send-failed"
467                    );
468                    Error::io(e)
469                })?;
470            }
471            #[cfg(windows)]
472            ConnectionEndpoint::NamedPipe { host, name } => {
473                let pipe_path = format!(r"\\{host}\pipe\{name}");
474                // Use sync file I/O for cancel (short-lived connection)
475                let mut file = std::fs::OpenOptions::new()
476                    .read(true)
477                    .write(true)
478                    .open(&pipe_path)
479                    .map_err(|e| {
480                        warn!(
481                            target: "hyperdb_api",
482                            addr = %endpoint_str,
483                            error = %e,
484                            "query-cancel-connect-failed"
485                        );
486                        Error::connection(format!(
487                            "failed to connect for cancel request to {endpoint_str}: {e}"
488                        ))
489                    })?;
490
491                let mut buf = BytesMut::new();
492                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
493
494                use std::io::Write;
495                file.write_all(&buf).map_err(|e| {
496                    warn!(
497                        target: "hyperdb_api",
498                        error = %e,
499                        "query-cancel-send-failed"
500                    );
501                    Error::io(e)
502                })?;
503
504                file.flush().map_err(Error::io)?;
505            }
506        }
507
508        debug!(target: "hyperdb_api", "query-cancel-sent");
509        Ok(())
510    }
511
512    /// Executes a query and returns all result rows (async).
513    ///
514    /// # Errors
515    ///
516    /// Propagates any [`Error`] from the underlying connection's
517    /// [`AsyncRawConnection::simple_query`] — [`Error`] (server) for
518    /// server-side SQL errors, [`Error`] (I/O) / [`Error`] (closed) for
519    /// transport failures, and [`Error`] (connection) if the connection
520    /// is unhealthy. Row construction may also raise an [`Error`] when
521    /// a `DataRow` cannot be decoded against its `RowDescription`.
522    pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
523        let mut conn = self.connection.lock().await;
524        let messages = conn.simple_query(sql).await?;
525        Self::process_query_messages(messages, self.notice_receiver.as_ref())
526    }
527
528    /// Executes a query with `HyperBinary` format for better performance (async).
529    ///
530    /// # Errors
531    ///
532    /// Same failure modes as [`Self::query`].
533    pub async fn query_fast(&self, sql: &str) -> Result<Vec<StreamRow>> {
534        let mut conn = self.connection.lock().await;
535        let messages = conn.query_binary(sql).await?;
536        Ok(Self::process_binary_messages(
537            messages,
538            self.notice_receiver.as_ref(),
539        ))
540    }
541
542    /// Executes a query with `HyperBinary` format and returns a streaming
543    /// result reader (async).
544    ///
545    /// This is the async mirror of
546    /// [`Client::query_streaming`](super::client::Client::query_streaming).
547    /// The returned [`AsyncQueryStream`] yields rows in chunks so callers
548    /// can process arbitrarily large result sets with constant memory. The
549    /// connection mutex is held for the duration of iteration; dropping the
550    /// stream before completion issues a best-effort cancel and marks the
551    /// connection desynchronized.
552    ///
553    /// # Errors
554    ///
555    /// - Returns [`Error`] (connection) if the connection is unhealthy.
556    /// - Returns [`Error`] (I/O) if writing the Parse/Bind/Execute/Sync
557    ///   sequence fails on the transport.
558    pub async fn query_streaming(
559        &self,
560        sql: &str,
561        chunk_size: usize,
562    ) -> Result<AsyncQueryStream<'_>> {
563        let mut conn = self.connection.lock().await;
564        conn.start_query_binary(sql).await?;
565        Ok(AsyncQueryStream::new(conn, self, chunk_size))
566    }
567
568    /// Sends a best-effort `CancelRequest` using *synchronous* I/O so it
569    /// is usable from [`Drop`] impls (notably
570    /// [`AsyncQueryStream::drop`](super::async_stream_query::AsyncQueryStream)).
571    ///
572    /// Cancellation opens a short-lived TCP / UDS / Named-Pipe connection,
573    /// writes the cancel packet, and drops it — the server recognizes the
574    /// (`process_id`, `secret_key`) tuple and signals the long-running query
575    /// to abort. No response is expected.
576    fn cancel_sync(&self) -> Result<()> {
577        use crate::protocol::message::frontend;
578        use bytes::BytesMut;
579        use std::io::Write;
580
581        info!(
582            target: "hyperdb_api",
583            process_id = self.process_id,
584            "query-cancel-request"
585        );
586
587        let endpoint_str = self.endpoint.to_string();
588
589        match &self.endpoint {
590            ConnectionEndpoint::Tcp { host, port } => {
591                let addr = format!("{host}:{port}");
592                let mut stream = std::net::TcpStream::connect(&addr).map_err(|e| {
593                    warn!(
594                        target: "hyperdb_api",
595                        addr = %endpoint_str,
596                        error = %e,
597                        "query-cancel-connect-failed"
598                    );
599                    Error::connection(format!(
600                        "failed to connect for cancel request to {endpoint_str}: {e}"
601                    ))
602                })?;
603                // Cancel is a 16-byte fire-and-forget — disable Nagle so the
604                // request hits the wire without waiting on a coalesce timer.
605                stream.set_nodelay(true).ok();
606
607                let mut buf = BytesMut::with_capacity(16);
608                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
609
610                stream.write_all(&buf).map_err(Error::io)?;
611                stream.flush().map_err(Error::io)?;
612            }
613            #[cfg(unix)]
614            ConnectionEndpoint::DomainSocket { directory, name } => {
615                let socket_path = directory.join(name);
616                let mut stream =
617                    std::os::unix::net::UnixStream::connect(&socket_path).map_err(|e| {
618                        warn!(
619                            target: "hyperdb_api",
620                            addr = %endpoint_str,
621                            error = %e,
622                            "query-cancel-connect-failed"
623                        );
624                        Error::connection(format!(
625                            "failed to connect for cancel request to {endpoint_str}: {e}"
626                        ))
627                    })?;
628
629                let mut buf = BytesMut::with_capacity(16);
630                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
631
632                stream.write_all(&buf).map_err(Error::io)?;
633                stream.flush().map_err(Error::io)?;
634            }
635            #[cfg(windows)]
636            ConnectionEndpoint::NamedPipe { host, name } => {
637                let pipe_path = format!(r"\\{host}\pipe\{name}");
638                let mut file = std::fs::OpenOptions::new()
639                    .read(true)
640                    .write(true)
641                    .open(&pipe_path)
642                    .map_err(|e| {
643                        warn!(
644                            target: "hyperdb_api",
645                            addr = %endpoint_str,
646                            error = %e,
647                            "query-cancel-connect-failed"
648                        );
649                        Error::connection(format!(
650                            "failed to connect for cancel request to {endpoint_str}: {e}"
651                        ))
652                    })?;
653
654                let mut buf = BytesMut::with_capacity(16);
655                frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
656
657                file.write_all(&buf).map_err(Error::io)?;
658                file.flush().map_err(Error::io)?;
659            }
660        }
661
662        debug!(target: "hyperdb_api", "query-cancel-sent");
663        Ok(())
664    }
665
666    /// Executes a command (INSERT/UPDATE/DELETE/DDL) and returns affected row count (async).
667    ///
668    /// # Errors
669    ///
670    /// Same failure modes as [`Self::query`] — server-side SQL errors,
671    /// transport failures, and unhealthy-connection state all surface
672    /// as [`Error`].
673    pub async fn exec(&self, sql: &str) -> Result<u64> {
674        let mut conn = self.connection.lock().await;
675        let messages = conn.simple_query(sql).await?;
676        Ok(Self::extract_row_count(&messages))
677    }
678
679    /// Returns a server parameter value by name.
680    pub async fn parameter_status(&self, name: &str) -> Option<String> {
681        let conn = self.connection.lock().await;
682        conn.parameter_status(name)
683            .map(std::string::ToString::to_string)
684    }
685
686    /// Sets the notice receiver callback.
687    pub fn set_notice_receiver(&mut self, receiver: Option<Box<dyn Fn(Notice) + Send + Sync>>) {
688        self.notice_receiver = receiver.map(Arc::from);
689    }
690
691    /// Closes the connection gracefully (async).
692    ///
693    /// # Errors
694    ///
695    /// Returns [`Error`] (I/O) if writing the `Terminate` frame or
696    /// flushing the async transport fails.
697    pub async fn close(self) -> Result<()> {
698        let mut conn = self.connection.lock().await;
699        conn.terminate().await
700    }
701
702    /// Executes a batch of statements separated by semicolons (async).
703    ///
704    /// # Errors
705    ///
706    /// Same failure modes as [`Self::query`].
707    pub async fn batch_execute(&self, sql: &str) -> Result<()> {
708        let mut conn = self.connection.lock().await;
709        let _messages = conn.simple_query(sql).await?;
710        Ok(())
711    }
712
713    /// Starts a COPY IN operation for bulk data insertion (async).
714    ///
715    /// # Errors
716    ///
717    /// Delegates to [`Self::copy_in_with_format`]; see that method
718    /// for concrete failure modes.
719    pub async fn copy_in(
720        &self,
721        table_name: &str,
722        columns: &[&str],
723    ) -> Result<AsyncCopyInWriter<'_>> {
724        self.copy_in_with_format(table_name, columns, "HYPERBINARY")
725            .await
726    }
727
728    /// Starts a COPY IN operation and returns an owned-handle writer
729    /// whose lifetime is independent of this client. The writer holds an
730    /// `Arc`-cloned reference to the underlying connection mutex, so it
731    /// can be stored in structs that need a `'static`-lifetime writer —
732    /// e.g. N-API classes that can't carry borrowed references across
733    /// JS callbacks.
734    ///
735    /// # Errors
736    ///
737    /// Same failure modes as [`Self::copy_in_with_format`].
738    pub async fn copy_in_arc_with_format(
739        &self,
740        table_name: &str,
741        columns: &[&str],
742        format: &str,
743    ) -> Result<AsyncCopyInWriterOwned> {
744        let mut conn = self.connection.lock().await;
745        conn.start_copy_in_with_format(table_name, columns, format)
746            .await?;
747        drop(conn);
748        Ok(AsyncCopyInWriterOwned::new(Arc::clone(&self.connection)))
749    }
750
751    /// Starts a COPY IN operation with a specified data format (async).
752    ///
753    /// # Errors
754    ///
755    /// - Returns [`Error`] (connection) if the connection is unhealthy.
756    /// - Returns [`Error`] (server) if the server rejects the generated
757    ///   `COPY ... FROM STDIN` statement.
758    /// - Returns [`Error`] (I/O) on transport read/write failure.
759    pub async fn copy_in_with_format(
760        &self,
761        table_name: &str,
762        columns: &[&str],
763        format: &str,
764    ) -> Result<AsyncCopyInWriter<'_>> {
765        let mut conn = self.connection.lock().await;
766        conn.start_copy_in_with_format(table_name, columns, format)
767            .await?;
768        drop(conn);
769        Ok(AsyncCopyInWriter::new(&self.connection))
770    }
771
772    /// Executes a COPY ... TO STDOUT query and returns all output data (async).
773    ///
774    /// # Errors
775    ///
776    /// - Returns [`Error`] (connection) if the connection is unhealthy.
777    /// - Returns [`Error`] (server) when the server rejects the statement.
778    /// - Returns [`Error`] (I/O) / [`Error`] (closed) on transport
779    ///   read/write failure.
780    pub async fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
781        let mut conn = self.connection.lock().await;
782        conn.copy_out(query).await
783    }
784
785    /// Returns true if the connection is alive.
786    #[must_use]
787    pub fn is_alive(&self) -> bool {
788        // Try to acquire the lock - if we can, connection is alive
789        self.connection.try_lock().is_ok()
790    }
791
792    /// Prepares a statement for execution (async).
793    ///
794    /// # Errors
795    ///
796    /// Delegates to [`Self::prepare_typed`]; see that method for the
797    /// failure modes.
798    pub async fn prepare(&self, query: &str) -> Result<AsyncPreparedStatement> {
799        self.prepare_typed(query, &[]).await
800    }
801
802    /// Prepares a statement with explicit parameter types (async).
803    ///
804    /// # Errors
805    ///
806    /// - Returns [`Error`] (connection) if the connection is unhealthy.
807    /// - Returns [`Error`] (server) if the server rejects the `Parse`
808    ///   request (SQL syntax, unknown parameter OIDs, etc.).
809    /// - Returns [`Error`] (I/O) on transport read/write failure.
810    pub async fn prepare_typed(
811        &self,
812        query: &str,
813        param_types: &[crate::types::Oid],
814    ) -> Result<AsyncPreparedStatement> {
815        use std::sync::atomic::{AtomicU64, Ordering};
816        static COUNTER: AtomicU64 = AtomicU64::new(0);
817
818        let name = format!(
819            "__hyper_async_stmt_{}",
820            COUNTER.fetch_add(1, Ordering::Relaxed)
821        );
822        let mut conn = self.connection.lock().await;
823        let (params, columns) = conn.prepare(&name, query, param_types).await?;
824
825        Ok(AsyncPreparedStatement {
826            name,
827            query: query.to_string(),
828            param_types: params,
829            columns,
830            connection: Arc::downgrade(&self.connection),
831            closed: false,
832        })
833    }
834
835    /// Closes a prepared statement on the server (async).
836    ///
837    /// Prefer the RAII [`AsyncPreparedStatement::close`] method on the
838    /// statement itself — it consumes the statement and prevents the
839    /// auto-close Drop path from double-closing.
840    ///
841    /// # Errors
842    ///
843    /// Propagates any error from
844    /// [`AsyncRawConnection::close_statement`] — unhealthy connection,
845    /// server-side error during `Close`/`Sync`, or transport failure.
846    pub async fn close_statement(&self, statement: &AsyncPreparedStatement) -> Result<()> {
847        let mut conn = self.connection.lock().await;
848        conn.close_statement(&statement.name).await
849    }
850
851    /// Executes a prepared statement with parameters (async).
852    ///
853    /// # Errors
854    ///
855    /// Propagates any error from
856    /// [`AsyncRawConnection::execute_prepared`] — unhealthy connection,
857    /// parameter/type mismatch, server-side execution failure, or
858    /// transport failure. Row construction may also raise an [`Error`]
859    /// when a `DataRow` cannot be decoded.
860    pub async fn execute_prepared<P: AsRef<[Option<Vec<u8>>]>>(
861        &self,
862        statement: &AsyncPreparedStatement,
863        params: P,
864    ) -> Result<Vec<Row>> {
865        let params_ref: Vec<Option<&[u8]>> = params
866            .as_ref()
867            .iter()
868            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
869            .collect();
870
871        let mut conn = self.connection.lock().await;
872        conn.execute_prepared(&statement.name, &params_ref, statement.columns.len())
873            .await
874    }
875
876    /// Executes a prepared statement that doesn't return rows (async).
877    ///
878    /// # Errors
879    ///
880    /// Same failure modes as [`Self::execute_prepared`] (excluding
881    /// row-construction errors — this path never builds rows).
882    pub async fn execute_prepared_no_result<P: AsRef<[Option<Vec<u8>>]>>(
883        &self,
884        statement: &AsyncPreparedStatement,
885        params: P,
886    ) -> Result<u64> {
887        let params_ref: Vec<Option<&[u8]>> = params
888            .as_ref()
889            .iter()
890            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
891            .collect();
892
893        let mut conn = self.connection.lock().await;
894        conn.execute_prepared_no_result(&statement.name, &params_ref)
895            .await
896    }
897
898    /// Executes a prepared statement with streaming results (async).
899    ///
900    /// Returns an [`AsyncPreparedQueryStream`](super::async_prepared_stream::AsyncPreparedQueryStream)
901    /// that yields rows in chunks, keeping memory bounded regardless of
902    /// result size. Async mirror of
903    /// [`Client::execute_streaming`](crate::client::Client::execute_streaming).
904    ///
905    /// # Errors
906    ///
907    /// - Returns [`Error`] (connection) if the connection is unhealthy.
908    /// - Returns [`Error`] (I/O) if writing the initial Bind/Execute/Sync
909    ///   sequence fails on the transport.
910    pub async fn execute_prepared_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
911        &'a self,
912        statement: &AsyncPreparedStatement,
913        params: P,
914        chunk_size: usize,
915    ) -> Result<super::async_prepared_stream::AsyncPreparedQueryStream<'a>> {
916        let params_ref: Vec<Option<&[u8]>> = params
917            .as_ref()
918            .iter()
919            .map(|p| p.as_ref().map(std::vec::Vec::as_slice))
920            .collect();
921
922        let mut conn = self.connection.lock().await;
923        conn.start_execute_prepared(&statement.name, &params_ref, statement.columns.len())
924            .await?;
925
926        let columns = std::sync::Arc::new(statement.columns.clone());
927        Ok(super::async_prepared_stream::AsyncPreparedQueryStream::new(
928            conn, self, chunk_size, columns,
929        ))
930    }
931}
932
933impl Cancellable for AsyncClient {
934    /// Fire-and-forget cancel via PG wire protocol `CancelRequest` on a
935    /// fresh connection. Uses synchronous I/O so it is callable from
936    /// `Drop` impls. Errors are logged and swallowed because cancellation
937    /// is best-effort and callers cannot meaningfully recover.
938    fn cancel(&self) {
939        if let Err(e) = AsyncClient::cancel_sync(self) {
940            warn!(
941                target: "hyperdb_api_core::client",
942                error = %e,
943                process_id = self.process_id,
944                "cancel request failed (best-effort, swallowed)",
945            );
946        }
947    }
948}
949
950impl AsyncClient {
951    fn process_query_messages(
952        messages: Vec<Message>,
953        notice_receiver: Option<&Arc<NoticeReceiver>>,
954    ) -> Result<Vec<Row>> {
955        use super::statement::{Column, ColumnFormat};
956
957        let mut rows = Vec::new();
958        let mut columns: Option<Arc<Vec<Column>>> = None;
959
960        for msg in messages {
961            match msg {
962                Message::RowDescription(desc) => {
963                    let mut cols = Vec::new();
964                    for field in desc.fields().filter_map(std::result::Result::ok) {
965                        cols.push(Column::new(
966                            field.name().to_string(),
967                            field.type_oid(),
968                            field.type_modifier(),
969                            ColumnFormat::from_code(field.format()),
970                        ));
971                    }
972                    columns = Some(Arc::new(cols));
973                }
974                Message::DataRow(data) => {
975                    if let Some(ref cols) = columns {
976                        rows.push(Row::new(Arc::clone(cols), data)?);
977                    }
978                }
979                Message::NoticeResponse(body) => {
980                    if let Some(receiver) = notice_receiver {
981                        let notice = Notice::from_response_body(&body);
982                        receiver(notice);
983                    }
984                }
985                _ => {}
986            }
987        }
988        Ok(rows)
989    }
990
991    fn process_binary_messages(
992        messages: Vec<Message>,
993        notice_receiver: Option<&Arc<NoticeReceiver>>,
994    ) -> Vec<StreamRow> {
995        let mut rows = Vec::new();
996
997        for msg in messages {
998            match msg {
999                Message::DataRow(data) => {
1000                    rows.push(StreamRow::new(data));
1001                }
1002                Message::NoticeResponse(body) => {
1003                    if let Some(receiver) = notice_receiver {
1004                        let notice = Notice::from_response_body(&body);
1005                        receiver(notice);
1006                    }
1007                }
1008                _ => {}
1009            }
1010        }
1011        rows
1012    }
1013
1014    fn extract_row_count(messages: &[Message]) -> u64 {
1015        for msg in messages {
1016            if let Message::CommandComplete(body) = msg {
1017                if let Ok(tag) = body.tag() {
1018                    // Parse formats like "INSERT 0 5", "UPDATE 10", "DELETE 3"
1019                    let parts: Vec<&str> = tag.split_whitespace().collect();
1020                    if let Some(last) = parts.last() {
1021                        if let Ok(count) = last.parse() {
1022                            return count;
1023                        }
1024                    }
1025                }
1026            }
1027        }
1028        0
1029    }
1030}
1031
1032/// An async prepared statement.
1033///
1034/// Represents a server-side prepared statement that can be executed
1035/// multiple times with different parameters. **Auto-closes on `Drop`**
1036/// via a best-effort `tokio::spawn` task — if no tokio runtime is
1037/// available at drop time we log a warning and flag the connection
1038/// desynchronized rather than silently leaking the server-side
1039/// statement slot.
1040///
1041/// For callers who need confirmed close with error propagation, use
1042/// [`AsyncPreparedStatement::close`] (explicit async close).
1043#[derive(Debug)]
1044pub struct AsyncPreparedStatement {
1045    /// Statement name on the server.
1046    pub(crate) name: String,
1047    /// Original SQL query string.
1048    query: String,
1049    /// Parameter type OIDs.
1050    param_types: Vec<crate::types::Oid>,
1051    /// Result column descriptions.
1052    pub(crate) columns: Vec<super::statement::Column>,
1053    /// Weak handle to the owning connection for the Drop path. `Weak`
1054    /// so that a lingering statement never keeps the connection alive
1055    /// past the `AsyncClient` it was prepared against.
1056    connection: std::sync::Weak<Mutex<AsyncRawConnection<AsyncStream>>>,
1057    /// Flipped by `close(self)` to suppress the Drop-path auto-close.
1058    closed: bool,
1059}
1060
1061impl AsyncPreparedStatement {
1062    /// Returns the statement name.
1063    #[must_use]
1064    pub fn name(&self) -> &str {
1065        &self.name
1066    }
1067
1068    /// Returns the original query.
1069    #[must_use]
1070    pub fn query(&self) -> &str {
1071        &self.query
1072    }
1073
1074    /// Returns the parameter types.
1075    #[must_use]
1076    pub fn param_types(&self) -> &[crate::types::Oid] {
1077        &self.param_types
1078    }
1079
1080    /// Returns the number of parameters.
1081    #[must_use]
1082    pub fn param_count(&self) -> usize {
1083        self.param_types.len()
1084    }
1085
1086    /// Returns the result column descriptions.
1087    #[must_use]
1088    pub fn columns(&self) -> &[super::statement::Column] {
1089        &self.columns
1090    }
1091
1092    /// Returns the number of result columns.
1093    #[must_use]
1094    pub fn column_count(&self) -> usize {
1095        self.columns.len()
1096    }
1097
1098    /// Explicitly closes the prepared statement on the server (async).
1099    ///
1100    /// Consumes the statement — no further `execute_prepared` /
1101    /// `execute_prepared_streaming` calls are possible — and suppresses
1102    /// the Drop-path auto-close. Returns the `close_statement` result so
1103    /// callers can observe any transport errors.
1104    ///
1105    /// If you don't need error propagation, simply dropping the
1106    /// statement has the same effect (best-effort auto-close via
1107    /// `tokio::spawn`).
1108    ///
1109    /// # Errors
1110    ///
1111    /// Propagates any error from
1112    /// [`AsyncClient::close_statement`] — unhealthy connection,
1113    /// server-side error during `Close`/`Sync`, or transport failure.
1114    pub async fn close(mut self, client: &AsyncClient) -> Result<()> {
1115        self.closed = true;
1116        client.close_statement(&self).await
1117    }
1118}
1119
1120impl Drop for AsyncPreparedStatement {
1121    fn drop(&mut self) {
1122        // Explicit close already ran — nothing to do.
1123        if self.closed {
1124            return;
1125        }
1126
1127        // Best-effort close. Try to grab a tokio handle; if we're being
1128        // dropped outside a runtime (e.g. the runtime has already shut
1129        // down or the caller never had one), fall back to a warning and
1130        // flag the connection desynchronized so the next operation fails
1131        // loudly rather than racing with a lingering statement.
1132        let Some(conn) = self.connection.upgrade() else {
1133            // Connection has already been dropped — nothing to close.
1134            return;
1135        };
1136
1137        let name = std::mem::take(&mut self.name);
1138
1139        if let Ok(handle) = tokio::runtime::Handle::try_current() {
1140            handle.spawn(async move {
1141                let mut c = conn.lock().await;
1142                if let Err(e) = c.close_statement(&name).await {
1143                    warn!(
1144                        target: "hyperdb_api_core::client",
1145                        statement = %name,
1146                        error = %e,
1147                        "AsyncPreparedStatement drop-close failed (best-effort, swallowed)"
1148                    );
1149                }
1150            });
1151        } else {
1152            // No runtime — can't do async I/O from here. Mark the
1153            // connection desynchronized so the next caller gets a
1154            // clear error instead of a latent leaked statement.
1155            if let Ok(mut c) = conn.try_lock() {
1156                c.mark_desynchronized();
1157            }
1158            warn!(
1159                target: "hyperdb_api_core::client",
1160                statement = %name,
1161                "AsyncPreparedStatement dropped outside of a tokio runtime; \
1162                 server-side statement slot leaked and connection marked \
1163                 desynchronized — call statement.close(&client) explicitly \
1164                 for deterministic cleanup"
1165            );
1166        }
1167    }
1168}
1169
1170/// Async COPY IN writer for bulk data insertion.
1171///
1172/// # Drop Safety
1173///
1174/// If this writer is dropped without calling [`finish()`](Self::finish) or
1175/// [`cancel()`](Self::cancel), it will attempt a best-effort synchronous cancel
1176/// by queuing a `CopyFail` message in the connection's write buffer. The next
1177/// async operation on the connection will flush and drain the cancel response,
1178/// restoring the connection to a usable state.
1179#[derive(Debug)]
1180pub struct AsyncCopyInWriter<'a> {
1181    connection: &'a Mutex<AsyncRawConnection<AsyncStream>>,
1182    /// Set to `true` after `finish()` or `cancel()` consumes the writer.
1183    /// Checked in `Drop` to avoid queuing a spurious `CopyFail`.
1184    finished: bool,
1185}
1186
1187/// Owned-handle variant of [`AsyncCopyInWriter`] that holds an
1188/// `Arc<Mutex<_>>` to the underlying connection instead of a borrow.
1189/// Used by callers that need a `'static`-lifetime writer — e.g. N-API
1190/// classes that can't carry borrowed references across JS callbacks.
1191///
1192/// Semantics are identical to [`AsyncCopyInWriter`]; the only
1193/// difference is lifetime.
1194#[derive(Debug)]
1195pub struct AsyncCopyInWriterOwned {
1196    connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
1197    finished: bool,
1198}
1199
1200impl AsyncCopyInWriterOwned {
1201    /// Creates a new owned-handle COPY IN writer.
1202    pub(crate) fn new(connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>) -> Self {
1203        AsyncCopyInWriterOwned {
1204            connection,
1205            finished: false,
1206        }
1207    }
1208
1209    /// Sends data to the server.
1210    ///
1211    /// # Errors
1212    ///
1213    /// Currently infallible — frame construction is pure. The `Result`
1214    /// return type is preserved for forward compatibility.
1215    pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1216        let mut conn = self.connection.lock().await;
1217        conn.send_copy_data(data)?;
1218        Ok(())
1219    }
1220
1221    /// Flushes any buffered data to the server.
1222    ///
1223    /// # Errors
1224    ///
1225    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1226    pub async fn flush(&mut self) -> Result<()> {
1227        let mut conn = self.connection.lock().await;
1228        conn.flush().await
1229    }
1230
1231    /// Sends COPY data directly to the stream without internal buffering.
1232    ///
1233    /// # Errors
1234    ///
1235    /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1236    ///   `u32::MAX`.
1237    /// - Returns [`Error`] (I/O) on transport write failure.
1238    pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1239        let mut conn = self.connection.lock().await;
1240        conn.send_copy_data_direct(data).await
1241    }
1242
1243    /// Flushes the TCP stream.
1244    ///
1245    /// # Errors
1246    ///
1247    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1248    pub async fn flush_stream(&mut self) -> Result<()> {
1249        let mut conn = self.connection.lock().await;
1250        conn.flush_stream().await
1251    }
1252
1253    /// Finishes the COPY operation and returns the number of rows inserted.
1254    ///
1255    /// # Errors
1256    ///
1257    /// Returns [`Error`] (server) if the server reports an `ErrorResponse`
1258    /// (e.g. constraint violation), or [`Error`] (I/O) / [`Error`] (closed)
1259    /// on transport failure.
1260    pub async fn finish(mut self) -> Result<u64> {
1261        self.finished = true;
1262        let mut conn = self.connection.lock().await;
1263        conn.finish_copy().await
1264    }
1265
1266    /// Cancels the COPY operation.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns [`Error`] (I/O) on transport write failure, or
1271    /// [`Error`] (closed) if the server drops the connection before
1272    /// acknowledging the cancel.
1273    pub async fn cancel(mut self, reason: &str) -> Result<()> {
1274        self.finished = true;
1275        let mut conn = self.connection.lock().await;
1276        conn.cancel_copy(reason).await
1277    }
1278}
1279
1280impl Drop for AsyncCopyInWriterOwned {
1281    fn drop(&mut self) {
1282        if self.finished {
1283            return;
1284        }
1285        // Mirror AsyncCopyInWriter's Drop — queue a CopyFail via
1286        // try_lock; the next async operation on the connection will
1287        // flush and drain.
1288        if let Ok(mut conn) = self.connection.try_lock() {
1289            conn.queue_copy_fail("AsyncCopyInWriterOwned dropped without finish/cancel");
1290        }
1291    }
1292}
1293
1294impl<'a> AsyncCopyInWriter<'a> {
1295    /// Creates a new COPY IN writer.
1296    pub(crate) fn new(connection: &'a Mutex<AsyncRawConnection<AsyncStream>>) -> Self {
1297        AsyncCopyInWriter {
1298            connection,
1299            finished: false,
1300        }
1301    }
1302
1303    /// Sends data to the server.
1304    ///
1305    /// # Errors
1306    ///
1307    /// Currently infallible — frame construction is pure. The `Result`
1308    /// return type is preserved for forward compatibility.
1309    pub async fn send(&mut self, data: &[u8]) -> Result<()> {
1310        let mut conn = self.connection.lock().await;
1311        conn.send_copy_data(data)?;
1312        Ok(())
1313    }
1314
1315    /// Flushes any buffered data to the server.
1316    ///
1317    /// # Errors
1318    ///
1319    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1320    pub async fn flush(&mut self) -> Result<()> {
1321        let mut conn = self.connection.lock().await;
1322        conn.flush().await
1323    }
1324
1325    /// Sends COPY data directly to the stream without internal buffering.
1326    ///
1327    /// This writes data directly to the TCP stream, letting the kernel handle
1328    /// buffering. More efficient for streaming large amounts of data.
1329    /// Call `flush_stream()` periodically to ensure data is sent.
1330    ///
1331    /// # Errors
1332    ///
1333    /// - Returns [`Error`] (protocol) if `data.len() + 4` exceeds
1334    ///   `u32::MAX`.
1335    /// - Returns [`Error`] (I/O) on transport write failure.
1336    pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
1337        let mut conn = self.connection.lock().await;
1338        conn.send_copy_data_direct(data).await
1339    }
1340
1341    /// Flushes the TCP stream.
1342    ///
1343    /// Use with `send_direct()` to periodically ensure data reaches the server.
1344    ///
1345    /// # Errors
1346    ///
1347    /// Returns [`Error`] (I/O) if flushing the async transport fails.
1348    pub async fn flush_stream(&mut self) -> Result<()> {
1349        let mut conn = self.connection.lock().await;
1350        conn.flush_stream().await
1351    }
1352
1353    /// Finishes the COPY operation and returns the number of rows inserted.
1354    ///
1355    /// # Errors
1356    ///
1357    /// Returns [`Error`] (server) if the server reports an `ErrorResponse`,
1358    /// or [`Error`] (I/O) / [`Error`] (closed) on transport failure.
1359    pub async fn finish(mut self) -> Result<u64> {
1360        self.finished = true;
1361        let mut conn = self.connection.lock().await;
1362        conn.finish_copy().await
1363    }
1364
1365    /// Cancels the COPY operation.
1366    ///
1367    /// # Errors
1368    ///
1369    /// Returns [`Error`] (I/O) on transport write failure, or
1370    /// [`Error`] (closed) if the server drops the connection before
1371    /// acknowledging the cancel.
1372    pub async fn cancel(mut self, reason: &str) -> Result<()> {
1373        self.finished = true;
1374        let mut conn = self.connection.lock().await;
1375        conn.cancel_copy(reason).await
1376    }
1377}
1378
1379impl Drop for AsyncCopyInWriter<'_> {
1380    fn drop(&mut self) {
1381        if self.finished {
1382            return;
1383        }
1384        // Best-effort cancel: try to acquire the mutex synchronously and
1385        // queue a CopyFail message. We cannot do async I/O from Drop, so
1386        // the message is only written to the buffer — not flushed. The next
1387        // async operation will call drain_pending_copy_cancel() to flush it
1388        // and restore the connection to ReadyForQuery state.
1389        if let Ok(mut conn) = self.connection.try_lock() {
1390            conn.queue_copy_fail("COPY writer dropped without finish or cancel");
1391            warn!(
1392                target: "hyperdb_api_core::client",
1393                "AsyncCopyInWriter dropped without finish() or cancel(). \
1394                 Queued best-effort CopyFail — connection will self-heal on next operation."
1395            );
1396        } else {
1397            warn!(
1398                target: "hyperdb_api_core::client",
1399                "AsyncCopyInWriter dropped without finish() or cancel(), \
1400                 and the connection mutex was locked. The connection may be \
1401                 left in an unusable COPY-IN state."
1402            );
1403        }
1404    }
1405}