zero_mysql/sync/
conn.rs

1use crate::PreparedStatement;
2use crate::buffer::BufferSet;
3use crate::buffer_pool::PooledBufferSet;
4use crate::constant::CapabilityFlags;
5use crate::error::{Error, Result};
6use crate::nightly::unlikely;
7use crate::protocol::TextRowPayload;
8use crate::protocol::command::Action;
9use crate::protocol::command::ColumnDefinition;
10use crate::protocol::command::bulk_exec::{BulkExec, BulkFlags, BulkParamsSet, write_bulk_execute};
11use crate::protocol::command::prepared::Exec;
12use crate::protocol::command::prepared::write_execute;
13use crate::protocol::command::prepared::{read_prepare_ok, write_prepare};
14use crate::protocol::command::query::Query;
15use crate::protocol::command::query::write_query;
16use crate::protocol::command::utility::DropHandler;
17use crate::protocol::command::utility::FirstHandler;
18use crate::protocol::command::utility::write_ping;
19use crate::protocol::command::utility::write_reset_connection;
20use crate::protocol::connection::{Handshake, HandshakeAction, InitialHandshake};
21use crate::protocol::packet::PacketHeader;
22use crate::protocol::primitive::read_string_lenenc;
23use crate::protocol::response::{ErrPayloadBytes, OkPayloadBytes};
24use crate::protocol::r#trait::{BinaryResultSetHandler, TextResultSetHandler, param::Params};
25use std::net::TcpStream;
26#[cfg(unix)]
27use std::os::unix::net::UnixStream;
28use zerocopy::FromZeros;
29use zerocopy::{FromBytes, IntoBytes};
30
31use super::stream::Stream;
32
33pub struct Conn {
34    stream: Stream,
35    buffer_set: PooledBufferSet,
36    initial_handshake: InitialHandshake,
37    capability_flags: CapabilityFlags,
38    mariadb_capabilities: crate::constant::MariadbCapabilityFlags,
39    in_transaction: bool,
40    is_broken: bool,
41}
42
43impl Conn {
44    pub(crate) fn set_in_transaction(&mut self, value: bool) {
45        self.in_transaction = value;
46    }
47
48    /// Returns true if the connection is currently in a transaction
49    pub fn in_transaction(&self) -> bool {
50        self.in_transaction
51    }
52
53    /// Create a new MySQL connection from connection options
54    pub fn new<O: TryInto<crate::opts::Opts>>(opts: O) -> Result<Self>
55    where
56        Error: From<O::Error>,
57    {
58        let opts: crate::opts::Opts = opts.try_into()?;
59
60        #[cfg(unix)]
61        let stream = if let Some(socket_path) = &opts.socket {
62            let stream = UnixStream::connect(socket_path)?;
63            Stream::unix(stream)
64        } else {
65            if opts.host.is_empty() {
66                return Err(Error::BadUsageError(
67                    "Missing host in connection options".to_string(),
68                ));
69            }
70            let addr = format!("{}:{}", opts.host, opts.port);
71            let stream = TcpStream::connect(&addr)?;
72            stream.set_nodelay(opts.tcp_nodelay)?;
73            Stream::tcp(stream)
74        };
75
76        #[cfg(not(unix))]
77        let stream = {
78            if opts.socket.is_some() {
79                return Err(Error::BadUsageError(
80                    "Unix sockets are not supported on this platform".to_string(),
81                ));
82            }
83            if opts.host.is_empty() {
84                return Err(Error::BadUsageError(
85                    "Missing host in connection options".to_string(),
86                ));
87            }
88            let addr = format!("{}:{}", opts.host, opts.port);
89            let stream = TcpStream::connect(&addr)?;
90            stream.set_nodelay(opts.tcp_nodelay)?;
91            Stream::tcp(stream)
92        };
93
94        Self::new_with_stream(stream, &opts)
95    }
96
97    /// Create a new MySQL connection with an existing stream
98    pub fn new_with_stream(stream: Stream, opts: &crate::opts::Opts) -> Result<Self> {
99        let mut conn_stream = stream;
100        let mut buffer_set = opts.buffer_pool.get_buffer_set();
101
102        #[cfg(feature = "sync-tls")]
103        let host = opts.host.clone();
104
105        let mut handshake = Handshake::new(opts);
106
107        loop {
108            match handshake.step(&mut buffer_set)? {
109                HandshakeAction::ReadPacket(buffer) => {
110                    buffer.clear();
111                    read_payload(&mut conn_stream, buffer)?;
112                }
113                HandshakeAction::WritePacket { sequence_id } => {
114                    write_handshake_payload(&mut conn_stream, &mut buffer_set, sequence_id)?;
115                    buffer_set.read_buffer.clear();
116                    read_payload(&mut conn_stream, &mut buffer_set.read_buffer)?;
117                }
118                #[cfg(feature = "sync-tls")]
119                HandshakeAction::UpgradeTls { sequence_id } => {
120                    write_handshake_payload(&mut conn_stream, &mut buffer_set, sequence_id)?;
121                    conn_stream = conn_stream.upgrade_to_tls(&host)?;
122                }
123                #[cfg(not(feature = "sync-tls"))]
124                HandshakeAction::UpgradeTls { .. } => {
125                    return Err(Error::BadUsageError(
126                        "TLS requested but sync-tls feature is not enabled".to_string(),
127                    ));
128                }
129                HandshakeAction::Finished => break,
130            }
131        }
132
133        let (initial_handshake, capability_flags, mariadb_capabilities) = handshake.finish()?;
134
135        let conn = Self {
136            stream: conn_stream,
137            buffer_set,
138            initial_handshake,
139            capability_flags,
140            mariadb_capabilities,
141            in_transaction: false,
142            is_broken: false,
143        };
144
145        // Upgrade to Unix socket if connected via TCP to loopback
146        #[cfg(unix)]
147        let mut conn = if opts.upgrade_to_unix_socket && conn.stream.is_tcp_loopback() {
148            conn.try_upgrade_to_unix_socket(opts)
149        } else {
150            conn
151        };
152        #[cfg(not(unix))]
153        let mut conn = conn;
154
155        // Execute init command if specified
156        if let Some(init_command) = &opts.init_command {
157            conn.query_drop(init_command)?;
158        }
159
160        Ok(conn)
161    }
162
163    /// Example: `"11.4.8-MariaDB"`
164    pub fn server_version(&self) -> &[u8] {
165        &self.buffer_set.initial_handshake[self.initial_handshake.server_version.clone()]
166    }
167
168    /// Get the negotiated capability flags
169    pub fn capability_flags(&self) -> CapabilityFlags {
170        self.capability_flags
171    }
172
173    /// Check if the server is MySQL (as opposed to MariaDB)
174    pub fn is_mysql(&self) -> bool {
175        self.capability_flags.is_mysql()
176    }
177
178    /// Check if the server is MariaDB (as opposed to MySQL)
179    pub fn is_mariadb(&self) -> bool {
180        self.capability_flags.is_mariadb()
181    }
182
183    /// Get the connection ID assigned by the server
184    pub fn connection_id(&self) -> u64 {
185        self.initial_handshake.connection_id as u64
186    }
187
188    /// Get the server status flags from the initial handshake
189    pub fn status_flags(&self) -> crate::constant::ServerStatusFlags {
190        self.initial_handshake.status_flags
191    }
192
193    /// Indicates if the connection is broken by errors
194    ///
195    /// This state is used by Pool to decide if this Conn can be reset and reused or dropped.
196    pub fn is_broken(&self) -> bool {
197        self.is_broken
198    }
199
200    #[inline]
201    fn check_error<T>(&mut self, result: Result<T>) -> Result<T> {
202        if let Err(e) = &result
203            && e.is_conn_broken()
204        {
205            self.is_broken = true;
206        }
207        result
208    }
209
210    /// Try to upgrade to Unix socket connection.
211    /// Returns upgraded conn on success, original conn on failure.
212    #[cfg(unix)]
213    fn try_upgrade_to_unix_socket(mut self, opts: &crate::opts::Opts) -> Self {
214        // Query the server for its Unix socket path
215        let mut handler = SocketPathHandler { path: None };
216        if self.query("SELECT @@socket", &mut handler).is_err() {
217            return self;
218        }
219
220        let socket_path = match handler.path {
221            Some(p) if !p.is_empty() => p,
222            _ => return self,
223        };
224
225        // Connect via Unix socket
226        let unix_stream = match UnixStream::connect(&socket_path) {
227            Ok(s) => s,
228            Err(_) => return self,
229        };
230        let stream = Stream::unix(unix_stream);
231
232        // Create new connection over Unix socket (re-handshakes)
233        // Disable upgrade_to_unix_socket to prevent infinite recursion
234        let mut opts_unix = opts.clone();
235        opts_unix.upgrade_to_unix_socket = false;
236
237        match Self::new_with_stream(stream, &opts_unix) {
238            Ok(new_conn) => new_conn,
239            Err(_) => self,
240        }
241    }
242
243    fn write_payload(&mut self) -> Result<()> {
244        let mut sequence_id = 0_u8;
245        let mut buffer = self.buffer_set.write_buffer_mut().as_mut_slice();
246
247        loop {
248            let chunk_size = buffer[4..].len().min(0xFFFFFF);
249            PacketHeader::mut_from_bytes(&mut buffer[0..4])?
250                .encode_in_place(chunk_size, sequence_id);
251            self.stream.write_all(&buffer[..4 + chunk_size])?;
252
253            if chunk_size < 0xFFFFFF {
254                break;
255            }
256
257            sequence_id = sequence_id.wrapping_add(1);
258            buffer = &mut buffer[0xFFFFFF..];
259        }
260        self.stream.flush()?;
261        Ok(())
262    }
263
264    /// Returns `Ok(statement_id)` on success
265    pub fn prepare(&mut self, sql: &str) -> Result<PreparedStatement> {
266        let result = self.prepare_inner(sql);
267        self.check_error(result)
268    }
269
270    fn prepare_inner(&mut self, sql: &str) -> Result<PreparedStatement> {
271        use crate::protocol::command::ColumnDefinitions;
272
273        self.buffer_set.read_buffer.clear();
274
275        write_prepare(self.buffer_set.new_write_buffer(), sql);
276
277        self.write_payload()?;
278        let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer)?;
279
280        if unlikely(
281            !self.buffer_set.read_buffer.is_empty() && self.buffer_set.read_buffer[0] == 0xFF,
282        ) {
283            Err(ErrPayloadBytes(&self.buffer_set.read_buffer))?
284        }
285
286        let prepare_ok = read_prepare_ok(&self.buffer_set.read_buffer)?;
287        let statement_id = prepare_ok.statement_id();
288        let num_params = prepare_ok.num_params();
289        let num_columns = prepare_ok.num_columns();
290
291        // Skip param definitions (we don't cache them)
292        if num_params > 0 {
293            for _ in 0..num_params {
294                let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer)?;
295            }
296        }
297
298        // Read and cache column definitions for MARIADB_CLIENT_CACHE_METADATA support
299        let column_definitions = if num_columns > 0 {
300            read_column_definition_packets(
301                &mut self.stream,
302                &mut self.buffer_set.column_definition_buffer,
303                num_columns as usize,
304            )?;
305            Some(ColumnDefinitions::new(
306                num_columns as usize,
307                std::mem::take(&mut self.buffer_set.column_definition_buffer),
308            )?)
309        } else {
310            None
311        };
312
313        let mut stmt = PreparedStatement::new(statement_id);
314        if let Some(col_defs) = column_definitions {
315            stmt.set_column_definitions(col_defs);
316        }
317        Ok(stmt)
318    }
319
320    fn drive_exec<H: BinaryResultSetHandler>(
321        &mut self,
322        stmt: &mut PreparedStatement,
323        handler: &mut H,
324    ) -> Result<()> {
325        let cache_metadata = self
326            .mariadb_capabilities
327            .contains(crate::constant::MariadbCapabilityFlags::MARIADB_CLIENT_CACHE_METADATA);
328        let mut exec = Exec::new(handler, stmt, cache_metadata);
329
330        loop {
331            match exec.step(&mut self.buffer_set)? {
332                Action::NeedPacket(buffer) => {
333                    buffer.clear();
334                    let _ = read_payload(&mut self.stream, buffer)?;
335                }
336                Action::ReadColumnMetadata { num_columns } => {
337                    read_column_definition_packets(
338                        &mut self.stream,
339                        &mut self.buffer_set.column_definition_buffer,
340                        num_columns,
341                    )?;
342                }
343                Action::Finished => return Ok(()),
344            }
345        }
346    }
347
348    /// Executes a prepared statement with parameters.
349    ///
350    /// This is the most general version of exec_*() methods.
351    pub fn exec<'conn, P, H>(
352        &'conn mut self,
353        stmt: &'conn mut PreparedStatement,
354        params: P,
355        handler: &mut H,
356    ) -> Result<()>
357    where
358        P: Params,
359        H: BinaryResultSetHandler,
360    {
361        let result = self.exec_inner(stmt, params, handler);
362        self.check_error(result)
363    }
364
365    fn exec_inner<'conn, P, H>(
366        &'conn mut self,
367        stmt: &'conn mut PreparedStatement,
368        params: P,
369        handler: &mut H,
370    ) -> Result<()>
371    where
372        P: Params,
373        H: BinaryResultSetHandler,
374    {
375        write_execute(self.buffer_set.new_write_buffer(), stmt.id(), params)?;
376        self.write_payload()?;
377        self.drive_exec(stmt, handler)
378    }
379
380    fn drive_bulk_exec<H: BinaryResultSetHandler>(
381        &mut self,
382        stmt: &mut PreparedStatement,
383        handler: &mut H,
384    ) -> Result<()> {
385        let cache_metadata = self
386            .mariadb_capabilities
387            .contains(crate::constant::MariadbCapabilityFlags::MARIADB_CLIENT_CACHE_METADATA);
388        let mut bulk_exec = BulkExec::new(handler, stmt, cache_metadata);
389
390        loop {
391            match bulk_exec.step(&mut self.buffer_set)? {
392                Action::NeedPacket(buffer) => {
393                    buffer.clear();
394                    let _ = read_payload(&mut self.stream, buffer)?;
395                }
396                Action::ReadColumnMetadata { num_columns } => {
397                    read_column_definition_packets(
398                        &mut self.stream,
399                        &mut self.buffer_set.column_definition_buffer,
400                        num_columns,
401                    )?;
402                }
403                Action::Finished => return Ok(()),
404            }
405        }
406    }
407
408    /// Execute a bulk prepared statement with a result set handler.
409    ///
410    /// On MariaDB, this sends all parameters in a single packet using the bulk command extension.
411    /// On Oracle MySQL, this falls back to multiple `exec()` calls.
412    pub fn exec_bulk_insert_or_update<P, I, H>(
413        &mut self,
414        stmt: &mut PreparedStatement,
415        params: P,
416        flags: BulkFlags,
417        handler: &mut H,
418    ) -> Result<()>
419    where
420        P: BulkParamsSet + IntoIterator<Item = I>,
421        I: Params,
422        H: BinaryResultSetHandler,
423    {
424        let result = self.exec_bulk_insert_or_update_inner(stmt, params, flags, handler);
425        self.check_error(result)
426    }
427
428    fn exec_bulk_insert_or_update_inner<P, I, H>(
429        &mut self,
430        stmt: &mut PreparedStatement,
431        params: P,
432        flags: BulkFlags,
433        handler: &mut H,
434    ) -> Result<()>
435    where
436        P: BulkParamsSet + IntoIterator<Item = I>,
437        I: Params,
438        H: BinaryResultSetHandler,
439    {
440        if !self.is_mariadb() {
441            // Fallback to multiple exec_drop for non-MariaDB servers
442            for param in params {
443                self.exec_inner(stmt, param, &mut DropHandler::default())?;
444            }
445            Ok(())
446        } else {
447            // Use MariaDB bulk execute protocol
448            write_bulk_execute(self.buffer_set.new_write_buffer(), stmt.id(), params, flags)?;
449            self.write_payload()?;
450            self.drive_bulk_exec(stmt, handler)
451        }
452    }
453
454    /// Execute a prepared statement and return only the first row, dropping the rest.
455    pub fn exec_first<Row, P>(
456        &mut self,
457        stmt: &mut PreparedStatement,
458        params: P,
459    ) -> Result<Option<Row>>
460    where
461        Row: for<'buf> crate::raw::FromRawRow<'buf>,
462        P: Params,
463    {
464        let result = self.exec_first_inner(stmt, params);
465        self.check_error(result)
466    }
467
468    fn exec_first_inner<Row, P>(
469        &mut self,
470        stmt: &mut PreparedStatement,
471        params: P,
472    ) -> Result<Option<Row>>
473    where
474        Row: for<'buf> crate::raw::FromRawRow<'buf>,
475        P: Params,
476    {
477        write_execute(self.buffer_set.new_write_buffer(), stmt.id(), params)?;
478        self.write_payload()?;
479        let mut handler = FirstHandler::<Row>::default();
480        self.drive_exec(stmt, &mut handler)?;
481        Ok(handler.take())
482    }
483
484    /// Execute a prepared statement and discard all results
485    pub fn exec_drop<P>(&mut self, stmt: &mut PreparedStatement, params: P) -> Result<()>
486    where
487        P: Params,
488    {
489        self.exec(stmt, params, &mut DropHandler::default())
490    }
491
492    /// Execute a prepared statement and collect all rows into a Vec.
493    pub fn exec_collect<Row, P>(
494        &mut self,
495        stmt: &mut PreparedStatement,
496        params: P,
497    ) -> Result<Vec<Row>>
498    where
499        Row: for<'buf> crate::raw::FromRawRow<'buf>,
500        P: Params,
501    {
502        let mut handler = crate::handler::CollectHandler::<Row>::default();
503        self.exec(stmt, params, &mut handler)?;
504        Ok(handler.into_rows())
505    }
506
507    /// Execute a prepared statement and call a closure for each row.
508    ///
509    /// The closure can return an error to stop iteration early.
510    pub fn exec_foreach<Row, P, F>(
511        &mut self,
512        stmt: &mut PreparedStatement,
513        params: P,
514        f: F,
515    ) -> Result<()>
516    where
517        Row: for<'buf> crate::raw::FromRawRow<'buf>,
518        P: Params,
519        F: FnMut(Row) -> Result<()>,
520    {
521        let mut handler = crate::handler::ForEachHandler::<Row, F>::new(f);
522        self.exec(stmt, params, &mut handler)
523    }
524
525    fn drive_query<H: TextResultSetHandler>(&mut self, handler: &mut H) -> Result<()> {
526        let mut query = Query::new(handler);
527
528        loop {
529            match query.step(&mut self.buffer_set)? {
530                Action::NeedPacket(buffer) => {
531                    buffer.clear();
532                    let _ = read_payload(&mut self.stream, buffer)?;
533                }
534                Action::ReadColumnMetadata { num_columns } => {
535                    read_column_definition_packets(
536                        &mut self.stream,
537                        &mut self.buffer_set.column_definition_buffer,
538                        num_columns,
539                    )?;
540                }
541                Action::Finished => return Ok(()),
542            }
543        }
544    }
545
546    /// Execute a text protocol SQL query
547    pub fn query<H>(&mut self, sql: &str, handler: &mut H) -> Result<()>
548    where
549        H: TextResultSetHandler,
550    {
551        let result = self.query_inner(sql, handler);
552        self.check_error(result)
553    }
554
555    fn query_inner<H>(&mut self, sql: &str, handler: &mut H) -> Result<()>
556    where
557        H: TextResultSetHandler,
558    {
559        write_query(self.buffer_set.new_write_buffer(), sql);
560        self.write_payload()?;
561        self.drive_query(handler)
562    }
563
564    /// Execute a text protocol SQL query and discard the result
565    pub fn query_drop(&mut self, sql: &str) -> Result<()> {
566        let result = self.query_drop_inner(sql);
567        self.check_error(result)
568    }
569
570    fn query_drop_inner(&mut self, sql: &str) -> Result<()> {
571        write_query(self.buffer_set.new_write_buffer(), sql);
572        self.write_payload()?;
573        self.drive_query(&mut DropHandler::default())
574    }
575
576    /// Send a ping to the server to check if the connection is alive
577    ///
578    /// This sends a COM_PING command to the MySQL server and waits for an OK response.
579    pub fn ping(&mut self) -> Result<()> {
580        let result = self.ping_inner();
581        self.check_error(result)
582    }
583
584    fn ping_inner(&mut self) -> Result<()> {
585        write_ping(self.buffer_set.new_write_buffer());
586        self.write_payload()?;
587        self.buffer_set.read_buffer.clear();
588        let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer)?;
589        Ok(())
590    }
591
592    /// Reset the connection to its initial state
593    pub fn reset(&mut self) -> Result<()> {
594        let result = self.reset_inner();
595        self.check_error(result)
596    }
597
598    fn reset_inner(&mut self) -> Result<()> {
599        write_reset_connection(self.buffer_set.new_write_buffer());
600        self.write_payload()?;
601        self.buffer_set.read_buffer.clear();
602        let _ = read_payload(&mut self.stream, &mut self.buffer_set.read_buffer)?;
603        self.in_transaction = false;
604        Ok(())
605    }
606
607    /// Execute a closure within a transaction
608    ///
609    /// # Errors
610    /// Returns `Error::NestedTransaction` if called while already in a transaction
611    pub fn transaction<F, R>(&mut self, f: F) -> Result<R>
612    where
613        F: FnOnce(&mut Conn, super::transaction::Transaction) -> Result<R>,
614    {
615        if self.in_transaction {
616            return Err(Error::NestedTransaction);
617        }
618
619        self.in_transaction = true;
620
621        if let Err(e) = self.query_drop("BEGIN") {
622            self.in_transaction = false;
623            return Err(e);
624        }
625
626        let tx = super::transaction::Transaction::new(self.connection_id());
627        let result = f(self, tx);
628
629        // If no explicit commit/rollback was called, commit on Ok, rollback on Err
630        if self.in_transaction {
631            self.in_transaction = false;
632            match &result {
633                Ok(_) => self.query_drop("COMMIT")?,
634                Err(_) => {
635                    let _ = self.query_drop("ROLLBACK");
636                }
637            }
638        }
639
640        result
641    }
642}
643
644/// Read a complete MySQL payload, concatenating payloads if they span multiple 16MB chunks
645/// Returns the sequence_id of the last packet read.
646fn read_payload(reader: &mut Stream, buffer: &mut Vec<u8>) -> Result<u8> {
647    buffer.clear();
648
649    let mut header = PacketHeader::new_zeroed();
650    reader.read_exact(header.as_mut_bytes())?;
651
652    let length = header.length();
653    let mut sequence_id = header.sequence_id;
654
655    buffer.reserve(length);
656
657    {
658        let spare = buffer.spare_capacity_mut();
659        reader.read_buf_exact(&mut spare[..length])?;
660        // SAFETY: read_buf_exact filled exactly `length` bytes
661        unsafe {
662            buffer.set_len(length);
663        }
664    }
665
666    let mut current_length = length;
667    while current_length == 0xFFFFFF {
668        reader.read_exact(header.as_mut_bytes())?;
669
670        current_length = header.length();
671        sequence_id = header.sequence_id;
672
673        buffer.reserve(current_length);
674        let spare = buffer.spare_capacity_mut();
675        reader.read_buf_exact(&mut spare[..current_length])?;
676        // SAFETY: read_buf_exact filled exactly `current_length` bytes
677        unsafe {
678            buffer.set_len(buffer.len() + current_length);
679        }
680    }
681
682    Ok(sequence_id)
683}
684
685fn read_column_definition_packets(
686    reader: &mut Stream,
687    out: &mut Vec<u8>,
688    num_columns: usize,
689) -> Result<u8> {
690    out.clear();
691    let mut header = PacketHeader::new_zeroed();
692
693    // For each column, write [4 bytes len][payload]
694    for _ in 0..num_columns {
695        reader.read_exact(header.as_mut_bytes())?;
696        let length = header.length();
697        out.extend((length as u32).to_ne_bytes());
698
699        out.reserve(length);
700        let spare = out.spare_capacity_mut();
701        reader.read_buf_exact(&mut spare[..length])?;
702        // SAFETY: read_buf_exact filled exactly `length` bytes
703        unsafe {
704            out.set_len(out.len() + length);
705        }
706    }
707
708    Ok(header.sequence_id)
709}
710
711fn write_handshake_payload(
712    stream: &mut Stream,
713    buffer_set: &mut BufferSet,
714    sequence_id: u8,
715) -> Result<()> {
716    let mut buffer = buffer_set.write_buffer_mut().as_mut_slice();
717    let mut seq_id = sequence_id;
718
719    loop {
720        let chunk_size = buffer[4..].len().min(0xFFFFFF);
721        PacketHeader::mut_from_bytes(&mut buffer[0..4])?.encode_in_place(chunk_size, seq_id);
722        stream.write_all(&buffer[..4 + chunk_size])?;
723
724        if chunk_size < 0xFFFFFF {
725            break;
726        }
727
728        seq_id = seq_id.wrapping_add(1);
729        buffer = &mut buffer[0xFFFFFF..];
730    }
731    stream.flush()?;
732    Ok(())
733}
734
735/// Handler to capture socket path from SELECT @@socket query
736#[cfg(unix)]
737struct SocketPathHandler {
738    path: Option<String>,
739}
740
741#[cfg(unix)]
742impl TextResultSetHandler for SocketPathHandler {
743    fn no_result_set(&mut self, _: OkPayloadBytes) -> Result<()> {
744        Ok(())
745    }
746    fn resultset_start(&mut self, _: &[ColumnDefinition<'_>]) -> Result<()> {
747        Ok(())
748    }
749    fn resultset_end(&mut self, _: OkPayloadBytes) -> Result<()> {
750        Ok(())
751    }
752    fn row(&mut self, _: &[ColumnDefinition<'_>], row: TextRowPayload<'_>) -> Result<()> {
753        // 0xFB indicates NULL value
754        if row.0.first() == Some(&0xFB) {
755            return Ok(());
756        }
757        // Parse the first length-encoded string
758        let (value, _) = read_string_lenenc(row.0)?;
759        if !value.is_empty() {
760            self.path = Some(String::from_utf8_lossy(value).into_owned());
761        }
762        Ok(())
763    }
764}