oracle_rs/
connection.rs

1//! Oracle database connection
2//!
3//! This module provides the main `Connection` type for interacting with Oracle databases.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use oracle_rs::{Connection, Config};
9//!
10//! #[tokio::main]
11//! async fn main() -> oracle_rs::Result<()> {
12//!     // Create a connection
13//!     let conn = Connection::connect("localhost:1521/ORCLPDB1", "user", "password").await?;
14//!
15//!     // Execute a query
16//!     let rows = conn.query("SELECT * FROM employees WHERE department_id = :1", &[&10]).await?;
17//!
18//!     for row in rows {
19//!         println!("{:?}", row);
20//!     }
21//!
22//!     // Commit and close
23//!     conn.commit().await?;
24//!     conn.close().await?;
25//!     Ok(())
26//! }
27//! ```
28
29use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
30use std::sync::Arc;
31use bytes::Bytes;
32use tokio::io::{AsyncReadExt, AsyncWriteExt};
33use tokio::net::TcpStream;
34use tokio::sync::Mutex;
35
36use crate::batch::{BatchBinds, BatchResult};
37use crate::buffer::{ReadBuffer, WriteBuffer};
38use crate::transport::{TlsConfig, TlsOracleStream, connect_tls};
39use crate::capabilities::Capabilities;
40use crate::config::{Config, ServiceMethod};
41use crate::constants::{BindDirection, FetchOrientation, FunctionCode, MessageType, OracleType, PacketType, PACKET_HEADER_SIZE};
42use crate::cursor::{ScrollableCursor, ScrollResult};
43use crate::error::{Error, Result};
44use crate::implicit::{ImplicitResult, ImplicitResults};
45use crate::messages::{AcceptMessage, AuthMessage, AuthPhase, ConnectMessage, ExecuteMessage, ExecuteOptions, FetchMessage, LobOpMessage};
46use crate::packet::Packet;
47use crate::row::{Row, Value};
48use crate::statement::{BindParam, ColumnInfo, Statement, StatementType};
49use crate::types::{LobData, LobLocator, LobValue};
50use crate::statement_cache::StatementCache;
51
52/// Connection state
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ConnectionState {
55    /// Not connected
56    Disconnected,
57    /// TCP connection established
58    Connected,
59    /// Protocol negotiation complete
60    ProtocolNegotiated,
61    /// Data types negotiated
62    DataTypesNegotiated,
63    /// Fully authenticated and ready
64    Ready,
65    /// Connection is closed
66    Closed,
67}
68
69/// Options for query execution
70#[derive(Debug, Clone)]
71pub struct QueryOptions {
72    /// Number of rows to prefetch
73    pub prefetch_rows: u32,
74    /// Array size for batch operations
75    pub array_size: u32,
76    /// Whether to auto-commit after DML
77    pub auto_commit: bool,
78}
79
80impl Default for QueryOptions {
81    fn default() -> Self {
82        Self {
83            prefetch_rows: 100,
84            array_size: 100,
85            auto_commit: false,
86        }
87    }
88}
89
90/// Result set from a query
91#[derive(Debug)]
92pub struct QueryResult {
93    /// Column information
94    pub columns: Vec<ColumnInfo>,
95    /// Rows returned
96    pub rows: Vec<Row>,
97    /// Number of rows affected (for DML)
98    pub rows_affected: u64,
99    /// Whether there are more rows to fetch
100    pub has_more_rows: bool,
101    /// Cursor ID for subsequent fetches (needed for fetch_more)
102    pub cursor_id: u16,
103}
104
105impl QueryResult {
106    /// Create an empty query result
107    pub fn empty() -> Self {
108        Self {
109            columns: Vec::new(),
110            rows: Vec::new(),
111            rows_affected: 0,
112            has_more_rows: false,
113            cursor_id: 0,
114        }
115    }
116
117    /// Get the number of columns
118    pub fn column_count(&self) -> usize {
119        self.columns.len()
120    }
121
122    /// Get the number of rows
123    pub fn row_count(&self) -> usize {
124        self.rows.len()
125    }
126
127    /// Check if the result is empty
128    pub fn is_empty(&self) -> bool {
129        self.rows.is_empty()
130    }
131
132    /// Get a column by name
133    pub fn column_by_name(&self, name: &str) -> Option<&ColumnInfo> {
134        self.columns.iter().find(|c| c.name.eq_ignore_ascii_case(name))
135    }
136
137    /// Get column index by name
138    pub fn column_index(&self, name: &str) -> Option<usize> {
139        self.columns.iter().position(|c| c.name.eq_ignore_ascii_case(name))
140    }
141
142    /// Iterate over rows
143    pub fn iter(&self) -> impl Iterator<Item = &Row> {
144        self.rows.iter()
145    }
146
147    /// Get a single row (first row)
148    pub fn first(&self) -> Option<&Row> {
149        self.rows.first()
150    }
151}
152
153impl IntoIterator for QueryResult {
154    type Item = Row;
155    type IntoIter = std::vec::IntoIter<Row>;
156
157    fn into_iter(self) -> Self::IntoIter {
158        self.rows.into_iter()
159    }
160}
161
162/// Result from executing a PL/SQL block with OUT parameters
163#[derive(Debug)]
164pub struct PlsqlResult {
165    /// OUT parameter values indexed by position (0-based)
166    pub out_values: Vec<Value>,
167    /// Number of rows affected (if applicable)
168    pub rows_affected: u64,
169    /// Cursor ID (if the result contains a REF CURSOR)
170    pub cursor_id: Option<u16>,
171    /// Implicit result sets returned via DBMS_SQL.RETURN_RESULT
172    pub implicit_results: ImplicitResults,
173}
174
175impl PlsqlResult {
176    /// Create an empty PL/SQL result
177    pub fn empty() -> Self {
178        Self {
179            out_values: Vec::new(),
180            rows_affected: 0,
181            cursor_id: None,
182            implicit_results: ImplicitResults::new(),
183        }
184    }
185
186    /// Get an OUT value by position (0-based)
187    pub fn get(&self, index: usize) -> Option<&Value> {
188        self.out_values.get(index)
189    }
190
191    /// Get a string OUT value by position
192    pub fn get_string(&self, index: usize) -> Option<&str> {
193        self.out_values.get(index).and_then(|v| v.as_str())
194    }
195
196    /// Get an integer OUT value by position
197    pub fn get_integer(&self, index: usize) -> Option<i64> {
198        self.out_values.get(index).and_then(|v| v.as_i64())
199    }
200
201    /// Get a float OUT value by position
202    pub fn get_float(&self, index: usize) -> Option<f64> {
203        self.out_values.get(index).and_then(|v| v.as_f64())
204    }
205
206    /// Get a cursor ID from OUT value by position (for REF CURSOR)
207    pub fn get_cursor_id(&self, index: usize) -> Option<u16> {
208        self.out_values.get(index).and_then(|v| v.as_cursor_id())
209    }
210}
211
212/// Server information obtained during connection
213#[derive(Debug, Clone, Default)]
214pub struct ServerInfo {
215    /// Oracle version string
216    pub version: String,
217    /// Server banner
218    pub banner: String,
219    /// Session ID (SID)
220    pub session_id: u32,
221    /// Serial number
222    pub serial_number: u32,
223    /// Instance name
224    pub instance_name: Option<String>,
225    /// Service name
226    pub service_name: Option<String>,
227    /// Database name
228    pub database_name: Option<String>,
229    /// Negotiated protocol version
230    pub protocol_version: u16,
231    /// Whether server supports OOB (out of band) data
232    pub supports_oob: bool,
233}
234
235/// Stream type that can be either plain TCP or TLS-encrypted
236enum OracleStream {
237    /// Plain TCP connection
238    Plain(TcpStream),
239    /// TLS-encrypted connection
240    Tls(TlsOracleStream),
241}
242
243impl OracleStream {
244    async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
245        match self {
246            OracleStream::Plain(stream) => {
247                AsyncReadExt::read_exact(stream, buf).await?;
248                Ok(())
249            }
250            OracleStream::Tls(stream) => {
251                AsyncReadExt::read_exact(stream, buf).await?;
252                Ok(())
253            }
254        }
255    }
256
257    async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
258        match self {
259            OracleStream::Plain(stream) => stream.write_all(buf).await,
260            OracleStream::Tls(stream) => stream.write_all(buf).await,
261        }
262    }
263
264    async fn flush(&mut self) -> std::io::Result<()> {
265        match self {
266            OracleStream::Plain(stream) => stream.flush().await,
267            OracleStream::Tls(stream) => stream.flush().await,
268        }
269    }
270
271}
272
273/// Internal connection state shared across async operations
274struct ConnectionInner {
275    stream: Option<OracleStream>,
276    capabilities: Capabilities,
277    state: ConnectionState,
278    server_info: ServerInfo,
279    sdu_size: u16,
280    large_sdu: bool,
281    /// Sequence number for TTC messages (increments per message)
282    sequence_number: u8,
283    /// Statement cache for prepared statement reuse
284    statement_cache: Option<StatementCache>,
285}
286
287impl ConnectionInner {
288    fn new_with_cache(cache_size: usize) -> Self {
289        Self {
290            stream: None,
291            capabilities: Capabilities::default(),
292            state: ConnectionState::Disconnected,
293            server_info: ServerInfo::default(),
294            sdu_size: 8192,
295            large_sdu: false,
296            sequence_number: 0,
297            statement_cache: if cache_size > 0 {
298                Some(StatementCache::new(cache_size))
299            } else {
300                None
301            },
302        }
303    }
304
305    /// Get the next sequence number (auto-increments, wraps at 255 to 1)
306    fn next_sequence_number(&mut self) -> u8 {
307        self.sequence_number = self.sequence_number.wrapping_add(1);
308        if self.sequence_number == 0 {
309            self.sequence_number = 1;
310        }
311        self.sequence_number
312    }
313
314    async fn send(&mut self, data: &[u8]) -> Result<()> {
315        if let Some(stream) = &mut self.stream {
316            stream.write_all(data).await?;
317            stream.flush().await?;
318            Ok(())
319        } else {
320            Err(Error::ConnectionClosed)
321        }
322    }
323
324    /// Send a payload that may need to be split across multiple packets.
325    ///
326    /// This is used for large LOB writes and other operations where the payload
327    /// exceeds the SDU size. The payload is split into multiple DATA packets,
328    /// each with proper headers.
329    ///
330    /// # Arguments
331    /// * `payload` - The raw message payload (without packet header or data flags)
332    /// * `data_flags` - The data flags for the first packet (typically 0)
333    async fn send_multi_packet(&mut self, payload: &[u8], data_flags: u16) -> Result<()> {
334        let stream = self.stream.as_mut().ok_or(Error::ConnectionClosed)?;
335
336        // Calculate max payload per packet: SDU - header (8) - data flags (2)
337        let max_payload_per_packet = self.sdu_size as usize - PACKET_HEADER_SIZE - 2;
338
339        let mut offset = 0;
340        let mut is_first = true;
341
342        while offset < payload.len() {
343            let remaining = payload.len() - offset;
344            let chunk_size = std::cmp::min(remaining, max_payload_per_packet);
345            let is_last = offset + chunk_size >= payload.len();
346
347            // Build packet
348            let packet_len = PACKET_HEADER_SIZE + 2 + chunk_size; // header + data flags + payload
349            let mut packet = Vec::with_capacity(packet_len);
350
351            // Header
352            if self.large_sdu {
353                packet.extend_from_slice(&(packet_len as u32).to_be_bytes());
354            } else {
355                packet.extend_from_slice(&(packet_len as u16).to_be_bytes());
356                packet.extend_from_slice(&[0, 0]); // Checksum
357            }
358            packet.push(PacketType::Data as u8);
359            packet.push(0); // Flags
360            packet.extend_from_slice(&[0, 0]); // Header checksum
361
362            // Data flags - only include on first packet
363            if is_first {
364                packet.extend_from_slice(&data_flags.to_be_bytes());
365                is_first = false;
366            } else {
367                // Continuation packets still need data flags position but value is 0
368                packet.extend_from_slice(&0u16.to_be_bytes());
369            }
370
371            // Payload chunk
372            packet.extend_from_slice(&payload[offset..offset + chunk_size]);
373
374            // Send this packet
375            stream.write_all(&packet).await?;
376
377            offset += chunk_size;
378
379            // Don't flush until the last packet to improve performance
380            if is_last {
381                stream.flush().await?;
382            }
383        }
384
385        Ok(())
386    }
387
388    async fn receive(&mut self) -> Result<bytes::Bytes> {
389        if let Some(stream) = &mut self.stream {
390            // Read packet header first (always 8 bytes)
391            // large_sdu only affects how the length field is interpreted, not header size
392            let mut header_buf = vec![0u8; PACKET_HEADER_SIZE];
393            stream.read_exact(&mut header_buf).await?;
394
395            // Parse header to get payload length
396            // In large_sdu mode, first 4 bytes are length; otherwise first 2 bytes
397            let packet_len = if self.large_sdu {
398                u32::from_be_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]])
399                    as usize
400            } else {
401                u16::from_be_bytes([header_buf[0], header_buf[1]]) as usize
402            };
403
404            // Read remaining payload
405            let payload_len = packet_len.saturating_sub(PACKET_HEADER_SIZE);
406            let mut payload_buf = vec![0u8; payload_len];
407            if payload_len > 0 {
408                stream.read_exact(&mut payload_buf).await?;
409            }
410
411            // Combine header and payload
412            let mut full_packet = header_buf.clone();
413            full_packet.extend(payload_buf);
414
415            Ok(bytes::Bytes::from(full_packet))
416        } else {
417            Err(Error::ConnectionClosed)
418        }
419    }
420
421    /// Receive a complete response that may span multiple packets
422    ///
423    /// This method accumulates packets until the END_OF_RESPONSE flag is detected
424    /// in the data flags. It's used for operations like LOB reads that may return
425    /// data spanning multiple TNS packets.
426    ///
427    /// Returns the combined payload of all packets (excluding headers).
428    async fn receive_response(&mut self) -> Result<bytes::Bytes> {
429        use crate::constants::{data_flags, MessageType};
430
431        let mut accumulated_payload = Vec::new();
432        let mut is_first_packet = true;
433
434        loop {
435            let packet = self.receive().await?;
436
437            if packet.len() < PACKET_HEADER_SIZE {
438                return Err(Error::Protocol("Packet too small".to_string()));
439            }
440
441            // Check packet type - only DATA packets can be accumulated
442            let packet_type = packet[4];
443            if packet_type != PacketType::Data as u8 {
444                // Non-DATA packet (e.g., MARKER) - return as-is for special handling
445                return Ok(packet);
446            }
447
448            // Get payload (everything after the 8-byte header)
449            let payload = &packet[PACKET_HEADER_SIZE..];
450
451            if payload.len() < 2 {
452                return Err(Error::Protocol("DATA packet payload too small".to_string()));
453            }
454
455            // Read data flags (first 2 bytes of payload)
456            let data_flags_value = u16::from_be_bytes([payload[0], payload[1]]);
457
458            // Check for end of response - Python checks both flags and message type
459            let has_end_flag = (data_flags_value & data_flags::END_OF_RESPONSE) != 0;
460            let has_eof_flag = (data_flags_value & data_flags::EOF) != 0;
461
462            // Also check for EndOfResponse message type (header + 3 bytes with msg type 29)
463            let has_end_message = payload.len() == 3
464                && payload[2] == MessageType::EndOfResponse as u8;
465
466            // Accumulate payload first
467            if is_first_packet {
468                // First packet: include data flags in accumulated payload
469                accumulated_payload.extend_from_slice(payload);
470                is_first_packet = false;
471            } else {
472                // Subsequent packets: skip the data flags, append only the message data
473                accumulated_payload.extend_from_slice(&payload[2..]);
474            }
475
476            // Check for end of response using data flags from this packet
477            let is_end_of_response = has_end_flag || has_eof_flag || has_end_message;
478
479            // If data flags don't indicate end, scan the ACCUMULATED message data
480            // for terminal messages. We scan accumulated data (not just current packet)
481            // because messages can span packet boundaries.
482            let has_terminal_message = if !is_end_of_response && accumulated_payload.len() > 2 {
483                self.scan_for_terminal_message(&accumulated_payload[2..])
484            } else {
485                false
486            };
487
488            // Check if this is the last packet
489            if is_end_of_response || has_terminal_message {
490                break;
491            }
492        }
493
494        // Build a synthetic packet with combined payload
495        let total_len = PACKET_HEADER_SIZE + accumulated_payload.len();
496        let mut result = Vec::with_capacity(total_len);
497
498        // Build header
499        if self.large_sdu {
500            result.extend_from_slice(&(total_len as u32).to_be_bytes());
501        } else {
502            result.extend_from_slice(&(total_len as u16).to_be_bytes());
503            result.extend_from_slice(&[0, 0]); // Checksum
504        }
505        result.push(PacketType::Data as u8);
506        result.push(0); // Flags
507        result.extend_from_slice(&[0, 0]); // Header checksum
508
509        // Add combined payload
510        result.extend_from_slice(&accumulated_payload);
511
512        Ok(bytes::Bytes::from(result))
513    }
514
515    /// Scan message data for terminal message types (ERROR or END_OF_RESPONSE)
516    /// that indicate the response is complete.
517    ///
518    /// This is needed because Oracle doesn't always set the END_OF_RESPONSE flag
519    /// in the data flags for LOB operations. Instead, we must detect the terminal
520    /// message by parsing the message stream.
521    ///
522    /// NOTE: This is conservative - it only returns true if we can definitively
523    /// identify a terminal message. We avoid false positives by not scanning
524    /// raw byte values (which could match message type values by coincidence).
525    fn scan_for_terminal_message(&self, data: &[u8]) -> bool {
526        use crate::buffer::ReadBuffer;
527        use crate::constants::MessageType;
528
529        if data.is_empty() {
530            return false;
531        }
532
533        // Try to parse the message stream and look for ERROR or END_OF_RESPONSE
534        let mut buf = ReadBuffer::from_slice(data);
535
536        while buf.remaining() > 0 {
537            let msg_type = match buf.read_u8() {
538                Ok(t) => t,
539                Err(_) => return false, // Can't read, assume incomplete
540            };
541
542            // END_OF_RESPONSE is a standalone message with no additional data
543            if msg_type == MessageType::EndOfResponse as u8 {
544                return true;
545            }
546
547            // ERROR message indicates end of response for older Oracle
548            if msg_type == MessageType::Error as u8 {
549                // Error message found - this indicates end of response
550                return true;
551            }
552
553            // STATUS message also indicates end of response
554            if msg_type == MessageType::Status as u8 {
555                return true;
556            }
557
558            // LOB_DATA message - skip the data
559            if msg_type == MessageType::LobData as u8 {
560                // Read length-prefixed data and skip it
561                match buf.read_raw_bytes_chunked() {
562                    Ok(_) => continue,
563                    Err(_) => return false, // Incomplete LOB data, need more packets
564                }
565            }
566
567            // PARAMETER message (8) - this contains the updated locator and amount.
568            // For LOB write responses, PARAMETER is the first message and the response
569            // is relatively small (locator + error info). We can safely scan for
570            // ERROR/END_OF_RESPONSE bytes because the locator doesn't contain arbitrary
571            // binary data that would false-positive.
572            //
573            // For LOB read responses, LobData comes first and contains the actual data,
574            // which might contain bytes that match ERROR (4) or END_OF_RESPONSE (29).
575            // But since we skip LobData content, by the time we reach PARAMETER,
576            // the remaining data is just locator + error info.
577            if msg_type == MessageType::Parameter as u8 {
578                let remaining = buf.remaining_bytes();
579                // Check if ERROR (4) or END_OF_RESPONSE (29) appears in remaining bytes
580                // This is safe because PARAMETER data (locator + amount) doesn't contain
581                // arbitrary binary data that would false-positive.
582                if remaining.contains(&(MessageType::Error as u8))
583                    || remaining.contains(&(MessageType::EndOfResponse as u8))
584                {
585                    return true;
586                }
587                // If no terminal marker found, response might be incomplete
588                return false;
589            }
590
591            // For other unknown message types, we can't determine the end
592            return false;
593        }
594
595        false
596    }
597
598    /// Send a marker packet with the specified marker type
599    async fn send_marker(&mut self, marker_type: u8) -> Result<()> {
600        let mut buf = WriteBuffer::with_capacity(16);
601
602        // Build marker packet header
603        // Marker packet structure: [length][0x00][0x00][0x00][0x0c][flags][0x00][0x00] + payload
604        // Payload: [0x01][0x00][marker_type]
605        let payload_len = 3; // 0x01, 0x00, marker_type
606        let total_len = (PACKET_HEADER_SIZE + payload_len) as u16;
607
608        // Header
609        buf.write_u16_be(total_len)?;
610        buf.write_u16_be(0)?; // zeros in large_sdu position
611        buf.write_u8(PacketType::Marker as u8)?;
612        buf.write_u8(0)?; // flags
613        buf.write_u16_be(0)?; // reserved
614
615        // Payload
616        buf.write_u8(0x01)?; // constant
617        buf.write_u8(0x00)?; // constant
618        buf.write_u8(marker_type)?;
619
620        self.send(buf.as_slice()).await
621    }
622
623    /// Handle the reset protocol after receiving a MARKER packet
624    /// This sends a reset marker, waits for the reset response, then returns the error packet
625    /// Returns Err if the connection is closed after reset (some Oracle versions do this)
626    async fn handle_marker_reset(&mut self) -> Result<bytes::Bytes> {
627        const MARKER_TYPE_RESET: u8 = 2;
628
629        // Send reset marker
630        self.send_marker(MARKER_TYPE_RESET).await?;
631
632        // Read packets until we get a reset marker back
633        loop {
634            let packet = self.receive().await?;
635            if packet.len() < PACKET_HEADER_SIZE {
636                return Err(Error::Protocol("Invalid packet received".to_string()));
637            }
638
639            let packet_type = packet[4];
640
641            if packet_type == PacketType::Marker as u8 {
642                // Check if it's a reset marker
643                if packet.len() >= PACKET_HEADER_SIZE + 3 {
644                    let marker_type = packet[PACKET_HEADER_SIZE + 2];
645                    if marker_type == MARKER_TYPE_RESET {
646                        break;
647                    }
648                }
649            } else {
650                // Non-marker packet received unexpectedly during reset wait
651                return Ok(packet);
652            }
653        }
654
655        // Try to read the error packet (may need to skip additional marker packets first)
656        // Note: Some Oracle versions (like Oracle Free) may close the connection after reset
657        // instead of sending an error packet
658        loop {
659            match self.receive().await {
660                Ok(packet) => {
661                    let packet_type = packet[4];
662
663                    if packet_type != PacketType::Marker as u8 {
664                        // This should be the error data packet
665                        return Ok(packet);
666                    }
667                    // Skip additional marker packets
668                }
669                Err(_) => {
670                    // Connection closed after reset - Oracle Free and some versions
671                    // close the connection instead of sending the error details.
672                    // This typically happens when:
673                    // - Table or view doesn't exist
674                    // - Insufficient privileges to access the object
675                    // - Invalid SQL syntax
676                    return Err(Error::ConnectionClosedByServer(
677                        "Query failed - Oracle closed the connection without providing error details. \
678                         This typically indicates insufficient privileges or the object doesn't exist.".to_string()
679                    ));
680                }
681            }
682        }
683    }
684}
685
686/// An Oracle database connection.
687///
688/// This is the main type for interacting with Oracle databases. It provides
689/// methods for executing queries, DML statements, PL/SQL blocks, and managing
690/// transactions.
691///
692/// Connections are created using [`Connection::connect`] or
693/// [`Connection::connect_with_config`]. For connection pooling, use the
694/// `deadpool-oracle` crate.
695///
696/// # Example
697///
698/// ```rust,no_run
699/// use oracle_rs::{Config, Connection, Value};
700///
701/// # async fn example() -> oracle_rs::Result<()> {
702/// // Create a connection
703/// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
704/// let conn = Connection::connect_with_config(config).await?;
705///
706/// // Execute a query
707/// let result = conn.query("SELECT * FROM employees WHERE dept_id = :1", &[10.into()]).await?;
708/// for row in &result.rows {
709///     let name = row.get_by_name("name").and_then(|v| v.as_str()).unwrap_or("");
710///     println!("Employee: {}", name);
711/// }
712///
713/// // Execute DML with transaction
714/// conn.execute("INSERT INTO logs (msg) VALUES (:1)", &["Hello".into()]).await?;
715/// conn.commit().await?;
716///
717/// // Close the connection
718/// conn.close().await?;
719/// # Ok(())
720/// # }
721/// ```
722///
723/// # Thread Safety
724///
725/// `Connection` is `Send` and `Sync`, but operations are serialized internally
726/// via a mutex. For parallel query execution, use multiple connections (e.g.,
727/// via a connection pool).
728pub struct Connection {
729    inner: Arc<Mutex<ConnectionInner>>,
730    config: Config,
731    closed: AtomicBool,
732    id: u32,
733}
734
735// Connection ID counter
736static CONNECTION_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
737
738impl Connection {
739    /// Create a new connection to an Oracle database
740    ///
741    /// # Arguments
742    ///
743    /// * `connect_string` - Connection string in EZConnect format (e.g., "host:port/service")
744    /// * `username` - Database username
745    /// * `password` - Database password
746    ///
747    /// # Example
748    ///
749    /// ```rust,ignore
750    /// let conn = Connection::connect("localhost:1521/ORCLPDB1", "scott", "tiger").await?;
751    /// ```
752    pub async fn connect(connect_string: &str, username: &str, password: &str) -> Result<Self> {
753        let mut config: Config = connect_string.parse()?;
754        config.username = username.to_string();
755        config.set_password(password);
756        Self::connect_with_config(config).await
757    }
758
759    /// Create a new connection using a [`Config`].
760    ///
761    /// This is the preferred way to create connections as it gives full control
762    /// over connection parameters including TLS, timeouts, and statement caching.
763    ///
764    /// # Example
765    ///
766    /// ```rust,no_run
767    /// use oracle_rs::{Config, Connection};
768    ///
769    /// # async fn example() -> oracle_rs::Result<()> {
770    /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password")
771    ///     .with_statement_cache_size(50);
772    ///
773    /// let conn = Connection::connect_with_config(config).await?;
774    /// # Ok(())
775    /// # }
776    /// ```
777    pub async fn connect_with_config(config: Config) -> Result<Self> {
778        let id = CONNECTION_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
779
780        // Create TCP connection
781        let addr = config.socket_addr();
782        let tcp_stream = TcpStream::connect(&addr).await?;
783
784        // Set TCP options
785        tcp_stream.set_nodelay(true)?;
786
787        // Wrap with TLS if configured
788        let stream = if config.is_tls_enabled() {
789            let tls_config = config.tls_config.as_ref()
790                .cloned()
791                .unwrap_or_else(TlsConfig::new);
792
793            let tls_stream = connect_tls(tcp_stream, &config.host, &tls_config).await?;
794            OracleStream::Tls(tls_stream)
795        } else {
796            OracleStream::Plain(tcp_stream)
797        };
798
799        let mut inner = ConnectionInner::new_with_cache(config.stmtcachesize);
800        inner.stream = Some(stream);
801        inner.state = ConnectionState::Connected;
802
803        let conn = Connection {
804            inner: Arc::new(Mutex::new(inner)),
805            config,
806            closed: AtomicBool::new(false),
807            id,
808        };
809
810        // Perform connection handshake
811        conn.perform_handshake().await?;
812
813        Ok(conn)
814    }
815
816    /// Get the connection ID
817    pub fn id(&self) -> u32 {
818        self.id
819    }
820
821    /// Check if the connection is closed
822    pub fn is_closed(&self) -> bool {
823        self.closed.load(Ordering::Relaxed)
824    }
825
826    /// Mark the connection as closed
827    ///
828    /// This should be called when the underlying connection is detected as broken.
829    /// Once marked closed, `is_closed()` returns true and operations will fail fast.
830    pub fn mark_closed(&self) {
831        self.closed.store(true, Ordering::Relaxed);
832    }
833
834    /// Helper to mark connection as closed if the result is a connection error
835    fn handle_result<T>(&self, result: Result<T>) -> Result<T> {
836        if let Err(ref e) = result {
837            if e.is_connection_error() {
838                self.mark_closed();
839            }
840        }
841        result
842    }
843
844    /// Get server information
845    pub async fn server_info(&self) -> ServerInfo {
846        let inner = self.inner.lock().await;
847        inner.server_info.clone()
848    }
849
850    /// Get the current connection state
851    pub async fn state(&self) -> ConnectionState {
852        let inner = self.inner.lock().await;
853        inner.state
854    }
855
856    /// Perform the connection handshake
857    async fn perform_handshake(&self) -> Result<()> {
858        // Step 1: Send CONNECT packet and parse ACCEPT response
859        self.send_connect_packet().await?;
860
861        // Step 2: OOB check (required for protocol version >= 318 AND server supports OOB)
862        // Both conditions must be met - server must have indicated OOB support in ACCEPT
863        let needs_oob_check = {
864            let inner = self.inner.lock().await;
865            inner.server_info.protocol_version >= crate::constants::version::MIN_OOB_CHECK
866                && inner.server_info.supports_oob
867        };
868        if needs_oob_check {
869            self.send_oob_check().await?;
870        }
871
872        // Step 3: Protocol negotiation
873        self.negotiate_protocol().await?;
874
875        // Step 4: Data types negotiation
876        self.negotiate_data_types().await?;
877
878        // Step 5: Authentication
879        self.authenticate().await?;
880
881        Ok(())
882    }
883
884    /// Send OOB (Out of Band) check
885    /// Required for protocol version >= 318
886    async fn send_oob_check(&self) -> Result<()> {
887        let mut inner = self.inner.lock().await;
888        let large_sdu = inner.large_sdu;
889
890        // Step 1: Send raw byte "!" (0x21) for OOB check
891        inner.send(&[0x21]).await?;
892
893        // Step 2: Send MARKER packet with Reset
894        let marker_payload = [1u8, 0u8, crate::constants::MarkerType::Reset as u8];
895        let mut packet_buf = WriteBuffer::new();
896
897        if large_sdu {
898            packet_buf.write_u32_be((PACKET_HEADER_SIZE + marker_payload.len()) as u32)?;
899        } else {
900            packet_buf.write_u16_be((PACKET_HEADER_SIZE + marker_payload.len()) as u16)?;
901            packet_buf.write_u16_be(0)?; // Checksum
902        }
903        packet_buf.write_u8(PacketType::Marker as u8)?;
904        packet_buf.write_u8(0)?; // Flags
905        packet_buf.write_u16_be(0)?; // Header checksum
906        packet_buf.write_bytes(&marker_payload)?;
907
908        inner.send(&packet_buf.freeze()).await?;
909
910        // Step 3: Wait for OOB reset response
911        // The server sends back a MARKER packet or reset acknowledgment
912        let response = inner.receive().await?;
913
914        // Validate response - should be a MARKER packet type (12)
915        if response.len() > 4 && response[4] == PacketType::Marker as u8 {
916            Ok(())
917        } else {
918            // Server might just acknowledge without a specific packet
919            // This is acceptable in some Oracle versions
920            Ok(())
921        }
922    }
923
924    /// Send the initial CONNECT packet
925    async fn send_connect_packet(&self) -> Result<()> {
926        let mut inner = self.inner.lock().await;
927
928        // Build connect packet using ConnectMessage for proper packet format
929        let connect_msg = ConnectMessage::from_config(&self.config);
930        let (connect_packet, continuation) = connect_msg.build_with_continuation()?;
931
932        // Send the CONNECT packet
933        inner.send(&connect_packet).await?;
934
935        // If we have a continuation DATA packet (for large connect strings), send it
936        if let Some(data_packet) = continuation {
937            inner.send(&data_packet).await?;
938        }
939
940        // Wait for response
941        let response = inner.receive().await?;
942
943        // Parse response packet type
944        if response.len() < PACKET_HEADER_SIZE {
945            return Err(Error::PacketTooShort {
946                expected: PACKET_HEADER_SIZE,
947                actual: response.len(),
948            });
949        }
950
951        let packet_type = response[4];
952
953        match packet_type {
954            2 => {
955                // ACCEPT - parse the accept message to get protocol version and capabilities
956                let packet = Packet::from_bytes(response)?;
957                let accept = AcceptMessage::parse(&packet)?;
958
959                // Set large_sdu mode if protocol version >= 315
960                inner.large_sdu = accept.uses_large_sdu();
961
962                // Update server info
963                inner.server_info.protocol_version = accept.protocol_version;
964                inner.server_info.supports_oob = accept.supports_oob;
965                inner.sdu_size = accept.sdu.min(65535) as u16;
966
967                inner.state = ConnectionState::Connected;
968                Ok(())
969            }
970            4 => {
971                // REFUSE
972                let mut buf = ReadBuffer::new(response.slice(PACKET_HEADER_SIZE..));
973                let _reason = buf.read_u8()?;
974                let _user_reason = buf.read_u8()?;
975
976                Err(Error::ConnectionRefused {
977                    error_code: None,
978                    message: Some("Connection refused by server".to_string()),
979                })
980            }
981            5 => {
982                // REDIRECT
983                Err(Error::ConnectionRedirect("redirect not implemented".to_string()))
984            }
985            _ => Err(Error::ProtocolError(format!(
986                "Unexpected packet type during connect: {}",
987                packet_type
988            ))),
989        }
990    }
991
992    /// Negotiate protocol version and capabilities
993    async fn negotiate_protocol(&self) -> Result<()> {
994        use crate::messages::ProtocolMessage;
995
996        let mut inner = self.inner.lock().await;
997        let large_sdu = inner.large_sdu;
998
999
1000        // Build protocol request (includes header)
1001        let protocol_msg = ProtocolMessage::new();
1002        let packet = protocol_msg.build_request(large_sdu)?;
1003        inner.send(&packet).await?;
1004
1005        // Receive response
1006        let response = inner.receive().await?;
1007
1008        // Validate packet type (at offset 4 for both SDU modes)
1009        if response.len() <= 4 || response[4] != PacketType::Data as u8 {
1010            return Err(Error::ProtocolError("Protocol negotiation failed".to_string()));
1011        }
1012
1013        // Parse the Protocol response to extract server capabilities
1014        // The payload starts after the 8-byte header
1015        let payload = &response[PACKET_HEADER_SIZE..];
1016        let mut protocol_msg = ProtocolMessage::new();
1017        protocol_msg.parse_response(payload, &mut inner.capabilities)?;
1018
1019        // Update server info with banner
1020        if let Some(banner) = &protocol_msg.server_banner {
1021            inner.server_info.banner = banner.clone();
1022        }
1023
1024        inner.state = ConnectionState::ProtocolNegotiated;
1025        Ok(())
1026    }
1027
1028    /// Negotiate data types
1029    async fn negotiate_data_types(&self) -> Result<()> {
1030        use crate::messages::DataTypesMessage;
1031
1032        let mut inner = self.inner.lock().await;
1033        let large_sdu = inner.large_sdu;
1034
1035
1036        // Build data types request using DataTypesMessage (includes all ~320 data types)
1037        let data_types_msg = DataTypesMessage::new();
1038        let packet = data_types_msg.build_request(&inner.capabilities, large_sdu)?;
1039        inner.send(&packet).await?;
1040
1041        // Receive response
1042        let response = inner.receive().await?;
1043
1044        // Basic validation - packet type is at offset 4 regardless of large_sdu
1045        if response.len() > 4 && response[4] == PacketType::Data as u8 {
1046            inner.state = ConnectionState::DataTypesNegotiated;
1047            Ok(())
1048        } else {
1049            Err(Error::ProtocolError("Data types negotiation failed".to_string()))
1050        }
1051    }
1052
1053    /// Perform authentication
1054    async fn authenticate(&self) -> Result<()> {
1055
1056        let service_name = match &self.config.service {
1057            ServiceMethod::ServiceName(name) => name.clone(),
1058            ServiceMethod::Sid(sid) => sid.clone(),
1059        };
1060
1061        let mut auth = AuthMessage::new(
1062            &self.config.username,
1063            self.config.password().as_bytes(),
1064            &service_name,
1065        );
1066
1067        // Phase one: send username and session info
1068        {
1069            let mut inner = self.inner.lock().await;
1070            let large_sdu = inner.large_sdu;
1071            let request = auth.build_request(&inner.capabilities, large_sdu)?;
1072            inner.send(&request).await?;
1073
1074            let response = inner.receive().await?;
1075            if response.len() <= PACKET_HEADER_SIZE {
1076                return Err(Error::Protocol("Empty auth response".to_string()));
1077            }
1078
1079            // Check for error message type
1080            if response.len() > PACKET_HEADER_SIZE + 2 {
1081                let msg_type = response[PACKET_HEADER_SIZE + 2];
1082                if msg_type == MessageType::Error as u8 {
1083                    return Err(Error::AuthenticationFailed(
1084                        "Server rejected authentication phase one".to_string(),
1085                    ));
1086                }
1087            }
1088
1089            auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1090        }
1091
1092        // Phase two: send encrypted password
1093        if auth.phase() == AuthPhase::Two {
1094            let mut inner = self.inner.lock().await;
1095            let large_sdu = inner.large_sdu;
1096            let request = auth.build_request(&inner.capabilities, large_sdu)?;
1097            inner.send(&request).await?;
1098
1099            let response = inner.receive().await?;
1100            if response.len() <= PACKET_HEADER_SIZE {
1101                return Err(Error::Protocol("Empty auth phase two response".to_string()));
1102            }
1103
1104            // Check for error message type or marker
1105            let packet_type = response[4];
1106            if packet_type == 12 {
1107                // Marker - authentication failed
1108                return Err(Error::AuthenticationFailed("Server sent MARKER - authentication rejected".to_string()));
1109            }
1110
1111            if response.len() > PACKET_HEADER_SIZE + 2 {
1112                let msg_type = response[PACKET_HEADER_SIZE + 2];
1113                if msg_type == MessageType::Error as u8 {
1114                    return Err(Error::InvalidCredentials);
1115                }
1116            }
1117
1118            auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1119        }
1120
1121        // Verify authentication completed
1122        if !auth.is_complete() {
1123            return Err(Error::AuthenticationFailed(
1124                "Authentication did not complete".to_string(),
1125            ));
1126        }
1127
1128        // Store combo key for later use (encrypted data)
1129        let mut inner = self.inner.lock().await;
1130        if let Some(combo_key) = auth.combo_key() {
1131            inner.capabilities.combo_key = Some(combo_key.to_vec());
1132        }
1133        // Auth used sequence numbers 1 and 2, set to 2 so next is 3
1134        inner.sequence_number = 2;
1135        inner.state = ConnectionState::Ready;
1136
1137        Ok(())
1138    }
1139
1140    /// Execute a SQL statement and return the result
1141    ///
1142    /// # Arguments
1143    ///
1144    /// * `sql` - SQL statement to execute
1145    /// * `params` - Bind parameters (use `Value::Integer`, `Value::String`, etc.)
1146    ///
1147    /// # Example
1148    ///
1149    /// ```rust,ignore
1150    /// use oracle_rs::Value;
1151    ///
1152    /// // Query with bind parameters
1153    /// let result = conn.execute(
1154    ///     "SELECT * FROM employees WHERE department_id = :1",
1155    ///     &[Value::Integer(10)]
1156    /// ).await?;
1157    ///
1158    /// // DML with bind parameters
1159    /// let result = conn.execute(
1160    ///     "UPDATE employees SET salary = :1 WHERE employee_id = :2",
1161    ///     &[Value::Integer(50000), Value::Integer(100)]
1162    /// ).await?;
1163    /// println!("Rows affected: {}", result.rows_affected);
1164    /// ```
1165    pub async fn execute(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1166        self.ensure_ready().await?;
1167
1168        // Check statement cache for existing prepared statement
1169        let (statement, from_cache) = {
1170            let mut inner = self.inner.lock().await;
1171            if let Some(ref mut cache) = inner.statement_cache {
1172                if let Some(cached_stmt) = cache.get(sql) {
1173                    tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement (execute)");
1174                    (cached_stmt, true)
1175                } else {
1176                    (Statement::new(sql), false)
1177                }
1178            } else {
1179                (Statement::new(sql), false)
1180            }
1181        };
1182
1183        let result = match statement.statement_type() {
1184            StatementType::Query => self.execute_query_with_params(&statement, params).await,
1185            _ => self.execute_dml_with_params(&statement, params).await,
1186        };
1187
1188        // Return statement to cache or cache it for the first time
1189        match &result {
1190            Ok(query_result) => {
1191                let mut inner = self.inner.lock().await;
1192                if let Some(ref mut cache) = inner.statement_cache {
1193                    if from_cache {
1194                        cache.return_statement(sql);
1195                    } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1196                        let mut stmt_to_cache = statement.clone();
1197                        stmt_to_cache.set_cursor_id(query_result.cursor_id);
1198                        stmt_to_cache.set_executed(true);
1199                        cache.put(sql.to_string(), stmt_to_cache);
1200                    }
1201                }
1202            }
1203            Err(_) => {
1204                if from_cache {
1205                    let mut inner = self.inner.lock().await;
1206                    if let Some(ref mut cache) = inner.statement_cache {
1207                        cache.return_statement(sql);
1208                    }
1209                }
1210            }
1211        }
1212
1213        result
1214    }
1215
1216    /// Execute a query and return rows
1217    ///
1218    /// # Example
1219    ///
1220    /// ```rust,ignore
1221    /// use oracle_rs::Value;
1222    ///
1223    /// let result = conn.query(
1224    ///     "SELECT * FROM employees WHERE salary > :1",
1225    ///     &[Value::Integer(50000)]
1226    /// ).await?;
1227    /// ```
1228    pub async fn query(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1229        self.ensure_ready().await?;
1230
1231        // Check statement cache for existing prepared statement
1232        let (statement, from_cache) = {
1233            let mut inner = self.inner.lock().await;
1234            if let Some(ref mut cache) = inner.statement_cache {
1235                if let Some(cached_stmt) = cache.get(sql) {
1236                    tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement");
1237                    (cached_stmt, true)
1238                } else {
1239                    (Statement::new(sql), false)
1240                }
1241            } else {
1242                (Statement::new(sql), false)
1243            }
1244        };
1245
1246        // If using cached statement, save the columns (Oracle won't resend on reexecute)
1247        let cached_columns = if from_cache {
1248            Some(statement.columns().to_vec())
1249        } else {
1250            None
1251        };
1252
1253        let mut result = self.execute_query_with_params(&statement, params).await;
1254
1255        // For cached statements, restore columns if Oracle didn't send them
1256        if let (Ok(ref mut query_result), Some(columns)) = (&mut result, cached_columns) {
1257            if query_result.columns.is_empty() && !columns.is_empty() {
1258                query_result.columns = columns;
1259            }
1260        }
1261
1262        // Return statement to cache or cache it for the first time
1263        match &result {
1264            Ok(query_result) => {
1265                let mut inner = self.inner.lock().await;
1266                if let Some(ref mut cache) = inner.statement_cache {
1267                    if from_cache {
1268                        // Return to cache
1269                        cache.return_statement(sql);
1270                    } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1271                        // Cache the newly prepared statement with cursor_id and columns
1272                        let mut stmt_to_cache = statement.clone();
1273                        stmt_to_cache.set_cursor_id(query_result.cursor_id);
1274                        stmt_to_cache.set_executed(true);
1275                        stmt_to_cache.set_columns(query_result.columns.clone());
1276                        cache.put(sql.to_string(), stmt_to_cache);
1277                    }
1278                }
1279            }
1280            Err(_) => {
1281                // On error, still return statement to cache if it came from there
1282                if from_cache {
1283                    let mut inner = self.inner.lock().await;
1284                    if let Some(ref mut cache) = inner.statement_cache {
1285                        cache.return_statement(sql);
1286                    }
1287                }
1288            }
1289        }
1290
1291        self.handle_result(result)
1292    }
1293
1294    /// Execute DML (INSERT, UPDATE, DELETE) and return rows affected
1295    pub async fn execute_dml_sql(&self, sql: &str, params: &[Value]) -> Result<u64> {
1296        self.ensure_ready().await?;
1297
1298        // Check statement cache for existing prepared statement
1299        let (statement, from_cache) = {
1300            let mut inner = self.inner.lock().await;
1301            if let Some(ref mut cache) = inner.statement_cache {
1302                if let Some(cached_stmt) = cache.get(sql) {
1303                    tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached DML statement");
1304                    (cached_stmt, true)
1305                } else {
1306                    (Statement::new(sql), false)
1307                }
1308            } else {
1309                (Statement::new(sql), false)
1310            }
1311        };
1312
1313        let result = self.execute_dml_with_params(&statement, params).await;
1314
1315        // Return statement to cache or cache it for the first time
1316        match &result {
1317            Ok(query_result) => {
1318                let mut inner = self.inner.lock().await;
1319                if let Some(ref mut cache) = inner.statement_cache {
1320                    if from_cache {
1321                        cache.return_statement(sql);
1322                    } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1323                        let mut stmt_to_cache = statement.clone();
1324                        stmt_to_cache.set_cursor_id(query_result.cursor_id);
1325                        stmt_to_cache.set_executed(true);
1326                        cache.put(sql.to_string(), stmt_to_cache);
1327                    }
1328                }
1329            }
1330            Err(_) => {
1331                if from_cache {
1332                    let mut inner = self.inner.lock().await;
1333                    if let Some(ref mut cache) = inner.statement_cache {
1334                        cache.return_statement(sql);
1335                    }
1336                }
1337            }
1338        }
1339
1340        self.handle_result(result).map(|r| r.rows_affected)
1341    }
1342
1343    /// Execute a PL/SQL block with IN/OUT/INOUT parameters
1344    ///
1345    /// This method allows execution of PL/SQL anonymous blocks or procedure calls
1346    /// that have OUT or IN OUT parameters. The `params` slice specifies the direction
1347    /// and type of each bind parameter.
1348    ///
1349    /// # Arguments
1350    ///
1351    /// * `sql` - The PL/SQL block or procedure call
1352    /// * `params` - The bind parameters with direction information
1353    ///
1354    /// # Example
1355    ///
1356    /// ```rust,ignore
1357    /// use oracle_rs::{Connection, BindParam, OracleType, Value};
1358    ///
1359    /// // Call a procedure with IN and OUT parameters
1360    /// let result = conn.execute_plsql(
1361    ///     "BEGIN get_employee_name(:1, :2); END;",
1362    ///     &[
1363    ///         BindParam::input(Value::Integer(100)),           // IN: employee_id
1364    ///         BindParam::output(OracleType::Varchar, 100),     // OUT: employee_name
1365    ///     ]
1366    /// ).await?;
1367    ///
1368    /// // Get the OUT parameter value
1369    /// let name = result.get_string(0).unwrap_or("Unknown");
1370    /// println!("Employee name: {}", name);
1371    /// ```
1372    ///
1373    /// # REF CURSOR Example
1374    ///
1375    /// ```rust,ignore
1376    /// use oracle_rs::{Connection, BindParam, Value};
1377    ///
1378    /// // Call a procedure that returns a REF CURSOR
1379    /// let result = conn.execute_plsql(
1380    ///     "BEGIN OPEN :1 FOR SELECT * FROM employees; END;",
1381    ///     &[BindParam::output_cursor()]
1382    /// ).await?;
1383    ///
1384    /// // Get the cursor ID and fetch rows
1385    /// if let Some(cursor_id) = result.get_cursor_id(0) {
1386    ///     let rows = conn.fetch_cursor(cursor_id, 100).await?;
1387    ///     for row in rows {
1388    ///         println!("{:?}", row);
1389    ///     }
1390    /// }
1391    /// ```
1392    pub async fn execute_plsql(&self, sql: &str, params: &[BindParam]) -> Result<PlsqlResult> {
1393        self.ensure_ready().await?;
1394
1395        let statement = Statement::new(sql);
1396
1397        // Build values for bind parameters
1398        // IN params: use provided value or Null
1399        // OUT params: use placeholder value (required for metadata, server ignores actual value)
1400        // INOUT params: use provided value or Null
1401        let bind_values: Vec<Value> = params
1402            .iter()
1403            .map(|p| {
1404                if p.direction == BindDirection::Output {
1405                    // OUT params get a placeholder based on their type
1406                    // Oracle still needs a value sent in the request (even though it's ignored)
1407                    p.placeholder_value()
1408                } else {
1409                    // IN and INOUT params use the provided value or Null
1410                    p.value.clone().unwrap_or(Value::Null)
1411                }
1412            })
1413            .collect();
1414
1415        // Build bind metadata for proper buffer sizes
1416        // For OUTPUT params, use the user-specified buffer_size
1417        // For INPUT params, derive buffer_size from the actual value
1418        let bind_metadata: Vec<crate::messages::BindMetadata> = params
1419            .iter()
1420            .zip(bind_values.iter())
1421            .map(|(p, v)| {
1422                let buffer_size = if p.buffer_size > 0 {
1423                    p.buffer_size
1424                } else {
1425                    // Derive from value
1426                    match v {
1427                        Value::String(s) => std::cmp::max(s.len() as u32, 1),
1428                        Value::Bytes(b) => std::cmp::max(b.len() as u32, 1),
1429                        Value::Integer(_) | Value::Number(_) => 22, // Oracle NUMBER max size
1430                        Value::Float(_) => 8, // BINARY_DOUBLE
1431                        Value::Boolean(_) => 1,
1432                        Value::Timestamp(_) => 13,
1433                        Value::Date(_) => 7,
1434                        Value::RowId(_) => 18,
1435                        _ => 100, // Default fallback
1436                    }
1437                };
1438                crate::messages::BindMetadata {
1439                    oracle_type: p.oracle_type,
1440                    buffer_size,
1441                }
1442            })
1443            .collect();
1444
1445        // Create execute message with PL/SQL options
1446        let options = ExecuteOptions::for_plsql();
1447        let mut execute_msg = ExecuteMessage::new(&statement, options);
1448        execute_msg.set_bind_values(bind_values);
1449        execute_msg.set_bind_metadata(bind_metadata);
1450
1451        let mut inner = self.inner.lock().await;
1452        let large_sdu = inner.large_sdu;
1453        let seq_num = inner.next_sequence_number();
1454        execute_msg.set_sequence_number(seq_num);
1455        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1456        inner.send(&request).await?;
1457
1458        // Receive response
1459        let response = inner.receive().await?;
1460        if response.len() <= PACKET_HEADER_SIZE {
1461            return Err(Error::Protocol("Empty PL/SQL response".to_string()));
1462        }
1463
1464        // Check for MARKER packet (indicates error - requires reset protocol)
1465        let packet_type = response[4];
1466        if packet_type == PacketType::Marker as u8 {
1467            // Handle marker reset protocol and get the error packet
1468            let error_response = inner.handle_marker_reset().await?;
1469            let payload = &error_response[PACKET_HEADER_SIZE..];
1470            // Parse error response to extract the actual Oracle error
1471            let _: QueryResult = self.parse_error_response(payload)?;
1472            return Err(Error::Protocol("PL/SQL execution failed".to_string()));
1473        }
1474
1475        // Parse the PL/SQL response
1476        let payload = &response[PACKET_HEADER_SIZE..];
1477        let caps = inner.capabilities.clone();
1478        drop(inner); // Release lock before parsing
1479
1480        self.parse_plsql_response(payload, &caps, params)
1481    }
1482
1483    /// Execute a batch of DML statements with multiple rows of bind values
1484    ///
1485    /// This method efficiently executes the same SQL statement multiple times
1486    /// with different bind values (executemany pattern).
1487    ///
1488    /// # Arguments
1489    ///
1490    /// * `batch` - The batch containing SQL and rows of bind values
1491    ///
1492    /// # Example
1493    ///
1494    /// ```rust,ignore
1495    /// use oracle_rs::{Connection, BatchBuilder, Value};
1496    ///
1497    /// let batch = BatchBuilder::new("INSERT INTO users (id, name) VALUES (:1, :2)")
1498    ///     .add_row(vec![Value::Integer(1), Value::String("Alice".to_string())])
1499    ///     .add_row(vec![Value::Integer(2), Value::String("Bob".to_string())])
1500    ///     .with_row_counts()
1501    ///     .build();
1502    ///
1503    /// let result = conn.execute_batch(&batch).await?;
1504    /// println!("Total rows affected: {}", result.total_rows_affected);
1505    /// ```
1506    pub async fn execute_batch(&self, batch: &BatchBinds) -> Result<BatchResult> {
1507        self.ensure_ready().await?;
1508
1509        // Validate the batch
1510        batch.validate()?;
1511
1512        if batch.rows.is_empty() {
1513            return Ok(BatchResult::new());
1514        }
1515
1516        // Build execute options for batch DML
1517        let mut options = ExecuteOptions::for_dml(batch.options.auto_commit);
1518        options.num_execs = batch.rows.len() as u32;
1519        options.batch_errors = batch.options.batch_errors;
1520        options.dml_row_counts = batch.options.array_dml_row_counts;
1521
1522        // Create execute message with batch bind values
1523        let mut execute_msg = ExecuteMessage::new(&batch.statement, options);
1524        execute_msg.set_batch_bind_values(batch.rows.clone());
1525
1526        let mut inner = self.inner.lock().await;
1527        let large_sdu = inner.large_sdu;
1528        let seq_num = inner.next_sequence_number();
1529        execute_msg.set_sequence_number(seq_num);
1530        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1531        inner.send(&request).await?;
1532
1533        // Receive response
1534        let mut response = inner.receive().await?;
1535        if response.len() <= PACKET_HEADER_SIZE {
1536            return Err(Error::Protocol("Empty batch response".to_string()));
1537        }
1538
1539        // Check packet type - handle MARKER packets
1540        let packet_type = response[4];
1541        if packet_type == PacketType::Marker as u8 {
1542            // Handle BREAK/RESET protocol same as regular DML
1543            response = self.handle_marker_protocol(&mut inner, response).await?;
1544        }
1545
1546        // Parse the batch response
1547        let payload = &response[PACKET_HEADER_SIZE..];
1548        drop(inner); // Release lock before parsing
1549
1550        self.parse_batch_response(payload, batch.rows.len(), batch.options.array_dml_row_counts)
1551    }
1552
1553    /// Handle MARKER packet protocol (BREAK/RESET)
1554    async fn handle_marker_protocol(
1555        &self,
1556        inner: &mut ConnectionInner,
1557        initial_response: Bytes,
1558    ) -> Result<Bytes> {
1559        // Extract marker type from packet
1560        let marker_type = if initial_response.len() >= PACKET_HEADER_SIZE + 3 {
1561            initial_response[PACKET_HEADER_SIZE + 2]
1562        } else {
1563            1 // Assume BREAK
1564        };
1565
1566        if marker_type == 1 {
1567            // BREAK marker - send RESET and wait for response
1568            self.send_marker(inner, 2).await?;
1569
1570            // Wait for RESET marker back
1571            loop {
1572                match inner.receive().await {
1573                    Ok(pkt) => {
1574                        if pkt.len() < PACKET_HEADER_SIZE + 1 {
1575                            break;
1576                        }
1577                        let pkt_type = pkt[4];
1578                        if pkt_type == PacketType::Marker as u8 {
1579                            if pkt.len() >= PACKET_HEADER_SIZE + 3 {
1580                                let mk_type = pkt[PACKET_HEADER_SIZE + 2];
1581                                if mk_type == 2 {
1582                                    // Got RESET marker, break out
1583                                    break;
1584                                }
1585                            }
1586                        } else if pkt_type == PacketType::Data as u8 {
1587                            // Got DATA packet - return it as the response
1588                            return Ok(pkt);
1589                        } else {
1590                            break;
1591                        }
1592                    }
1593                    Err(e) => {
1594                        inner.state = ConnectionState::Closed;
1595                        return Err(e);
1596                    }
1597                }
1598            }
1599
1600            // After RESET, continue receiving until we get DATA packet
1601            loop {
1602                match inner.receive().await {
1603                    Ok(pkt) => {
1604                        let pkt_type = pkt[4];
1605                        if pkt_type == PacketType::Marker as u8 {
1606                            continue;
1607                        } else if pkt_type == PacketType::Data as u8 {
1608                            return Ok(pkt);
1609                        } else {
1610                            return Err(Error::Protocol(format!(
1611                                "Unexpected packet type {} after reset",
1612                                pkt_type
1613                            )));
1614                        }
1615                    }
1616                    Err(e) => {
1617                        inner.state = ConnectionState::Closed;
1618                        return Err(e);
1619                    }
1620                }
1621            }
1622        }
1623
1624        Ok(initial_response)
1625    }
1626
1627    /// Parse batch execution response
1628    fn parse_batch_response(
1629        &self,
1630        payload: &[u8],
1631        batch_size: usize,
1632        want_row_counts: bool,
1633    ) -> Result<BatchResult> {
1634        if payload.len() < 3 {
1635            return Err(Error::Protocol("Batch response too short".to_string()));
1636        }
1637
1638        let mut buf = ReadBuffer::from_slice(payload);
1639
1640        // Skip data flags
1641        buf.skip(2)?;
1642
1643        let mut rows_affected: u64 = 0;
1644        let mut row_counts: Option<Vec<u64>> = None;
1645        let mut end_of_response = false;
1646
1647        // Process messages until end_of_response or out of data
1648        while !end_of_response && buf.remaining() > 0 {
1649            let msg_type = buf.read_u8()?;
1650
1651            match msg_type {
1652                // Error (4) - may contain error or success info
1653                x if x == MessageType::Error as u8 => {
1654                    let (error_code, error_msg, _cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
1655                    rows_affected = row_count;
1656                    if error_code != 0 && error_code != 1403 {
1657                        return Err(Error::OracleError {
1658                            code: error_code,
1659                            message: error_msg.unwrap_or_default(),
1660                        });
1661                    }
1662                }
1663
1664                // Parameter (8) - return parameters (may contain row counts)
1665                x if x == MessageType::Parameter as u8 => {
1666                    if let Some(counts) = self.parse_return_parameters_internal(&mut buf, want_row_counts)? {
1667                        row_counts = Some(counts);
1668                    }
1669                }
1670
1671                // Status (9) - call status
1672                x if x == MessageType::Status as u8 => {
1673                    let _call_status = buf.read_ub4()?;
1674                    let _end_to_end_seq = buf.read_ub2()?;
1675                }
1676
1677                // BitVector (21)
1678                21 => {
1679                    let _num_columns_sent = buf.read_ub2()?;
1680                    if buf.remaining() > 0 {
1681                        let _byte = buf.read_u8()?;
1682                    }
1683                }
1684
1685                // End of Response (29) - explicit end marker
1686                29 => {
1687                    end_of_response = true;
1688                }
1689
1690                _ => {
1691                    // Unknown message type - continue processing
1692                }
1693            }
1694        }
1695
1696        let mut result = BatchResult::new();
1697        result.total_rows_affected = rows_affected;
1698        result.success_count = batch_size;
1699        result.row_counts = row_counts;
1700
1701        Ok(result)
1702    }
1703
1704    /// Fetch more rows from an open cursor
1705    ///
1706    /// This method is used when a query result has `has_more_rows == true`
1707    /// to retrieve additional rows from the server.
1708    ///
1709    /// # Arguments
1710    ///
1711    /// * `cursor_id` - The cursor ID from a previous query result
1712    /// * `columns` - Column information from the original query
1713    /// * `fetch_size` - Number of rows to fetch
1714    ///
1715    /// # Example
1716    ///
1717    /// ```rust,ignore
1718    /// let mut result = conn.query("SELECT * FROM large_table", &[]).await?;
1719    /// let mut all_rows = result.rows.clone();
1720    ///
1721    /// while result.has_more_rows {
1722    ///     result = conn.fetch_more(result.cursor_id, &result.columns, 100).await?;
1723    ///     all_rows.extend(result.rows);
1724    /// }
1725    /// ```
1726    pub async fn fetch_more(
1727        &self,
1728        cursor_id: u16,
1729        columns: &[ColumnInfo],
1730        fetch_size: u32,
1731    ) -> Result<QueryResult> {
1732        self.ensure_ready().await?;
1733
1734        // Build fetch message
1735        let fetch_msg = FetchMessage::new(cursor_id, fetch_size);
1736
1737        let mut inner = self.inner.lock().await;
1738        let request = fetch_msg.build_request(&inner.capabilities)?;
1739        inner.send(&request).await?;
1740
1741        // Receive and parse response
1742        let response = inner.receive().await?;
1743        if response.len() <= PACKET_HEADER_SIZE {
1744            return Err(Error::Protocol("Empty fetch response".to_string()));
1745        }
1746
1747        // Parse row data from response
1748        let payload = &response[PACKET_HEADER_SIZE..];
1749        let caps = inner.capabilities.clone();
1750        drop(inner); // Release lock before parsing
1751        self.parse_fetch_response(payload, columns, &caps)
1752    }
1753
1754    /// Fetch rows from a REF CURSOR
1755    ///
1756    /// This method fetches rows from a REF CURSOR that was returned from a
1757    /// PL/SQL procedure or function. The cursor contains the column metadata
1758    /// and cursor ID needed to fetch the rows.
1759    ///
1760    /// # Arguments
1761    ///
1762    /// * `cursor` - The REF CURSOR returned from PL/SQL
1763    ///
1764    /// # Example
1765    ///
1766    /// ```rust,ignore
1767    /// use oracle_rs::{Connection, BindParam, Value};
1768    ///
1769    /// // Call a procedure that returns a REF CURSOR
1770    /// let result = conn.execute_plsql(
1771    ///     "BEGIN OPEN :1 FOR SELECT id, name FROM employees; END;",
1772    ///     &[BindParam::output_cursor()]
1773    /// ).await?;
1774    ///
1775    /// // Get the cursor and fetch rows
1776    /// if let Value::Cursor(cursor) = &result.out_values[0] {
1777    ///     let rows = conn.fetch_cursor(cursor).await?;
1778    ///     println!("Fetched {} rows", rows.row_count());
1779    ///     for row in &rows.rows {
1780    ///         println!("{:?}", row);
1781    ///     }
1782    /// }
1783    /// ```
1784    pub async fn fetch_cursor(&self, cursor: &crate::types::RefCursor) -> Result<QueryResult> {
1785        self.fetch_cursor_with_size(cursor, 100).await
1786    }
1787
1788    /// Fetch rows from a REF CURSOR with a specified fetch size
1789    ///
1790    /// This is the same as `fetch_cursor` but allows specifying how many
1791    /// rows to fetch at once.
1792    ///
1793    /// REF CURSORs use an ExecuteMessage with only the FETCH option because
1794    /// the cursor is already open from the PL/SQL execution. The cursor_id
1795    /// and column metadata were obtained when the REF CURSOR was returned.
1796    ///
1797    /// # Arguments
1798    ///
1799    /// * `cursor` - The REF CURSOR returned from PL/SQL
1800    /// * `fetch_size` - Number of rows to fetch (default is 100)
1801    pub async fn fetch_cursor_with_size(
1802        &self,
1803        cursor: &crate::types::RefCursor,
1804        fetch_size: u32,
1805    ) -> Result<QueryResult> {
1806        use crate::messages::ExecuteMessage;
1807
1808        if cursor.cursor_id() == 0 {
1809            return Err(Error::InvalidCursor("Cursor ID is 0 (not initialized)".to_string()));
1810        }
1811
1812        self.ensure_ready().await?;
1813
1814        // REF CURSOR uses ExecuteMessage with FETCH only (no SQL, no EXECUTE)
1815        // Create a statement with the cursor's metadata
1816        let mut stmt = Statement::new(""); // No SQL for REF CURSOR
1817        stmt.set_cursor_id(cursor.cursor_id());
1818        stmt.set_columns(cursor.columns().to_vec());
1819        stmt.set_executed(true); // Already executed by Oracle
1820        stmt.set_statement_type(crate::statement::StatementType::Query); // This is a query cursor
1821
1822        // Build execute message with only FETCH option
1823        let options = crate::messages::ExecuteOptions::for_ref_cursor(fetch_size);
1824        let mut execute_msg = ExecuteMessage::new(&stmt, options);
1825
1826        let mut inner = self.inner.lock().await;
1827        let large_sdu = inner.large_sdu;
1828        let seq_num = inner.next_sequence_number();
1829        execute_msg.set_sequence_number(seq_num);
1830
1831        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1832        inner.send(&request).await?;
1833
1834        // Receive and parse response
1835        let response = inner.receive().await?;
1836        if response.len() <= PACKET_HEADER_SIZE {
1837            return Err(Error::Protocol("Empty cursor response".to_string()));
1838        }
1839
1840        // Check for MARKER packet (indicates error)
1841        let packet_type = response[4];
1842        if packet_type == PacketType::Marker as u8 {
1843            let error_response = inner.handle_marker_reset().await?;
1844            let payload = &error_response[PACKET_HEADER_SIZE..];
1845            return self.parse_error_response(payload);
1846        }
1847
1848        // Parse query response - use cursor's columns since they're already defined
1849        let payload = &response[PACKET_HEADER_SIZE..];
1850        let caps = inner.capabilities.clone();
1851        drop(inner); // Release lock before parsing
1852        self.parse_fetch_response(payload, cursor.columns(), &caps)
1853    }
1854
1855    /// Fetch rows from an implicit result set
1856    ///
1857    /// Implicit results are returned via `DBMS_SQL.RETURN_RESULT` from PL/SQL.
1858    /// They contain cursor metadata but no rows until fetched.
1859    ///
1860    /// # Arguments
1861    ///
1862    /// * `result` - The implicit result from PL/SQL execution
1863    ///
1864    /// # Example
1865    ///
1866    /// ```rust,ignore
1867    /// let plsql_result = conn.execute_plsql(r#"
1868    ///     declare
1869    ///         c sys_refcursor;
1870    ///     begin
1871    ///         open c for select * from employees;
1872    ///         dbms_sql.return_result(c);
1873    ///     end;
1874    /// "#, &[]).await?;
1875    ///
1876    /// for implicit in plsql_result.implicit_results.iter() {
1877    ///     let rows = conn.fetch_implicit_result(implicit).await?;
1878    ///     println!("Fetched {} rows", rows.row_count());
1879    /// }
1880    /// ```
1881    pub async fn fetch_implicit_result(&self, result: &ImplicitResult) -> Result<QueryResult> {
1882        self.fetch_implicit_result_with_size(result, 100).await
1883    }
1884
1885    /// Fetch rows from an implicit result set with a specified fetch size
1886    pub async fn fetch_implicit_result_with_size(
1887        &self,
1888        result: &ImplicitResult,
1889        fetch_size: u32,
1890    ) -> Result<QueryResult> {
1891        // Convert implicit result to RefCursor and use fetch_cursor mechanism
1892        let cursor = crate::types::RefCursor::new(result.cursor_id, result.columns.clone());
1893        self.fetch_cursor_with_size(&cursor, fetch_size).await
1894    }
1895
1896    /// Parse fetch response to extract additional rows
1897    ///
1898    /// REF CURSOR fetch responses contain a series of messages:
1899    /// - RowHeader (6): Contains metadata about the following row data
1900    /// - RowData (7): Contains the actual row values
1901    /// - Error (4): Contains error info with cursor_id and row counts
1902    fn parse_fetch_response(&self, payload: &[u8], columns: &[ColumnInfo], caps: &Capabilities) -> Result<QueryResult> {
1903        if payload.len() < 3 {
1904            return Err(Error::Protocol("Fetch response too short".to_string()));
1905        }
1906
1907        let mut buf = ReadBuffer::from_slice(payload);
1908        let mut rows = Vec::new();
1909        let mut has_more_rows = false;
1910
1911        // Bit vector for duplicate column optimization
1912        let mut bit_vector: Option<Vec<u8>> = None;
1913        let mut previous_row_values: Option<Vec<Value>> = None;
1914
1915        // Skip data flags
1916        buf.skip(2)?;
1917
1918        // Process multiple messages in the response
1919        while buf.remaining() >= 1 {
1920            let msg_type = buf.read_u8()?;
1921
1922            match msg_type {
1923                x if x == MessageType::RowHeader as u8 => {
1924                    // Skip row header metadata (per Python's _process_row_header)
1925                    buf.skip(1)?; // flags
1926                    buf.skip_ub2()?; // num requests
1927                    buf.skip_ub4()?; // iteration number
1928                    buf.skip_ub4()?; // num iters
1929                    buf.skip_ub2()?; // buffer length
1930                    let num_bytes = buf.read_ub4()?;
1931                    if num_bytes > 0 {
1932                        buf.skip(1)?; // skip repeated length
1933                        // This bit vector in row header is for the following row data
1934                        let bv = buf.read_bytes_vec(num_bytes as usize)?;
1935                        bit_vector = Some(bv);
1936                    }
1937                    let rxhrid_bytes = buf.read_ub4()?;
1938                    if rxhrid_bytes > 0 {
1939                        buf.skip_raw_bytes_chunked()?;
1940                    }
1941                }
1942                x if x == MessageType::RowData as u8 => {
1943                    // Parse actual row data with bit vector support
1944                    let row = self.parse_row_data_with_bitvector(
1945                        &mut buf,
1946                        columns,
1947                        caps,
1948                        bit_vector.as_deref(),
1949                        previous_row_values.as_ref(),
1950                    )?;
1951                    previous_row_values = Some(row.values().to_vec());
1952                    bit_vector = None;
1953                    rows.push(row);
1954                }
1955                x if x == MessageType::BitVector as u8 => {
1956                    // BitVector indicates which columns have actual data vs duplicates
1957                    let _num_columns_sent = buf.read_ub2()?;
1958                    let num_bytes = (columns.len() + 7) / 8; // Round up
1959                    if num_bytes > 0 {
1960                        let bv = buf.read_bytes_vec(num_bytes)?;
1961                        bit_vector = Some(bv);
1962                    }
1963                    // Continue processing - RowData follows
1964                }
1965                x if x == MessageType::Error as u8 => {
1966                    // Error message contains row count and cursor info
1967                    let (error_code, error_msg, more_rows) = self.parse_error_message_info(&mut buf)?;
1968                    has_more_rows = more_rows;
1969                    if error_code != 0 && error_code != 1403 { // 1403 = no data found
1970                        return Err(Error::OracleError {
1971                            code: error_code,
1972                            message: error_msg,
1973                        });
1974                    }
1975                    break; // Error message marks end of response
1976                }
1977                x if x == MessageType::Status as u8 => {
1978                    // Status message - usually marks end
1979                    break;
1980                }
1981                x if x == MessageType::EndOfResponse as u8 => {
1982                    break;
1983                }
1984                _ => {
1985                    // Unknown message type - stop processing
1986                    break;
1987                }
1988            }
1989        }
1990
1991        Ok(QueryResult {
1992            columns: columns.to_vec(),
1993            rows,
1994            rows_affected: 0,
1995            has_more_rows,
1996            cursor_id: 0,
1997        })
1998    }
1999
2000    /// Parse error message info including cursor_id and row counts
2001    fn parse_error_message_info(&self, buf: &mut ReadBuffer) -> Result<(u32, String, bool)> {
2002        let _call_status = buf.read_ub4()?; // end of call status
2003        buf.skip_ub2()?; // end to end seq#
2004        buf.skip_ub4()?; // current row number
2005        buf.skip_ub2()?; // error number
2006        buf.skip_ub2()?; // array elem error
2007        buf.skip_ub2()?; // array elem error
2008        let _cursor_id = buf.read_ub2()?; // cursor id
2009        let _error_pos = buf.read_sb2()?; // error position
2010        buf.skip(1)?; // sql type
2011        buf.skip(1)?; // fatal?
2012        buf.skip(1)?; // flags
2013        buf.skip(1)?; // user cursor options
2014        buf.skip(1)?; // UPI parameter
2015        let flags = buf.read_u8()?; // flags
2016        // Skip rowid - fixed 10 bytes in Oracle format
2017        buf.skip(10)?; // rowid is 10 bytes
2018        buf.skip_ub4()?; // OS error
2019        buf.skip(1)?; // statement number
2020        buf.skip(1)?; // call number
2021        buf.skip_ub2()?; // padding
2022        buf.skip_ub4()?; // success iters
2023        let num_bytes = buf.read_ub4()?; // oerrdd
2024        if num_bytes > 0 {
2025            buf.skip_raw_bytes_chunked()?;
2026        }
2027
2028        // Skip batch error codes
2029        let num_errors = buf.read_ub2()?;
2030        if num_errors > 0 {
2031            buf.skip_raw_bytes_chunked()?;
2032        }
2033
2034        // Skip batch error offsets
2035        let num_offsets = buf.read_ub4()?;
2036        if num_offsets > 0 {
2037            buf.skip_raw_bytes_chunked()?;
2038        }
2039
2040        // Skip batch error messages
2041        let temp16 = buf.read_ub2()?;
2042        if temp16 > 0 {
2043            buf.skip_raw_bytes_chunked()?;
2044        }
2045
2046        // Read extended error info
2047        let error_num = buf.read_ub4()?;
2048        let row_count = buf.read_ub8()?;
2049        let more_rows = row_count > 0 || (flags & 0x20) != 0;
2050
2051        // Read error message if present
2052        let error_msg = if error_num != 0 {
2053            buf.read_string_with_length()?.unwrap_or_default()
2054        } else {
2055            String::new()
2056        };
2057
2058        Ok((error_num, error_msg, more_rows))
2059    }
2060
2061    /// Open a scrollable cursor for bidirectional navigation
2062    ///
2063    /// Scrollable cursors allow moving forward and backward through result sets,
2064    /// jumping to specific positions, and fetching from various locations.
2065    ///
2066    /// # Arguments
2067    ///
2068    /// * `sql` - SQL query to execute
2069    ///
2070    /// # Example
2071    ///
2072    /// ```rust,ignore
2073    /// let mut cursor = conn.open_scrollable_cursor("SELECT * FROM employees").await?;
2074    ///
2075    /// // Move to different positions
2076    /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2077    /// let last = conn.scroll(&mut cursor, FetchOrientation::Last, 0).await?;
2078    /// let row5 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 5).await?;
2079    ///
2080    /// conn.close_cursor(&mut cursor).await?;
2081    /// ```
2082    pub async fn open_scrollable_cursor(&self, sql: &str) -> Result<ScrollableCursor> {
2083        self.ensure_ready().await?;
2084
2085        let statement = Statement::new(sql);
2086
2087        // For scrollable cursors, execute with scrollable flag and prefetch 1 row
2088        // to get column metadata. The scroll() method will fetch actual rows at
2089        // specific positions.
2090        let mut options = ExecuteOptions::for_query(1); // prefetch 1 row for column info
2091
2092        options.scrollable = true;
2093
2094        let mut execute_msg = ExecuteMessage::new(&statement, options);
2095
2096        let mut inner = self.inner.lock().await;
2097        let large_sdu = inner.large_sdu;
2098        let seq_num = inner.next_sequence_number();
2099        execute_msg.set_sequence_number(seq_num);
2100        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2101        inner.send(&request).await?;
2102
2103        // Receive and parse response
2104        let response = inner.receive().await?;
2105
2106        if response.len() <= PACKET_HEADER_SIZE {
2107            return Err(Error::Protocol("Empty scrollable cursor response".to_string()));
2108        }
2109
2110        // Check for MARKER packet (indicates error - requires reset protocol)
2111        let packet_type = response[4];
2112        if packet_type == PacketType::Marker as u8 {
2113            // Handle marker reset protocol and get the error packet
2114            let error_response = inner.handle_marker_reset().await?;
2115            let payload = &error_response[PACKET_HEADER_SIZE..];
2116            // Parse error response to extract the actual Oracle error
2117            let _: QueryResult = self.parse_error_response(payload)?;
2118            // If we get here without error, something unexpected happened
2119            return Err(Error::Protocol("Unexpected successful response after MARKER".to_string()));
2120        }
2121
2122        // Parse describe info to get columns
2123        let payload = &response[PACKET_HEADER_SIZE..];
2124        let result = self.parse_query_response(payload, &inner.capabilities)?;
2125
2126        Ok(ScrollableCursor::new(result.cursor_id, result.columns))
2127    }
2128
2129    /// Scroll to a position in a scrollable cursor and fetch rows
2130    ///
2131    /// # Arguments
2132    ///
2133    /// * `cursor` - The scrollable cursor to scroll
2134    /// * `orientation` - The direction/mode of scrolling
2135    /// * `offset` - Position offset (used for Absolute and Relative modes)
2136    ///
2137    /// # Example
2138    ///
2139    /// ```rust,ignore
2140    /// // Go to first row
2141    /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2142    ///
2143    /// // Go to absolute position 10
2144    /// let row10 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 10).await?;
2145    ///
2146    /// // Move 5 rows forward from current position
2147    /// let plus5 = conn.scroll(&mut cursor, FetchOrientation::Relative, 5).await?;
2148    ///
2149    /// // Move 3 rows backward
2150    /// let minus3 = conn.scroll(&mut cursor, FetchOrientation::Relative, -3).await?;
2151    /// ```
2152    pub async fn scroll(
2153        &self,
2154        cursor: &mut ScrollableCursor,
2155        orientation: FetchOrientation,
2156        offset: i64,
2157    ) -> Result<ScrollResult> {
2158        self.ensure_ready().await?;
2159
2160        if !cursor.is_open() {
2161            return Err(Error::CursorClosed);
2162        }
2163
2164        // Create a statement for the scroll operation (uses the existing cursor)
2165        let mut stmt = Statement::new("");
2166        stmt.set_cursor_id(cursor.cursor_id);
2167        stmt.set_columns(cursor.columns.clone());
2168        stmt.set_executed(true);
2169        stmt.set_statement_type(crate::statement::StatementType::Query);
2170
2171        // Build execute message with scroll_operation=true
2172        let mut options = ExecuteOptions::for_query(1);
2173        options.scrollable = true;
2174        options.scroll_operation = true;
2175        options.fetch_orientation = orientation as u32;
2176        // Calculate fetch_pos based on orientation
2177        options.fetch_pos = match orientation {
2178            FetchOrientation::First => 1,
2179            FetchOrientation::Last => 0, // Server calculates
2180            FetchOrientation::Absolute => offset.max(0) as u32,
2181            FetchOrientation::Relative => (cursor.position + offset).max(0) as u32,
2182            FetchOrientation::Next => (cursor.position + 1).max(0) as u32,
2183            FetchOrientation::Prior => (cursor.position - 1).max(0) as u32,
2184            FetchOrientation::Current => cursor.position.max(0) as u32,
2185        };
2186
2187        let mut execute_msg = ExecuteMessage::new(&stmt, options);
2188
2189        let mut inner = self.inner.lock().await;
2190        let large_sdu = inner.large_sdu;
2191        let seq_num = inner.next_sequence_number();
2192        execute_msg.set_sequence_number(seq_num);
2193        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2194        inner.send(&request).await?;
2195
2196        // Receive and parse response
2197        let response = inner.receive().await?;
2198        if response.len() <= PACKET_HEADER_SIZE {
2199            return Err(Error::Protocol("Empty scroll response".to_string()));
2200        }
2201
2202        // Check for MARKER packet
2203        let packet_type = response[4];
2204        if packet_type == PacketType::Marker as u8 {
2205            let error_response = inner.handle_marker_reset().await?;
2206            let payload = &error_response[PACKET_HEADER_SIZE..];
2207            let _: QueryResult = self.parse_error_response(payload)?;
2208            return Err(Error::Protocol("Scroll operation failed".to_string()));
2209        }
2210
2211        let payload = &response[PACKET_HEADER_SIZE..];
2212        // Use cursor's columns since Oracle doesn't re-send column metadata for scroll operations
2213        let query_result = self.parse_query_response_with_columns(payload, &inner.capabilities, &cursor.columns)?;
2214
2215        // Use position from Oracle's response (rows_affected contains the row position)
2216        // For scrollable cursors, Oracle returns the row number in error_info.rowcount
2217        let new_position = if !query_result.rows.is_empty() {
2218            // Position is the actual row number from Oracle
2219            query_result.rows_affected as i64
2220        } else {
2221            // No rows returned - calculate position based on orientation
2222            match orientation {
2223                FetchOrientation::First => 0, // Before first
2224                FetchOrientation::Last => cursor.row_count.unwrap_or(0) as i64 + 1, // After last
2225                FetchOrientation::Next => cursor.position + 1,
2226                FetchOrientation::Prior => cursor.position - 1,
2227                FetchOrientation::Absolute => offset,
2228                FetchOrientation::Relative => cursor.position + offset,
2229                FetchOrientation::Current => cursor.position,
2230            }
2231        };
2232
2233        cursor.update_position(new_position);
2234
2235        let mut result = ScrollResult::new(query_result.rows, new_position);
2236        result.at_end = !query_result.has_more_rows;
2237        result.at_beginning = new_position <= 1;
2238
2239        Ok(result)
2240    }
2241
2242    /// Close a scrollable cursor
2243    ///
2244    /// # Arguments
2245    ///
2246    /// * `cursor` - The scrollable cursor to close
2247    pub async fn close_cursor(&self, cursor: &mut ScrollableCursor) -> Result<()> {
2248        if !cursor.is_open() {
2249            return Ok(());
2250        }
2251
2252        // Send close cursor message
2253        // For now, just mark it as closed - the cursor will be cleaned up
2254        // when the connection is closed or reused
2255        cursor.mark_closed();
2256        Ok(())
2257    }
2258
2259    /// Get type information for a database object or collection type
2260    ///
2261    /// This method queries Oracle's data dictionary to retrieve type metadata
2262    /// for collections (VARRAY, Nested Table) and user-defined object types.
2263    ///
2264    /// # Arguments
2265    ///
2266    /// * `type_name` - Fully qualified type name (e.g., "SCHEMA.TYPE_NAME" or just "TYPE_NAME")
2267    ///
2268    /// # Returns
2269    ///
2270    /// A `DbObjectType` containing the type metadata, including:
2271    /// - Schema and type name
2272    /// - Whether it's a collection
2273    /// - Collection type (VARRAY, Nested Table, etc.)
2274    /// - Element type for collections
2275    ///
2276    /// # Example
2277    ///
2278    /// ```rust,ignore
2279    /// let number_array = conn.get_type("MY_NUMBER_ARRAY").await?;
2280    /// assert!(number_array.is_collection);
2281    /// ```
2282    pub async fn get_type(&self, type_name: &str) -> Result<crate::dbobject::DbObjectType> {
2283        use crate::dbobject::{CollectionType, DbObjectType};
2284
2285        self.ensure_ready().await?;
2286
2287        // Parse type name into schema and name
2288        let (schema, name) = parse_type_name(type_name, &self.config.username);
2289
2290        // First, query ALL_TYPES to get basic type info
2291        let type_info = self
2292            .query(
2293                "SELECT typecode, type_oid FROM all_types WHERE owner = :1 AND type_name = :2",
2294                &[Value::String(schema.clone()), Value::String(name.clone())],
2295            )
2296            .await?;
2297
2298        if type_info.rows.is_empty() {
2299            return Err(Error::OracleError {
2300                code: 4043, // ORA-04043: object does not exist
2301                message: format!("Type {}.{} not found", schema, name),
2302            });
2303        }
2304
2305        let row = &type_info.rows[0];
2306        let typecode = row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2307        let type_oid = row.get(1).and_then(|v| v.as_bytes()).map(|b| b.to_vec());
2308
2309        // Check if it's a collection
2310        if typecode == "COLLECTION" {
2311            // Query ALL_COLL_TYPES for collection details
2312            let coll_info = self.query(
2313                "SELECT coll_type, elem_type_name, elem_type_owner, upper_bound FROM all_coll_types WHERE owner = :1 AND type_name = :2",
2314                &[Value::String(schema.clone()), Value::String(name.clone())],
2315            ).await?;
2316
2317            if coll_info.rows.is_empty() {
2318                return Err(Error::OracleError {
2319                    code: 4043,
2320                    message: format!("Collection type {}.{} metadata not found", schema, name),
2321                });
2322            }
2323
2324            let coll_row = &coll_info.rows[0];
2325            let coll_type_str = coll_row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2326            let elem_type_name = coll_row.get(1).and_then(|v| v.as_str()).unwrap_or("VARCHAR2");
2327            let _elem_type_owner = coll_row.get(2).and_then(|v| v.as_str());
2328
2329            let collection_type = match coll_type_str {
2330                "VARYING ARRAY" => CollectionType::Varray,
2331                "TABLE" => CollectionType::NestedTable,
2332                _ => CollectionType::Varray, // Default
2333            };
2334
2335            let element_type = oracle_type_from_name(elem_type_name);
2336
2337            let mut obj_type = DbObjectType::collection(schema, name, collection_type, element_type);
2338            obj_type.oid = type_oid;
2339            Ok(obj_type)
2340        } else {
2341            // Regular object type (not yet fully implemented)
2342            let mut obj_type = DbObjectType::new(schema, name);
2343            obj_type.oid = type_oid;
2344            Ok(obj_type)
2345        }
2346    }
2347
2348    /// Internal: Execute a query statement with optional bind parameters
2349    async fn execute_query_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2350        let prefetch_rows = 100; // Default prefetch
2351
2352        // For first execution, check if we might have LOBs (no prefetch for safety)
2353        // This can be optimized later with describe-only first
2354        let options = ExecuteOptions::for_query(prefetch_rows);
2355        let mut execute_msg = ExecuteMessage::new(statement, options);
2356
2357        // Set bind values if provided
2358        if !params.is_empty() {
2359            execute_msg.set_bind_values(params.to_vec());
2360        }
2361
2362        let mut inner = self.inner.lock().await;
2363        let large_sdu = inner.large_sdu;
2364        let seq_num = inner.next_sequence_number();
2365        execute_msg.set_sequence_number(seq_num);
2366        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2367        inner.send(&request).await?;
2368
2369        // Receive and parse response
2370        let response = inner.receive().await?;
2371        if response.len() <= PACKET_HEADER_SIZE {
2372            return Err(Error::Protocol("Empty query response".to_string()));
2373        }
2374
2375        // Check for MARKER packet (indicates error - requires reset protocol)
2376        let packet_type = response[4];
2377        if packet_type == PacketType::Marker as u8 {
2378            // Handle marker reset protocol and get the error packet
2379            let error_response = inner.handle_marker_reset().await?;
2380            let payload = &error_response[PACKET_HEADER_SIZE..];
2381            return self.parse_error_response(payload);
2382        }
2383
2384        // Parse the response to extract columns and rows
2385        let payload = &response[PACKET_HEADER_SIZE..];
2386        let mut result = self.parse_query_response(payload, &inner.capabilities)?;
2387
2388        // Check if any columns are LOB types that require defines
2389        let has_lob_columns = result.columns.iter().any(|col| col.is_lob());
2390
2391        if has_lob_columns && !statement.requires_define() {
2392            // We need to re-execute with column defines
2393            // Create a modified statement with the define flag set
2394            let mut stmt_with_define = statement.clone();
2395            stmt_with_define.set_columns(result.columns.clone());
2396            stmt_with_define.set_cursor_id(result.cursor_id);
2397            stmt_with_define.set_requires_define(true);
2398            stmt_with_define.set_no_prefetch(true);
2399            stmt_with_define.set_executed(true);
2400
2401            // Re-execute with defines
2402            let define_options = ExecuteOptions::for_query(prefetch_rows);
2403            let mut define_msg = ExecuteMessage::new(&stmt_with_define, define_options);
2404            let seq_num = inner.next_sequence_number();
2405            define_msg.set_sequence_number(seq_num);
2406
2407            let define_request = define_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2408            inner.send(&define_request).await?;
2409
2410            // Receive the re-execute response
2411            let define_response = inner.receive().await?;
2412            if define_response.len() <= PACKET_HEADER_SIZE {
2413                return Err(Error::Protocol("Empty define response".to_string()));
2414            }
2415
2416            // Check for MARKER packet
2417            let packet_type = define_response[4];
2418            if packet_type == PacketType::Marker as u8 {
2419                let error_response = inner.handle_marker_reset().await?;
2420                let payload = &error_response[PACKET_HEADER_SIZE..];
2421                return self.parse_error_response(payload);
2422            }
2423
2424            // Parse the response with LOB data, using the columns we already know
2425            let payload = &define_response[PACKET_HEADER_SIZE..];
2426            result = self.parse_query_response_with_columns(
2427                payload,
2428                &inner.capabilities,
2429                &stmt_with_define.columns(),
2430            )?;
2431        }
2432
2433        Ok(result)
2434    }
2435
2436    /// Internal: Execute a DML statement with optional bind parameters
2437    async fn execute_dml_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2438        let options = ExecuteOptions::for_dml(false); // Don't auto-commit
2439        let mut execute_msg = ExecuteMessage::new(statement, options);
2440
2441        // Set bind values if provided
2442        if !params.is_empty() {
2443            execute_msg.set_bind_values(params.to_vec());
2444        }
2445
2446        let mut inner = self.inner.lock().await;
2447        let large_sdu = inner.large_sdu;
2448        let seq_num = inner.next_sequence_number();
2449        execute_msg.set_sequence_number(seq_num);
2450        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2451
2452        inner.send(&request).await?;
2453
2454        // Receive response
2455        let mut response = inner.receive().await?;
2456        if response.len() <= PACKET_HEADER_SIZE {
2457            return Err(Error::Protocol("Empty DML response".to_string()));
2458        }
2459
2460        // Check packet type - handle MARKER packets
2461        let packet_type = response[4];
2462        if packet_type == PacketType::Marker as u8 {
2463            // Need to do reset protocol and then get the actual response
2464            // Extract marker type from packet: after header (8 bytes) + data flags (2 bytes) + marker type (1 byte)
2465            let marker_type = if response.len() >= PACKET_HEADER_SIZE + 3 {
2466                response[PACKET_HEADER_SIZE + 2]
2467            } else {
2468                1 // Assume BREAK
2469            };
2470
2471            if marker_type == 1 {
2472                // BREAK marker - send RESET and wait for response
2473                self.send_marker(&mut inner, 2).await?;
2474
2475                // Wait for RESET marker back or DATA packet with error
2476                // The server may send: RESET marker, or BREAK marker followed by DATA, or just close
2477                let mut got_reset = false;
2478                let mut max_attempts = 5;
2479                loop {
2480                    match inner.receive().await {
2481                        Ok(pkt) => {
2482                            if pkt.len() < PACKET_HEADER_SIZE + 1 {
2483                                break;
2484                            }
2485                            let pkt_type = pkt[4];
2486                            if pkt_type == PacketType::Marker as u8 {
2487                                if pkt.len() >= PACKET_HEADER_SIZE + 3 {
2488                                    let mk_type = pkt[PACKET_HEADER_SIZE + 2];
2489                                    if mk_type == 2 {
2490                                        // Got RESET marker, break out
2491                                        got_reset = true;
2492                                        break;
2493                                    }
2494                                    // Got another BREAK marker - server may be acknowledging
2495                                    // Keep trying a few times
2496                                    max_attempts -= 1;
2497                                    if max_attempts == 0 {
2498                                        // Give up and return a generic error
2499                                        return Err(Error::Protocol(
2500                                            "Server rejected operation (multiple BREAK markers)".to_string()
2501                                        ));
2502                                    }
2503                                    continue;
2504                                }
2505                            } else if pkt_type == PacketType::Data as u8 {
2506                                // Got DATA packet - use this as the response (may contain error)
2507                                let payload = &pkt[PACKET_HEADER_SIZE..];
2508                                return self.parse_dml_response(payload);
2509                            } else {
2510                                break;
2511                            }
2512                        }
2513                        Err(e) => {
2514                            // If we get EOF, the server may have closed the connection
2515                            // This can happen with temp LOB binding issues
2516                            inner.state = ConnectionState::Closed;
2517                            return Err(Error::Protocol(format!(
2518                                "Server closed connection during error handling: {}",
2519                                e
2520                            )));
2521                        }
2522                    }
2523                }
2524
2525                // After RESET, try to receive DATA packet with error response
2526                // Note: Some Oracle servers "quit immediately" after reset without sending
2527                // an error packet. In that case, we'll get EOF which is handled below.
2528                if got_reset {
2529                    loop {
2530                        match inner.receive().await {
2531                            Ok(pkt) => {
2532                                if pkt.len() < PACKET_HEADER_SIZE + 1 {
2533                                    // Too short, connection may be closing
2534                                    break;
2535                                }
2536                                let pkt_type = pkt[4];
2537                                if pkt_type == PacketType::Marker as u8 {
2538                                    // More markers, keep reading
2539                                    continue;
2540                                } else if pkt_type == PacketType::Data as u8 {
2541                                    // Got DATA packet, use this as the response
2542                                    response = pkt;
2543                                    break;
2544                                } else {
2545                                    // Unknown packet type, return error
2546                                    return Err(Error::Protocol(format!(
2547                                        "Unexpected packet type {} after reset",
2548                                        pkt_type
2549                                    )));
2550                                }
2551                            }
2552                            Err(_e) => {
2553                                // EOF after reset is normal - server may close connection
2554                                // without sending error details. Return a descriptive error.
2555                                inner.state = ConnectionState::Closed;
2556                                return Err(Error::OracleError {
2557                                    code: 0,
2558                                    message: "Server rejected the operation and closed the connection. \
2559                                              This may happen when binding a temporary LOB to an INSERT statement. \
2560                                              Try using a different approach (e.g., DBMS_LOB procedures).".to_string(),
2561                                });
2562                            }
2563                        }
2564                    }
2565                }
2566            }
2567        }
2568
2569        // Parse the response to extract rows affected (or error)
2570        let payload = &response[PACKET_HEADER_SIZE..];
2571        self.parse_dml_response(payload)
2572    }
2573
2574    /// Parse query response to extract columns and rows
2575    ///
2576    /// Oracle sends multiple messages in a single response:
2577    /// - DescribeInfo (16): column metadata
2578    /// - RowHeader (6): row header info
2579    /// - RowData (7): actual column values
2580    /// - Error (4): completion status (may contain error or success)
2581    fn parse_query_response(&self, payload: &[u8], caps: &Capabilities) -> Result<QueryResult> {
2582        self.parse_query_response_with_columns(payload, caps, &[])
2583    }
2584
2585    /// Parse query response with pre-known columns (for re-execute after define)
2586    fn parse_query_response_with_columns(
2587        &self,
2588        payload: &[u8],
2589        caps: &Capabilities,
2590        known_columns: &[ColumnInfo],
2591    ) -> Result<QueryResult> {
2592        if payload.len() < 3 {
2593            return Err(Error::Protocol("Query response too short".to_string()));
2594        }
2595
2596        let mut buf = ReadBuffer::from_slice(payload);
2597
2598        // Skip data flags
2599        buf.skip(2)?;
2600
2601        // Use known columns if provided, otherwise parse from describe info
2602        let mut columns: Vec<ColumnInfo> = known_columns.to_vec();
2603        let mut rows: Vec<Row> = Vec::new();
2604        let mut cursor_id: u16 = 0;
2605        let mut row_count: u64 = 0;
2606        let mut end_of_response = false;
2607
2608        // Bit vector for duplicate column optimization
2609        // When Some, indicates which columns have actual data (bit=1) vs duplicates (bit=0)
2610        let mut bit_vector: Option<Vec<u8>> = None;
2611        // Previous row values for copying duplicates
2612        let mut previous_row_values: Option<Vec<Value>> = None;
2613
2614        // Process messages until we hit end of response or run out of data
2615        while !end_of_response && buf.remaining() > 0 {
2616            let msg_type = buf.read_u8()?;
2617
2618            match msg_type {
2619                // DescribeInfo (16) - column metadata
2620                x if x == MessageType::DescribeInfo as u8 => {
2621                    // Skip chunked bytes first
2622                    buf.skip_raw_bytes_chunked()?;
2623                    columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2624                }
2625
2626                // RowHeader (6) - header info for rows
2627                x if x == MessageType::RowHeader as u8 => {
2628                    self.parse_row_header(&mut buf)?;
2629                }
2630
2631                // RowData (7) - actual row values
2632                x if x == MessageType::RowData as u8 => {
2633                    let row = self.parse_row_data_with_bitvector(
2634                        &mut buf,
2635                        &columns,
2636                        caps,
2637                        bit_vector.as_deref(),
2638                        previous_row_values.as_ref(),
2639                    )?;
2640                    // Store this row's values for potential duplicate copying
2641                    previous_row_values = Some(row.values().to_vec());
2642                    // Clear bit vector after using it (it's per-row)
2643                    bit_vector = None;
2644                    rows.push(row);
2645                }
2646
2647                // Error (4) - completion or error
2648                x if x == MessageType::Error as u8 => {
2649                    let (error_code, error_msg, cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2650                    cursor_id = cid;
2651                    row_count = rc;
2652                    if error_code != 0 && error_code != 1403 {
2653                        // 1403 is "no data found" which is not an error for queries
2654                        return Err(Error::OracleError {
2655                            code: error_code,
2656                            message: error_msg.unwrap_or_default(),
2657                        });
2658                    }
2659                    end_of_response = true;
2660                }
2661
2662                // Parameter (8) - return parameters
2663                x if x == MessageType::Parameter as u8 => {
2664                    self.parse_return_parameters(&mut buf)?;
2665                }
2666
2667                // Status (9) - call status
2668                x if x == MessageType::Status as u8 => {
2669                    // Read call status and end-to-end seq number
2670                    let _call_status = buf.read_ub4()?;
2671                    let _end_to_end_seq = buf.read_ub2()?;
2672                    // Note: end_of_response only if supports_end_of_response is false
2673                    // For now, we assume it's not the end
2674                }
2675
2676                // BitVector (21) - column presence bitmap for sparse results
2677                // Bit=1 means actual data is sent, bit=0 means duplicate from previous row
2678                21 => {
2679                    // Read num columns sent
2680                    let _num_columns_sent = buf.read_ub2()?;
2681                    // Read bit vector (1 byte per 8 columns, rounded up)
2682                    let num_bytes = (columns.len() + 7) / 8;
2683                    if num_bytes > 0 {
2684                        let bv = buf.read_bytes_vec(num_bytes)?;
2685                        bit_vector = Some(bv);
2686                    }
2687                }
2688
2689                _ => {
2690                    // Unknown message type - break to avoid parsing errors
2691                    break;
2692                }
2693            }
2694        }
2695
2696        Ok(QueryResult {
2697            columns,
2698            rows,
2699            rows_affected: row_count,
2700            has_more_rows: false,
2701            cursor_id,
2702        })
2703    }
2704
2705    /// Parse a PL/SQL response containing OUT parameter values
2706    ///
2707    /// PL/SQL responses may contain:
2708    /// - IoVector (11): bind directions for each parameter
2709    /// - RowData (7): OUT parameter values
2710    /// - FlushOutBinds (19): signals end of OUT bind data
2711    /// - Error (4): completion status
2712    fn parse_plsql_response(
2713        &self,
2714        payload: &[u8],
2715        caps: &Capabilities,
2716        params: &[BindParam],
2717    ) -> Result<PlsqlResult> {
2718        if payload.len() < 3 {
2719            return Err(Error::Protocol("PL/SQL response too short".to_string()));
2720        }
2721
2722        let mut buf = ReadBuffer::from_slice(payload);
2723
2724        // Skip data flags
2725        buf.skip(2)?;
2726
2727        let mut out_values: Vec<Value> = Vec::new();
2728        let mut _out_indices: Vec<usize> = Vec::new();
2729        let mut row_count: u64 = 0;
2730        let mut cursor_id: Option<u16> = None;
2731        let mut end_of_response = false;
2732        let mut implicit_results = ImplicitResults::new();
2733
2734        // Create column infos for OUT params based on their oracle types
2735        let mut out_columns: Vec<ColumnInfo> = Vec::new();
2736
2737        while !end_of_response && buf.remaining() > 0 {
2738            let msg_type = buf.read_u8()?;
2739
2740            match msg_type {
2741                // IoVector (11) - bind directions from server
2742                x if x == MessageType::IoVector as u8 => {
2743                    let (indices, cols) = self.parse_io_vector(&mut buf, params)?;
2744                    _out_indices = indices;
2745                    out_columns = cols;
2746                }
2747
2748                // RowHeader (6)
2749                x if x == MessageType::RowHeader as u8 => {
2750                    self.parse_row_header(&mut buf)?;
2751                }
2752
2753                // RowData (7) - OUT parameter values
2754                x if x == MessageType::RowData as u8 => {
2755                    if !out_columns.is_empty() {
2756                        let row = self.parse_row_data_single(&mut buf, &out_columns, caps)?;
2757                        // Extract values from the row into out_values
2758                        for (idx, value) in row.into_values().into_iter().enumerate() {
2759                            // Check if this is a cursor
2760                            if let Value::Cursor(cursor) = &value {
2761                                if cursor_id.is_none() && cursor.cursor_id() != 0 {
2762                                    cursor_id = Some(cursor.cursor_id());
2763                                }
2764                            }
2765                            // Map back to original param position if we have indices
2766                            if idx < out_values.len() {
2767                                out_values[idx] = value;
2768                            } else {
2769                                out_values.push(value);
2770                            }
2771                        }
2772                    } else {
2773                        // Skip the row data if we don't have column info
2774                        // This shouldn't normally happen
2775                        break;
2776                    }
2777                }
2778
2779                // DescribeInfo (16) - for REF CURSOR describe
2780                x if x == MessageType::DescribeInfo as u8 => {
2781                    buf.skip_raw_bytes_chunked()?;
2782                    let cursor_columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2783                    // Store cursor columns if needed
2784                    let _ = cursor_columns; // For now, just skip
2785                }
2786
2787                // FlushOutBinds (19) - signals end of OUT bind data
2788                x if x == MessageType::FlushOutBinds as u8 => {
2789                    // This indicates that OUT bind data is done
2790                    // Just continue to get the error/completion status
2791                }
2792
2793                // Error (4) - completion or error
2794                x if x == MessageType::Error as u8 => {
2795                    let (error_code, error_msg, _cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2796                    row_count = rc;
2797                    if error_code != 0 {
2798                        return Err(Error::OracleError {
2799                            code: error_code,
2800                            message: error_msg.unwrap_or_default(),
2801                        });
2802                    }
2803                    end_of_response = true;
2804                }
2805
2806                // Parameter (8) - return parameters
2807                x if x == MessageType::Parameter as u8 => {
2808                    self.parse_return_parameters(&mut buf)?;
2809                }
2810
2811                // Status (9)
2812                x if x == MessageType::Status as u8 => {
2813                    let _call_status = buf.read_ub4()?;
2814                    let _end_to_end_seq = buf.read_ub2()?;
2815                }
2816
2817                // ImplicitResultset (27) - result sets from DBMS_SQL.RETURN_RESULT
2818                x if x == MessageType::ImplicitResultset as u8 => {
2819                    let parsed_results = self.parse_implicit_results(&mut buf, caps)?;
2820                    implicit_results = parsed_results;
2821                }
2822
2823                _ => {
2824                    // Unknown message type - break to avoid parsing errors
2825                    break;
2826                }
2827            }
2828        }
2829
2830        // If no IoVector was received, all params might be IN-only
2831        // In that case, out_values should be empty
2832        Ok(PlsqlResult {
2833            out_values,
2834            rows_affected: row_count,
2835            cursor_id,
2836            implicit_results,
2837        })
2838    }
2839
2840    /// Parse implicit result sets from DBMS_SQL.RETURN_RESULT
2841    ///
2842    /// Format per Python base.pyx _process_implicit_result:
2843    /// - num_results: ub4 (number of implicit result sets)
2844    /// - For each result:
2845    ///   - num_bytes: ub1 + raw bytes (metadata to skip)
2846    ///   - describe_info: column metadata
2847    ///   - cursor_id: ub2
2848    fn parse_implicit_results(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<ImplicitResults> {
2849        let num_results = buf.read_ub4()?;
2850        let mut results = ImplicitResults::new();
2851
2852        for _ in 0..num_results {
2853            // Skip metadata bytes
2854            let num_bytes = buf.read_u8()?;
2855            if num_bytes > 0 {
2856                buf.skip(num_bytes as usize)?;
2857            }
2858
2859            // Parse column metadata for this result set
2860            let columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
2861
2862            // Read cursor ID
2863            let cursor_id = buf.read_ub2()?;
2864
2865            // Create implicit result with metadata but no rows yet
2866            // Rows will be fetched separately using fetch_implicit_result
2867            let result = ImplicitResult::new(cursor_id, columns, Vec::new());
2868            results.add(result);
2869        }
2870
2871        Ok(results)
2872    }
2873
2874    /// Parse IO Vector message to get bind directions
2875    ///
2876    /// Returns a tuple of:
2877    /// - indices of OUT/INOUT parameters in the params list
2878    /// - column infos for parsing OUT values
2879    fn parse_io_vector(
2880        &self,
2881        buf: &mut ReadBuffer,
2882        params: &[BindParam],
2883    ) -> Result<(Vec<usize>, Vec<ColumnInfo>)> {
2884        // I/O vector format (from Python reference):
2885        // - skip 1 byte (flag)
2886        // - read ub2 (num requests)
2887        // - read ub4 (num iters)
2888        // - num_binds = num_iters * 256 + num_requests
2889        // - skip ub4 (num iters this time)
2890        // - skip ub2 (uac buffer length)
2891        // - read ub2 (num_bytes for bit vector), skip if > 0
2892        // - read ub2 (num_bytes for rowid), skip if > 0
2893        // - for each bind: read ub1 (bind_dir)
2894
2895        buf.skip(1)?; // flag
2896        let num_requests = buf.read_ub2()? as u32;
2897        let num_iters = buf.read_ub4()?;
2898        let num_binds = num_iters * 256 + num_requests;
2899        let _ = buf.read_ub4()?; // num iters this time (discard)
2900        let _ = buf.read_ub2()?; // uac buffer length (discard)
2901
2902        // Bit vector
2903        let num_bytes = buf.read_ub2()? as usize;
2904        if num_bytes > 0 {
2905            buf.skip(num_bytes)?;
2906        }
2907
2908        // Rowid (raw bytes, not length-prefixed here)
2909        let num_bytes = buf.read_ub4()? as usize;
2910        if num_bytes > 0 {
2911            buf.skip_raw_bytes_chunked()?; // Skip rowid bytes using chunked read
2912        }
2913
2914        // Read bind directions
2915        let mut out_indices = Vec::new();
2916        let mut out_columns = Vec::new();
2917
2918        for i in 0..(num_binds as usize).min(params.len()) {
2919            let dir_byte = buf.read_u8()?;
2920            let dir = BindDirection::try_from(dir_byte).unwrap_or(BindDirection::Input);
2921
2922            // If this is not an INPUT-only parameter, it has OUT data
2923            if dir != BindDirection::Input {
2924                out_indices.push(i);
2925
2926                // Create a column info for parsing the OUT value
2927                let param = &params[i];
2928                let mut col = ColumnInfo::new(format!("OUT_{}", i), param.oracle_type);
2929                col.buffer_size = param.buffer_size;
2930                col.data_size = param.buffer_size;
2931                col.nullable = true;
2932
2933                // For collection OUT params, extract element type from the placeholder
2934                if let Some(Value::Collection(ref placeholder)) = param.value {
2935                    if let Some(Value::Integer(elem_type_code)) = placeholder.get("_element_type") {
2936                        col.element_type = crate::constants::OracleType::try_from(*elem_type_code as u8).ok();
2937                    }
2938                }
2939
2940                out_columns.push(col);
2941            }
2942        }
2943
2944        Ok((out_indices, out_columns))
2945    }
2946
2947    /// Parse row header (TNS_MSG_TYPE_ROW_HEADER = 6)
2948    fn parse_row_header(&self, buf: &mut ReadBuffer) -> Result<()> {
2949        buf.skip_ub1()?;  // flags
2950        buf.skip_ub2()?;  // num requests
2951        buf.skip_ub4()?;  // iteration number
2952        buf.skip_ub4()?;  // num iters
2953        buf.skip_ub2()?;  // buffer length
2954        let num_bytes = buf.read_ub4()? as usize;
2955        if num_bytes > 0 {
2956            buf.skip_ub1()?;  // skip repeated length
2957            buf.skip(num_bytes)?;  // bit vector
2958        }
2959        let num_bytes = buf.read_ub4()? as usize;
2960        if num_bytes > 0 {
2961            buf.skip_raw_bytes_chunked()?;  // rxhrid
2962        }
2963        Ok(())
2964    }
2965
2966    /// Parse return parameters (TNS_MSG_TYPE_PARAMETER = 8)
2967    fn parse_return_parameters(&self, buf: &mut ReadBuffer) -> Result<()> {
2968        self.parse_return_parameters_internal(buf, false).map(|_| ())
2969    }
2970
2971    /// Parse return parameters with optional row counts extraction
2972    /// When `want_row_counts` is true, attempts to read arraydmlrowcounts from the response.
2973    fn parse_return_parameters_internal(
2974        &self,
2975        buf: &mut ReadBuffer,
2976        want_row_counts: bool,
2977    ) -> Result<Option<Vec<u64>>> {
2978        // Per Python's _process_return_parameters
2979        let num_params = buf.read_ub2()?;  // al8o4l (ignored)
2980        for _ in 0..num_params {
2981            buf.skip_ub4()?;
2982        }
2983
2984        let al8txl = buf.read_ub2()?;  // al8txl (ignored)
2985        if al8txl > 0 {
2986            buf.skip(al8txl as usize)?;
2987        }
2988
2989        // num key/value pairs - skip for now
2990        let num_pairs = buf.read_ub2()?;
2991        for _ in 0..num_pairs {
2992            buf.read_bytes_with_length()?;  // text value
2993            buf.read_bytes_with_length()?;  // binary value
2994            buf.skip_ub2()?;  // keyword num
2995        }
2996
2997        // registration
2998        let num_bytes = buf.read_ub2()?;
2999        if num_bytes > 0 {
3000            buf.skip(num_bytes as usize)?;
3001        }
3002
3003        // If arraydmlrowcounts was requested, parse the row counts
3004        if want_row_counts && buf.remaining() >= 4 {
3005            let num_rows = buf.read_ub4()? as usize;
3006            let mut row_counts = Vec::with_capacity(num_rows);
3007            for _ in 0..num_rows {
3008                let count = buf.read_ub8()?;
3009                row_counts.push(count);
3010            }
3011            Ok(Some(row_counts))
3012        } else {
3013            Ok(None)
3014        }
3015    }
3016
3017    /// Parse a single row of data
3018    fn parse_row_data_single(
3019        &self,
3020        buf: &mut ReadBuffer,
3021        columns: &[ColumnInfo],
3022        caps: &Capabilities,
3023    ) -> Result<Row> {
3024        let mut values = Vec::with_capacity(columns.len());
3025
3026        for col in columns {
3027            let value = self.parse_column_value(buf, col, caps)?;
3028            values.push(value);
3029        }
3030
3031        Ok(Row::new(values))
3032    }
3033
3034    /// Parse a single row of data with bit vector support for duplicate column optimization
3035    ///
3036    /// Oracle sends a BitVector message before RowData when some columns have the same
3037    /// value as the previous row. Bits that are SET (1) indicate data is sent in the buffer;
3038    /// bits that are CLEAR (0) indicate the value should be copied from the previous row.
3039    fn parse_row_data_with_bitvector(
3040        &self,
3041        buf: &mut ReadBuffer,
3042        columns: &[ColumnInfo],
3043        caps: &Capabilities,
3044        bit_vector: Option<&[u8]>,
3045        previous_values: Option<&Vec<Value>>,
3046    ) -> Result<Row> {
3047        let mut values = Vec::with_capacity(columns.len());
3048
3049        for (col_idx, col) in columns.iter().enumerate() {
3050            // Check if this column is a duplicate (bit=0 means duplicate)
3051            let is_duplicate = if let Some(bv) = bit_vector {
3052                let byte_num = col_idx / 8;
3053                let bit_num = col_idx % 8;
3054                if byte_num < bv.len() {
3055                    // If bit is 0, it's a duplicate
3056                    (bv[byte_num] & (1 << bit_num)) == 0
3057                } else {
3058                    false
3059                }
3060            } else {
3061                false
3062            };
3063
3064            if is_duplicate {
3065                // Copy value from previous row
3066                if let Some(prev) = previous_values {
3067                    if col_idx < prev.len() {
3068                        values.push(prev[col_idx].clone());
3069                    } else {
3070                        // Shouldn't happen, but fallback to null
3071                        values.push(Value::Null);
3072                    }
3073                } else {
3074                    // No previous row (shouldn't happen for duplicate), fallback to null
3075                    values.push(Value::Null);
3076                }
3077            } else {
3078                // Read actual value from buffer
3079                let value = self.parse_column_value(buf, col, caps)?;
3080                values.push(value);
3081            }
3082        }
3083
3084        Ok(Row::new(values))
3085    }
3086
3087    /// Parse a single column value from the buffer
3088    fn parse_column_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo, caps: &Capabilities) -> Result<Value> {
3089        use crate::constants::OracleType;
3090
3091        // Handle LOB columns specially - they have a different format
3092        if col.is_lob() {
3093            return self.parse_lob_value(buf, col);
3094        }
3095
3096        // Handle CURSOR type - REF CURSOR from PL/SQL
3097        if col.oracle_type == OracleType::Cursor {
3098            return self.parse_cursor_value(buf, caps);
3099        }
3100
3101        // Handle Object type - collections (VARRAY, Nested Table) and UDTs
3102        if col.oracle_type == OracleType::Object {
3103            return self.parse_object_value(buf, col);
3104        }
3105
3106        // Read the value based on the column type
3107        // First, check if it's NULL
3108        let data = buf.read_bytes_with_length()?;
3109
3110        match data {
3111            None => Ok(Value::Null),
3112            Some(bytes) if bytes.is_empty() => Ok(Value::Null),
3113            Some(bytes) => {
3114                // Decode based on oracle type
3115                match col.oracle_type {
3116                    OracleType::Number => {
3117                        // Oracle NUMBER format - decode to string
3118                        let num = crate::types::decode_oracle_number(&bytes)?;
3119                        Ok(Value::String(num.value))
3120                    }
3121                    OracleType::Varchar | OracleType::Char | OracleType::Long => {
3122                        let s = String::from_utf8_lossy(&bytes).to_string();
3123                        Ok(Value::String(s))
3124                    }
3125                    OracleType::Raw | OracleType::LongRaw => {
3126                        // RAW/LONG RAW types - return as bytes
3127                        Ok(Value::Bytes(bytes.to_vec()))
3128                    }
3129                    OracleType::Date => {
3130                        // Oracle DATE format - 7 bytes
3131                        let date = crate::types::decode_oracle_date(&bytes)?;
3132                        Ok(Value::Date(date))
3133                    }
3134                    OracleType::Timestamp | OracleType::TimestampLtz => {
3135                        // Oracle TIMESTAMP format - 11 bytes (date + fractional seconds)
3136                        let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3137                        Ok(Value::Timestamp(ts))
3138                    }
3139                    OracleType::TimestampTz => {
3140                        // Oracle TIMESTAMP WITH TIME ZONE - 13 bytes
3141                        let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3142                        Ok(Value::Timestamp(ts))
3143                    }
3144                    _ => {
3145                        // Default: return as raw bytes or string
3146                        let s = String::from_utf8_lossy(&bytes).to_string();
3147                        Ok(Value::String(s))
3148                    }
3149                }
3150            }
3151        }
3152    }
3153
3154    /// Parse a REF CURSOR value from the buffer
3155    ///
3156    /// Per Python base.pyx lines 1038-1046:
3157    /// - Skip 1 byte (length indicator - fixed value)
3158    /// - Read describe info (column metadata for the cursor)
3159    /// - Read cursor_id (UB2)
3160    fn parse_cursor_value(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<Value> {
3161        use crate::types::RefCursor;
3162
3163        // Skip length indicator (fixed value for cursors)
3164        let _length = buf.read_u8()?;
3165
3166        // Read column metadata for this cursor
3167        let cursor_columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
3168
3169        // Read the cursor ID
3170        let cursor_id = buf.read_ub2()?;
3171
3172        // Create RefCursor with the metadata
3173        let ref_cursor = RefCursor::new(cursor_id, cursor_columns);
3174
3175        Ok(Value::Cursor(ref_cursor))
3176    }
3177
3178    /// Parse an Object/Collection value from the buffer
3179    ///
3180    /// Object format from Oracle (per Python packet.pyx read_dbobject):
3181    /// - UB4: type OID length, then type OID bytes if > 0
3182    /// - UB4: OID length, then OID bytes if > 0
3183    /// - UB4: snapshot length, then snapshot bytes if > 0 (discarded)
3184    /// - UB2: version (skip)
3185    /// - UB4: packed data length
3186    /// - UB2: flags (skip)
3187    /// - Bytes: packed data (pickle format)
3188    fn parse_object_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3189        use crate::dbobject::{CollectionType, DbObject, DbObjectType};
3190        use crate::types::decode_collection;
3191
3192        // Read type OID
3193        let toid_len = buf.read_ub4()?;
3194        let _toid = if toid_len > 0 {
3195            Some(buf.read_bytes_vec(toid_len as usize)?)
3196        } else {
3197            None
3198        };
3199
3200        // Read OID
3201        let oid_len = buf.read_ub4()?;
3202        let _oid = if oid_len > 0 {
3203            Some(buf.read_bytes_vec(oid_len as usize)?)
3204        } else {
3205            None
3206        };
3207
3208        // Read and discard snapshot
3209        let snapshot_len = buf.read_ub4()?;
3210        if snapshot_len > 0 {
3211            buf.skip_raw_bytes_chunked()?;
3212        }
3213
3214        // Skip version (length-prefixed UB2)
3215        let _version = buf.read_ub2()?;
3216
3217        // Read packed data length
3218        let data_len = buf.read_ub4()?;
3219
3220        // Skip flags (length-prefixed UB2)
3221        let _flags = buf.read_ub2()?;
3222
3223        if data_len == 0 {
3224            return Ok(Value::Null);
3225        }
3226
3227        // Read packed data (chunked format like other byte sequences)
3228        let packed_data = buf.read_bytes_with_length()?;
3229
3230        match packed_data {
3231            None => Ok(Value::Null),
3232            Some(data) if data.is_empty() => Ok(Value::Null),
3233            Some(data) => {
3234                // Create a placeholder type based on column info
3235                let type_name = col.type_name.clone().unwrap_or_else(|| "UNKNOWN".to_string());
3236
3237                // Try to determine if this is a collection based on the pickle data
3238                // The first byte contains flags - check for IS_COLLECTION (0x08)
3239                let is_collection = !data.is_empty() && (data[0] & 0x08) != 0;
3240
3241                if is_collection {
3242                    // Get element type from column info or default to VARCHAR
3243                    let element_type = col.element_type.unwrap_or(crate::constants::OracleType::Varchar);
3244
3245                    // Determine collection type from pickle flags
3246                    // Collection flags are after header - but we'll default for now
3247                    let collection_type = CollectionType::Varray;
3248
3249                    let obj_type = DbObjectType::collection(
3250                        &col.type_schema.clone().unwrap_or_default(),
3251                        &type_name,
3252                        collection_type,
3253                        element_type,
3254                    );
3255
3256                    match decode_collection(&obj_type, &data) {
3257                        Ok(collection) => Ok(Value::Collection(collection)),
3258                        Err(e) => {
3259                            tracing::warn!("Failed to decode collection: {}, data: {:02x?}", e, &data[..std::cmp::min(20, data.len())]);
3260                            // Return raw bytes as fallback
3261                            Ok(Value::Bytes(data))
3262                        }
3263                    }
3264                } else {
3265                    // Regular object type - not yet fully implemented
3266                    let mut obj = DbObject::new(&type_name);
3267                    // Store raw pickle data for later inspection
3268                    obj.set("_raw_data", Value::Bytes(data));
3269                    Ok(Value::Collection(obj))
3270                }
3271            }
3272        }
3273    }
3274
3275    /// Parse a LOB column value from the buffer
3276    ///
3277    /// LOB format from Oracle (per Python's read_lob_with_length):
3278    /// - UB4: num_bytes (indicator that LOB data follows)
3279    /// - If num_bytes > 0:
3280    ///   - For non-BFILE: UB8 size, UB4 chunk_size
3281    ///   - Bytes: LOB locator (chunked format)
3282    ///
3283    /// The actual LOB content must be fetched separately using LOB operations.
3284    /// For JSON columns, the data is OSON-encoded and decoded directly.
3285    /// For VECTOR columns, the data is decoded from Oracle's vector binary format.
3286    fn parse_lob_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3287        use crate::constants::OracleType;
3288        use crate::types::{decode_vector, OsonDecoder};
3289
3290        // Read length indicator
3291        let num_bytes = buf.read_ub4()?;
3292
3293        if num_bytes == 0 {
3294            // For JSON, null is Value::Json(serde_json::Value::Null)
3295            if col.oracle_type == OracleType::Json || col.is_json {
3296                return Ok(Value::Json(serde_json::Value::Null));
3297            }
3298            // For VECTOR, null is Value::Null
3299            if col.oracle_type == OracleType::Vector {
3300                return Ok(Value::Null);
3301            }
3302            return Ok(Value::Lob(LobValue::Null));
3303        }
3304
3305        // For BFILE, there's no size/chunk_size metadata
3306        let (size, chunk_size) = if col.oracle_type == OracleType::Bfile {
3307            (0u64, 0u32)
3308        } else {
3309            // Read LOB size and chunk size
3310            let size = buf.read_ub8()?;
3311            let chunk_size = buf.read_ub4()?;
3312            (size, chunk_size)
3313        };
3314
3315        // Read LOB data (could be locator or inline data depending on size)
3316        let data_bytes = buf.read_bytes_with_length()?;
3317
3318        // Handle JSON columns - decode OSON format
3319        if col.oracle_type == OracleType::Json || col.is_json {
3320            if let Some(data) = data_bytes {
3321                if !data.is_empty() {
3322                    // Decode OSON to JSON
3323                    match OsonDecoder::decode(bytes::Bytes::from(data)) {
3324                        Ok(json_value) => return Ok(Value::Json(json_value)),
3325                        Err(e) => {
3326                            tracing::warn!("Failed to decode OSON: {}", e);
3327                            return Ok(Value::Json(serde_json::Value::Null));
3328                        }
3329                    }
3330                }
3331            }
3332            return Ok(Value::Json(serde_json::Value::Null));
3333        }
3334
3335        // Handle VECTOR columns - decode vector binary format
3336        if col.oracle_type == OracleType::Vector {
3337            // Read and discard LOB locator (not needed for VECTOR)
3338            let _locator = buf.read_bytes_with_length()?;
3339
3340            if let Some(data) = data_bytes {
3341                if !data.is_empty() {
3342                    match decode_vector(&data) {
3343                        Ok(vector) => return Ok(Value::Vector(vector)),
3344                        Err(e) => {
3345                            tracing::warn!("Failed to decode VECTOR: {}", e);
3346                            return Ok(Value::Null);
3347                        }
3348                    }
3349                }
3350            }
3351            return Ok(Value::Null);
3352        }
3353
3354        // Create a LOB locator for fetching the data later
3355        if let Some(locator_data) = data_bytes {
3356            if !locator_data.is_empty() {
3357                let locator = LobLocator::new(
3358                    bytes::Bytes::from(locator_data),
3359                    size,
3360                    chunk_size,
3361                    col.oracle_type,
3362                    col.csfrm,
3363                );
3364                return Ok(Value::Lob(LobValue::locator(locator)));
3365            }
3366        }
3367
3368        // If we have size but no locator, it might be an empty LOB
3369        if size == 0 {
3370            return Ok(Value::Lob(LobValue::Empty));
3371        }
3372
3373        // Empty LOB (shouldn't normally reach here)
3374        Ok(Value::Lob(LobValue::Empty))
3375    }
3376
3377    /// Parse error info message and extract cursor_id
3378    /// Format per Python's _process_error_info in base.pyx
3379    fn parse_error_info(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16)> {
3380        // End of call status
3381        let _call_status = buf.read_ub4()?;
3382        // End to end seq#
3383        buf.skip_ub2()?;
3384        // Current row number
3385        buf.skip_ub4()?;
3386        // Error number (short form)
3387        buf.skip_ub2()?;
3388        // Array elem error
3389        buf.skip_ub2()?;
3390        // Array elem error
3391        buf.skip_ub2()?;
3392        // Cursor ID
3393        let cursor_id = buf.read_ub2()?;
3394        // Error position
3395        let _error_pos = buf.read_sb2()?;
3396        // SQL type (19c and earlier)
3397        buf.skip_ub1()?;
3398        // Fatal?
3399        buf.skip_ub1()?;
3400        // Flags
3401        buf.skip_ub1()?;
3402        // User cursor options
3403        buf.skip_ub1()?;
3404        // UPI parameter
3405        buf.skip_ub1()?;
3406        // Flags (second)
3407        buf.skip_ub1()?;
3408        // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3409        buf.skip_ub4()?; // rba
3410        buf.skip_ub2()?; // partition_id
3411        buf.skip_ub1()?; // skip
3412        buf.skip_ub4()?; // block_num
3413        buf.skip_ub2()?; // slot_num
3414        // OS error
3415        buf.skip_ub4()?;
3416        // Statement number
3417        buf.skip_ub1()?;
3418        // Call number
3419        buf.skip_ub1()?;
3420        // Padding
3421        buf.skip_ub2()?;
3422        // Success iters
3423        buf.skip_ub4()?;
3424        // oerrdd (logical rowid)
3425        let oerrdd_len = buf.read_ub4()?;
3426        if oerrdd_len > 0 {
3427            buf.skip_raw_bytes_chunked()?;
3428        }
3429
3430        // Batch error codes array
3431        let num_batch_errors = buf.read_ub2()?;
3432        if num_batch_errors > 0 {
3433            buf.skip_ub1()?;  // first byte
3434            for _ in 0..num_batch_errors {
3435                buf.skip_ub2()?;  // error code
3436            }
3437        }
3438
3439        // Batch error row offset array
3440        let num_offsets = buf.read_ub4()?;
3441        if num_offsets > 0 {
3442            buf.skip_ub1()?;  // first byte
3443            for _ in 0..num_offsets {
3444                buf.skip_ub4()?;  // offset
3445            }
3446        }
3447
3448        // Batch error messages array
3449        let num_batch_msgs = buf.read_ub2()?;
3450        if num_batch_msgs > 0 {
3451            buf.skip_ub1()?;  // packed size
3452            for _ in 0..num_batch_msgs {
3453                buf.skip_ub2()?;  // chunk length
3454                buf.read_string_with_length()?;  // message
3455                buf.skip(2)?;  // end marker
3456            }
3457        }
3458
3459        // Extended error number (UB4)
3460        let error_code = buf.read_ub4()?;
3461        // Row count (UB8)
3462        let _row_count = buf.read_ub8()?;
3463
3464        // Error message
3465        let error_msg = if error_code != 0 {
3466            buf.read_string_with_length()?.map(|s| s.trim().to_string())
3467        } else {
3468            None
3469        };
3470
3471        Ok((error_code, error_msg, cursor_id))
3472    }
3473
3474    /// Parse error response packet (received after marker reset)
3475    fn parse_error_response(&self, payload: &[u8]) -> Result<QueryResult> {
3476        if payload.len() < 3 {
3477            return Err(Error::Protocol("Error response too short".to_string()));
3478        }
3479
3480        let mut buf = ReadBuffer::from_slice(payload);
3481
3482        // Skip data flags
3483        buf.skip(2)?;
3484
3485        // Read message type
3486        let msg_type = buf.read_u8()?;
3487
3488        // Check for error message type (4)
3489        if msg_type == MessageType::Error as u8 {
3490            // Parse error info per Python's _process_error_info
3491            let _call_status = buf.read_ub4()?;  // end of call status
3492            buf.skip_ub2()?;  // end to end seq#
3493            buf.skip_ub4()?;  // current row number
3494            buf.skip_ub2()?;  // error number (short form)
3495            buf.skip_ub2()?;  // array elem error
3496            buf.skip_ub2()?;  // array elem error
3497            let _cursor_id = buf.read_ub2()?;  // cursor id
3498            let _error_pos = buf.read_sb2()?;  // error position
3499            buf.skip_ub1()?;  // sql type (19c and earlier)
3500            buf.skip_ub1()?;  // fatal?
3501            buf.skip_ub1()?;  // flags
3502            buf.skip_ub1()?;  // user cursor options
3503            buf.skip_ub1()?;  // UPI parameter
3504            buf.skip_ub1()?;  // flags
3505
3506            // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3507            buf.skip_ub4()?; // rba
3508            buf.skip_ub2()?; // partition_id
3509            buf.skip_ub1()?; // skip
3510            buf.skip_ub4()?; // block_num
3511            buf.skip_ub2()?; // slot_num
3512
3513            buf.skip_ub4()?;  // OS error
3514            buf.skip_ub1()?;  // statement number
3515            buf.skip_ub1()?;  // call number
3516            buf.skip_ub2()?;  // padding
3517            buf.skip_ub4()?;  // success iters
3518
3519            // oerrdd (logical rowid)
3520            let oerrdd_len = buf.read_ub4()?;
3521            if oerrdd_len > 0 {
3522                buf.skip_raw_bytes_chunked()?;
3523            }
3524
3525            // batch error codes array
3526            let num_batch_errors = buf.read_ub2()?;
3527            if num_batch_errors > 0 {
3528                // Skip batch error data - we don't process it for now
3529                buf.skip_ub1()?;  // first byte
3530                for _ in 0..num_batch_errors {
3531                    buf.skip_ub2()?;  // error code
3532                }
3533            }
3534
3535            // batch error row offset array
3536            let num_offsets = buf.read_ub4()?;
3537            if num_offsets > 0 {
3538                buf.skip_ub1()?;  // first byte
3539                for _ in 0..num_offsets {
3540                    buf.skip_ub4()?;  // offset
3541                }
3542            }
3543
3544            // batch error messages array
3545            let num_batch_msgs = buf.read_ub2()?;
3546            if num_batch_msgs > 0 {
3547                // Skip batch error messages
3548                buf.skip_ub1()?;  // packed size
3549                for _ in 0..num_batch_msgs {
3550                    buf.skip_ub2()?;  // chunk length
3551                    buf.read_string_with_length()?;  // message
3552                    buf.skip(2)?;  // end marker
3553                }
3554            }
3555
3556            // Extended error number (UB4)
3557            let error_num = buf.read_ub4()?;
3558            let _row_count = buf.read_ub8()?;  // row number (extended)
3559
3560            // Read error message
3561            let error_msg = if error_num != 0 {
3562                buf.read_string_with_length()?.map(|s| s.trim().to_string())
3563            } else {
3564                None
3565            };
3566
3567            return Err(Error::OracleError {
3568                code: error_num,
3569                message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_num)),
3570            });
3571        }
3572
3573        // If not an error message type, return generic error
3574        Err(Error::Protocol(format!(
3575            "Expected error message type 4, got {}",
3576            msg_type
3577        )))
3578    }
3579
3580    /// Parse DML response to extract rows affected
3581    fn parse_dml_response(&self, payload: &[u8]) -> Result<QueryResult> {
3582        if payload.len() < 3 {
3583            return Err(Error::Protocol("DML response too short".to_string()));
3584        }
3585
3586        let mut buf = ReadBuffer::from_slice(payload);
3587
3588        // Skip data flags
3589        buf.skip(2)?;
3590
3591        let mut rows_affected: u64 = 0;
3592        let mut cursor_id: u16 = 0;
3593        let mut end_of_response = false;
3594
3595        // Process messages until end_of_response or out of data
3596        // Note: If supports_end_of_response is true, we must continue until msg type 29
3597        while !end_of_response && buf.remaining() > 0 {
3598            let msg_type = buf.read_u8()?;
3599
3600            match msg_type {
3601                // Error (4) - may contain error or success info
3602                x if x == MessageType::Error as u8 => {
3603                    let (error_code, error_msg, cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
3604                    cursor_id = cid;
3605                    rows_affected = row_count;
3606                    if error_code != 0 && error_code != 1403 {
3607                        return Err(Error::OracleError {
3608                            code: error_code,
3609                            message: error_msg.unwrap_or_default(),
3610                        });
3611                    }
3612                    // Only end if server doesn't support end_of_response
3613                    // Otherwise, continue until we get msg type 29
3614                }
3615
3616                // Parameter (8) - return parameters
3617                x if x == MessageType::Parameter as u8 => {
3618                    self.parse_return_parameters(&mut buf)?;
3619                }
3620
3621                // Status (9) - call status
3622                x if x == MessageType::Status as u8 => {
3623                    let _call_status = buf.read_ub4()?;
3624                    let _end_to_end_seq = buf.read_ub2()?;
3625                }
3626
3627                // BitVector (21)
3628                21 => {
3629                    let _num_columns_sent = buf.read_ub2()?;
3630                    // No columns for DML, but read the byte if present
3631                    if buf.remaining() > 0 {
3632                        let _byte = buf.read_u8()?;
3633                    }
3634                }
3635
3636                // End of Response (29) - explicit end marker
3637                29 => {
3638                    end_of_response = true;
3639                }
3640
3641                _ => {
3642                    // Unknown message type - continue processing
3643                }
3644            }
3645        }
3646
3647        Ok(QueryResult {
3648            columns: Vec::new(),
3649            rows: Vec::new(),
3650            rows_affected,
3651            has_more_rows: false,
3652            cursor_id,
3653        })
3654    }
3655
3656    /// Parse error info and return (error_code, error_msg, cursor_id, row_count)
3657    fn parse_error_info_with_rowcount(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16, u64)> {
3658        // End of call status
3659        let _call_status = buf.read_ub4()?;
3660        // End to end seq#
3661        buf.skip_ub2()?;
3662        // Current row number
3663        buf.skip_ub4()?;
3664        // Error number (short form)
3665        buf.skip_ub2()?;
3666        // Array elem error
3667        buf.skip_ub2()?;
3668        // Array elem error
3669        buf.skip_ub2()?;
3670        // Cursor ID
3671        let cursor_id = buf.read_ub2()?;
3672        // Error position
3673        let _error_pos = buf.read_sb2()?;
3674        // SQL type (19c and earlier)
3675        buf.skip_ub1()?;
3676        // Fatal?
3677        buf.skip_ub1()?;
3678        // Flags
3679        buf.skip_ub1()?;
3680        // User cursor options
3681        buf.skip_ub1()?;
3682        // UPI parameter
3683        buf.skip_ub1()?;
3684        // Flags (second)
3685        buf.skip_ub1()?;
3686        // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3687        buf.skip_ub4()?; // rba
3688        buf.skip_ub2()?; // partition_id
3689        buf.skip_ub1()?; // skip
3690        buf.skip_ub4()?; // block_num
3691        buf.skip_ub2()?; // slot_num
3692        // OS error
3693        buf.skip_ub4()?;
3694        // Statement number
3695        buf.skip_ub1()?;
3696        // Call number
3697        buf.skip_ub1()?;
3698        // Padding
3699        buf.skip_ub2()?;
3700        // Success iters
3701        buf.skip_ub4()?;
3702        // oerrdd (logical rowid)
3703        let oerrdd_len = buf.read_ub4()?;
3704        if oerrdd_len > 0 {
3705            buf.skip_raw_bytes_chunked()?;
3706        }
3707
3708        // Batch error codes array
3709        let num_batch_errors = buf.read_ub2()?;
3710        if num_batch_errors > 0 {
3711            buf.skip_ub1()?;  // first byte
3712            for _ in 0..num_batch_errors {
3713                buf.skip_ub2()?;  // error code
3714            }
3715        }
3716
3717        // Batch error row offset array
3718        let num_offsets = buf.read_ub4()?;
3719        if num_offsets > 0 {
3720            buf.skip_ub1()?;  // first byte
3721            for _ in 0..num_offsets {
3722                buf.skip_ub4()?;  // offset
3723            }
3724        }
3725
3726        // Batch error messages array
3727        let num_batch_msgs = buf.read_ub2()?;
3728        if num_batch_msgs > 0 {
3729            buf.skip_ub1()?;  // packed size
3730            for _ in 0..num_batch_msgs {
3731                buf.skip_ub2()?;  // chunk length
3732                buf.read_string_with_length()?;  // message
3733                buf.skip(2)?;  // end marker
3734            }
3735        }
3736
3737        // Extended error number (UB4)
3738        let error_code = buf.read_ub4()?;
3739        // Row count (UB8) - this is the rows affected!
3740        let row_count = buf.read_ub8()?;
3741
3742        // Fields added in Oracle Database 20c (TTC field version >= 16)
3743        // We always skip these since we support Oracle 20c+
3744        buf.skip_ub4()?; // sql_type
3745        buf.skip_ub4()?; // server_checksum
3746
3747        // Error message
3748        let error_msg = if error_code != 0 {
3749            buf.read_string_with_length()?.map(|s| s.trim().to_string())
3750        } else {
3751            None
3752        };
3753
3754        Ok((error_code, error_msg, cursor_id, row_count))
3755    }
3756
3757    /// Parse describe info from response to extract column metadata
3758    ///
3759    /// Per Python's _process_describe_info, the format is:
3760    /// - UB4: max row size (skip)
3761    /// - UB4: number of columns
3762    /// - If num_columns > 0: UB1 (skip one byte)
3763    /// - For each column: metadata fields
3764    /// - After columns: current date, dcb flags, etc.
3765    fn parse_describe_info(&self, buf: &mut ReadBuffer, ttc_field_version: u8) -> Result<Vec<ColumnInfo>> {
3766        use crate::constants::ccap_value;
3767
3768        // Skip max row size
3769        buf.skip_ub4()?;
3770
3771        // Read number of columns
3772        let num_columns = buf.read_ub4()? as usize;
3773        if num_columns == 0 {
3774            return Ok(Vec::new());
3775        }
3776
3777        // Skip one byte if we have columns
3778        buf.skip_ub1()?;
3779
3780        let mut columns = Vec::with_capacity(num_columns);
3781
3782        for _col_idx in 0..num_columns {
3783
3784            // Parse column metadata per Python's _process_metadata
3785            let ora_type_num = buf.read_u8()?;
3786            buf.skip_ub1()?; // flags
3787            let precision = buf.read_u8()?; // precision as SB1
3788            let scale = buf.read_u8()?; // scale as SB1
3789            let buffer_size = buf.read_ub4()?;
3790
3791            buf.skip_ub4()?; // max_num_array_elements
3792            buf.skip_ub8()?; // cont_flags
3793            let _oid = buf.read_bytes_with_length()?; // OID
3794            buf.skip_ub2()?; // version
3795            buf.skip_ub2()?; // charset_id
3796            let _csfrm = buf.read_u8()?; // charset form
3797            let max_size = buf.read_ub4()?;
3798
3799            // For TTC field version >= 12.2 (8), skip oaccolid
3800            if ttc_field_version >= ccap_value::FIELD_VERSION_12_2 {
3801                buf.skip_ub4()?; // oaccolid
3802            }
3803
3804            let _nulls_allowed = buf.read_u8()?;
3805            buf.skip_ub1()?; // v7 length of name
3806            let name = buf.read_string_with_ub4_length()?.unwrap_or_default();
3807            let _schema = buf.read_string_with_ub4_length()?; // schema
3808            let _type_name = buf.read_string_with_ub4_length()?; // type_name
3809            buf.skip_ub2()?; // column position
3810            buf.skip_ub4()?; // uds_flags
3811
3812            // For TTC field version >= 23.1 (17), read domain fields
3813            if ttc_field_version >= ccap_value::FIELD_VERSION_23_1 {
3814                let _domain_schema = buf.read_string_with_ub4_length()?;
3815                let _domain_name = buf.read_string_with_ub4_length()?;
3816            }
3817
3818            // For TTC field version >= 20 (23.1_EXT_3), read annotations
3819            if ttc_field_version >= 20 {
3820                let num_annotations = buf.read_ub4()?;
3821                if num_annotations > 0 {
3822                    buf.skip_ub1()?;
3823                    // Read the actual annotations count (yes, it's read twice in Python)
3824                    let actual_num = buf.read_ub4()?;
3825                    buf.skip_ub1()?;
3826                    for _ in 0..actual_num {
3827                        // Skip annotation key and value (both are string with UB4 length)
3828                        let _key = buf.read_string_with_ub4_length()?;
3829                        let _value = buf.read_string_with_ub4_length()?;
3830                        buf.skip_ub4()?; // flags per annotation
3831                    }
3832                    buf.skip_ub4()?; // final flags
3833                }
3834            }
3835
3836            // For TTC field version >= 24 (23.4), read vector fields
3837            if ttc_field_version >= ccap_value::FIELD_VERSION_23_4 {
3838                buf.skip_ub4()?; // vector_dimensions
3839                buf.skip_ub1()?; // vector_format
3840                buf.skip_ub1()?; // vector_flags
3841            }
3842
3843            // Convert data type to OracleType
3844            let oracle_type = crate::constants::OracleType::try_from(ora_type_num)
3845                .unwrap_or(crate::constants::OracleType::Varchar);
3846
3847            let mut col = ColumnInfo::new(&name, oracle_type);
3848            col.data_size = if max_size > 0 { max_size } else { buffer_size };
3849            col.precision = precision as i16;
3850            col.scale = scale as i16;
3851            columns.push(col);
3852        }
3853
3854        // After columns: skip remaining describe info fields
3855        // Python's _process_describe_info uses:
3856        //   buf.read_ub4(&num_bytes)
3857        //   if num_bytes > 0:
3858        //       buf.skip_raw_bytes_chunked()    # current date
3859        //   buf.skip_ub4()                      # dcbflag
3860        //   buf.skip_ub4()                      # dcbmdbz
3861        //   buf.skip_ub4()                      # dcbmnpr
3862        //   buf.skip_ub4()                      # dcbmxpr
3863        //   buf.read_ub4(&num_bytes)
3864        //   if num_bytes > 0:
3865        //       buf.skip_raw_bytes_chunked()    # dcbqcky
3866
3867        // current_date - read UB4 indicator first, then skip chunked bytes if > 0
3868        let current_date_indicator = buf.read_ub4()?;
3869        if current_date_indicator > 0 {
3870            buf.skip_raw_bytes_chunked()?;
3871        }
3872
3873        // dcb* fields as UB4
3874        buf.skip_ub4()?; // dcbflag
3875        buf.skip_ub4()?; // dcbmdbz
3876        buf.skip_ub4()?; // dcbmnpr
3877        buf.skip_ub4()?; // dcbmxpr
3878
3879        // dcbqcky - read UB4 indicator first, then skip chunked bytes if > 0
3880        let dcbqcky_indicator = buf.read_ub4()?;
3881        if dcbqcky_indicator > 0 {
3882            buf.skip_raw_bytes_chunked()?;
3883        }
3884
3885        // After dcbqcky, the next message (RowHeader) follows directly
3886        // No additional fields to skip here
3887
3888
3889        Ok(columns)
3890    }
3891
3892    /// Commit the current transaction.
3893    ///
3894    /// Makes all changes in the current transaction permanent. After commit,
3895    /// a new transaction begins automatically.
3896    ///
3897    /// # Example
3898    ///
3899    /// ```rust,no_run
3900    /// # use oracle_rs::{Connection, Value};
3901    /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3902    /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Alice".into()]).await?;
3903    /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Bob".into()]).await?;
3904    /// conn.commit().await?; // Both inserts are now permanent
3905    /// # Ok(())
3906    /// # }
3907    /// ```
3908    pub async fn commit(&self) -> Result<()> {
3909        self.ensure_ready().await?;
3910        // Use SQL COMMIT statement instead of simple function
3911        // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3912        self.execute("COMMIT", &[]).await?;
3913        Ok(())
3914    }
3915
3916    /// Rollback the current transaction.
3917    ///
3918    /// Discards all changes made in the current transaction. After rollback,
3919    /// a new transaction begins automatically.
3920    ///
3921    /// # Example
3922    ///
3923    /// ```rust,no_run
3924    /// # use oracle_rs::{Connection, Value};
3925    /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3926    /// conn.execute("DELETE FROM users WHERE id = :1", &[1.into()]).await?;
3927    /// // Oops, wrong user!
3928    /// conn.rollback().await?; // Delete is undone
3929    /// # Ok(())
3930    /// # }
3931    /// ```
3932    pub async fn rollback(&self) -> Result<()> {
3933        self.ensure_ready().await?;
3934        // Use SQL ROLLBACK statement instead of simple function
3935        // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3936        self.execute("ROLLBACK", &[]).await?;
3937        Ok(())
3938    }
3939
3940    /// Create a savepoint within the current transaction
3941    ///
3942    /// Savepoints allow partial rollback of a transaction. You can create multiple
3943    /// savepoints and rollback to any of them without affecting work done before
3944    /// that savepoint.
3945    ///
3946    /// # Arguments
3947    /// * `name` - The savepoint name (must be a valid Oracle identifier)
3948    ///
3949    /// # Example
3950    /// ```rust,ignore
3951    /// conn.execute("INSERT INTO t VALUES (1)", &[]).await?;
3952    /// conn.savepoint("sp1").await?;
3953    /// conn.execute("INSERT INTO t VALUES (2)", &[]).await?;
3954    /// conn.rollback_to_savepoint("sp1").await?; // Undoes the second insert
3955    /// conn.commit().await?; // Commits only the first insert
3956    /// ```
3957    pub async fn savepoint(&self, name: &str) -> Result<()> {
3958        self.ensure_ready().await?;
3959        self.execute(&format!("SAVEPOINT {}", name), &[]).await?;
3960        Ok(())
3961    }
3962
3963    /// Rollback to a previously created savepoint
3964    ///
3965    /// This undoes all changes made after the savepoint was created, but keeps
3966    /// the transaction active. Changes made before the savepoint are preserved.
3967    ///
3968    /// # Arguments
3969    /// * `name` - The savepoint name to rollback to
3970    pub async fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3971        self.ensure_ready().await?;
3972        self.execute(&format!("ROLLBACK TO SAVEPOINT {}", name), &[]).await?;
3973        Ok(())
3974    }
3975
3976    /// Ping the server to check if the connection is still alive.
3977    ///
3978    /// This executes a lightweight query (`SELECT 1 FROM DUAL`) to verify
3979    /// the connection is responsive. Useful for connection health checks
3980    /// in pooling scenarios.
3981    ///
3982    /// # Example
3983    ///
3984    /// ```rust,no_run
3985    /// # use oracle_rs::Connection;
3986    /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3987    /// if conn.ping().await.is_ok() {
3988    ///     println!("Connection is alive");
3989    /// } else {
3990    ///     println!("Connection is dead");
3991    /// }
3992    /// # Ok(())
3993    /// # }
3994    /// ```
3995    pub async fn ping(&self) -> Result<()> {
3996        self.ensure_ready().await?;
3997        // Use SELECT FROM DUAL instead of simple function
3998        // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3999        self.query("SELECT 1 FROM DUAL", &[]).await?;
4000        Ok(())
4001    }
4002
4003    /// Clear the statement cache
4004    ///
4005    /// This should be called when recycling a connection in a pool to ensure
4006    /// that any stale cursor state is cleared. This is useful after errors
4007    /// or when the connection state may be inconsistent.
4008    pub async fn clear_statement_cache(&self) {
4009        let mut inner = self.inner.lock().await;
4010        if let Some(ref mut cache) = inner.statement_cache {
4011            cache.clear();
4012        }
4013    }
4014
4015    /// Read data from a LOB (CLOB or BLOB)
4016    ///
4017    /// # Arguments
4018    /// * `locator` - The LOB locator obtained from a query result
4019    /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4020    /// * `amount` - Amount to read (0 for entire LOB)
4021    ///
4022    /// # Returns
4023    /// For CLOB: returns the text content as a String
4024    /// For BLOB: returns the binary content as bytes
4025    pub async fn read_lob(&self, locator: &LobLocator) -> Result<LobData> {
4026        self.ensure_ready().await?;
4027
4028        // Read the entire LOB starting at offset 1
4029        let offset = 1u64;
4030        let amount = locator.size();
4031
4032        self.read_lob_internal(locator, offset, amount).await
4033    }
4034
4035    /// Read a portion of a LOB
4036    pub async fn read_lob_range(
4037        &self,
4038        locator: &LobLocator,
4039        offset: u64,
4040        amount: u64,
4041    ) -> Result<LobData> {
4042        self.ensure_ready().await?;
4043        self.read_lob_internal(locator, offset, amount).await
4044    }
4045
4046    /// Read a CLOB and return as String
4047    ///
4048    /// This is a convenience method for reading CLOB data directly as a String.
4049    /// Returns an error if the LOB is not a CLOB.
4050    pub async fn read_clob(&self, locator: &LobLocator) -> Result<String> {
4051        if locator.is_blob() || locator.is_bfile() {
4052            return Err(Error::Protocol(
4053                "Cannot read BLOB/BFILE as string, use read_blob instead".to_string(),
4054            ));
4055        }
4056
4057        let data = self.read_lob(locator).await?;
4058        match data {
4059            LobData::String(s) => Ok(s),
4060            LobData::Bytes(_) => Err(Error::Protocol(
4061                "Unexpected bytes from CLOB read".to_string(),
4062            )),
4063        }
4064    }
4065
4066    /// Read a BLOB and return as bytes
4067    ///
4068    /// This is a convenience method for reading BLOB data directly as bytes.
4069    /// Returns an error if the LOB is a CLOB (use read_clob instead).
4070    pub async fn read_blob(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4071        if locator.is_clob() {
4072            return Err(Error::Protocol(
4073                "Cannot read CLOB as bytes, use read_clob instead".to_string(),
4074            ));
4075        }
4076
4077        let data = self.read_lob(locator).await?;
4078        match data {
4079            LobData::Bytes(b) => Ok(b),
4080            LobData::String(_) => Err(Error::Protocol(
4081                "Unexpected string from BLOB read".to_string(),
4082            )),
4083        }
4084    }
4085
4086    /// Read a LOB in chunks, calling a callback for each chunk
4087    ///
4088    /// This is useful for processing large LOBs without loading the entire
4089    /// content into memory. The callback receives each chunk as it's read.
4090    ///
4091    /// # Arguments
4092    /// * `locator` - The LOB locator
4093    /// * `chunk_size` - Size of each chunk to read (0 uses the LOB's natural chunk size)
4094    /// * `callback` - Async function called for each chunk
4095    ///
4096    /// # Example
4097    /// ```ignore
4098    /// let mut total_size = 0;
4099    /// conn.read_lob_chunked(&locator, 8192, |chunk| async move {
4100    ///     match chunk {
4101    ///         LobData::Bytes(b) => total_size += b.len(),
4102    ///         LobData::String(s) => total_size += s.len(),
4103    ///     }
4104    ///     Ok(())
4105    /// }).await?;
4106    /// ```
4107    pub async fn read_lob_chunked<F, Fut>(
4108        &self,
4109        locator: &LobLocator,
4110        chunk_size: u64,
4111        mut callback: F,
4112    ) -> Result<()>
4113    where
4114        F: FnMut(LobData) -> Fut,
4115        Fut: std::future::Future<Output = Result<()>>,
4116    {
4117        self.ensure_ready().await?;
4118
4119        let total_size = locator.size();
4120        if total_size == 0 {
4121            return Ok(());
4122        }
4123
4124        // Use LOB's natural chunk size if not specified
4125        let chunk_size = if chunk_size == 0 {
4126            self.lob_chunk_size(locator).await?.max(8192) as u64
4127        } else {
4128            chunk_size
4129        };
4130
4131        let mut offset = 1u64;
4132        while offset <= total_size {
4133            let remaining = total_size - offset + 1;
4134            let amount = std::cmp::min(remaining, chunk_size);
4135
4136            let chunk = self.read_lob_internal(locator, offset, amount).await?;
4137            callback(chunk).await?;
4138
4139            offset += amount;
4140        }
4141
4142        Ok(())
4143    }
4144
4145    /// Get the optimal chunk size for a LOB
4146    ///
4147    /// This returns the chunk size that Oracle recommends for efficient
4148    /// reading and writing to this LOB.
4149    pub async fn lob_chunk_size(&self, locator: &LobLocator) -> Result<u32> {
4150        self.ensure_ready().await?;
4151
4152        let mut inner = self.inner.lock().await;
4153        let large_sdu = inner.large_sdu;
4154
4155        // Create LOB operation message for get chunk size
4156        let mut lob_msg = LobOpMessage::new_get_chunk_size(locator);
4157        let seq_num = inner.next_sequence_number();
4158        lob_msg.set_sequence_number(seq_num);
4159
4160        // Build and send the request
4161        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4162        inner.send(&request).await?;
4163
4164        // Receive and parse response
4165        let response = inner.receive().await?;
4166        if response.len() <= PACKET_HEADER_SIZE {
4167            return Err(Error::Protocol("Empty LOB chunk size response".to_string()));
4168        }
4169
4170        // Check for MARKER packet
4171        let packet_type = response[4];
4172        if packet_type == PacketType::Marker as u8 {
4173            let error_response = inner.handle_marker_reset().await?;
4174            let payload = &error_response[PACKET_HEADER_SIZE..];
4175            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4176            buf.skip(2)?;
4177            return self.parse_lob_error(&mut buf);
4178        }
4179
4180        // Parse the amount response (chunk size is returned as amount)
4181        self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4182            .map(|v| v as u32)
4183    }
4184
4185    /// Internal LOB read implementation
4186    async fn read_lob_internal(
4187        &self,
4188        locator: &LobLocator,
4189        offset: u64,
4190        amount: u64,
4191    ) -> Result<LobData> {
4192        let mut inner = self.inner.lock().await;
4193        let large_sdu = inner.large_sdu;
4194
4195        // Create LOB operation message for read
4196        let mut lob_msg = LobOpMessage::new_read(locator, offset, amount);
4197        let seq_num = inner.next_sequence_number();
4198        lob_msg.set_sequence_number(seq_num);
4199
4200        // Build and send the request
4201        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4202        inner.send(&request).await?;
4203
4204        // Receive and parse response (may span multiple packets for large LOBs)
4205        let response = inner.receive_response().await?;
4206        if response.len() <= PACKET_HEADER_SIZE {
4207            return Err(Error::Protocol("Empty LOB read response".to_string()));
4208        }
4209
4210        // Check for MARKER packet (indicates error)
4211        let packet_type = response[4];
4212        if packet_type == PacketType::Marker as u8 {
4213            let error_response = inner.handle_marker_reset().await?;
4214            let payload = &error_response[PACKET_HEADER_SIZE..];
4215            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4216            // Skip data flags
4217            buf.skip(2)?;
4218            return self.parse_lob_error(&mut buf);
4219        }
4220
4221        // Parse LOB data response
4222        let payload = &response[PACKET_HEADER_SIZE..];
4223        self.parse_lob_read_response(payload, locator)
4224    }
4225
4226    /// Parse LOB read response
4227    fn parse_lob_read_response(&self, payload: &[u8], locator: &LobLocator) -> Result<LobData> {
4228        use crate::buffer::ReadBuffer;
4229
4230        let mut buf = ReadBuffer::from_slice(payload);
4231
4232        // Skip data flags
4233        buf.skip(2)?;
4234
4235        let mut lob_data: Option<Vec<u8>> = None;
4236
4237        // Process messages until end of response
4238        while buf.remaining() > 0 {
4239            let msg_type = buf.read_u8()?;
4240
4241            match msg_type {
4242                // LobData message (14)
4243                x if x == MessageType::LobData as u8 => {
4244                    // Read LOB data with length
4245                    let data = buf.read_raw_bytes_chunked()?;
4246                    lob_data = Some(data);
4247                }
4248
4249                // Parameter return (8) - contains updated locator and amount
4250                x if x == MessageType::Parameter as u8 => {
4251                    // Skip the updated locator (same length as original)
4252                    let locator_len = locator.locator_bytes().len();
4253                    buf.skip(locator_len)?;
4254
4255                    // Read back the amount (ub8)
4256                    let _returned_amount = buf.read_ub8()?;
4257                }
4258
4259                // Error/Status message (4) - code 0 means success
4260                x if x == MessageType::Error as u8 => {
4261                    // Parse error info - code 0 means success
4262                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4263                        if code != 0 {
4264                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4265                            return Err(Error::OracleError { code, message });
4266                        }
4267                        // code 0 = success, continue processing
4268                    }
4269                }
4270
4271                // End of response (29)
4272                x if x == MessageType::EndOfResponse as u8 => {
4273                    break;
4274                }
4275
4276                // Skip other message types
4277                _ => {
4278                    // Try to skip unknown messages
4279                    continue;
4280                }
4281            }
4282        }
4283
4284        // Convert to appropriate type based on LOB type
4285        match lob_data {
4286            Some(data) => {
4287                if locator.is_blob() || locator.is_bfile() {
4288                    Ok(LobData::Bytes(bytes::Bytes::from(data)))
4289                } else {
4290                    // CLOB - decode based on encoding
4291                    let text = if locator.uses_var_length_charset() {
4292                        // UTF-16 BE encoding
4293                        let chars: Vec<u16> = data
4294                            .chunks_exact(2)
4295                            .map(|c| u16::from_be_bytes([c[0], c[1]]))
4296                            .collect();
4297                        String::from_utf16_lossy(&chars)
4298                    } else {
4299                        // UTF-8 encoding
4300                        String::from_utf8_lossy(&data).to_string()
4301                    };
4302                    Ok(LobData::String(text))
4303                }
4304            }
4305            None => {
4306                // Empty LOB
4307                if locator.is_blob() || locator.is_bfile() {
4308                    Ok(LobData::Bytes(bytes::Bytes::new()))
4309                } else {
4310                    Ok(LobData::String(String::new()))
4311                }
4312            }
4313        }
4314    }
4315
4316    /// Write data to a LOB
4317    ///
4318    /// # Arguments
4319    /// * `locator` - The LOB locator obtained from a query result
4320    /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4321    /// * `data` - Data to write (bytes for BLOB, UTF-8 encoded bytes for CLOB)
4322    pub async fn write_lob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4323        self.ensure_ready().await?;
4324
4325        let mut inner = self.inner.lock().await;
4326        let large_sdu = inner.large_sdu;
4327        let sdu_size = inner.sdu_size as usize;
4328
4329        // Encode data for CLOB if necessary
4330        let encoded_data: Vec<u8>;
4331        let write_data = if locator.is_clob() && locator.uses_var_length_charset() {
4332            // Convert UTF-8 to UTF-16 BE for CLOB with var length charset
4333            let text = String::from_utf8_lossy(data);
4334            encoded_data = text
4335                .encode_utf16()
4336                .flat_map(|c| c.to_be_bytes())
4337                .collect();
4338            &encoded_data[..]
4339        } else {
4340            data
4341        };
4342
4343        // Create LOB operation message for write
4344        let mut lob_msg = LobOpMessage::new_write(locator, offset, write_data);
4345        let seq_num = inner.next_sequence_number();
4346        lob_msg.set_sequence_number(seq_num);
4347
4348        // Build message content (without packet header or data flags)
4349        let message = lob_msg.build_message_only(&inner.capabilities)?;
4350
4351        // Calculate if this fits in a single packet
4352        // Single packet max payload = SDU - header (8) - data flags (2)
4353        let max_single_packet_payload = sdu_size.saturating_sub(PACKET_HEADER_SIZE + 2);
4354
4355        let is_multi_packet = message.len() > max_single_packet_payload;
4356
4357        if is_multi_packet {
4358            // Needs multiple packets - use multi-packet sender
4359            inner.send_multi_packet(&message, 0).await?;
4360        } else {
4361            // Fits in one packet - use standard send
4362            let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4363            inner.send(&request).await?;
4364        }
4365
4366        // Receive and parse response
4367        // Use receive_response() to accumulate all packets until END_OF_RESPONSE.
4368        // This is necessary because Oracle may send multiple packets for the response,
4369        // and if we only read one packet, leftover data causes close() to hang.
4370        let response = inner.receive_response().await?;
4371        if response.len() <= PACKET_HEADER_SIZE {
4372            return Err(Error::Protocol("Empty LOB write response".to_string()));
4373        }
4374
4375        // Check for MARKER packet (indicates error)
4376        let packet_type = response[4];
4377        if packet_type == PacketType::Marker as u8 {
4378            let error_response = inner.handle_marker_reset().await?;
4379            let payload = &error_response[PACKET_HEADER_SIZE..];
4380            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4381            buf.skip(2)?;
4382            return self.parse_lob_error(&mut buf);
4383        }
4384
4385        // Parse response to check for errors
4386        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4387    }
4388
4389    /// Write string data to a CLOB
4390    pub async fn write_clob(&self, locator: &LobLocator, offset: u64, text: &str) -> Result<()> {
4391        if locator.is_blob() || locator.is_bfile() {
4392            return Err(Error::Protocol(
4393                "Cannot write string to BLOB/BFILE, use write_blob instead".to_string(),
4394            ));
4395        }
4396        self.write_lob(locator, offset, text.as_bytes()).await
4397    }
4398
4399    /// Write binary data to a BLOB
4400    pub async fn write_blob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4401        if locator.is_clob() {
4402            return Err(Error::Protocol(
4403                "Cannot write bytes to CLOB, use write_clob instead".to_string(),
4404            ));
4405        }
4406        self.write_lob(locator, offset, data).await
4407    }
4408
4409    /// Get the length of a LOB
4410    ///
4411    /// For CLOB: returns length in characters
4412    /// For BLOB: returns length in bytes
4413    pub async fn lob_length(&self, locator: &LobLocator) -> Result<u64> {
4414        self.ensure_ready().await?;
4415
4416        let mut inner = self.inner.lock().await;
4417        let large_sdu = inner.large_sdu;
4418
4419        // Create LOB operation message for get length
4420        let mut lob_msg = LobOpMessage::new_get_length(locator);
4421        let seq_num = inner.next_sequence_number();
4422        lob_msg.set_sequence_number(seq_num);
4423
4424        // Build and send the request
4425        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4426        inner.send(&request).await?;
4427
4428        // Receive and parse response
4429        let response = inner.receive().await?;
4430        if response.len() <= PACKET_HEADER_SIZE {
4431            return Err(Error::Protocol("Empty LOB get_length response".to_string()));
4432        }
4433
4434        // Check for MARKER packet (indicates error)
4435        let packet_type = response[4];
4436        if packet_type == PacketType::Marker as u8 {
4437            let error_response = inner.handle_marker_reset().await?;
4438            let payload = &error_response[PACKET_HEADER_SIZE..];
4439            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4440            buf.skip(2)?;
4441            return self.parse_lob_error(&mut buf);
4442        }
4443
4444        // Parse response to get the length
4445        self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4446    }
4447
4448    /// Trim a LOB to a specified length
4449    ///
4450    /// # Arguments
4451    /// * `locator` - The LOB locator
4452    /// * `new_size` - The new size (in characters for CLOB, bytes for BLOB)
4453    pub async fn lob_trim(&self, locator: &LobLocator, new_size: u64) -> Result<()> {
4454        self.ensure_ready().await?;
4455
4456        let mut inner = self.inner.lock().await;
4457        let large_sdu = inner.large_sdu;
4458
4459        // Create LOB operation message for trim
4460        let mut lob_msg = LobOpMessage::new_trim(locator, new_size);
4461        let seq_num = inner.next_sequence_number();
4462        lob_msg.set_sequence_number(seq_num);
4463
4464        // Build and send the request
4465        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4466        inner.send(&request).await?;
4467
4468        // Receive and parse response
4469        let response = inner.receive().await?;
4470        if response.len() <= PACKET_HEADER_SIZE {
4471            return Err(Error::Protocol("Empty LOB trim response".to_string()));
4472        }
4473
4474        // Check for MARKER packet (indicates error)
4475        let packet_type = response[4];
4476        if packet_type == PacketType::Marker as u8 {
4477            let error_response = inner.handle_marker_reset().await?;
4478            let payload = &error_response[PACKET_HEADER_SIZE..];
4479            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4480            buf.skip(2)?;
4481            return self.parse_lob_error(&mut buf);
4482        }
4483
4484        // Parse response to check for errors
4485        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4486    }
4487
4488    /// Create a temporary LOB on the server
4489    ///
4490    /// Creates a temporary LOB of the specified type that lives until the connection
4491    /// is closed or the LOB is explicitly freed.
4492    ///
4493    /// # Arguments
4494    /// * `oracle_type` - The LOB type to create (Clob or Blob)
4495    ///
4496    /// # Returns
4497    /// A `LobLocator` for the newly created temporary LOB
4498    ///
4499    /// # Example
4500    /// ```ignore
4501    /// use oracle_rs::OracleType;
4502    ///
4503    /// let locator = conn.create_temp_lob(OracleType::Clob).await?;
4504    /// conn.write_clob(&locator, 1, "Hello, World!").await?;
4505    /// // Now bind the locator to insert into a CLOB column
4506    /// ```
4507    pub async fn create_temp_lob(&self, oracle_type: OracleType) -> Result<LobLocator> {
4508        use crate::buffer::ReadBuffer;
4509
4510        // Validate oracle_type is a LOB type
4511        match oracle_type {
4512            OracleType::Clob | OracleType::Blob => {}
4513            _ => {
4514                return Err(Error::Protocol(format!(
4515                    "create_temp_lob: invalid type {:?}, must be Clob or Blob",
4516                    oracle_type
4517                )));
4518            }
4519        }
4520
4521        self.ensure_ready().await?;
4522
4523        let mut inner = self.inner.lock().await;
4524        let large_sdu = inner.large_sdu;
4525
4526        // Create the CREATE_TEMP message
4527        let mut lob_msg = LobOpMessage::new_create_temp(oracle_type);
4528        let seq_num = inner.next_sequence_number();
4529        lob_msg.set_sequence_number(seq_num);
4530
4531        // Build and send the request
4532        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4533        inner.send(&request).await?;
4534
4535        // Receive and parse response
4536        let response = inner.receive().await?;
4537        if response.len() <= PACKET_HEADER_SIZE {
4538            return Err(Error::Protocol("Empty CREATE_TEMP LOB response".to_string()));
4539        }
4540
4541        // Check for MARKER packet (indicates error)
4542        let packet_type = response[4];
4543        if packet_type == PacketType::Marker as u8 {
4544            let error_response = inner.handle_marker_reset().await?;
4545            let payload = &error_response[PACKET_HEADER_SIZE..];
4546            let mut buf = ReadBuffer::from_slice(payload);
4547            buf.skip(2)?;
4548            return self.parse_lob_error(&mut buf);
4549        }
4550
4551        // Parse response to extract the locator
4552        let payload = &response[PACKET_HEADER_SIZE..];
4553        let mut buf = ReadBuffer::from_slice(payload);
4554        buf.skip(2)?; // Skip data flags
4555
4556        let mut locator_bytes: Option<Vec<u8>> = None;
4557
4558        while buf.remaining() > 0 {
4559            let msg_type = buf.read_u8()?;
4560
4561            match msg_type {
4562                // Parameter return (8) - contains the populated locator
4563                x if x == MessageType::Parameter as u8 => {
4564                    // Read the 40-byte locator
4565                    let loc_data = buf.read_bytes_vec(40)?;
4566                    locator_bytes = Some(loc_data);
4567                    // Skip charset (ub2) and trailing flags (ub1)
4568                    buf.skip(2)?; // charset
4569                    buf.skip(1)?; // flags
4570                }
4571
4572                // Error/Status message (4) - code 0 means success
4573                x if x == MessageType::Error as u8 => {
4574                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4575                        if code != 0 {
4576                            let message = msg.unwrap_or_else(|| "CREATE_TEMP LOB error".to_string());
4577                            return Err(Error::OracleError { code, message });
4578                        }
4579                    }
4580                }
4581
4582                // End of response (29)
4583                x if x == MessageType::EndOfResponse as u8 => {
4584                    break;
4585                }
4586
4587                _ => continue,
4588            }
4589        }
4590
4591        // Create the LobLocator from the returned bytes
4592        let loc_bytes = locator_bytes.ok_or_else(|| {
4593            Error::Protocol("CREATE_TEMP LOB response did not contain locator".to_string())
4594        })?;
4595
4596        // Create LobLocator with size 0, chunk_size 0 (will be fetched if needed)
4597        let locator = LobLocator::new(
4598            bytes::Bytes::from(loc_bytes),
4599            0,      // size - unknown for new temp LOB
4600            0,      // chunk_size - unknown, can be fetched later
4601            oracle_type,
4602            1,      // csfrm - 1 for CLOB, 0 for BLOB (but we store it on the locator type)
4603        );
4604
4605        Ok(locator)
4606    }
4607
4608    // ==================== BFILE Operations ====================
4609
4610    /// Check if a BFILE exists on the server
4611    ///
4612    /// Returns true if the file referenced by the BFILE locator exists on the server.
4613    pub async fn bfile_exists(&self, locator: &LobLocator) -> Result<bool> {
4614        self.ensure_ready().await?;
4615
4616        if !locator.is_bfile() {
4617            return Err(Error::Protocol("bfile_exists called on non-BFILE locator".to_string()));
4618        }
4619
4620        let mut inner = self.inner.lock().await;
4621        let large_sdu = inner.large_sdu;
4622
4623        let mut lob_msg = LobOpMessage::new_file_exists(locator);
4624        let seq_num = inner.next_sequence_number();
4625        lob_msg.set_sequence_number(seq_num);
4626
4627        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4628        inner.send(&request).await?;
4629
4630        let response = inner.receive().await?;
4631        if response.len() <= PACKET_HEADER_SIZE {
4632            return Err(Error::Protocol("Empty BFILE exists response".to_string()));
4633        }
4634
4635        let packet_type = response[4];
4636        if packet_type == PacketType::Marker as u8 {
4637            let error_response = inner.handle_marker_reset().await?;
4638            let payload = &error_response[PACKET_HEADER_SIZE..];
4639            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4640            buf.skip(2)?;
4641            return self.parse_lob_error(&mut buf);
4642        }
4643
4644        self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4645    }
4646
4647    /// Open a BFILE for reading
4648    ///
4649    /// The BFILE must be opened before reading. After reading, close it with bfile_close.
4650    pub async fn bfile_open(&self, locator: &LobLocator) -> Result<()> {
4651        self.ensure_ready().await?;
4652
4653        if !locator.is_bfile() {
4654            return Err(Error::Protocol("bfile_open called on non-BFILE locator".to_string()));
4655        }
4656
4657        let mut inner = self.inner.lock().await;
4658        let large_sdu = inner.large_sdu;
4659
4660        let mut lob_msg = LobOpMessage::new_file_open(locator);
4661        let seq_num = inner.next_sequence_number();
4662        lob_msg.set_sequence_number(seq_num);
4663
4664        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4665        inner.send(&request).await?;
4666
4667        let response = inner.receive().await?;
4668        if response.len() <= PACKET_HEADER_SIZE {
4669            return Err(Error::Protocol("Empty BFILE open response".to_string()));
4670        }
4671
4672        let packet_type = response[4];
4673        if packet_type == PacketType::Marker as u8 {
4674            let error_response = inner.handle_marker_reset().await?;
4675            let payload = &error_response[PACKET_HEADER_SIZE..];
4676            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4677            buf.skip(2)?;
4678            return self.parse_lob_error(&mut buf);
4679        }
4680
4681        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4682    }
4683
4684    /// Close a BFILE after reading
4685    pub async fn bfile_close(&self, locator: &LobLocator) -> Result<()> {
4686        self.ensure_ready().await?;
4687
4688        if !locator.is_bfile() {
4689            return Err(Error::Protocol("bfile_close called on non-BFILE locator".to_string()));
4690        }
4691
4692        let mut inner = self.inner.lock().await;
4693        let large_sdu = inner.large_sdu;
4694
4695        let mut lob_msg = LobOpMessage::new_file_close(locator);
4696        let seq_num = inner.next_sequence_number();
4697        lob_msg.set_sequence_number(seq_num);
4698
4699        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4700        inner.send(&request).await?;
4701
4702        let response = inner.receive().await?;
4703        if response.len() <= PACKET_HEADER_SIZE {
4704            return Err(Error::Protocol("Empty BFILE close response".to_string()));
4705        }
4706
4707        let packet_type = response[4];
4708        if packet_type == PacketType::Marker as u8 {
4709            let error_response = inner.handle_marker_reset().await?;
4710            let payload = &error_response[PACKET_HEADER_SIZE..];
4711            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4712            buf.skip(2)?;
4713            return self.parse_lob_error(&mut buf);
4714        }
4715
4716        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4717    }
4718
4719    /// Check if a BFILE is currently open
4720    pub async fn bfile_is_open(&self, locator: &LobLocator) -> Result<bool> {
4721        self.ensure_ready().await?;
4722
4723        if !locator.is_bfile() {
4724            return Err(Error::Protocol("bfile_is_open called on non-BFILE locator".to_string()));
4725        }
4726
4727        let mut inner = self.inner.lock().await;
4728        let large_sdu = inner.large_sdu;
4729
4730        let mut lob_msg = LobOpMessage::new_file_is_open(locator);
4731        let seq_num = inner.next_sequence_number();
4732        lob_msg.set_sequence_number(seq_num);
4733
4734        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4735        inner.send(&request).await?;
4736
4737        let response = inner.receive().await?;
4738        if response.len() <= PACKET_HEADER_SIZE {
4739            return Err(Error::Protocol("Empty BFILE is_open response".to_string()));
4740        }
4741
4742        let packet_type = response[4];
4743        if packet_type == PacketType::Marker as u8 {
4744            let error_response = inner.handle_marker_reset().await?;
4745            let payload = &error_response[PACKET_HEADER_SIZE..];
4746            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4747            buf.skip(2)?;
4748            return self.parse_lob_error(&mut buf);
4749        }
4750
4751        self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4752    }
4753
4754    /// Read BFILE data
4755    ///
4756    /// This is a convenience method that opens the BFILE if needed, reads all content,
4757    /// and returns it as bytes. For large BFILEs, consider using read_lob_chunked.
4758    pub async fn read_bfile(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4759        if !locator.is_bfile() {
4760            return Err(Error::Protocol("read_bfile called on non-BFILE locator".to_string()));
4761        }
4762
4763        // Check if file is open, open if needed
4764        let should_close = if !self.bfile_is_open(locator).await? {
4765            self.bfile_open(locator).await?;
4766            true
4767        } else {
4768            false
4769        };
4770
4771        // Read all data
4772        let result = self.read_blob(locator).await;
4773
4774        // Close if we opened it
4775        if should_close {
4776            let _ = self.bfile_close(locator).await;
4777        }
4778
4779        result
4780    }
4781
4782    /// Parse a LOB operation response that returns a boolean (file_exists, is_open)
4783    fn parse_lob_bool_response(&self, payload: &[u8], locator: &LobLocator) -> Result<bool> {
4784        use crate::buffer::ReadBuffer;
4785
4786        let mut buf = ReadBuffer::from_slice(payload);
4787        buf.skip(2)?; // Skip data flags
4788
4789        let mut bool_result: bool = false;
4790
4791        while buf.remaining() > 0 {
4792            let msg_type = buf.read_u8()?;
4793
4794            match msg_type {
4795                // Parameter return (8) - contains updated locator and bool flag
4796                x if x == MessageType::Parameter as u8 => {
4797                    let locator_len = locator.locator_bytes().len();
4798                    buf.skip(locator_len)?;
4799                    // Boolean flag is a single byte, > 0 means true
4800                    let flag = buf.read_u8()?;
4801                    bool_result = flag > 0;
4802                }
4803
4804                // Error/Status message (4) - code 0 means success
4805                x if x == MessageType::Error as u8 => {
4806                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4807                        if code != 0 {
4808                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4809                            return Err(Error::OracleError { code, message });
4810                        }
4811                    }
4812                }
4813
4814                // End of response (29)
4815                x if x == MessageType::EndOfResponse as u8 => {
4816                    break;
4817                }
4818
4819                _ => continue,
4820            }
4821        }
4822
4823        Ok(bool_result)
4824    }
4825
4826    /// Parse a simple LOB operation response (write, trim)
4827    fn parse_lob_simple_response(&self, payload: &[u8], locator: &LobLocator) -> Result<()> {
4828        use crate::buffer::ReadBuffer;
4829
4830        let mut buf = ReadBuffer::from_slice(payload);
4831        buf.skip(2)?; // Skip data flags
4832
4833        while buf.remaining() > 0 {
4834            let msg_type = buf.read_u8()?;
4835
4836            match msg_type {
4837                // Parameter return (8) - contains updated locator and possibly amount
4838                x if x == MessageType::Parameter as u8 => {
4839                    let locator_len = locator.locator_bytes().len();
4840                    buf.skip(locator_len)?;
4841                    // After locator, there may be amount (ub8) if send_amount was true
4842                    // We just skip any remaining bytes until we hit Error or EndOfResponse
4843                }
4844
4845                // Error/Status message (4) - code 0 means success
4846                x if x == MessageType::Error as u8 => {
4847                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4848                        if code != 0 {
4849                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4850                            return Err(Error::OracleError { code, message });
4851                        }
4852                    }
4853                }
4854
4855                // End of response (29)
4856                x if x == MessageType::EndOfResponse as u8 => {
4857                    break;
4858                }
4859
4860                _ => continue,
4861            }
4862        }
4863
4864        Ok(())
4865    }
4866
4867    /// Parse a LOB operation response that returns an amount (get_length, get_chunk_size)
4868    fn parse_lob_amount_response(&self, payload: &[u8], locator: &LobLocator) -> Result<u64> {
4869        use crate::buffer::ReadBuffer;
4870
4871        let mut buf = ReadBuffer::from_slice(payload);
4872        buf.skip(2)?; // Skip data flags
4873
4874        let mut returned_amount: u64 = 0;
4875
4876        while buf.remaining() > 0 {
4877            let msg_type = buf.read_u8()?;
4878
4879            match msg_type {
4880                // Parameter return (8) - contains updated locator and amount
4881                x if x == MessageType::Parameter as u8 => {
4882                    let locator_len = locator.locator_bytes().len();
4883                    buf.skip(locator_len)?;
4884                    returned_amount = buf.read_ub8()?;
4885                }
4886
4887                // Error/Status message (4) - code 0 means success
4888                x if x == MessageType::Error as u8 => {
4889                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4890                        if code != 0 {
4891                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4892                            return Err(Error::OracleError { code, message });
4893                        }
4894                    }
4895                }
4896
4897                // End of response (29)
4898                x if x == MessageType::EndOfResponse as u8 => {
4899                    break;
4900                }
4901
4902                _ => continue,
4903            }
4904        }
4905
4906        Ok(returned_amount)
4907    }
4908
4909    /// Parse LOB error response
4910    fn parse_lob_error<T>(&self, buf: &mut crate::buffer::ReadBuffer) -> Result<T> {
4911        // Try to extract error info
4912        if let Ok((code, msg, _)) = self.parse_error_info(buf) {
4913            let message = msg.unwrap_or_else(|| "Unknown LOB error".to_string());
4914            Err(Error::OracleError { code, message })
4915        } else {
4916            Err(Error::Protocol("LOB operation failed".to_string()))
4917        }
4918    }
4919
4920    /// Close the connection.
4921    ///
4922    /// Sends a logoff message to the server and closes the underlying TCP
4923    /// connection. After calling close, the connection cannot be reused.
4924    ///
4925    /// If the connection is already closed, this method returns `Ok(())`
4926    /// without doing anything.
4927    ///
4928    /// # Note
4929    ///
4930    /// Any uncommitted transaction is rolled back by the server when the
4931    /// connection is closed.
4932    ///
4933    /// # Example
4934    ///
4935    /// ```rust,no_run
4936    /// # use oracle_rs::{Config, Connection};
4937    /// # async fn example() -> oracle_rs::Result<()> {
4938    /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
4939    /// let conn = Connection::connect_with_config(config).await?;
4940    ///
4941    /// // Do work...
4942    /// conn.commit().await?;
4943    ///
4944    /// // Explicitly close when done
4945    /// conn.close().await?;
4946    /// # Ok(())
4947    /// # }
4948    /// ```
4949    pub async fn close(&self) -> Result<()> {
4950        if self.closed.swap(true, Ordering::Relaxed) {
4951            // Already closed
4952            return Ok(());
4953        }
4954
4955        let mut inner = self.inner.lock().await;
4956
4957        if inner.state == ConnectionState::Ready {
4958            // Send logoff
4959            let _ = self.send_simple_function_inner(&mut inner, FunctionCode::Logoff).await;
4960        }
4961
4962        inner.state = ConnectionState::Closed;
4963
4964        // Close the TCP stream
4965        if let Some(stream) = inner.stream.take() {
4966            drop(stream);
4967        }
4968
4969        Ok(())
4970    }
4971
4972    /// Send a marker packet to the server
4973    /// marker_type: 1=BREAK, 2=RESET, 3=INTERRUPT
4974    async fn send_marker(&self, inner: &mut ConnectionInner, marker_type: u8) -> Result<()> {
4975        let mut packet_buf = WriteBuffer::new();
4976
4977        // Build MARKER packet header
4978        let packet_len = PACKET_HEADER_SIZE + 3; // Header + 3 bytes payload
4979        packet_buf.write_u16_be(packet_len as u16)?;
4980        packet_buf.write_u16_be(0)?; // Checksum
4981        packet_buf.write_u8(PacketType::Marker as u8)?;
4982        packet_buf.write_u8(0)?; // Flags
4983        packet_buf.write_u16_be(0)?; // Header checksum
4984
4985        // Marker payload: [1, 0, marker_type] per Python's _send_marker
4986        packet_buf.write_u8(1)?;
4987        packet_buf.write_u8(0)?;
4988        packet_buf.write_u8(marker_type)?;
4989
4990        inner.send(&packet_buf.freeze()).await
4991    }
4992
4993    async fn send_simple_function_inner(
4994        &self,
4995        inner: &mut ConnectionInner,
4996        function_code: FunctionCode,
4997    ) -> Result<()> {
4998        // Build function message
4999        let mut buf = WriteBuffer::new();
5000
5001        // Get next sequence number (must be done before sending)
5002        let seq_num = inner.next_sequence_number();
5003
5004        // Data flags
5005        buf.write_u16_be(0)?;
5006        // Message type: Function
5007        buf.write_u8(MessageType::Function as u8)?;
5008        // Function code
5009        buf.write_u8(function_code as u8)?;
5010        // Sequence number (tracked per connection)
5011        buf.write_u8(seq_num)?;
5012
5013        // Token number (required for TTC field version >= 18, i.e. Oracle 23ai)
5014        if inner.capabilities.ttc_field_version >= 18 {
5015            buf.write_ub8(0)?;
5016        }
5017
5018        // Build DATA packet
5019        let data_payload = buf.freeze();
5020        let mut packet_buf = WriteBuffer::new();
5021        let packet_len = PACKET_HEADER_SIZE + data_payload.len();
5022        packet_buf.write_u16_be(packet_len as u16)?;
5023        packet_buf.write_u16_be(0)?; // Checksum
5024        packet_buf.write_u8(PacketType::Data as u8)?;
5025        packet_buf.write_u8(0)?; // Flags
5026        packet_buf.write_u16_be(0)?; // Header checksum
5027        packet_buf.write_bytes(&data_payload)?;
5028
5029        let packet_bytes = packet_buf.freeze();
5030        inner.send(&packet_bytes).await?;
5031
5032        // Wait for response
5033        let response = inner.receive().await?;
5034
5035        // Check response
5036        if response.len() <= 4 {
5037            return Err(Error::Protocol("Response too short".to_string()));
5038        }
5039
5040        let packet_type = response[4];
5041
5042        // MARKER packet (type 12) - need to handle reset protocol
5043        if packet_type == PacketType::Marker as u8 {
5044            // Check marker type
5045            if response.len() >= PACKET_HEADER_SIZE + 3 {
5046                let marker_type = response[PACKET_HEADER_SIZE + 2];
5047
5048                // For BREAK marker (1), we need to do the reset protocol
5049                if marker_type == 1 {
5050                    // For Logoff, Oracle may send BREAK to indicate "connection closing"
5051                    // Don't try to do the full reset handshake - just return success
5052                    if function_code == FunctionCode::Logoff {
5053                        inner.state = ConnectionState::Closed;
5054                        return Ok(());
5055                    }
5056
5057                    // The BREAK marker means the server is interrupting/breaking the current operation
5058                    // We MUST complete the reset handshake or the connection will be in a bad state
5059
5060                    // Send RESET marker to server
5061                    if let Err(e) = self.send_marker(inner, 2).await {
5062                        inner.state = ConnectionState::Closed;
5063                        return Err(e);
5064                    }
5065
5066                    // Read and discard packets until we get RESET marker
5067                    // This follows Python's _reset() logic
5068                    let mut current_packet_type: u8;
5069                    loop {
5070                        match inner.receive().await {
5071                            Ok(pkt) => {
5072                                if pkt.len() < PACKET_HEADER_SIZE + 1 {
5073                                    break;
5074                                }
5075                                current_packet_type = pkt[4];
5076
5077                                if current_packet_type == PacketType::Marker as u8 {
5078                                    if pkt.len() >= PACKET_HEADER_SIZE + 3 {
5079                                        let mk_type = pkt[PACKET_HEADER_SIZE + 2];
5080                                        if mk_type == 2 {
5081                                            // Got RESET marker, exit this loop
5082                                            break;
5083                                        }
5084                                    }
5085                                } else {
5086                                    // Non-marker packet - unexpected during reset wait
5087                                    break;
5088                                }
5089                            }
5090                            Err(e) => {
5091                                inner.state = ConnectionState::Closed;
5092                                return Err(e);
5093                            }
5094                        }
5095                    }
5096
5097                    // After RESET, continue reading while we still get MARKER packets
5098                    // Some servers send multiple RESET markers, others send DATA response
5099                    // Python comment: "some quit immediately" - meaning some servers close
5100                    // the connection right after the reset handshake
5101                    loop {
5102                        match inner.receive().await {
5103                            Ok(pkt) => {
5104                                if pkt.len() < PACKET_HEADER_SIZE + 1 {
5105                                    break;
5106                                }
5107                                current_packet_type = pkt[4];
5108
5109                                if current_packet_type == PacketType::Marker as u8 {
5110                                    // Another marker, continue reading
5111                                    continue;
5112                                }
5113
5114                                // Got a non-marker packet (probably DATA with error/status)
5115                                if current_packet_type == PacketType::Data as u8 {
5116                                    if pkt.len() > PACKET_HEADER_SIZE + 2 {
5117                                        let msg_type = pkt[PACKET_HEADER_SIZE + 2];
5118                                        if msg_type == MessageType::Error as u8 {
5119                                            let payload = &pkt[PACKET_HEADER_SIZE..];
5120                                            let mut buf = ReadBuffer::from_slice(payload);
5121                                            buf.skip(2)?; // data flags
5122                                            buf.skip(1)?; // msg_type
5123                                            let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5124                                            if error_code != 0 {
5125                                                return Err(Error::OracleError {
5126                                                    code: error_code,
5127                                                    message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5128                                                });
5129                                            }
5130                                        }
5131                                    }
5132                                }
5133                                // Exit after processing non-marker packet
5134                                break;
5135                            }
5136                            Err(_) => {
5137                                // Error reading - connection might be closed
5138                                // Python comment says "some quit immediately" - meaning some
5139                                // servers close the connection after BREAK/RESET handshake.
5140                                // For commit/rollback/logoff, treat this as success since
5141                                // the operation was processed before the close.
5142                                if matches!(function_code,
5143                                    FunctionCode::Logoff |
5144                                    FunctionCode::Commit |
5145                                    FunctionCode::Rollback
5146                                ) {
5147                                    // The operation succeeded, but the server closed the connection
5148                                    // Mark connection as closed for future operations
5149                                    inner.state = ConnectionState::Closed;
5150                                    return Ok(());
5151                                }
5152                                // For other functions, this means connection is broken
5153                                inner.state = ConnectionState::Closed;
5154                                // Don't return error for Ping - treat as success
5155                                if function_code == FunctionCode::Ping {
5156                                    return Ok(());
5157                                }
5158                                return Ok(()); // Conservative approach - treat as success
5159                            }
5160                        }
5161                    }
5162
5163                    return Ok(());
5164                }
5165            }
5166            // For non-BREAK markers, just return success
5167            return Ok(());
5168        }
5169
5170        // DATA packet (type 6)
5171        if packet_type == PacketType::Data as u8 {
5172            // Parse response to check for errors
5173            if response.len() > PACKET_HEADER_SIZE + 2 {
5174                let msg_type = response[PACKET_HEADER_SIZE + 2];
5175                if msg_type == MessageType::Error as u8 {
5176                    // Parse the error info
5177                    let payload = &response[PACKET_HEADER_SIZE..];
5178                    let mut buf = ReadBuffer::from_slice(payload);
5179                    buf.skip(2)?; // data flags
5180                    buf.skip(1)?; // msg_type
5181                    let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5182                    if error_code != 0 {
5183                        return Err(Error::OracleError {
5184                            code: error_code,
5185                            message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5186                        });
5187                    }
5188                }
5189            }
5190            return Ok(());
5191        }
5192
5193        Err(Error::Protocol(format!("Unexpected packet type {} for function call", packet_type)))
5194    }
5195
5196    /// Ensure the connection is ready for operations
5197    async fn ensure_ready(&self) -> Result<()> {
5198        if self.is_closed() {
5199            return Err(Error::ConnectionClosed);
5200        }
5201
5202        let inner = self.inner.lock().await;
5203        if inner.state != ConnectionState::Ready {
5204            return Err(Error::ConnectionNotReady);
5205        }
5206
5207        Ok(())
5208    }
5209}
5210
5211impl Drop for Connection {
5212    fn drop(&mut self) {
5213        // Note: Can't do async cleanup in Drop
5214        // Users should call close() explicitly
5215        self.closed.store(true, Ordering::Relaxed);
5216    }
5217}
5218
5219// =============================================================================
5220// Helper functions for get_type()
5221// =============================================================================
5222
5223/// Parse a type name into (schema, name) components
5224///
5225/// Handles formats like:
5226/// - "SCHEMA.TYPE_NAME" -> ("SCHEMA", "TYPE_NAME")
5227/// - "TYPE_NAME" -> (default_schema, "TYPE_NAME")
5228fn parse_type_name(type_name: &str, default_schema: &str) -> (String, String) {
5229    let parts: Vec<&str> = type_name.split('.').collect();
5230    match parts.len() {
5231        1 => (default_schema.to_uppercase(), parts[0].to_uppercase()),
5232        2 => (parts[0].to_uppercase(), parts[1].to_uppercase()),
5233        _ => {
5234            // Multiple dots - take first as schema, rest as name
5235            (parts[0].to_uppercase(), parts[1..].join(".").to_uppercase())
5236        }
5237    }
5238}
5239
5240/// Convert an Oracle type name from data dictionary to OracleType enum
5241fn oracle_type_from_name(type_name: &str) -> crate::constants::OracleType {
5242    use crate::constants::OracleType;
5243
5244    match type_name.to_uppercase().as_str() {
5245        "NUMBER" => OracleType::Number,
5246        "INTEGER" | "INT" | "SMALLINT" => OracleType::Number,
5247        "FLOAT" | "REAL" | "DOUBLE PRECISION" => OracleType::BinaryDouble,
5248        "BINARY_FLOAT" => OracleType::BinaryFloat,
5249        "BINARY_DOUBLE" => OracleType::BinaryDouble,
5250        "VARCHAR2" | "VARCHAR" | "NVARCHAR2" => OracleType::Varchar,
5251        "CHAR" | "NCHAR" => OracleType::Char,
5252        "DATE" => OracleType::Date,
5253        "TIMESTAMP" => OracleType::Timestamp,
5254        "TIMESTAMP WITH TIME ZONE" => OracleType::TimestampTz,
5255        "TIMESTAMP WITH LOCAL TIME ZONE" => OracleType::TimestampLtz,
5256        "RAW" => OracleType::Raw,
5257        "BLOB" => OracleType::Blob,
5258        "CLOB" | "NCLOB" => OracleType::Clob,
5259        "BOOLEAN" | "PL/SQL BOOLEAN" => OracleType::Boolean,
5260        "ROWID" | "UROWID" => OracleType::Rowid,
5261        "XMLTYPE" => OracleType::Varchar, // Treat XMLType as string for now
5262        _ => OracleType::Varchar, // Default to VARCHAR for unknown types
5263    }
5264}
5265
5266#[cfg(test)]
5267mod tests {
5268    use super::*;
5269    use crate::row::Value;
5270
5271    #[test]
5272    fn test_query_options_default() {
5273        let opts = QueryOptions::default();
5274        assert_eq!(opts.prefetch_rows, 100);
5275        assert_eq!(opts.array_size, 100);
5276        assert!(!opts.auto_commit);
5277    }
5278
5279    #[test]
5280    fn test_query_result_empty() {
5281        let result = QueryResult::empty();
5282        assert!(result.is_empty());
5283        assert_eq!(result.column_count(), 0);
5284        assert_eq!(result.row_count(), 0);
5285        assert!(result.first().is_none());
5286    }
5287
5288    #[test]
5289    fn test_query_result_with_rows() {
5290        let columns = vec![ColumnInfo::new("ID", crate::constants::OracleType::Number)];
5291        let rows = vec![Row::new(vec![Value::Integer(1)])];
5292
5293        let result = QueryResult {
5294            columns,
5295            rows,
5296            rows_affected: 0,
5297            has_more_rows: false,
5298            cursor_id: 1,
5299        };
5300
5301        assert!(!result.is_empty());
5302        assert_eq!(result.column_count(), 1);
5303        assert_eq!(result.row_count(), 1);
5304        assert!(result.first().is_some());
5305        assert!(result.column_by_name("ID").is_some());
5306        assert!(result.column_by_name("id").is_some()); // Case insensitive
5307        assert_eq!(result.column_index("ID"), Some(0));
5308    }
5309
5310    #[test]
5311    fn test_server_info_default() {
5312        let info = ServerInfo::default();
5313        assert!(info.version.is_empty());
5314        assert_eq!(info.session_id, 0);
5315    }
5316
5317    #[test]
5318    fn test_connection_state_transitions() {
5319        assert_eq!(ConnectionState::Disconnected, ConnectionState::Disconnected);
5320        assert_ne!(ConnectionState::Connected, ConnectionState::Ready);
5321    }
5322
5323    #[test]
5324    fn test_query_result_iterator() {
5325        let rows = vec![
5326            Row::new(vec![Value::Integer(1)]),
5327            Row::new(vec![Value::Integer(2)]),
5328        ];
5329        let result = QueryResult {
5330            columns: vec![],
5331            rows,
5332            rows_affected: 0,
5333            has_more_rows: false,
5334            cursor_id: 0,
5335        };
5336
5337        let collected: Vec<_> = result.iter().collect();
5338        assert_eq!(collected.len(), 2);
5339    }
5340
5341    #[test]
5342    fn test_query_result_into_iterator() {
5343        let rows = vec![
5344            Row::new(vec![Value::Integer(1)]),
5345            Row::new(vec![Value::Integer(2)]),
5346        ];
5347        let result = QueryResult {
5348            columns: vec![],
5349            rows,
5350            rows_affected: 0,
5351            has_more_rows: false,
5352            cursor_id: 0,
5353        };
5354
5355        let collected: Vec<Row> = result.into_iter().collect();
5356        assert_eq!(collected.len(), 2);
5357    }
5358}