use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
#[cfg(unix)]
use tokio::net::UnixStream;
use super::async_connection::AsyncRawConnection;
use super::async_stream::AsyncStream;
use super::async_stream_query::AsyncQueryStream;
use super::cancel::Cancellable;
use super::config::Config;
use super::endpoint::ConnectionEndpoint;
use super::error::{Error, Result};
use super::notice::{Notice, NoticeReceiver};
use super::row::{Row, StreamRow};
use crate::protocol::message::Message;
pub struct AsyncClient {
connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
process_id: i32,
secret_key: i32,
endpoint: ConnectionEndpoint,
notice_receiver: Option<Arc<NoticeReceiver>>,
}
impl std::fmt::Debug for AsyncClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncClient")
.field("process_id", &self.process_id)
.field("secret_key", &self.secret_key)
.field("endpoint", &self.endpoint)
.field(
"notice_receiver",
&self.notice_receiver.as_ref().map(|_| "<callback>"),
)
.finish_non_exhaustive()
}
}
impl AsyncClient {
pub async fn connect(config: &Config) -> Result<Self> {
info!(
target: "hyperdb_api",
host = %config.host(),
port = config.port(),
user = config.user().unwrap_or("(default)"),
database = config.database().unwrap_or("(none)"),
"connection-parameters"
);
let endpoint = ConnectionEndpoint::tcp(config.host(), config.port());
let addr = format!("{}:{}", config.host(), config.port());
let tcp_stream = TcpStream::connect(&addr).await.map_err(|e| {
warn!(target: "hyperdb_api", %addr, error = %e, "connection-failed");
Error::connection(format!("failed to connect to {addr}: {e}"))
})?;
tcp_stream.set_nodelay(true).ok();
let sock = socket2::SockRef::from(&tcp_stream);
sock.set_recv_buffer_size(4 * 1024 * 1024).ok();
sock.set_send_buffer_size(4 * 1024 * 1024).ok();
let stream = AsyncStream::tcp(tcp_stream);
let mut connection = AsyncRawConnection::new(stream);
let params = config.startup_params();
let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
connection.startup(¶ms_ref, config.password()).await?;
let process_id = connection.process_id();
let secret_key = connection.secret_key();
debug!(
target: "hyperdb_api",
process_id,
"connection-established"
);
Ok(AsyncClient {
connection: Arc::new(Mutex::new(connection)),
process_id,
secret_key,
endpoint,
notice_receiver: None,
})
}
#[cfg(unix)]
pub async fn connect_unix(
socket_path: impl AsRef<std::path::Path>,
config: &Config,
) -> Result<Self> {
use std::path::Path;
let path = socket_path.as_ref();
info!(
target: "hyperdb_api",
socket_path = %path.display(),
user = config.user().unwrap_or("(default)"),
database = config.database().unwrap_or("(none)"),
"connection-parameters-unix"
);
let unix_stream = UnixStream::connect(path).await.map_err(|e| {
warn!(target: "hyperdb_api", socket_path = %path.display(), error = %e, "connection-failed");
Error::connection(format!("failed to connect to unix socket {}: {}", path.display(), e))
})?;
let directory = path.parent().unwrap_or(Path::new("/"));
let name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("socket");
let endpoint = ConnectionEndpoint::domain_socket(directory, name);
let stream = AsyncStream::unix(unix_stream);
let mut connection = AsyncRawConnection::new(stream);
let params = config.startup_params();
let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
connection.startup(¶ms_ref, config.password()).await?;
let process_id = connection.process_id();
let secret_key = connection.secret_key();
debug!(
target: "hyperdb_api",
process_id,
"connection-established-unix"
);
Ok(AsyncClient {
connection: Arc::new(Mutex::new(connection)),
process_id,
secret_key,
endpoint,
notice_receiver: None,
})
}
#[cfg(windows)]
pub async fn connect_named_pipe(pipe_path: &str, config: &Config) -> Result<Self> {
use std::time::{Duration, Instant};
use tokio::net::windows::named_pipe::ClientOptions;
info!(
target: "hyperdb_api",
pipe_path = %pipe_path,
user = config.user().unwrap_or("(default)"),
database = config.database().unwrap_or("(none)"),
"connection-parameters-named-pipe"
);
const RETRY_INTERVAL: Duration = Duration::from_millis(20);
const MAX_WAIT: Duration = Duration::from_secs(10);
const ERROR_PIPE_BUSY: i32 = 231;
let deadline = Instant::now() + MAX_WAIT;
let client = loop {
match ClientOptions::new().open(pipe_path) {
Ok(c) => break c,
Err(e)
if e.raw_os_error() == Some(ERROR_PIPE_BUSY) && Instant::now() < deadline =>
{
tokio::time::sleep(RETRY_INTERVAL).await;
}
Err(e) => {
warn!(target: "hyperdb_api", pipe_path = %pipe_path, error = %e, "connection-failed");
return Err(Error::connection(format!(
"failed to connect to named pipe {pipe_path}: {e}"
)));
}
}
};
let endpoint = ConnectionEndpoint::parse(&format!(
"tab.pipe://{}",
pipe_path.trim_start_matches(r"\\").replace('\\', "/")
))
.unwrap_or_else(|_| {
let parts: Vec<&str> = pipe_path
.trim_start_matches(r"\\")
.splitn(3, '\\')
.collect();
if parts.len() >= 3 {
ConnectionEndpoint::named_pipe(parts[0], parts[2])
} else {
ConnectionEndpoint::named_pipe(".", pipe_path)
}
});
let stream = AsyncStream::named_pipe(client);
let mut connection = AsyncRawConnection::new(stream);
let params = config.startup_params();
let params_ref: Vec<(&str, &str)> = params.iter().map(|(k, v)| (*k, *v)).collect();
connection.startup(¶ms_ref, config.password()).await?;
let process_id = connection.process_id();
let secret_key = connection.secret_key();
debug!(
target: "hyperdb_api",
process_id,
"connection-established-named-pipe"
);
Ok(AsyncClient {
connection: Arc::new(Mutex::new(connection)),
process_id,
secret_key,
endpoint,
notice_receiver: None,
})
}
pub async fn connect_endpoint(endpoint: &ConnectionEndpoint, config: &Config) -> Result<Self> {
match endpoint {
ConnectionEndpoint::Tcp { host, port } => {
let mut cfg = config.clone();
cfg = cfg.with_host(host.clone()).with_port(*port);
Self::connect(&cfg).await
}
#[cfg(unix)]
ConnectionEndpoint::DomainSocket { directory, name } => {
let socket_path = directory.join(name);
Self::connect_unix(&socket_path, config).await
}
#[cfg(windows)]
ConnectionEndpoint::NamedPipe { host, name } => {
let pipe_path = format!(r"\\{host}\pipe\{name}");
Self::connect_named_pipe(&pipe_path, config).await
}
}
}
#[must_use]
pub fn endpoint(&self) -> &ConnectionEndpoint {
&self.endpoint
}
#[must_use]
pub fn process_id(&self) -> i32 {
self.process_id
}
#[must_use]
pub fn secret_key(&self) -> i32 {
self.secret_key
}
pub async fn cancel(&self) -> Result<()> {
use crate::protocol::message::frontend;
use bytes::BytesMut;
use tokio::io::AsyncWriteExt;
info!(
target: "hyperdb_api",
process_id = self.process_id,
"query-cancel-request"
);
let endpoint_str = self.endpoint.to_string();
match &self.endpoint {
ConnectionEndpoint::Tcp { host, port } => {
let addr = format!("{host}:{port}");
let mut stream = TcpStream::connect(&addr).await.map_err(|e| {
warn!(
target: "hyperdb_api",
addr = %endpoint_str,
error = %e,
"query-cancel-connect-failed"
);
Error::connection(format!(
"failed to connect for cancel request to {endpoint_str}: {e}"
))
})?;
stream.set_nodelay(true).ok();
let mut buf = BytesMut::new();
frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
stream.write_all(&buf).await.map_err(|e| {
warn!(
target: "hyperdb_api",
error = %e,
"query-cancel-send-failed"
);
Error::io(e)
})?;
}
#[cfg(unix)]
ConnectionEndpoint::DomainSocket { directory, name } => {
let socket_path = directory.join(name);
let mut stream = UnixStream::connect(&socket_path).await.map_err(|e| {
warn!(
target: "hyperdb_api",
addr = %endpoint_str,
error = %e,
"query-cancel-connect-failed"
);
Error::connection(format!(
"failed to connect for cancel request to {endpoint_str}: {e}"
))
})?;
let mut buf = BytesMut::new();
frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
stream.write_all(&buf).await.map_err(|e| {
warn!(
target: "hyperdb_api",
error = %e,
"query-cancel-send-failed"
);
Error::io(e)
})?;
}
#[cfg(windows)]
ConnectionEndpoint::NamedPipe { host, name } => {
let pipe_path = format!(r"\\{host}\pipe\{name}");
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&pipe_path)
.map_err(|e| {
warn!(
target: "hyperdb_api",
addr = %endpoint_str,
error = %e,
"query-cancel-connect-failed"
);
Error::connection(format!(
"failed to connect for cancel request to {endpoint_str}: {e}"
))
})?;
let mut buf = BytesMut::new();
frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
use std::io::Write;
file.write_all(&buf).map_err(|e| {
warn!(
target: "hyperdb_api",
error = %e,
"query-cancel-send-failed"
);
Error::io(e)
})?;
file.flush().map_err(Error::io)?;
}
}
debug!(target: "hyperdb_api", "query-cancel-sent");
Ok(())
}
pub async fn query(&self, sql: &str) -> Result<Vec<Row>> {
let mut conn = self.connection.lock().await;
let messages = conn.simple_query(sql).await?;
Self::process_query_messages(messages, self.notice_receiver.as_ref())
}
pub async fn query_fast(&self, sql: &str) -> Result<Vec<StreamRow>> {
let mut conn = self.connection.lock().await;
let messages = conn.query_binary(sql).await?;
Ok(Self::process_binary_messages(
messages,
self.notice_receiver.as_ref(),
))
}
pub async fn query_streaming(
&self,
sql: &str,
chunk_size: usize,
) -> Result<AsyncQueryStream<'_>> {
let mut conn = self.connection.lock().await;
conn.start_query_binary(sql).await?;
Ok(AsyncQueryStream::new(conn, self, chunk_size))
}
fn cancel_sync(&self) -> Result<()> {
use crate::protocol::message::frontend;
use bytes::BytesMut;
use std::io::Write;
info!(
target: "hyperdb_api",
process_id = self.process_id,
"query-cancel-request"
);
let endpoint_str = self.endpoint.to_string();
match &self.endpoint {
ConnectionEndpoint::Tcp { host, port } => {
let addr = format!("{host}:{port}");
let mut stream = std::net::TcpStream::connect(&addr).map_err(|e| {
warn!(
target: "hyperdb_api",
addr = %endpoint_str,
error = %e,
"query-cancel-connect-failed"
);
Error::connection(format!(
"failed to connect for cancel request to {endpoint_str}: {e}"
))
})?;
stream.set_nodelay(true).ok();
let mut buf = BytesMut::with_capacity(16);
frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
stream.write_all(&buf).map_err(Error::io)?;
stream.flush().map_err(Error::io)?;
}
#[cfg(unix)]
ConnectionEndpoint::DomainSocket { directory, name } => {
let socket_path = directory.join(name);
let mut stream =
std::os::unix::net::UnixStream::connect(&socket_path).map_err(|e| {
warn!(
target: "hyperdb_api",
addr = %endpoint_str,
error = %e,
"query-cancel-connect-failed"
);
Error::connection(format!(
"failed to connect for cancel request to {endpoint_str}: {e}"
))
})?;
let mut buf = BytesMut::with_capacity(16);
frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
stream.write_all(&buf).map_err(Error::io)?;
stream.flush().map_err(Error::io)?;
}
#[cfg(windows)]
ConnectionEndpoint::NamedPipe { host, name } => {
let pipe_path = format!(r"\\{host}\pipe\{name}");
let mut file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&pipe_path)
.map_err(|e| {
warn!(
target: "hyperdb_api",
addr = %endpoint_str,
error = %e,
"query-cancel-connect-failed"
);
Error::connection(format!(
"failed to connect for cancel request to {endpoint_str}: {e}"
))
})?;
let mut buf = BytesMut::with_capacity(16);
frontend::cancel_request(self.process_id, self.secret_key, &mut buf);
file.write_all(&buf).map_err(Error::io)?;
file.flush().map_err(Error::io)?;
}
}
debug!(target: "hyperdb_api", "query-cancel-sent");
Ok(())
}
pub async fn exec(&self, sql: &str) -> Result<u64> {
let mut conn = self.connection.lock().await;
let messages = conn.simple_query(sql).await?;
Ok(Self::extract_row_count(&messages))
}
pub async fn parameter_status(&self, name: &str) -> Option<String> {
let conn = self.connection.lock().await;
conn.parameter_status(name)
.map(std::string::ToString::to_string)
}
pub fn set_notice_receiver(&mut self, receiver: Option<Box<dyn Fn(Notice) + Send + Sync>>) {
self.notice_receiver = receiver.map(Arc::from);
}
pub async fn close(self) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.terminate().await
}
pub async fn batch_execute(&self, sql: &str) -> Result<()> {
let mut conn = self.connection.lock().await;
let _messages = conn.simple_query(sql).await?;
Ok(())
}
pub async fn copy_in(
&self,
table_name: &str,
columns: &[&str],
) -> Result<AsyncCopyInWriter<'_>> {
self.copy_in_with_format(table_name, columns, "HYPERBINARY")
.await
}
pub async fn copy_in_arc_with_format(
&self,
table_name: &str,
columns: &[&str],
format: &str,
) -> Result<AsyncCopyInWriterOwned> {
let mut conn = self.connection.lock().await;
conn.start_copy_in_with_format(table_name, columns, format)
.await?;
drop(conn);
Ok(AsyncCopyInWriterOwned::new(Arc::clone(&self.connection)))
}
pub async fn copy_in_with_format(
&self,
table_name: &str,
columns: &[&str],
format: &str,
) -> Result<AsyncCopyInWriter<'_>> {
let mut conn = self.connection.lock().await;
conn.start_copy_in_with_format(table_name, columns, format)
.await?;
drop(conn);
Ok(AsyncCopyInWriter::new(&self.connection))
}
pub async fn copy_out(&self, query: &str) -> Result<Vec<u8>> {
let mut conn = self.connection.lock().await;
conn.copy_out(query).await
}
#[must_use]
pub fn is_alive(&self) -> bool {
self.connection.try_lock().is_ok()
}
pub async fn prepare(&self, query: &str) -> Result<AsyncPreparedStatement> {
self.prepare_typed(query, &[]).await
}
pub async fn prepare_typed(
&self,
query: &str,
param_types: &[crate::types::Oid],
) -> Result<AsyncPreparedStatement> {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let name = format!(
"__hyper_async_stmt_{}",
COUNTER.fetch_add(1, Ordering::Relaxed)
);
let mut conn = self.connection.lock().await;
let (params, columns) = conn.prepare(&name, query, param_types).await?;
Ok(AsyncPreparedStatement {
name,
query: query.to_string(),
param_types: params,
columns,
connection: Arc::downgrade(&self.connection),
closed: false,
})
}
pub async fn close_statement(&self, statement: &AsyncPreparedStatement) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.close_statement(&statement.name).await
}
pub async fn execute_prepared<P: AsRef<[Option<Vec<u8>>]>>(
&self,
statement: &AsyncPreparedStatement,
params: P,
) -> Result<Vec<Row>> {
let params_ref: Vec<Option<&[u8]>> = params
.as_ref()
.iter()
.map(|p| p.as_ref().map(std::vec::Vec::as_slice))
.collect();
let mut conn = self.connection.lock().await;
conn.execute_prepared(&statement.name, ¶ms_ref, statement.columns.len())
.await
}
pub async fn execute_prepared_no_result<P: AsRef<[Option<Vec<u8>>]>>(
&self,
statement: &AsyncPreparedStatement,
params: P,
) -> Result<u64> {
let params_ref: Vec<Option<&[u8]>> = params
.as_ref()
.iter()
.map(|p| p.as_ref().map(std::vec::Vec::as_slice))
.collect();
let mut conn = self.connection.lock().await;
conn.execute_prepared_no_result(&statement.name, ¶ms_ref)
.await
}
pub async fn execute_prepared_streaming<'a, P: AsRef<[Option<Vec<u8>>]>>(
&'a self,
statement: &AsyncPreparedStatement,
params: P,
chunk_size: usize,
) -> Result<super::async_prepared_stream::AsyncPreparedQueryStream<'a>> {
let params_ref: Vec<Option<&[u8]>> = params
.as_ref()
.iter()
.map(|p| p.as_ref().map(std::vec::Vec::as_slice))
.collect();
let mut conn = self.connection.lock().await;
conn.start_execute_prepared(&statement.name, ¶ms_ref, statement.columns.len())
.await?;
let columns = std::sync::Arc::new(statement.columns.clone());
Ok(super::async_prepared_stream::AsyncPreparedQueryStream::new(
conn, self, chunk_size, columns,
))
}
}
impl Cancellable for AsyncClient {
fn cancel(&self) {
if let Err(e) = AsyncClient::cancel_sync(self) {
warn!(
target: "hyperdb_api_core::client",
error = %e,
process_id = self.process_id,
"cancel request failed (best-effort, swallowed)",
);
}
}
}
impl AsyncClient {
fn process_query_messages(
messages: Vec<Message>,
notice_receiver: Option<&Arc<NoticeReceiver>>,
) -> Result<Vec<Row>> {
use super::statement::{Column, ColumnFormat};
let mut rows = Vec::new();
let mut columns: Option<Arc<Vec<Column>>> = None;
for msg in messages {
match msg {
Message::RowDescription(desc) => {
let mut cols = Vec::new();
for field in desc.fields().filter_map(std::result::Result::ok) {
cols.push(Column::new(
field.name().to_string(),
field.type_oid(),
field.type_modifier(),
ColumnFormat::from_code(field.format()),
));
}
columns = Some(Arc::new(cols));
}
Message::DataRow(data) => {
if let Some(ref cols) = columns {
rows.push(Row::new(Arc::clone(cols), data)?);
}
}
Message::NoticeResponse(body) => {
if let Some(receiver) = notice_receiver {
let notice = Notice::from_response_body(&body);
receiver(notice);
}
}
_ => {}
}
}
Ok(rows)
}
fn process_binary_messages(
messages: Vec<Message>,
notice_receiver: Option<&Arc<NoticeReceiver>>,
) -> Vec<StreamRow> {
let mut rows = Vec::new();
for msg in messages {
match msg {
Message::DataRow(data) => {
rows.push(StreamRow::new(data));
}
Message::NoticeResponse(body) => {
if let Some(receiver) = notice_receiver {
let notice = Notice::from_response_body(&body);
receiver(notice);
}
}
_ => {}
}
}
rows
}
fn extract_row_count(messages: &[Message]) -> u64 {
for msg in messages {
if let Message::CommandComplete(body) = msg {
if let Ok(tag) = body.tag() {
let parts: Vec<&str> = tag.split_whitespace().collect();
if let Some(last) = parts.last() {
if let Ok(count) = last.parse() {
return count;
}
}
}
}
}
0
}
}
#[derive(Debug)]
pub struct AsyncPreparedStatement {
pub(crate) name: String,
query: String,
param_types: Vec<crate::types::Oid>,
pub(crate) columns: Vec<super::statement::Column>,
connection: std::sync::Weak<Mutex<AsyncRawConnection<AsyncStream>>>,
closed: bool,
}
impl AsyncPreparedStatement {
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn query(&self) -> &str {
&self.query
}
#[must_use]
pub fn param_types(&self) -> &[crate::types::Oid] {
&self.param_types
}
#[must_use]
pub fn param_count(&self) -> usize {
self.param_types.len()
}
#[must_use]
pub fn columns(&self) -> &[super::statement::Column] {
&self.columns
}
#[must_use]
pub fn column_count(&self) -> usize {
self.columns.len()
}
pub async fn close(mut self, client: &AsyncClient) -> Result<()> {
self.closed = true;
client.close_statement(&self).await
}
}
impl Drop for AsyncPreparedStatement {
fn drop(&mut self) {
if self.closed {
return;
}
let Some(conn) = self.connection.upgrade() else {
return;
};
let name = std::mem::take(&mut self.name);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let mut c = conn.lock().await;
if let Err(e) = c.close_statement(&name).await {
warn!(
target: "hyperdb_api_core::client",
statement = %name,
error = %e,
"AsyncPreparedStatement drop-close failed (best-effort, swallowed)"
);
}
});
} else {
if let Ok(mut c) = conn.try_lock() {
c.mark_desynchronized();
}
warn!(
target: "hyperdb_api_core::client",
statement = %name,
"AsyncPreparedStatement dropped outside of a tokio runtime; \
server-side statement slot leaked and connection marked \
desynchronized — call statement.close(&client) explicitly \
for deterministic cleanup"
);
}
}
}
#[derive(Debug)]
pub struct AsyncCopyInWriter<'a> {
connection: &'a Mutex<AsyncRawConnection<AsyncStream>>,
finished: bool,
}
#[derive(Debug)]
pub struct AsyncCopyInWriterOwned {
connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>,
finished: bool,
}
impl AsyncCopyInWriterOwned {
pub(crate) fn new(connection: Arc<Mutex<AsyncRawConnection<AsyncStream>>>) -> Self {
AsyncCopyInWriterOwned {
connection,
finished: false,
}
}
pub async fn send(&mut self, data: &[u8]) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.send_copy_data(data)?;
Ok(())
}
pub async fn flush(&mut self) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.flush().await
}
pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.send_copy_data_direct(data).await
}
pub async fn flush_stream(&mut self) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.flush_stream().await
}
pub async fn finish(mut self) -> Result<u64> {
self.finished = true;
let mut conn = self.connection.lock().await;
conn.finish_copy().await
}
pub async fn cancel(mut self, reason: &str) -> Result<()> {
self.finished = true;
let mut conn = self.connection.lock().await;
conn.cancel_copy(reason).await
}
}
impl Drop for AsyncCopyInWriterOwned {
fn drop(&mut self) {
if self.finished {
return;
}
if let Ok(mut conn) = self.connection.try_lock() {
conn.queue_copy_fail("AsyncCopyInWriterOwned dropped without finish/cancel");
}
}
}
impl<'a> AsyncCopyInWriter<'a> {
pub(crate) fn new(connection: &'a Mutex<AsyncRawConnection<AsyncStream>>) -> Self {
AsyncCopyInWriter {
connection,
finished: false,
}
}
pub async fn send(&mut self, data: &[u8]) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.send_copy_data(data)?;
Ok(())
}
pub async fn flush(&mut self) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.flush().await
}
pub async fn send_direct(&mut self, data: &[u8]) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.send_copy_data_direct(data).await
}
pub async fn flush_stream(&mut self) -> Result<()> {
let mut conn = self.connection.lock().await;
conn.flush_stream().await
}
pub async fn finish(mut self) -> Result<u64> {
self.finished = true;
let mut conn = self.connection.lock().await;
conn.finish_copy().await
}
pub async fn cancel(mut self, reason: &str) -> Result<()> {
self.finished = true;
let mut conn = self.connection.lock().await;
conn.cancel_copy(reason).await
}
}
impl Drop for AsyncCopyInWriter<'_> {
fn drop(&mut self) {
if self.finished {
return;
}
if let Ok(mut conn) = self.connection.try_lock() {
conn.queue_copy_fail("COPY writer dropped without finish or cancel");
warn!(
target: "hyperdb_api_core::client",
"AsyncCopyInWriter dropped without finish() or cancel(). \
Queued best-effort CopyFail — connection will self-heal on next operation."
);
} else {
warn!(
target: "hyperdb_api_core::client",
"AsyncCopyInWriter dropped without finish() or cancel(), \
and the connection mutex was locked. The connection may be \
left in an unusable COPY-IN state."
);
}
}
}