zero_mysql/tokio/
conn.rs

1use tokio::net::TcpStream;
2#[cfg(unix)]
3use tokio::net::UnixStream;
4use tracing::instrument;
5use zerocopy::{FromBytes, FromZeros, IntoBytes};
6
7use crate::PreparedStatement;
8use crate::buffer::BufferSet;
9use crate::buffer_pool::PooledBufferSet;
10use crate::constant::CapabilityFlags;
11use crate::error::{Error, Result};
12use crate::protocol::TextRowPayload;
13use crate::protocol::command::Action;
14use crate::protocol::command::ColumnDefinition;
15use crate::protocol::command::bulk_exec::{BulkExec, BulkFlags, BulkParamsSet, write_bulk_execute};
16use crate::protocol::command::prepared::{Exec, read_prepare_ok, write_execute, write_prepare};
17use crate::protocol::command::query::{Query, write_query};
18use crate::protocol::command::utility::{
19    DropHandler, FirstHandler, write_ping, write_reset_connection,
20};
21use crate::protocol::connection::{Handshake, HandshakeAction, InitialHandshake};
22use crate::protocol::packet::PacketHeader;
23use crate::protocol::primitive::read_string_lenenc;
24use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
25use crate::protocol::r#trait::{BinaryResultSetHandler, TextResultSetHandler, param::Params};
26
27use super::stream::Stream;
28
29pub struct Conn {
30    stream: Stream,
31    buffer_set: PooledBufferSet,
32    initial_handshake: InitialHandshake,
33    capability_flags: CapabilityFlags,
34    mariadb_capabilities: crate::constant::MariadbCapabilityFlags,
35    in_transaction: bool,
36    is_broken: bool,
37}
38
39impl Conn {
40    /// Create a new MySQL connection from connection options (async)
41    pub async fn new<O: TryInto<crate::opts::Opts>>(opts: O) -> Result<Self>
42    where
43        Error: From<O::Error>,
44    {
45        let opts: crate::opts::Opts = opts.try_into()?;
46
47        #[cfg(unix)]
48        let stream = if let Some(socket_path) = &opts.socket {
49            let stream = UnixStream::connect(socket_path).await?;
50            Stream::unix(stream)
51        } else {
52            if opts.host.is_empty() {
53                return Err(Error::BadUsageError(
54                    "Missing host in connection options".to_string(),
55                ));
56            }
57            let addr = format!("{}:{}", opts.host, opts.port);
58            let stream = TcpStream::connect(&addr).await?;
59            stream.set_nodelay(opts.tcp_nodelay)?;
60            Stream::tcp(stream)
61        };
62
63        #[cfg(not(unix))]
64        let stream = {
65            if opts.socket.is_some() {
66                return Err(Error::BadUsageError(
67                    "Unix sockets are not supported on this platform".to_string(),
68                ));
69            }
70            if opts.host.is_empty() {
71                return Err(Error::BadUsageError(
72                    "Missing host in connection options".to_string(),
73                ));
74            }
75            let addr = format!("{}:{}", opts.host, opts.port);
76            let stream = TcpStream::connect(&addr).await?;
77            stream.set_nodelay(opts.tcp_nodelay)?;
78            Stream::tcp(stream)
79        };
80
81        Self::new_with_stream(stream, &opts).await
82    }
83
84    /// Create a new MySQL connection with an existing stream (async)
85    pub async fn new_with_stream(stream: Stream, opts: &crate::opts::Opts) -> Result<Self> {
86        let mut conn_stream = stream;
87        let mut buffer_set = opts.buffer_pool.get_buffer_set();
88
89        #[cfg(feature = "tokio-tls")]
90        let host = opts.host.clone();
91
92        let mut handshake = Handshake::new(opts);
93
94        loop {
95            match handshake.step(&mut buffer_set)? {
96                HandshakeAction::ReadPacket(buffer) => {
97                    buffer.clear();
98                    read_payload(&mut conn_stream, buffer).await?;
99                }
100                HandshakeAction::WritePacket { sequence_id } => {
101                    write_handshake_payload(&mut conn_stream, &mut buffer_set, sequence_id).await?;
102                    buffer_set.read_buffer.clear();
103                    read_payload(&mut conn_stream, &mut buffer_set.read_buffer).await?;
104                }
105                #[cfg(feature = "tokio-tls")]
106                HandshakeAction::UpgradeTls { sequence_id } => {
107                    write_handshake_payload(&mut conn_stream, &mut buffer_set, sequence_id).await?;
108                    conn_stream = conn_stream.upgrade_to_tls(&host).await?;
109                }
110                #[cfg(not(feature = "tokio-tls"))]
111                HandshakeAction::UpgradeTls { .. } => {
112                    return Err(Error::BadUsageError(
113                        "TLS requested but tokio-tls feature is not enabled".to_string(),
114                    ));
115                }
116                HandshakeAction::Finished => break,
117            }
118        }
119
120        let (initial_handshake, capability_flags, mariadb_capabilities) = handshake.finish()?;
121
122        let conn = Self {
123            stream: conn_stream,
124            buffer_set,
125            initial_handshake,
126            capability_flags,
127            mariadb_capabilities,
128            in_transaction: false,
129            is_broken: false,
130        };
131
132        // Upgrade to Unix socket if connected via TCP to loopback
133        #[cfg(unix)]
134        let mut conn = if opts.upgrade_to_unix_socket && conn.stream.is_tcp_loopback() {
135            conn.try_upgrade_to_unix_socket(opts).await
136        } else {
137            conn
138        };
139        #[cfg(not(unix))]
140        let mut conn = conn;
141
142        // Execute init command if specified
143        if let Some(init_command) = &opts.init_command {
144            conn.query_drop(init_command).await?;
145        }
146
147        Ok(conn)
148    }
149
150    pub fn server_version(&self) -> &[u8] {
151        &self.buffer_set.initial_handshake[self.initial_handshake.server_version.clone()]
152    }
153
154    /// Get the negotiated capability flags
155    pub fn capability_flags(&self) -> CapabilityFlags {
156        self.capability_flags
157    }
158
159    /// Check if the server is MySQL (as opposed to MariaDB)
160    pub fn is_mysql(&self) -> bool {
161        self.capability_flags.is_mysql()
162    }
163
164    /// Check if the server is MariaDB (as opposed to MySQL)
165    pub fn is_mariadb(&self) -> bool {
166        self.capability_flags.is_mariadb()
167    }
168
169    /// Get the connection ID assigned by the server
170    pub fn connection_id(&self) -> u64 {
171        self.initial_handshake.connection_id as u64
172    }
173
174    /// Get the server status flags from the initial handshake
175    pub fn status_flags(&self) -> crate::constant::ServerStatusFlags {
176        self.initial_handshake.status_flags
177    }
178
179    /// Check if the connection is broken due to a previous I/O error
180    pub fn is_broken(&self) -> bool {
181        self.is_broken
182    }
183
184    #[inline]
185    fn check_error<T>(&mut self, result: Result<T>) -> Result<T> {
186        if let Err(e) = &result
187            && e.is_conn_broken()
188        {
189            self.is_broken = true;
190        }
191        result
192    }
193
194    pub(crate) fn set_in_transaction(&mut self, value: bool) {
195        self.in_transaction = value;
196    }
197
198    /// Try to upgrade to Unix socket connection.
199    /// Returns upgraded conn on success, original conn on failure.
200    #[cfg(unix)]
201    async fn try_upgrade_to_unix_socket(mut self, opts: &crate::opts::Opts) -> Self {
202        // Query the server for its Unix socket path
203        let mut handler = SocketPathHandler { path: None };
204        if self.query("SELECT @@socket", &mut handler).await.is_err() {
205            return self;
206        }
207
208        let socket_path = match handler.path {
209            Some(p) if !p.is_empty() => p,
210            _ => return self,
211        };
212
213        // Connect via Unix socket
214        let unix_stream = match UnixStream::connect(&socket_path).await {
215            Ok(s) => s,
216            Err(_) => return self,
217        };
218        let stream = Stream::unix(unix_stream);
219
220        // Create new connection over Unix socket (re-handshakes)
221        // Disable upgrade_to_unix_socket to prevent infinite recursion
222        let mut opts_unix = opts.clone();
223        opts_unix.upgrade_to_unix_socket = false;
224
225        match Box::pin(Self::new_with_stream(stream, &opts_unix)).await {
226            Ok(new_conn) => new_conn,
227            Err(_) => self,
228        }
229    }
230
231    /// Write a MySQL packet from write_buffer asynchronously, splitting it into 16MB chunks if necessary
232    #[instrument(skip_all)]
233    async fn write_payload(&mut self) -> Result<()> {
234        let mut sequence_id = 0_u8;
235        let mut buffer = self.buffer_set.write_buffer_mut().as_mut_slice();
236
237        loop {
238            let chunk_size = buffer[4..].len().min(0xFFFFFF);
239            PacketHeader::mut_from_bytes(&mut buffer[0..4])?
240                .encode_in_place(chunk_size, sequence_id);
241            self.stream.write_all(&buffer[..4 + chunk_size]).await?;
242
243            if chunk_size < 0xFFFFFF {
244                break;
245            }
246
247            sequence_id = sequence_id.wrapping_add(1);
248            buffer = &mut buffer[0xFFFFFF..];
249        }
250        self.stream.flush().await?;
251        Ok(())
252    }
253
254    /// Prepare a statement and return the PreparedStatement (async)
255    ///
256    /// Returns `Ok(PreparedStatement)` on success.
257    pub async fn prepare(&mut self, sql: &str) -> Result<PreparedStatement> {
258        let result = self.prepare_inner(sql).await;
259        self.check_error(result)
260    }
261
262    async fn prepare_inner(&mut self, sql: &str) -> Result<PreparedStatement> {
263        use crate::protocol::command::ColumnDefinitions;
264
265        self.buffer_set.read_buffer.clear();
266
267        write_prepare(self.buffer_set.new_write_buffer(), sql);
268
269        self.write_payload().await?;
270
271        let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer).await?;
272
273        if !self.buffer_set.read_buffer.is_empty() && self.buffer_set.read_buffer[0] == 0xFF {
274            Err(ErrPayloadBytes(&self.buffer_set.read_buffer))?
275        }
276
277        let prepare_ok = read_prepare_ok(&self.buffer_set.read_buffer)?;
278        let statement_id = prepare_ok.statement_id();
279        let num_params = prepare_ok.num_params();
280        let num_columns = prepare_ok.num_columns();
281
282        // Skip param definitions (we don't cache them)
283        for _ in 0..num_params {
284            let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer).await?;
285        }
286
287        // Read and cache column definitions for MARIADB_CLIENT_CACHE_METADATA support
288        let column_definitions = if num_columns > 0 {
289            self.read_column_definition_packets(num_columns as usize)
290                .await?;
291            Some(ColumnDefinitions::new(
292                num_columns as usize,
293                std::mem::take(&mut self.buffer_set.column_definition_buffer),
294            )?)
295        } else {
296            None
297        };
298
299        let mut stmt = PreparedStatement::new(statement_id);
300        if let Some(col_defs) = column_definitions {
301            stmt.set_column_definitions(col_defs);
302        }
303        Ok(stmt)
304    }
305
306    #[tracing::instrument(skip_all)]
307    async fn read_column_definition_packets(&mut self, num_columns: usize) -> Result<u8> {
308        let mut header = PacketHeader::new_zeroed();
309        let out = &mut self.buffer_set.column_definition_buffer;
310        out.clear();
311
312        // For each column, write [4 bytes len][payload]
313        for _ in 0..num_columns {
314            self.stream.read_exact(header.as_mut_bytes()).await?;
315            let length = header.length();
316            out.extend((length as u32).to_ne_bytes());
317
318            out.reserve(length);
319            let spare = out.spare_capacity_mut();
320            self.stream.read_buf_exact(&mut spare[..length]).await?;
321            // SAFETY: read_buf_exact filled exactly `length` bytes
322            unsafe {
323                out.set_len(out.len() + length);
324            }
325        }
326
327        Ok(header.sequence_id)
328    }
329
330    async fn drive_exec<H: BinaryResultSetHandler>(
331        &mut self,
332        stmt: &mut crate::PreparedStatement,
333        handler: &mut H,
334    ) -> Result<()> {
335        let cache_metadata = self
336            .mariadb_capabilities
337            .contains(crate::constant::MariadbCapabilityFlags::MARIADB_CLIENT_CACHE_METADATA);
338        let mut exec = Exec::new(handler, stmt, cache_metadata);
339
340        loop {
341            match exec.step(&mut self.buffer_set)? {
342                Action::NeedPacket(buffer) => {
343                    buffer.clear();
344                    let _ = read_payload(&mut self.stream, buffer).await?;
345                }
346                Action::ReadColumnMetadata { num_columns } => {
347                    self.read_column_definition_packets(num_columns).await?;
348                }
349                Action::Finished => return Ok(()),
350            }
351        }
352    }
353
354    async fn drive_query<H: TextResultSetHandler>(&mut self, handler: &mut H) -> Result<()> {
355        let mut query = Query::new(handler);
356
357        loop {
358            match query.step(&mut self.buffer_set)? {
359                Action::NeedPacket(buffer) => {
360                    buffer.clear();
361                    let _ = read_payload(&mut self.stream, buffer).await?;
362                }
363                Action::ReadColumnMetadata { num_columns } => {
364                    self.read_column_definition_packets(num_columns).await?;
365                }
366                Action::Finished => return Ok(()),
367            }
368        }
369    }
370
371    /// Execute a prepared statement with a result set handler (async)
372    pub async fn exec<P, H>(
373        &mut self,
374        stmt: &mut PreparedStatement,
375        params: P,
376        handler: &mut H,
377    ) -> Result<()>
378    where
379        P: Params,
380        H: BinaryResultSetHandler,
381    {
382        let result = self.exec_inner(stmt, params, handler).await;
383        self.check_error(result)
384    }
385
386    async fn exec_inner<P, H>(
387        &mut self,
388        stmt: &mut PreparedStatement,
389        params: P,
390        handler: &mut H,
391    ) -> Result<()>
392    where
393        P: Params,
394        H: BinaryResultSetHandler,
395    {
396        write_execute(self.buffer_set.new_write_buffer(), stmt.id(), params)?;
397        self.write_payload().await?;
398        self.drive_exec(stmt, handler).await
399    }
400
401    async fn drive_bulk_exec<H: BinaryResultSetHandler>(
402        &mut self,
403        stmt: &mut crate::PreparedStatement,
404        handler: &mut H,
405    ) -> Result<()> {
406        let cache_metadata = self
407            .mariadb_capabilities
408            .contains(crate::constant::MariadbCapabilityFlags::MARIADB_CLIENT_CACHE_METADATA);
409        let mut bulk_exec = BulkExec::new(handler, stmt, cache_metadata);
410
411        loop {
412            match bulk_exec.step(&mut self.buffer_set)? {
413                Action::NeedPacket(buffer) => {
414                    buffer.clear();
415                    let _ = read_payload(&mut self.stream, buffer).await?;
416                }
417                Action::ReadColumnMetadata { num_columns } => {
418                    self.read_column_definition_packets(num_columns).await?;
419                }
420                Action::Finished => return Ok(()),
421            }
422        }
423    }
424
425    /// Execute a bulk prepared statement with a result set handler (async)
426    pub async fn exec_bulk_insert_or_update<P, I, H>(
427        &mut self,
428        stmt: &mut PreparedStatement,
429        params: P,
430        flags: BulkFlags,
431        handler: &mut H,
432    ) -> Result<()>
433    where
434        P: BulkParamsSet + IntoIterator<Item = I>,
435        I: Params,
436        H: BinaryResultSetHandler,
437    {
438        let result = self
439            .exec_bulk_insert_or_update_inner(stmt, params, flags, handler)
440            .await;
441        self.check_error(result)
442    }
443
444    async fn exec_bulk_insert_or_update_inner<P, I, H>(
445        &mut self,
446        stmt: &mut PreparedStatement,
447        params: P,
448        flags: BulkFlags,
449        handler: &mut H,
450    ) -> Result<()>
451    where
452        P: BulkParamsSet + IntoIterator<Item = I>,
453        I: Params,
454        H: BinaryResultSetHandler,
455    {
456        if !self.is_mariadb() {
457            // Fallback to multiple exec_drop for non-MariaDB servers
458            for param in params {
459                self.exec_inner(stmt, param, &mut DropHandler::default())
460                    .await?;
461            }
462            Ok(())
463        } else {
464            // Use MariaDB bulk execute protocol
465            write_bulk_execute(self.buffer_set.new_write_buffer(), stmt.id(), params, flags)?;
466            self.write_payload().await?;
467            self.drive_bulk_exec(stmt, handler).await
468        }
469    }
470
471    /// Execute a prepared statement and return only the first row, dropping the rest (async).
472    pub async fn exec_first<Row, P>(
473        &mut self,
474        stmt: &mut PreparedStatement,
475        params: P,
476    ) -> Result<Option<Row>>
477    where
478        Row: for<'buf> crate::raw::FromRawRow<'buf>,
479        P: Params,
480    {
481        let result = self.exec_first_inner(stmt, params).await;
482        self.check_error(result)
483    }
484
485    async fn exec_first_inner<Row, P>(
486        &mut self,
487        stmt: &mut PreparedStatement,
488        params: P,
489    ) -> Result<Option<Row>>
490    where
491        Row: for<'buf> crate::raw::FromRawRow<'buf>,
492        P: Params,
493    {
494        write_execute(self.buffer_set.new_write_buffer(), stmt.id(), params)?;
495        self.write_payload().await?;
496        let mut handler = FirstHandler::<Row>::default();
497        self.drive_exec(stmt, &mut handler).await?;
498        Ok(handler.take())
499    }
500
501    /// Execute a prepared statement and discard all results (async)
502    #[instrument(skip_all)]
503    pub async fn exec_drop<P>(&mut self, stmt: &mut PreparedStatement, params: P) -> Result<()>
504    where
505        P: Params,
506    {
507        self.exec(stmt, params, &mut DropHandler::default()).await
508    }
509
510    /// Execute a prepared statement and collect all rows into a Vec (async).
511    pub async fn exec_collect<Row, P>(
512        &mut self,
513        stmt: &mut PreparedStatement,
514        params: P,
515    ) -> Result<Vec<Row>>
516    where
517        Row: for<'buf> crate::raw::FromRawRow<'buf>,
518        P: Params,
519    {
520        let mut handler = crate::handler::CollectHandler::<Row>::default();
521        self.exec(stmt, params, &mut handler).await?;
522        Ok(handler.into_rows())
523    }
524
525    /// Execute a prepared statement and call a closure for each row (async).
526    ///
527    /// The closure can return an error to stop iteration early.
528    pub async fn exec_foreach<Row, P, F>(
529        &mut self,
530        stmt: &mut PreparedStatement,
531        params: P,
532        f: F,
533    ) -> Result<()>
534    where
535        Row: for<'buf> crate::raw::FromRawRow<'buf>,
536        P: Params,
537        F: FnMut(Row) -> Result<()>,
538    {
539        let mut handler = crate::handler::ForEachHandler::<Row, F>::new(f);
540        self.exec(stmt, params, &mut handler).await
541    }
542
543    /// Execute a text protocol SQL query (async)
544    pub async fn query<H>(&mut self, sql: &str, handler: &mut H) -> Result<()>
545    where
546        H: TextResultSetHandler,
547    {
548        let result = self.query_inner(sql, handler).await;
549        self.check_error(result)
550    }
551
552    async fn query_inner<H>(&mut self, sql: &str, handler: &mut H) -> Result<()>
553    where
554        H: TextResultSetHandler,
555    {
556        write_query(self.buffer_set.new_write_buffer(), sql);
557        self.write_payload().await?;
558        self.drive_query(handler).await
559    }
560
561    /// Execute a text protocol SQL query and discard all results (async)
562    #[instrument(skip_all)]
563    pub async fn query_drop(&mut self, sql: &str) -> Result<()> {
564        let result = self.query_drop_inner(sql).await;
565        self.check_error(result)
566    }
567
568    async fn query_drop_inner(&mut self, sql: &str) -> Result<()> {
569        write_query(self.buffer_set.new_write_buffer(), sql);
570        self.write_payload().await?;
571        self.drive_query(&mut DropHandler::default()).await
572    }
573
574    /// Send a ping to the server to check if the connection is alive (async)
575    ///
576    /// This sends a COM_PING command to the MySQL server and waits for an OK response.
577    pub async fn ping(&mut self) -> Result<()> {
578        let result = self.ping_inner().await;
579        self.check_error(result)
580    }
581
582    async fn ping_inner(&mut self) -> Result<()> {
583        write_ping(self.buffer_set.new_write_buffer());
584        self.write_payload().await?;
585        self.buffer_set.read_buffer.clear();
586        let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer).await?;
587        Ok(())
588    }
589
590    /// Reset the connection to its initial state (async)
591    pub async fn reset(&mut self) -> Result<()> {
592        let result = self.reset_inner().await;
593        self.check_error(result)
594    }
595
596    async fn reset_inner(&mut self) -> Result<()> {
597        write_reset_connection(self.buffer_set.new_write_buffer());
598        self.write_payload().await?;
599        self.buffer_set.read_buffer.clear();
600        let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer).await?;
601        self.in_transaction = false;
602        Ok(())
603    }
604
605    /// Execute a closure within a transaction (async)
606    ///
607    /// # Errors
608    /// Returns `Error::NestedTransaction` if called while already in a transaction
609    pub async fn run_transaction<F, Fut, R>(&mut self, f: F) -> Result<R>
610    where
611        F: FnOnce(&mut Conn, super::transaction::Transaction) -> Fut,
612        Fut: core::future::Future<Output = Result<R>>,
613    {
614        if self.in_transaction {
615            return Err(Error::NestedTransaction);
616        }
617
618        self.in_transaction = true;
619
620        if let Err(err) = self.query_drop("BEGIN").await {
621            self.in_transaction = false;
622            return Err(err);
623        }
624
625        let tx = super::transaction::Transaction::new(self.connection_id());
626        let result = f(self, tx).await;
627
628        // If the transaction was not explicitly committed or rolled back, roll it back
629        if self.in_transaction {
630            let rollback_result = self.query_drop("ROLLBACK").await;
631            self.in_transaction = false;
632
633            // Return the first error (either from closure or rollback)
634            if let Err(e) = result {
635                return Err(e);
636            }
637            rollback_result?;
638        }
639
640        result
641    }
642}
643
644/// Read a complete MySQL payload asynchronously, concatenating packets if they span multiple 16MB chunks
645/// Returns the sequence_id of the last packet read.
646#[instrument(skip_all)]
647async fn read_payload(reader: &mut Stream, buffer: &mut Vec<u8>) -> Result<u8> {
648    let mut packet_header = PacketHeader::new_zeroed();
649
650    buffer.clear();
651    reader.read_exact(packet_header.as_mut_bytes()).await?;
652
653    let length = packet_header.length();
654    let mut sequence_id = packet_header.sequence_id;
655
656    buffer.reserve(length);
657
658    // read the first payload
659    {
660        let spare = buffer.spare_capacity_mut();
661        reader.read_buf_exact(&mut spare[..length]).await?;
662        // SAFETY: read_buf_exact filled exactly `length` bytes
663        unsafe {
664            buffer.set_len(length);
665        }
666    }
667
668    let mut current_length = length;
669    while current_length == 0xFFFFFF {
670        reader.read_exact(packet_header.as_mut_bytes()).await?;
671
672        current_length = packet_header.length();
673        sequence_id = packet_header.sequence_id;
674
675        buffer.reserve(current_length);
676        let spare = buffer.spare_capacity_mut();
677        reader.read_buf_exact(&mut spare[..current_length]).await?;
678        // SAFETY: read_buf_exact filled exactly `current_length` bytes
679        unsafe {
680            buffer.set_len(buffer.len() + current_length);
681        }
682    }
683
684    Ok(sequence_id)
685}
686
687async fn write_handshake_payload(
688    stream: &mut Stream,
689    buffer_set: &mut BufferSet,
690    sequence_id: u8,
691) -> Result<()> {
692    let mut buffer = buffer_set.write_buffer_mut().as_mut_slice();
693    let mut seq_id = sequence_id;
694
695    loop {
696        let chunk_size = buffer[4..].len().min(0xFFFFFF);
697        PacketHeader::mut_from_bytes(&mut buffer[0..4])?.encode_in_place(chunk_size, seq_id);
698        stream.write_all(&buffer[..4 + chunk_size]).await?;
699
700        if chunk_size < 0xFFFFFF {
701            break;
702        }
703
704        seq_id = seq_id.wrapping_add(1);
705        buffer = &mut buffer[0xFFFFFF..];
706    }
707    stream.flush().await?;
708    Ok(())
709}
710
711/// Handler to capture socket path from SELECT @@socket query
712#[cfg(unix)]
713struct SocketPathHandler {
714    path: Option<String>,
715}
716
717#[cfg(unix)]
718impl TextResultSetHandler for SocketPathHandler {
719    fn no_result_set(&mut self, _: OkPayloadBytes) -> Result<()> {
720        Ok(())
721    }
722    fn resultset_start(&mut self, _: &[ColumnDefinition<'_>]) -> Result<()> {
723        Ok(())
724    }
725    fn resultset_end(&mut self, _: OkPayloadBytes) -> Result<()> {
726        Ok(())
727    }
728    fn row(&mut self, _: &[ColumnDefinition<'_>], row: TextRowPayload<'_>) -> Result<()> {
729        // 0xFB indicates NULL value
730        if row.0.first() == Some(&0xFB) {
731            return Ok(());
732        }
733        // Parse the first length-encoded string
734        let (value, _) = read_string_lenenc(row.0)?;
735        if !value.is_empty() {
736            self.path = Some(String::from_utf8_lossy(value).into_owned());
737        }
738        Ok(())
739    }
740}