Skip to main content

oracle_rs/
connection.rs

1//! Oracle database connection
2//!
3//! This module provides the main `Connection` type for interacting with Oracle databases.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use oracle_rs::{Connection, Config};
9//!
10//! #[tokio::main]
11//! async fn main() -> oracle_rs::Result<()> {
12//!     // Create a connection
13//!     let conn = Connection::connect("localhost:1521/ORCLPDB1", "user", "password").await?;
14//!
15//!     // Execute a query
16//!     let rows = conn.query("SELECT * FROM employees WHERE department_id = :1", &[&10]).await?;
17//!
18//!     for row in rows {
19//!         println!("{:?}", row);
20//!     }
21//!
22//!     // Commit and close
23//!     conn.commit().await?;
24//!     conn.close().await?;
25//!     Ok(())
26//! }
27//! ```
28
29use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
30use std::sync::Arc;
31use bytes::Bytes;
32use tokio::io::{AsyncReadExt, AsyncWriteExt};
33use tokio::net::TcpStream;
34use tokio::sync::Mutex;
35
36use crate::batch::{BatchBinds, BatchResult};
37use crate::buffer::{ReadBuffer, WriteBuffer};
38use crate::transport::{TlsConfig, TlsOracleStream, connect_tls};
39use crate::capabilities::Capabilities;
40use crate::config::{Config, ServiceMethod};
41use crate::constants::{BindDirection, FetchOrientation, FunctionCode, MessageType, OracleType, PacketType, PACKET_HEADER_SIZE};
42use crate::cursor::{ScrollableCursor, ScrollResult};
43use crate::error::{Error, Result};
44use crate::implicit::{ImplicitResult, ImplicitResults};
45use crate::messages::{AcceptMessage, AuthMessage, AuthPhase, ConnectMessage, ExecuteMessage, ExecuteOptions, FetchMessage, LobOpMessage};
46use crate::packet::Packet;
47use crate::row::{Row, Value};
48use crate::statement::{BindParam, ColumnInfo, Statement, StatementType};
49use crate::types::{LobData, LobLocator, LobValue};
50use crate::statement_cache::StatementCache;
51
52/// Connection state
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ConnectionState {
55    /// Not connected
56    Disconnected,
57    /// TCP connection established
58    Connected,
59    /// Protocol negotiation complete
60    ProtocolNegotiated,
61    /// Data types negotiated
62    DataTypesNegotiated,
63    /// Fully authenticated and ready
64    Ready,
65    /// Connection is closed
66    Closed,
67}
68
69/// Options for query execution
70#[derive(Debug, Clone)]
71pub struct QueryOptions {
72    /// Number of rows to prefetch
73    pub prefetch_rows: u32,
74    /// Array size for batch operations
75    pub array_size: u32,
76    /// Whether to auto-commit after DML
77    pub auto_commit: bool,
78}
79
80impl Default for QueryOptions {
81    fn default() -> Self {
82        Self {
83            prefetch_rows: 100,
84            array_size: 100,
85            auto_commit: false,
86        }
87    }
88}
89
90/// Result set from a query
91#[derive(Debug)]
92pub struct QueryResult {
93    /// Column information
94    pub columns: Vec<ColumnInfo>,
95    /// Rows returned
96    pub rows: Vec<Row>,
97    /// Number of rows affected (for DML)
98    pub rows_affected: u64,
99    /// Whether there are more rows to fetch
100    pub has_more_rows: bool,
101    /// Cursor ID for subsequent fetches (needed for fetch_more)
102    pub cursor_id: u16,
103}
104
105impl QueryResult {
106    /// Create an empty query result
107    pub fn empty() -> Self {
108        Self {
109            columns: Vec::new(),
110            rows: Vec::new(),
111            rows_affected: 0,
112            has_more_rows: false,
113            cursor_id: 0,
114        }
115    }
116
117    /// Get the number of columns
118    pub fn column_count(&self) -> usize {
119        self.columns.len()
120    }
121
122    /// Get the number of rows
123    pub fn row_count(&self) -> usize {
124        self.rows.len()
125    }
126
127    /// Check if the result is empty
128    pub fn is_empty(&self) -> bool {
129        self.rows.is_empty()
130    }
131
132    /// Get a column by name
133    pub fn column_by_name(&self, name: &str) -> Option<&ColumnInfo> {
134        self.columns.iter().find(|c| c.name.eq_ignore_ascii_case(name))
135    }
136
137    /// Get column index by name
138    pub fn column_index(&self, name: &str) -> Option<usize> {
139        self.columns.iter().position(|c| c.name.eq_ignore_ascii_case(name))
140    }
141
142    /// Iterate over rows
143    pub fn iter(&self) -> impl Iterator<Item = &Row> {
144        self.rows.iter()
145    }
146
147    /// Get a single row (first row)
148    pub fn first(&self) -> Option<&Row> {
149        self.rows.first()
150    }
151}
152
153impl IntoIterator for QueryResult {
154    type Item = Row;
155    type IntoIter = std::vec::IntoIter<Row>;
156
157    fn into_iter(self) -> Self::IntoIter {
158        self.rows.into_iter()
159    }
160}
161
162/// Result from executing a PL/SQL block with OUT parameters
163#[derive(Debug)]
164pub struct PlsqlResult {
165    /// OUT parameter values indexed by position (0-based)
166    pub out_values: Vec<Value>,
167    /// Number of rows affected (if applicable)
168    pub rows_affected: u64,
169    /// Cursor ID (if the result contains a REF CURSOR)
170    pub cursor_id: Option<u16>,
171    /// Implicit result sets returned via DBMS_SQL.RETURN_RESULT
172    pub implicit_results: ImplicitResults,
173}
174
175impl PlsqlResult {
176    /// Create an empty PL/SQL result
177    pub fn empty() -> Self {
178        Self {
179            out_values: Vec::new(),
180            rows_affected: 0,
181            cursor_id: None,
182            implicit_results: ImplicitResults::new(),
183        }
184    }
185
186    /// Get an OUT value by position (0-based)
187    pub fn get(&self, index: usize) -> Option<&Value> {
188        self.out_values.get(index)
189    }
190
191    /// Get a string OUT value by position
192    pub fn get_string(&self, index: usize) -> Option<&str> {
193        self.out_values.get(index).and_then(|v| v.as_str())
194    }
195
196    /// Get an integer OUT value by position
197    pub fn get_integer(&self, index: usize) -> Option<i64> {
198        self.out_values.get(index).and_then(|v| v.as_i64())
199    }
200
201    /// Get a float OUT value by position
202    pub fn get_float(&self, index: usize) -> Option<f64> {
203        self.out_values.get(index).and_then(|v| v.as_f64())
204    }
205
206    /// Get a cursor ID from OUT value by position (for REF CURSOR)
207    pub fn get_cursor_id(&self, index: usize) -> Option<u16> {
208        self.out_values.get(index).and_then(|v| v.as_cursor_id())
209    }
210}
211
212/// Server information obtained during connection
213#[derive(Debug, Clone, Default)]
214pub struct ServerInfo {
215    /// Oracle version string
216    pub version: String,
217    /// Server banner
218    pub banner: String,
219    /// Session ID (SID)
220    pub session_id: u32,
221    /// Serial number
222    pub serial_number: u32,
223    /// Instance name
224    pub instance_name: Option<String>,
225    /// Service name
226    pub service_name: Option<String>,
227    /// Database name
228    pub database_name: Option<String>,
229    /// Negotiated protocol version
230    pub protocol_version: u16,
231    /// Whether server supports OOB (out of band) data
232    pub supports_oob: bool,
233}
234
235/// Stream type that can be either plain TCP or TLS-encrypted
236enum OracleStream {
237    /// Plain TCP connection
238    Plain(TcpStream),
239    /// TLS-encrypted connection
240    Tls(TlsOracleStream),
241}
242
243impl OracleStream {
244    async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
245        match self {
246            OracleStream::Plain(stream) => {
247                AsyncReadExt::read_exact(stream, buf).await?;
248                Ok(())
249            }
250            OracleStream::Tls(stream) => {
251                AsyncReadExt::read_exact(stream, buf).await?;
252                Ok(())
253            }
254        }
255    }
256
257    async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
258        match self {
259            OracleStream::Plain(stream) => stream.write_all(buf).await,
260            OracleStream::Tls(stream) => stream.write_all(buf).await,
261        }
262    }
263
264    async fn flush(&mut self) -> std::io::Result<()> {
265        match self {
266            OracleStream::Plain(stream) => stream.flush().await,
267            OracleStream::Tls(stream) => stream.flush().await,
268        }
269    }
270
271}
272
273/// Internal connection state shared across async operations
274struct ConnectionInner {
275    stream: Option<OracleStream>,
276    capabilities: Capabilities,
277    state: ConnectionState,
278    server_info: ServerInfo,
279    sdu_size: u16,
280    large_sdu: bool,
281    /// Sequence number for TTC messages (increments per message)
282    sequence_number: u8,
283    /// Statement cache for prepared statement reuse
284    statement_cache: Option<StatementCache>,
285}
286
287impl ConnectionInner {
288    fn new_with_cache(cache_size: usize) -> Self {
289        Self {
290            stream: None,
291            capabilities: Capabilities::default(),
292            state: ConnectionState::Disconnected,
293            server_info: ServerInfo::default(),
294            sdu_size: 8192,
295            large_sdu: false,
296            sequence_number: 0,
297            statement_cache: if cache_size > 0 {
298                Some(StatementCache::new(cache_size))
299            } else {
300                None
301            },
302        }
303    }
304
305    /// Get the next sequence number (auto-increments, wraps at 255 to 1)
306    fn next_sequence_number(&mut self) -> u8 {
307        self.sequence_number = self.sequence_number.wrapping_add(1);
308        if self.sequence_number == 0 {
309            self.sequence_number = 1;
310        }
311        self.sequence_number
312    }
313
314    async fn send(&mut self, data: &[u8]) -> Result<()> {
315        if let Some(stream) = &mut self.stream {
316            stream.write_all(data).await?;
317            stream.flush().await?;
318            Ok(())
319        } else {
320            Err(Error::ConnectionClosed)
321        }
322    }
323
324    /// Send a payload that may need to be split across multiple packets.
325    ///
326    /// This is used for large LOB writes and other operations where the payload
327    /// exceeds the SDU size. The payload is split into multiple DATA packets,
328    /// each with proper headers.
329    ///
330    /// # Arguments
331    /// * `payload` - The raw message payload (without packet header or data flags)
332    /// * `data_flags` - The data flags for the first packet (typically 0)
333    async fn send_multi_packet(&mut self, payload: &[u8], data_flags: u16) -> Result<()> {
334        let stream = self.stream.as_mut().ok_or(Error::ConnectionClosed)?;
335
336        // Calculate max payload per packet: SDU - header (8) - data flags (2)
337        let max_payload_per_packet = self.sdu_size as usize - PACKET_HEADER_SIZE - 2;
338
339        let mut offset = 0;
340        let mut is_first = true;
341
342        while offset < payload.len() {
343            let remaining = payload.len() - offset;
344            let chunk_size = std::cmp::min(remaining, max_payload_per_packet);
345            let is_last = offset + chunk_size >= payload.len();
346
347            // Build packet
348            let packet_len = PACKET_HEADER_SIZE + 2 + chunk_size; // header + data flags + payload
349            let mut packet = Vec::with_capacity(packet_len);
350
351            // Header
352            if self.large_sdu {
353                packet.extend_from_slice(&(packet_len as u32).to_be_bytes());
354            } else {
355                packet.extend_from_slice(&(packet_len as u16).to_be_bytes());
356                packet.extend_from_slice(&[0, 0]); // Checksum
357            }
358            packet.push(PacketType::Data as u8);
359            packet.push(0); // Flags
360            packet.extend_from_slice(&[0, 0]); // Header checksum
361
362            // Data flags - only include on first packet
363            if is_first {
364                packet.extend_from_slice(&data_flags.to_be_bytes());
365                is_first = false;
366            } else {
367                // Continuation packets still need data flags position but value is 0
368                packet.extend_from_slice(&0u16.to_be_bytes());
369            }
370
371            // Payload chunk
372            packet.extend_from_slice(&payload[offset..offset + chunk_size]);
373
374            // Send this packet
375            stream.write_all(&packet).await?;
376
377            offset += chunk_size;
378
379            // Don't flush until the last packet to improve performance
380            if is_last {
381                stream.flush().await?;
382            }
383        }
384
385        Ok(())
386    }
387
388    async fn receive(&mut self) -> Result<bytes::Bytes> {
389        if let Some(stream) = &mut self.stream {
390            // Read packet header first (always 8 bytes)
391            // large_sdu only affects how the length field is interpreted, not header size
392            let mut header_buf = vec![0u8; PACKET_HEADER_SIZE];
393            stream.read_exact(&mut header_buf).await?;
394
395            // Parse header to get payload length
396            // In large_sdu mode, first 4 bytes are length; otherwise first 2 bytes
397            let packet_len = if self.large_sdu {
398                u32::from_be_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]])
399                    as usize
400            } else {
401                u16::from_be_bytes([header_buf[0], header_buf[1]]) as usize
402            };
403
404            // Read remaining payload
405            let payload_len = packet_len.saturating_sub(PACKET_HEADER_SIZE);
406            let mut payload_buf = vec![0u8; payload_len];
407            if payload_len > 0 {
408                stream.read_exact(&mut payload_buf).await?;
409            }
410
411            // Combine header and payload
412            let mut full_packet = header_buf.clone();
413            full_packet.extend(payload_buf);
414
415            Ok(bytes::Bytes::from(full_packet))
416        } else {
417            Err(Error::ConnectionClosed)
418        }
419    }
420
421    /// Receive a complete response that may span multiple packets
422    ///
423    /// This method accumulates packets until the END_OF_RESPONSE flag is detected
424    /// in the data flags. It's used for operations like LOB reads that may return
425    /// data spanning multiple TNS packets.
426    ///
427    /// Returns the combined payload of all packets (excluding headers).
428    async fn receive_response(&mut self) -> Result<bytes::Bytes> {
429        use crate::constants::{data_flags, MessageType};
430
431        let mut accumulated_payload = Vec::new();
432        let mut is_first_packet = true;
433
434        loop {
435            let packet = self.receive().await?;
436
437            if packet.len() < PACKET_HEADER_SIZE {
438                return Err(Error::Protocol("Packet too small".to_string()));
439            }
440
441            // Check packet type - only DATA packets can be accumulated
442            let packet_type = packet[4];
443            if packet_type != PacketType::Data as u8 {
444                // Non-DATA packet (e.g., MARKER) - return as-is for special handling
445                return Ok(packet);
446            }
447
448            // Get payload (everything after the 8-byte header)
449            let payload = &packet[PACKET_HEADER_SIZE..];
450
451            if payload.len() < 2 {
452                return Err(Error::Protocol("DATA packet payload too small".to_string()));
453            }
454
455            // Read data flags (first 2 bytes of payload)
456            let data_flags_value = u16::from_be_bytes([payload[0], payload[1]]);
457
458            // Check for end of response - Python checks both flags and message type
459            let has_end_flag = (data_flags_value & data_flags::END_OF_RESPONSE) != 0;
460            let has_eof_flag = (data_flags_value & data_flags::EOF) != 0;
461
462            // Also check for EndOfResponse message type (header + 3 bytes with msg type 29)
463            let has_end_message = payload.len() == 3
464                && payload[2] == MessageType::EndOfResponse as u8;
465
466            // Accumulate payload first
467            if is_first_packet {
468                // First packet: include data flags in accumulated payload
469                accumulated_payload.extend_from_slice(payload);
470                is_first_packet = false;
471            } else {
472                // Subsequent packets: skip the data flags, append only the message data
473                accumulated_payload.extend_from_slice(&payload[2..]);
474            }
475
476            // Check for end of response using data flags from this packet
477            let is_end_of_response = has_end_flag || has_eof_flag || has_end_message;
478
479            // If data flags don't indicate end, scan the ACCUMULATED message data
480            // for terminal messages. We scan accumulated data (not just current packet)
481            // because messages can span packet boundaries.
482            let has_terminal_message = if !is_end_of_response && accumulated_payload.len() > 2 {
483                self.scan_for_terminal_message(&accumulated_payload[2..])
484            } else {
485                false
486            };
487
488            // Check if this is the last packet
489            if is_end_of_response || has_terminal_message {
490                break;
491            }
492        }
493
494        // Build a synthetic packet with combined payload
495        let total_len = PACKET_HEADER_SIZE + accumulated_payload.len();
496        let mut result = Vec::with_capacity(total_len);
497
498        // Build header
499        if self.large_sdu {
500            result.extend_from_slice(&(total_len as u32).to_be_bytes());
501        } else {
502            result.extend_from_slice(&(total_len as u16).to_be_bytes());
503            result.extend_from_slice(&[0, 0]); // Checksum
504        }
505        result.push(PacketType::Data as u8);
506        result.push(0); // Flags
507        result.extend_from_slice(&[0, 0]); // Header checksum
508
509        // Add combined payload
510        result.extend_from_slice(&accumulated_payload);
511
512        Ok(bytes::Bytes::from(result))
513    }
514
515    /// Scan message data for terminal message types (ERROR or END_OF_RESPONSE)
516    /// that indicate the response is complete.
517    ///
518    /// This is needed because Oracle doesn't always set the END_OF_RESPONSE flag
519    /// in the data flags for LOB operations. Instead, we must detect the terminal
520    /// message by parsing the message stream.
521    ///
522    /// NOTE: This is conservative - it only returns true if we can definitively
523    /// identify a terminal message. We avoid false positives by not scanning
524    /// raw byte values (which could match message type values by coincidence).
525    fn scan_for_terminal_message(&self, data: &[u8]) -> bool {
526        use crate::buffer::ReadBuffer;
527        use crate::constants::MessageType;
528
529        if data.is_empty() {
530            return false;
531        }
532
533        // Try to parse the message stream and look for ERROR or END_OF_RESPONSE
534        let mut buf = ReadBuffer::from_slice(data);
535
536        while buf.remaining() > 0 {
537            let msg_type = match buf.read_u8() {
538                Ok(t) => t,
539                Err(_) => return false, // Can't read, assume incomplete
540            };
541
542            // END_OF_RESPONSE is a standalone message with no additional data
543            if msg_type == MessageType::EndOfResponse as u8 {
544                return true;
545            }
546
547            // ERROR message indicates end of response for older Oracle
548            if msg_type == MessageType::Error as u8 {
549                // Error message found - this indicates end of response
550                return true;
551            }
552
553            // STATUS message also indicates end of response
554            if msg_type == MessageType::Status as u8 {
555                return true;
556            }
557
558            // LOB_DATA message - skip the data
559            if msg_type == MessageType::LobData as u8 {
560                // Read length-prefixed data and skip it
561                match buf.read_raw_bytes_chunked() {
562                    Ok(_) => continue,
563                    Err(_) => return false, // Incomplete LOB data, need more packets
564                }
565            }
566
567            // PARAMETER message (8) - this contains the updated locator and amount.
568            // For LOB write responses, PARAMETER is the first message and the response
569            // is relatively small (locator + error info). We can safely scan for
570            // ERROR/END_OF_RESPONSE bytes because the locator doesn't contain arbitrary
571            // binary data that would false-positive.
572            //
573            // For LOB read responses, LobData comes first and contains the actual data,
574            // which might contain bytes that match ERROR (4) or END_OF_RESPONSE (29).
575            // But since we skip LobData content, by the time we reach PARAMETER,
576            // the remaining data is just locator + error info.
577            if msg_type == MessageType::Parameter as u8 {
578                let remaining = buf.remaining_bytes();
579                // Check if ERROR (4) or END_OF_RESPONSE (29) appears in remaining bytes
580                // This is safe because PARAMETER data (locator + amount) doesn't contain
581                // arbitrary binary data that would false-positive.
582                if remaining.contains(&(MessageType::Error as u8))
583                    || remaining.contains(&(MessageType::EndOfResponse as u8))
584                {
585                    return true;
586                }
587                // If no terminal marker found, response might be incomplete
588                return false;
589            }
590
591            // For other unknown message types, we can't determine the end
592            return false;
593        }
594
595        false
596    }
597
598    /// Send a marker packet with the specified marker type
599    async fn send_marker(&mut self, marker_type: u8) -> Result<()> {
600        let mut buf = WriteBuffer::with_capacity(16);
601
602        // Build marker packet header
603        // Marker packet structure: [length][0x00][0x00][0x00][0x0c][flags][0x00][0x00] + payload
604        // Payload: [0x01][0x00][marker_type]
605        let payload_len = 3; // 0x01, 0x00, marker_type
606        let total_len = (PACKET_HEADER_SIZE + payload_len) as u16;
607
608        // Header
609        buf.write_u16_be(total_len)?;
610        buf.write_u16_be(0)?; // zeros in large_sdu position
611        buf.write_u8(PacketType::Marker as u8)?;
612        buf.write_u8(0)?; // flags
613        buf.write_u16_be(0)?; // reserved
614
615        // Payload
616        buf.write_u8(0x01)?; // constant
617        buf.write_u8(0x00)?; // constant
618        buf.write_u8(marker_type)?;
619
620        self.send(buf.as_slice()).await
621    }
622
623    /// Handle the reset protocol after receiving a MARKER packet
624    /// This sends a reset marker, waits for the reset response, then returns the error packet
625    /// Returns Err if the connection is closed after reset (some Oracle versions do this)
626    async fn handle_marker_reset(&mut self) -> Result<bytes::Bytes> {
627        const MARKER_TYPE_RESET: u8 = 2;
628
629        // Send reset marker
630        self.send_marker(MARKER_TYPE_RESET).await?;
631
632        // Read packets until we get a reset marker back
633        loop {
634            let packet = self.receive().await?;
635            if packet.len() < PACKET_HEADER_SIZE {
636                return Err(Error::Protocol("Invalid packet received".to_string()));
637            }
638
639            let packet_type = packet[4];
640
641            if packet_type == PacketType::Marker as u8 {
642                // Check if it's a reset marker
643                if packet.len() >= PACKET_HEADER_SIZE + 3 {
644                    let marker_type = packet[PACKET_HEADER_SIZE + 2];
645                    if marker_type == MARKER_TYPE_RESET {
646                        break;
647                    }
648                }
649            } else {
650                // Non-marker packet received unexpectedly during reset wait
651                return Ok(packet);
652            }
653        }
654
655        // Try to read the error packet (may need to skip additional marker packets first)
656        // Note: Some Oracle versions (like Oracle Free) may close the connection after reset
657        // instead of sending an error packet
658        loop {
659            match self.receive().await {
660                Ok(packet) => {
661                    let packet_type = packet[4];
662
663                    if packet_type != PacketType::Marker as u8 {
664                        // This should be the error data packet
665                        return Ok(packet);
666                    }
667                    // Skip additional marker packets
668                }
669                Err(_) => {
670                    // Connection closed after reset - Oracle Free and some versions
671                    // close the connection instead of sending the error details.
672                    // This typically happens when:
673                    // - Table or view doesn't exist
674                    // - Insufficient privileges to access the object
675                    // - Invalid SQL syntax
676                    return Err(Error::ConnectionClosedByServer(
677                        "Query failed - Oracle closed the connection without providing error details. \
678                         This typically indicates insufficient privileges or the object doesn't exist.".to_string()
679                    ));
680                }
681            }
682        }
683    }
684}
685
686/// An Oracle database connection.
687///
688/// This is the main type for interacting with Oracle databases. It provides
689/// methods for executing queries, DML statements, PL/SQL blocks, and managing
690/// transactions.
691///
692/// Connections are created using [`Connection::connect`] or
693/// [`Connection::connect_with_config`]. For connection pooling, use the
694/// `deadpool-oracle` crate.
695///
696/// # Example
697///
698/// ```rust,no_run
699/// use oracle_rs::{Config, Connection, Value};
700///
701/// # async fn example() -> oracle_rs::Result<()> {
702/// // Create a connection
703/// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
704/// let conn = Connection::connect_with_config(config).await?;
705///
706/// // Execute a query
707/// let result = conn.query("SELECT * FROM employees WHERE dept_id = :1", &[10.into()]).await?;
708/// for row in &result.rows {
709///     let name = row.get_by_name("name").and_then(|v| v.as_str()).unwrap_or("");
710///     println!("Employee: {}", name);
711/// }
712///
713/// // Execute DML with transaction
714/// conn.execute("INSERT INTO logs (msg) VALUES (:1)", &["Hello".into()]).await?;
715/// conn.commit().await?;
716///
717/// // Close the connection
718/// conn.close().await?;
719/// # Ok(())
720/// # }
721/// ```
722///
723/// # Thread Safety
724///
725/// `Connection` is `Send` and `Sync`, but operations are serialized internally
726/// via a mutex. For parallel query execution, use multiple connections (e.g.,
727/// via a connection pool).
728pub struct Connection {
729    inner: Arc<Mutex<ConnectionInner>>,
730    config: Config,
731    closed: AtomicBool,
732    id: u32,
733}
734
735// Connection ID counter
736static CONNECTION_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
737
738impl Connection {
739    /// Create a new connection to an Oracle database
740    ///
741    /// # Arguments
742    ///
743    /// * `connect_string` - Connection string in EZConnect format (e.g., "host:port/service")
744    /// * `username` - Database username
745    /// * `password` - Database password
746    ///
747    /// # Example
748    ///
749    /// ```rust,ignore
750    /// let conn = Connection::connect("localhost:1521/ORCLPDB1", "scott", "tiger").await?;
751    /// ```
752    pub async fn connect(connect_string: &str, username: &str, password: &str) -> Result<Self> {
753        let mut config: Config = connect_string.parse()?;
754        config.username = username.to_string();
755        config.set_password(password);
756        Self::connect_with_config(config).await
757    }
758
759    /// Create a new connection using a [`Config`].
760    ///
761    /// This is the preferred way to create connections as it gives full control
762    /// over connection parameters including TLS, timeouts, and statement caching.
763    ///
764    /// # Example
765    ///
766    /// ```rust,no_run
767    /// use oracle_rs::{Config, Connection};
768    ///
769    /// # async fn example() -> oracle_rs::Result<()> {
770    /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password")
771    ///     .with_statement_cache_size(50);
772    ///
773    /// let conn = Connection::connect_with_config(config).await?;
774    /// # Ok(())
775    /// # }
776    /// ```
777    pub async fn connect_with_config(config: Config) -> Result<Self> {
778        let id = CONNECTION_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
779
780        // Create TCP connection
781        let addr = config.socket_addr();
782        let tcp_stream = TcpStream::connect(&addr).await?;
783
784        // Set TCP options
785        tcp_stream.set_nodelay(true)?;
786
787        // Wrap with TLS if configured
788        let stream = if config.is_tls_enabled() {
789            let tls_config = config.tls_config.as_ref()
790                .cloned()
791                .unwrap_or_else(TlsConfig::new);
792
793            let tls_stream = connect_tls(tcp_stream, &config.host, &tls_config).await?;
794            OracleStream::Tls(tls_stream)
795        } else {
796            OracleStream::Plain(tcp_stream)
797        };
798
799        let mut inner = ConnectionInner::new_with_cache(config.stmtcachesize);
800        inner.stream = Some(stream);
801        inner.state = ConnectionState::Connected;
802
803        let conn = Connection {
804            inner: Arc::new(Mutex::new(inner)),
805            config,
806            closed: AtomicBool::new(false),
807            id,
808        };
809
810        // Perform connection handshake
811        conn.perform_handshake().await?;
812
813        Ok(conn)
814    }
815
816    /// Get the connection ID
817    pub fn id(&self) -> u32 {
818        self.id
819    }
820
821    /// Check if the connection is closed
822    pub fn is_closed(&self) -> bool {
823        self.closed.load(Ordering::Relaxed)
824    }
825
826    /// Mark the connection as closed
827    ///
828    /// This should be called when the underlying connection is detected as broken.
829    /// Once marked closed, `is_closed()` returns true and operations will fail fast.
830    pub fn mark_closed(&self) {
831        self.closed.store(true, Ordering::Relaxed);
832    }
833
834    /// Helper to mark connection as closed if the result is a connection error
835    fn handle_result<T>(&self, result: Result<T>) -> Result<T> {
836        if let Err(ref e) = result {
837            if e.is_connection_error() {
838                self.mark_closed();
839            }
840        }
841        result
842    }
843
844    /// Get server information
845    pub async fn server_info(&self) -> ServerInfo {
846        let inner = self.inner.lock().await;
847        inner.server_info.clone()
848    }
849
850    /// Get the current connection state
851    pub async fn state(&self) -> ConnectionState {
852        let inner = self.inner.lock().await;
853        inner.state
854    }
855
856    /// Perform the connection handshake
857    async fn perform_handshake(&self) -> Result<()> {
858        // Step 1: Send CONNECT packet and parse ACCEPT response
859        self.send_connect_packet().await?;
860
861        // Step 2: OOB check (required for protocol version >= 318 AND server supports OOB)
862        // Both conditions must be met - server must have indicated OOB support in ACCEPT
863        let needs_oob_check = {
864            let inner = self.inner.lock().await;
865            inner.server_info.protocol_version >= crate::constants::version::MIN_OOB_CHECK
866                && inner.server_info.supports_oob
867        };
868        if needs_oob_check {
869            self.send_oob_check().await?;
870        }
871
872        // Step 3: Protocol negotiation
873        self.negotiate_protocol().await?;
874
875        // Step 4: Data types negotiation
876        self.negotiate_data_types().await?;
877
878        // Step 5: Authentication
879        self.authenticate().await?;
880
881        Ok(())
882    }
883
884    /// Send OOB (Out of Band) check
885    /// Required for protocol version >= 318
886    async fn send_oob_check(&self) -> Result<()> {
887        let mut inner = self.inner.lock().await;
888        let large_sdu = inner.large_sdu;
889
890        // Step 1: Send raw byte "!" (0x21) for OOB check
891        inner.send(&[0x21]).await?;
892
893        // Step 2: Send MARKER packet with Reset
894        let marker_payload = [1u8, 0u8, crate::constants::MarkerType::Reset as u8];
895        let mut packet_buf = WriteBuffer::new();
896
897        if large_sdu {
898            packet_buf.write_u32_be((PACKET_HEADER_SIZE + marker_payload.len()) as u32)?;
899        } else {
900            packet_buf.write_u16_be((PACKET_HEADER_SIZE + marker_payload.len()) as u16)?;
901            packet_buf.write_u16_be(0)?; // Checksum
902        }
903        packet_buf.write_u8(PacketType::Marker as u8)?;
904        packet_buf.write_u8(0)?; // Flags
905        packet_buf.write_u16_be(0)?; // Header checksum
906        packet_buf.write_bytes(&marker_payload)?;
907
908        inner.send(&packet_buf.freeze()).await?;
909
910        // Step 3: Wait for OOB reset response
911        // The server sends back a MARKER packet or reset acknowledgment
912        let response = inner.receive().await?;
913
914        // Validate response - should be a MARKER packet type (12)
915        if response.len() > 4 && response[4] == PacketType::Marker as u8 {
916            Ok(())
917        } else {
918            // Server might just acknowledge without a specific packet
919            // This is acceptable in some Oracle versions
920            Ok(())
921        }
922    }
923
924    /// Send the initial CONNECT packet
925    async fn send_connect_packet(&self) -> Result<()> {
926        let mut inner = self.inner.lock().await;
927
928        // Build connect packet using ConnectMessage for proper packet format
929        let connect_msg = ConnectMessage::from_config(&self.config);
930        let (connect_packet, continuation) = connect_msg.build_with_continuation()?;
931
932        // Send the CONNECT packet
933        inner.send(&connect_packet).await?;
934
935        // If we have a continuation DATA packet (for large connect strings), send it
936        if let Some(ref data_packet) = continuation {
937            inner.send(data_packet).await?;
938        }
939
940        const MAX_RESENDS: u8 = 3;
941        let mut resend_count: u8 = 0;
942
943        loop {
944            // Wait for response
945            let response = inner.receive().await?;
946
947            // Parse response packet type
948            if response.len() < PACKET_HEADER_SIZE {
949                return Err(Error::PacketTooShort {
950                    expected: PACKET_HEADER_SIZE,
951                    actual: response.len(),
952                });
953            }
954
955            let packet_type = response[4];
956
957            match packet_type {
958                2 => {
959                    // ACCEPT - parse the accept message to get protocol version and capabilities
960                    let packet = Packet::from_bytes(response)?;
961                    let accept = AcceptMessage::parse(&packet)?;
962
963                    // Set large_sdu mode if protocol version >= 315
964                    inner.large_sdu = accept.uses_large_sdu();
965
966                    // Update server info
967                    inner.server_info.protocol_version = accept.protocol_version;
968                    inner.server_info.supports_oob = accept.supports_oob;
969                    inner.sdu_size = accept.sdu.min(65535) as u16;
970
971                    inner.state = ConnectionState::Connected;
972                    return Ok(());
973                }
974                4 => {
975                    // REFUSE
976                    let mut buf = ReadBuffer::new(response.slice(PACKET_HEADER_SIZE..));
977                    let _reason = buf.read_u8()?;
978                    let _user_reason = buf.read_u8()?;
979
980                    return Err(Error::ConnectionRefused {
981                        error_code: None,
982                        message: Some("Connection refused by server".to_string()),
983                    });
984                }
985                5 => {
986                    // REDIRECT
987                    return Err(Error::ConnectionRedirect(
988                        "redirect not implemented".to_string(),
989                    ));
990                }
991                11 => {
992                    // RESEND - server requests retransmission of the connect packet
993                    resend_count += 1;
994                    if resend_count > MAX_RESENDS {
995                        return Err(Error::ProtocolError(
996                            "Server requested too many resends during connect".to_string(),
997                        ));
998                    }
999                    inner.send(&connect_packet).await?;
1000                    if let Some(ref data_packet) = continuation {
1001                        inner.send(data_packet).await?;
1002                    }
1003                }
1004                _ => {
1005                    return Err(Error::ProtocolError(format!(
1006                        "Unexpected packet type during connect: {}",
1007                        packet_type,
1008                    )));
1009                }
1010            }
1011        }
1012    }
1013
1014    /// Negotiate protocol version and capabilities
1015    async fn negotiate_protocol(&self) -> Result<()> {
1016        use crate::messages::ProtocolMessage;
1017
1018        let mut inner = self.inner.lock().await;
1019        let large_sdu = inner.large_sdu;
1020
1021
1022        // Build protocol request (includes header)
1023        let protocol_msg = ProtocolMessage::new();
1024        let packet = protocol_msg.build_request(large_sdu)?;
1025        inner.send(&packet).await?;
1026
1027        // Receive response
1028        let response = inner.receive().await?;
1029
1030        // Validate packet type (at offset 4 for both SDU modes)
1031        if response.len() <= 4 || response[4] != PacketType::Data as u8 {
1032            return Err(Error::ProtocolError("Protocol negotiation failed".to_string()));
1033        }
1034
1035        // Parse the Protocol response to extract server capabilities
1036        // The payload starts after the 8-byte header
1037        let payload = &response[PACKET_HEADER_SIZE..];
1038        let mut protocol_msg = ProtocolMessage::new();
1039        protocol_msg.parse_response(payload, &mut inner.capabilities)?;
1040
1041        // Update server info with banner
1042        if let Some(banner) = &protocol_msg.server_banner {
1043            inner.server_info.banner = banner.clone();
1044        }
1045
1046        inner.state = ConnectionState::ProtocolNegotiated;
1047        Ok(())
1048    }
1049
1050    /// Negotiate data types
1051    async fn negotiate_data_types(&self) -> Result<()> {
1052        use crate::messages::DataTypesMessage;
1053
1054        let mut inner = self.inner.lock().await;
1055        let large_sdu = inner.large_sdu;
1056
1057
1058        // Build data types request using DataTypesMessage (includes all ~320 data types)
1059        let data_types_msg = DataTypesMessage::new();
1060        let packet = data_types_msg.build_request(&inner.capabilities, large_sdu)?;
1061        inner.send(&packet).await?;
1062
1063        // Receive response
1064        let response = inner.receive().await?;
1065
1066        // Basic validation - packet type is at offset 4 regardless of large_sdu
1067        if response.len() > 4 && response[4] == PacketType::Data as u8 {
1068            inner.state = ConnectionState::DataTypesNegotiated;
1069            Ok(())
1070        } else {
1071            Err(Error::ProtocolError("Data types negotiation failed".to_string()))
1072        }
1073    }
1074
1075    /// Perform authentication
1076    async fn authenticate(&self) -> Result<()> {
1077
1078        let service_name = match &self.config.service {
1079            ServiceMethod::ServiceName(name) => name.clone(),
1080            ServiceMethod::Sid(sid) => sid.clone(),
1081        };
1082
1083        let mut auth = AuthMessage::new(
1084            &self.config.username,
1085            self.config.password().as_bytes(),
1086            &service_name,
1087        );
1088
1089        // Phase one: send username and session info
1090        {
1091            let mut inner = self.inner.lock().await;
1092            let large_sdu = inner.large_sdu;
1093            let request = auth.build_request(&inner.capabilities, large_sdu)?;
1094            inner.send(&request).await?;
1095
1096            let response = inner.receive().await?;
1097            if response.len() <= PACKET_HEADER_SIZE {
1098                return Err(Error::Protocol("Empty auth response".to_string()));
1099            }
1100
1101            // Check for error message type
1102            if response.len() > PACKET_HEADER_SIZE + 2 {
1103                let msg_type = response[PACKET_HEADER_SIZE + 2];
1104                if msg_type == MessageType::Error as u8 {
1105                    return Err(Error::AuthenticationFailed(
1106                        "Server rejected authentication phase one".to_string(),
1107                    ));
1108                }
1109            }
1110
1111            auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1112        }
1113
1114        // Phase two: send encrypted password
1115        if auth.phase() == AuthPhase::Two {
1116            let mut inner = self.inner.lock().await;
1117            let large_sdu = inner.large_sdu;
1118            let request = auth.build_request(&inner.capabilities, large_sdu)?;
1119            inner.send(&request).await?;
1120
1121            let response = inner.receive().await?;
1122            if response.len() <= PACKET_HEADER_SIZE {
1123                return Err(Error::Protocol("Empty auth phase two response".to_string()));
1124            }
1125
1126            // Check for error message type or marker
1127            let packet_type = response[4];
1128            if packet_type == 12 {
1129                // Marker - authentication failed
1130                return Err(Error::AuthenticationFailed("Server sent MARKER - authentication rejected".to_string()));
1131            }
1132
1133            if response.len() > PACKET_HEADER_SIZE + 2 {
1134                let msg_type = response[PACKET_HEADER_SIZE + 2];
1135                if msg_type == MessageType::Error as u8 {
1136                    return Err(Error::InvalidCredentials);
1137                }
1138            }
1139
1140            auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1141        }
1142
1143        // Verify authentication completed
1144        if !auth.is_complete() {
1145            return Err(Error::AuthenticationFailed(
1146                "Authentication did not complete".to_string(),
1147            ));
1148        }
1149
1150        // Store combo key for later use (encrypted data)
1151        let mut inner = self.inner.lock().await;
1152        if let Some(combo_key) = auth.combo_key() {
1153            inner.capabilities.combo_key = Some(combo_key.to_vec());
1154        }
1155        // Auth used sequence numbers 1 and 2, set to 2 so next is 3
1156        inner.sequence_number = 2;
1157        inner.state = ConnectionState::Ready;
1158
1159        Ok(())
1160    }
1161
1162    /// Execute a SQL statement and return the result
1163    ///
1164    /// # Arguments
1165    ///
1166    /// * `sql` - SQL statement to execute
1167    /// * `params` - Bind parameters (use `Value::Integer`, `Value::String`, etc.)
1168    ///
1169    /// # Example
1170    ///
1171    /// ```rust,ignore
1172    /// use oracle_rs::Value;
1173    ///
1174    /// // Query with bind parameters
1175    /// let result = conn.execute(
1176    ///     "SELECT * FROM employees WHERE department_id = :1",
1177    ///     &[Value::Integer(10)]
1178    /// ).await?;
1179    ///
1180    /// // DML with bind parameters
1181    /// let result = conn.execute(
1182    ///     "UPDATE employees SET salary = :1 WHERE employee_id = :2",
1183    ///     &[Value::Integer(50000), Value::Integer(100)]
1184    /// ).await?;
1185    /// println!("Rows affected: {}", result.rows_affected);
1186    /// ```
1187    pub async fn execute(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1188        self.ensure_ready().await?;
1189
1190        // Check statement cache for existing prepared statement
1191        let (statement, from_cache) = {
1192            let mut inner = self.inner.lock().await;
1193            if let Some(ref mut cache) = inner.statement_cache {
1194                if let Some(cached_stmt) = cache.get(sql) {
1195                    tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement (execute)");
1196                    (cached_stmt, true)
1197                } else {
1198                    (Statement::new(sql), false)
1199                }
1200            } else {
1201                (Statement::new(sql), false)
1202            }
1203        };
1204
1205        let result = match statement.statement_type() {
1206            StatementType::Query => self.execute_query_with_params(&statement, params).await,
1207            _ => self.execute_dml_with_params(&statement, params).await,
1208        };
1209
1210        // Return statement to cache or cache it for the first time
1211        match &result {
1212            Ok(query_result) => {
1213                let mut inner = self.inner.lock().await;
1214                if let Some(ref mut cache) = inner.statement_cache {
1215                    let should_close_cursor = if statement.statement_type() == StatementType::Query {
1216                        !query_result.has_more_rows
1217                    } else {
1218                        true // DML/DDL/PL-SQL: always close
1219                    };
1220
1221                    if from_cache {
1222                        cache.return_statement(sql);
1223                        if should_close_cursor {
1224                            cache.mark_cursor_closed(sql);
1225                        }
1226                    } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1227                        let mut stmt_to_cache = statement.clone();
1228                        stmt_to_cache.set_cursor_id(query_result.cursor_id);
1229                        stmt_to_cache.set_executed(true);
1230                        cache.put(sql.to_string(), stmt_to_cache);
1231                        if should_close_cursor {
1232                            cache.mark_cursor_closed(sql);
1233                        }
1234                    }
1235                }
1236            }
1237            Err(_) => {
1238                if from_cache {
1239                    let mut inner = self.inner.lock().await;
1240                    if let Some(ref mut cache) = inner.statement_cache {
1241                        cache.return_statement(sql);
1242                        cache.mark_cursor_closed(sql);
1243                    }
1244                }
1245            }
1246        }
1247
1248        result
1249    }
1250
1251    /// Execute a query and return rows
1252    ///
1253    /// # Example
1254    ///
1255    /// ```rust,ignore
1256    /// use oracle_rs::Value;
1257    ///
1258    /// let result = conn.query(
1259    ///     "SELECT * FROM employees WHERE salary > :1",
1260    ///     &[Value::Integer(50000)]
1261    /// ).await?;
1262    /// ```
1263    pub async fn query(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1264        self.ensure_ready().await?;
1265
1266        // Check statement cache for existing prepared statement
1267        let (statement, from_cache) = {
1268            let mut inner = self.inner.lock().await;
1269            if let Some(ref mut cache) = inner.statement_cache {
1270                if let Some(cached_stmt) = cache.get(sql) {
1271                    tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement");
1272                    (cached_stmt, true)
1273                } else {
1274                    (Statement::new(sql), false)
1275                }
1276            } else {
1277                (Statement::new(sql), false)
1278            }
1279        };
1280
1281        // If using cached statement, save the columns (Oracle won't resend on reexecute)
1282        let cached_columns = if from_cache {
1283            Some(statement.columns().to_vec())
1284        } else {
1285            None
1286        };
1287
1288        let mut result = self.execute_query_with_params(&statement, params).await;
1289
1290        // For cached statements, restore columns if Oracle didn't send them
1291        if let (Ok(ref mut query_result), Some(columns)) = (&mut result, cached_columns) {
1292            if query_result.columns.is_empty() && !columns.is_empty() {
1293                query_result.columns = columns;
1294            }
1295        }
1296
1297        // Return statement to cache or cache it for the first time
1298        match &result {
1299            Ok(query_result) => {
1300                let mut inner = self.inner.lock().await;
1301                if let Some(ref mut cache) = inner.statement_cache {
1302                    if from_cache {
1303                        cache.return_statement(sql);
1304                        if !query_result.has_more_rows {
1305                            cache.mark_cursor_closed(sql);
1306                        }
1307                    } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1308                        let mut stmt_to_cache = statement.clone();
1309                        stmt_to_cache.set_cursor_id(query_result.cursor_id);
1310                        stmt_to_cache.set_executed(true);
1311                        stmt_to_cache.set_columns(query_result.columns.clone());
1312                        cache.put(sql.to_string(), stmt_to_cache);
1313                        if !query_result.has_more_rows {
1314                            cache.mark_cursor_closed(sql);
1315                        }
1316                    }
1317                }
1318            }
1319            Err(_) => {
1320                if from_cache {
1321                    let mut inner = self.inner.lock().await;
1322                    if let Some(ref mut cache) = inner.statement_cache {
1323                        cache.return_statement(sql);
1324                        cache.mark_cursor_closed(sql);
1325                    }
1326                }
1327            }
1328        }
1329
1330        self.handle_result(result)
1331    }
1332
1333    /// Execute DML (INSERT, UPDATE, DELETE) and return rows affected
1334    pub async fn execute_dml_sql(&self, sql: &str, params: &[Value]) -> Result<u64> {
1335        self.ensure_ready().await?;
1336
1337        // Check statement cache for existing prepared statement
1338        let (statement, from_cache) = {
1339            let mut inner = self.inner.lock().await;
1340            if let Some(ref mut cache) = inner.statement_cache {
1341                if let Some(cached_stmt) = cache.get(sql) {
1342                    tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached DML statement");
1343                    (cached_stmt, true)
1344                } else {
1345                    (Statement::new(sql), false)
1346                }
1347            } else {
1348                (Statement::new(sql), false)
1349            }
1350        };
1351
1352        let result = self.execute_dml_with_params(&statement, params).await;
1353
1354        // Return statement to cache or cache it for the first time
1355        // DML cursors are always closed after execution (no fetch phase)
1356        match &result {
1357            Ok(query_result) => {
1358                let mut inner = self.inner.lock().await;
1359                if let Some(ref mut cache) = inner.statement_cache {
1360                    if from_cache {
1361                        cache.return_statement(sql);
1362                        cache.mark_cursor_closed(sql);
1363                    } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1364                        let mut stmt_to_cache = statement.clone();
1365                        stmt_to_cache.set_cursor_id(query_result.cursor_id);
1366                        stmt_to_cache.set_executed(true);
1367                        cache.put(sql.to_string(), stmt_to_cache);
1368                        cache.mark_cursor_closed(sql);
1369                    }
1370                }
1371            }
1372            Err(_) => {
1373                if from_cache {
1374                    let mut inner = self.inner.lock().await;
1375                    if let Some(ref mut cache) = inner.statement_cache {
1376                        cache.return_statement(sql);
1377                        cache.mark_cursor_closed(sql);
1378                    }
1379                }
1380            }
1381        }
1382
1383        self.handle_result(result).map(|r| r.rows_affected)
1384    }
1385
1386    /// Execute a PL/SQL block with IN/OUT/INOUT parameters
1387    ///
1388    /// This method allows execution of PL/SQL anonymous blocks or procedure calls
1389    /// that have OUT or IN OUT parameters. The `params` slice specifies the direction
1390    /// and type of each bind parameter.
1391    ///
1392    /// # Arguments
1393    ///
1394    /// * `sql` - The PL/SQL block or procedure call
1395    /// * `params` - The bind parameters with direction information
1396    ///
1397    /// # Example
1398    ///
1399    /// ```rust,ignore
1400    /// use oracle_rs::{Connection, BindParam, OracleType, Value};
1401    ///
1402    /// // Call a procedure with IN and OUT parameters
1403    /// let result = conn.execute_plsql(
1404    ///     "BEGIN get_employee_name(:1, :2); END;",
1405    ///     &[
1406    ///         BindParam::input(Value::Integer(100)),           // IN: employee_id
1407    ///         BindParam::output(OracleType::Varchar, 100),     // OUT: employee_name
1408    ///     ]
1409    /// ).await?;
1410    ///
1411    /// // Get the OUT parameter value
1412    /// let name = result.get_string(0).unwrap_or("Unknown");
1413    /// println!("Employee name: {}", name);
1414    /// ```
1415    ///
1416    /// # REF CURSOR Example
1417    ///
1418    /// ```rust,ignore
1419    /// use oracle_rs::{Connection, BindParam, Value};
1420    ///
1421    /// // Call a procedure that returns a REF CURSOR
1422    /// let result = conn.execute_plsql(
1423    ///     "BEGIN OPEN :1 FOR SELECT * FROM employees; END;",
1424    ///     &[BindParam::output_cursor()]
1425    /// ).await?;
1426    ///
1427    /// // Get the cursor ID and fetch rows
1428    /// if let Some(cursor_id) = result.get_cursor_id(0) {
1429    ///     let rows = conn.fetch_cursor(cursor_id, 100).await?;
1430    ///     for row in rows {
1431    ///         println!("{:?}", row);
1432    ///     }
1433    /// }
1434    /// ```
1435    pub async fn execute_plsql(&self, sql: &str, params: &[BindParam]) -> Result<PlsqlResult> {
1436        self.ensure_ready().await?;
1437
1438        let statement = Statement::new(sql);
1439
1440        // Build values for bind parameters
1441        // IN params: use provided value or Null
1442        // OUT params: use placeholder value (required for metadata, server ignores actual value)
1443        // INOUT params: use provided value or Null
1444        let bind_values: Vec<Value> = params
1445            .iter()
1446            .map(|p| {
1447                if p.direction == BindDirection::Output {
1448                    // OUT params get a placeholder based on their type
1449                    // Oracle still needs a value sent in the request (even though it's ignored)
1450                    p.placeholder_value()
1451                } else {
1452                    // IN and INOUT params use the provided value or Null
1453                    p.value.clone().unwrap_or(Value::Null)
1454                }
1455            })
1456            .collect();
1457
1458        // Build bind metadata for proper buffer sizes
1459        // For OUTPUT params, use the user-specified buffer_size
1460        // For INPUT params, derive buffer_size from the actual value
1461        let bind_metadata: Vec<crate::messages::BindMetadata> = params
1462            .iter()
1463            .zip(bind_values.iter())
1464            .map(|(p, v)| {
1465                let buffer_size = if p.buffer_size > 0 {
1466                    p.buffer_size
1467                } else {
1468                    // Derive from value
1469                    match v {
1470                        Value::String(s) => std::cmp::max(s.len() as u32, 1),
1471                        Value::Bytes(b) => std::cmp::max(b.len() as u32, 1),
1472                        Value::Integer(_) | Value::Number(_) => 22, // Oracle NUMBER max size
1473                        Value::Float(_) => 8, // BINARY_DOUBLE
1474                        Value::Boolean(_) => 1,
1475                        Value::Timestamp(_) => 13,
1476                        Value::Date(_) => 7,
1477                        Value::RowId(_) => 18,
1478                        _ => 100, // Default fallback
1479                    }
1480                };
1481                crate::messages::BindMetadata {
1482                    oracle_type: p.oracle_type,
1483                    buffer_size,
1484                }
1485            })
1486            .collect();
1487
1488        // Create execute message with PL/SQL options
1489        let options = ExecuteOptions::for_plsql();
1490        let mut execute_msg = ExecuteMessage::new(&statement, options);
1491        execute_msg.set_bind_values(bind_values);
1492        execute_msg.set_bind_metadata(bind_metadata);
1493
1494        let mut inner = self.inner.lock().await;
1495        let large_sdu = inner.large_sdu;
1496        let seq_num = inner.next_sequence_number();
1497        execute_msg.set_sequence_number(seq_num);
1498        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1499        inner.send(&request).await?;
1500
1501        // Receive response
1502        let response = inner.receive().await?;
1503        if response.len() <= PACKET_HEADER_SIZE {
1504            return Err(Error::Protocol("Empty PL/SQL response".to_string()));
1505        }
1506
1507        // Check for MARKER packet (indicates error - requires reset protocol)
1508        let packet_type = response[4];
1509        if packet_type == PacketType::Marker as u8 {
1510            // Handle marker reset protocol and get the error packet
1511            let error_response = inner.handle_marker_reset().await?;
1512            let payload = &error_response[PACKET_HEADER_SIZE..];
1513            // Parse error response to extract the actual Oracle error
1514            let _: QueryResult = self.parse_error_response(payload)?;
1515            return Err(Error::Protocol("PL/SQL execution failed".to_string()));
1516        }
1517
1518        // Parse the PL/SQL response
1519        let payload = &response[PACKET_HEADER_SIZE..];
1520        let caps = inner.capabilities.clone();
1521        drop(inner); // Release lock before parsing
1522
1523        self.parse_plsql_response(payload, &caps, params)
1524    }
1525
1526    /// Execute a batch of DML statements with multiple rows of bind values
1527    ///
1528    /// This method efficiently executes the same SQL statement multiple times
1529    /// with different bind values (executemany pattern).
1530    ///
1531    /// # Arguments
1532    ///
1533    /// * `batch` - The batch containing SQL and rows of bind values
1534    ///
1535    /// # Example
1536    ///
1537    /// ```rust,ignore
1538    /// use oracle_rs::{Connection, BatchBuilder, Value};
1539    ///
1540    /// let batch = BatchBuilder::new("INSERT INTO users (id, name) VALUES (:1, :2)")
1541    ///     .add_row(vec![Value::Integer(1), Value::String("Alice".to_string())])
1542    ///     .add_row(vec![Value::Integer(2), Value::String("Bob".to_string())])
1543    ///     .with_row_counts()
1544    ///     .build();
1545    ///
1546    /// let result = conn.execute_batch(&batch).await?;
1547    /// println!("Total rows affected: {}", result.total_rows_affected);
1548    /// ```
1549    pub async fn execute_batch(&self, batch: &BatchBinds) -> Result<BatchResult> {
1550        self.ensure_ready().await?;
1551
1552        // Validate the batch
1553        batch.validate()?;
1554
1555        if batch.rows.is_empty() {
1556            return Ok(BatchResult::new());
1557        }
1558
1559        // Build execute options for batch DML
1560        let mut options = ExecuteOptions::for_dml(batch.options.auto_commit);
1561        options.num_execs = batch.rows.len() as u32;
1562        options.batch_errors = batch.options.batch_errors;
1563        options.dml_row_counts = batch.options.array_dml_row_counts;
1564
1565        // Create execute message with batch bind values
1566        let mut execute_msg = ExecuteMessage::new(&batch.statement, options);
1567        execute_msg.set_batch_bind_values(batch.rows.clone());
1568
1569        let mut inner = self.inner.lock().await;
1570        let large_sdu = inner.large_sdu;
1571        let seq_num = inner.next_sequence_number();
1572        execute_msg.set_sequence_number(seq_num);
1573        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1574        inner.send(&request).await?;
1575
1576        // Receive response
1577        let mut response = inner.receive().await?;
1578        if response.len() <= PACKET_HEADER_SIZE {
1579            return Err(Error::Protocol("Empty batch response".to_string()));
1580        }
1581
1582        // Check packet type - handle MARKER packets
1583        let packet_type = response[4];
1584        if packet_type == PacketType::Marker as u8 {
1585            // Handle BREAK/RESET protocol same as regular DML
1586            response = self.handle_marker_protocol(&mut inner, response).await?;
1587        }
1588
1589        // Parse the batch response
1590        let payload = &response[PACKET_HEADER_SIZE..];
1591        drop(inner); // Release lock before parsing
1592
1593        self.parse_batch_response(payload, batch.rows.len(), batch.options.array_dml_row_counts)
1594    }
1595
1596    /// Handle MARKER packet protocol (BREAK/RESET)
1597    async fn handle_marker_protocol(
1598        &self,
1599        inner: &mut ConnectionInner,
1600        initial_response: Bytes,
1601    ) -> Result<Bytes> {
1602        // Extract marker type from packet
1603        let marker_type = if initial_response.len() >= PACKET_HEADER_SIZE + 3 {
1604            initial_response[PACKET_HEADER_SIZE + 2]
1605        } else {
1606            1 // Assume BREAK
1607        };
1608
1609        if marker_type == 1 {
1610            // BREAK marker - send RESET and wait for response
1611            self.send_marker(inner, 2).await?;
1612
1613            // Wait for RESET marker back
1614            loop {
1615                match inner.receive().await {
1616                    Ok(pkt) => {
1617                        if pkt.len() < PACKET_HEADER_SIZE + 1 {
1618                            break;
1619                        }
1620                        let pkt_type = pkt[4];
1621                        if pkt_type == PacketType::Marker as u8 {
1622                            if pkt.len() >= PACKET_HEADER_SIZE + 3 {
1623                                let mk_type = pkt[PACKET_HEADER_SIZE + 2];
1624                                if mk_type == 2 {
1625                                    // Got RESET marker, break out
1626                                    break;
1627                                }
1628                            }
1629                        } else if pkt_type == PacketType::Data as u8 {
1630                            // Got DATA packet - return it as the response
1631                            return Ok(pkt);
1632                        } else {
1633                            break;
1634                        }
1635                    }
1636                    Err(e) => {
1637                        inner.state = ConnectionState::Closed;
1638                        return Err(e);
1639                    }
1640                }
1641            }
1642
1643            // After RESET, continue receiving until we get DATA packet
1644            loop {
1645                match inner.receive().await {
1646                    Ok(pkt) => {
1647                        let pkt_type = pkt[4];
1648                        if pkt_type == PacketType::Marker as u8 {
1649                            continue;
1650                        } else if pkt_type == PacketType::Data as u8 {
1651                            return Ok(pkt);
1652                        } else {
1653                            return Err(Error::Protocol(format!(
1654                                "Unexpected packet type {} after reset",
1655                                pkt_type
1656                            )));
1657                        }
1658                    }
1659                    Err(e) => {
1660                        inner.state = ConnectionState::Closed;
1661                        return Err(e);
1662                    }
1663                }
1664            }
1665        }
1666
1667        Ok(initial_response)
1668    }
1669
1670    /// Parse batch execution response
1671    fn parse_batch_response(
1672        &self,
1673        payload: &[u8],
1674        batch_size: usize,
1675        want_row_counts: bool,
1676    ) -> Result<BatchResult> {
1677        if payload.len() < 3 {
1678            return Err(Error::Protocol("Batch response too short".to_string()));
1679        }
1680
1681        let mut buf = ReadBuffer::from_slice(payload);
1682
1683        // Skip data flags
1684        buf.skip(2)?;
1685
1686        let mut rows_affected: u64 = 0;
1687        let mut row_counts: Option<Vec<u64>> = None;
1688        let mut end_of_response = false;
1689
1690        // Process messages until end_of_response or out of data
1691        while !end_of_response && buf.remaining() > 0 {
1692            let msg_type = buf.read_u8()?;
1693
1694            match msg_type {
1695                // Error (4) - may contain error or success info
1696                x if x == MessageType::Error as u8 => {
1697                    let (error_code, error_msg, _cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
1698                    rows_affected = row_count;
1699                    if error_code != 0 && error_code != 1403 {
1700                        return Err(Error::OracleError {
1701                            code: error_code,
1702                            message: error_msg.unwrap_or_default(),
1703                        });
1704                    }
1705                }
1706
1707                // Parameter (8) - return parameters (may contain row counts)
1708                x if x == MessageType::Parameter as u8 => {
1709                    if let Some(counts) = self.parse_return_parameters_internal(&mut buf, want_row_counts)? {
1710                        row_counts = Some(counts);
1711                    }
1712                }
1713
1714                // Status (9) - call status
1715                x if x == MessageType::Status as u8 => {
1716                    let _call_status = buf.read_ub4()?;
1717                    let _end_to_end_seq = buf.read_ub2()?;
1718                }
1719
1720                // BitVector (21)
1721                21 => {
1722                    let _num_columns_sent = buf.read_ub2()?;
1723                    if buf.remaining() > 0 {
1724                        let _byte = buf.read_u8()?;
1725                    }
1726                }
1727
1728                // End of Response (29) - explicit end marker
1729                29 => {
1730                    end_of_response = true;
1731                }
1732
1733                _ => {
1734                    // Unknown message type - continue processing
1735                }
1736            }
1737        }
1738
1739        let mut result = BatchResult::new();
1740        result.total_rows_affected = rows_affected;
1741        result.success_count = batch_size;
1742        result.row_counts = row_counts;
1743
1744        Ok(result)
1745    }
1746
1747    /// Fetch more rows from an open cursor
1748    ///
1749    /// This method is used when a query result has `has_more_rows == true`
1750    /// to retrieve additional rows from the server.
1751    ///
1752    /// # Arguments
1753    ///
1754    /// * `cursor_id` - The cursor ID from a previous query result
1755    /// * `columns` - Column information from the original query
1756    /// * `fetch_size` - Number of rows to fetch
1757    ///
1758    /// # Example
1759    ///
1760    /// ```rust,ignore
1761    /// let mut result = conn.query("SELECT * FROM large_table", &[]).await?;
1762    /// let mut all_rows = result.rows.clone();
1763    ///
1764    /// while result.has_more_rows {
1765    ///     result = conn.fetch_more(result.cursor_id, &result.columns, 100).await?;
1766    ///     all_rows.extend(result.rows);
1767    /// }
1768    /// ```
1769    pub async fn fetch_more(
1770        &self,
1771        cursor_id: u16,
1772        columns: &[ColumnInfo],
1773        fetch_size: u32,
1774    ) -> Result<QueryResult> {
1775        self.ensure_ready().await?;
1776
1777        // Build fetch message
1778        let fetch_msg = FetchMessage::new(cursor_id, fetch_size);
1779
1780        let mut inner = self.inner.lock().await;
1781        let request = fetch_msg.build_request(&inner.capabilities)?;
1782        inner.send(&request).await?;
1783
1784        // Receive and parse response
1785        let response = inner.receive().await?;
1786        if response.len() <= PACKET_HEADER_SIZE {
1787            return Err(Error::Protocol("Empty fetch response".to_string()));
1788        }
1789
1790        // Parse row data from response
1791        let payload = &response[PACKET_HEADER_SIZE..];
1792        let caps = inner.capabilities.clone();
1793        drop(inner); // Release lock before parsing
1794        self.parse_fetch_response(payload, columns, &caps)
1795    }
1796
1797    /// Fetch rows from a REF CURSOR
1798    ///
1799    /// This method fetches rows from a REF CURSOR that was returned from a
1800    /// PL/SQL procedure or function. The cursor contains the column metadata
1801    /// and cursor ID needed to fetch the rows.
1802    ///
1803    /// # Arguments
1804    ///
1805    /// * `cursor` - The REF CURSOR returned from PL/SQL
1806    ///
1807    /// # Example
1808    ///
1809    /// ```rust,ignore
1810    /// use oracle_rs::{Connection, BindParam, Value};
1811    ///
1812    /// // Call a procedure that returns a REF CURSOR
1813    /// let result = conn.execute_plsql(
1814    ///     "BEGIN OPEN :1 FOR SELECT id, name FROM employees; END;",
1815    ///     &[BindParam::output_cursor()]
1816    /// ).await?;
1817    ///
1818    /// // Get the cursor and fetch rows
1819    /// if let Value::Cursor(cursor) = &result.out_values[0] {
1820    ///     let rows = conn.fetch_cursor(cursor).await?;
1821    ///     println!("Fetched {} rows", rows.row_count());
1822    ///     for row in &rows.rows {
1823    ///         println!("{:?}", row);
1824    ///     }
1825    /// }
1826    /// ```
1827    pub async fn fetch_cursor(&self, cursor: &crate::types::RefCursor) -> Result<QueryResult> {
1828        self.fetch_cursor_with_size(cursor, 100).await
1829    }
1830
1831    /// Fetch rows from a REF CURSOR with a specified fetch size
1832    ///
1833    /// This is the same as `fetch_cursor` but allows specifying how many
1834    /// rows to fetch at once.
1835    ///
1836    /// REF CURSORs use an ExecuteMessage with only the FETCH option because
1837    /// the cursor is already open from the PL/SQL execution. The cursor_id
1838    /// and column metadata were obtained when the REF CURSOR was returned.
1839    ///
1840    /// # Arguments
1841    ///
1842    /// * `cursor` - The REF CURSOR returned from PL/SQL
1843    /// * `fetch_size` - Number of rows to fetch (default is 100)
1844    pub async fn fetch_cursor_with_size(
1845        &self,
1846        cursor: &crate::types::RefCursor,
1847        fetch_size: u32,
1848    ) -> Result<QueryResult> {
1849        use crate::messages::ExecuteMessage;
1850
1851        if cursor.cursor_id() == 0 {
1852            return Err(Error::InvalidCursor("Cursor ID is 0 (not initialized)".to_string()));
1853        }
1854
1855        self.ensure_ready().await?;
1856
1857        // REF CURSOR uses ExecuteMessage with FETCH only (no SQL, no EXECUTE)
1858        // Create a statement with the cursor's metadata
1859        let mut stmt = Statement::new(""); // No SQL for REF CURSOR
1860        stmt.set_cursor_id(cursor.cursor_id());
1861        stmt.set_columns(cursor.columns().to_vec());
1862        stmt.set_executed(true); // Already executed by Oracle
1863        stmt.set_statement_type(crate::statement::StatementType::Query); // This is a query cursor
1864
1865        // Build execute message with only FETCH option
1866        let options = crate::messages::ExecuteOptions::for_ref_cursor(fetch_size);
1867        let mut execute_msg = ExecuteMessage::new(&stmt, options);
1868
1869        let mut inner = self.inner.lock().await;
1870        let large_sdu = inner.large_sdu;
1871        let seq_num = inner.next_sequence_number();
1872        execute_msg.set_sequence_number(seq_num);
1873
1874        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1875        inner.send(&request).await?;
1876
1877        // Receive and parse response
1878        let response = inner.receive().await?;
1879        if response.len() <= PACKET_HEADER_SIZE {
1880            return Err(Error::Protocol("Empty cursor response".to_string()));
1881        }
1882
1883        // Check for MARKER packet (indicates error)
1884        let packet_type = response[4];
1885        if packet_type == PacketType::Marker as u8 {
1886            let error_response = inner.handle_marker_reset().await?;
1887            let payload = &error_response[PACKET_HEADER_SIZE..];
1888            return self.parse_error_response(payload);
1889        }
1890
1891        // Parse query response - use cursor's columns since they're already defined
1892        let payload = &response[PACKET_HEADER_SIZE..];
1893        let caps = inner.capabilities.clone();
1894        drop(inner); // Release lock before parsing
1895        self.parse_fetch_response(payload, cursor.columns(), &caps)
1896    }
1897
1898    /// Fetch rows from an implicit result set
1899    ///
1900    /// Implicit results are returned via `DBMS_SQL.RETURN_RESULT` from PL/SQL.
1901    /// They contain cursor metadata but no rows until fetched.
1902    ///
1903    /// # Arguments
1904    ///
1905    /// * `result` - The implicit result from PL/SQL execution
1906    ///
1907    /// # Example
1908    ///
1909    /// ```rust,ignore
1910    /// let plsql_result = conn.execute_plsql(r#"
1911    ///     declare
1912    ///         c sys_refcursor;
1913    ///     begin
1914    ///         open c for select * from employees;
1915    ///         dbms_sql.return_result(c);
1916    ///     end;
1917    /// "#, &[]).await?;
1918    ///
1919    /// for implicit in plsql_result.implicit_results.iter() {
1920    ///     let rows = conn.fetch_implicit_result(implicit).await?;
1921    ///     println!("Fetched {} rows", rows.row_count());
1922    /// }
1923    /// ```
1924    pub async fn fetch_implicit_result(&self, result: &ImplicitResult) -> Result<QueryResult> {
1925        self.fetch_implicit_result_with_size(result, 100).await
1926    }
1927
1928    /// Fetch rows from an implicit result set with a specified fetch size
1929    pub async fn fetch_implicit_result_with_size(
1930        &self,
1931        result: &ImplicitResult,
1932        fetch_size: u32,
1933    ) -> Result<QueryResult> {
1934        // Convert implicit result to RefCursor and use fetch_cursor mechanism
1935        let cursor = crate::types::RefCursor::new(result.cursor_id, result.columns.clone());
1936        self.fetch_cursor_with_size(&cursor, fetch_size).await
1937    }
1938
1939    /// Parse fetch response to extract additional rows
1940    ///
1941    /// REF CURSOR fetch responses contain a series of messages:
1942    /// - RowHeader (6): Contains metadata about the following row data
1943    /// - RowData (7): Contains the actual row values
1944    /// - Error (4): Contains error info with cursor_id and row counts
1945    fn parse_fetch_response(&self, payload: &[u8], columns: &[ColumnInfo], caps: &Capabilities) -> Result<QueryResult> {
1946        if payload.len() < 3 {
1947            return Err(Error::Protocol("Fetch response too short".to_string()));
1948        }
1949
1950        let mut buf = ReadBuffer::from_slice(payload);
1951        let mut rows = Vec::new();
1952        let mut has_more_rows = false;
1953
1954        // Bit vector for duplicate column optimization
1955        let mut bit_vector: Option<Vec<u8>> = None;
1956        let mut previous_row_values: Option<Vec<Value>> = None;
1957
1958        // Skip data flags
1959        buf.skip(2)?;
1960
1961        // Process multiple messages in the response
1962        while buf.remaining() >= 1 {
1963            let msg_type = buf.read_u8()?;
1964
1965            match msg_type {
1966                x if x == MessageType::RowHeader as u8 => {
1967                    // Skip row header metadata (per Python's _process_row_header)
1968                    buf.skip(1)?; // flags
1969                    buf.skip_ub2()?; // num requests
1970                    buf.skip_ub4()?; // iteration number
1971                    buf.skip_ub4()?; // num iters
1972                    buf.skip_ub2()?; // buffer length
1973                    let num_bytes = buf.read_ub4()?;
1974                    if num_bytes > 0 {
1975                        buf.skip(1)?; // skip repeated length
1976                        // This bit vector in row header is for the following row data
1977                        let bv = buf.read_bytes_vec(num_bytes as usize)?;
1978                        bit_vector = Some(bv);
1979                    }
1980                    let rxhrid_bytes = buf.read_ub4()?;
1981                    if rxhrid_bytes > 0 {
1982                        buf.skip_raw_bytes_chunked()?;
1983                    }
1984                }
1985                x if x == MessageType::RowData as u8 => {
1986                    // Parse actual row data with bit vector support
1987                    let row = self.parse_row_data_with_bitvector(
1988                        &mut buf,
1989                        columns,
1990                        caps,
1991                        bit_vector.as_deref(),
1992                        previous_row_values.as_ref(),
1993                    )?;
1994                    previous_row_values = Some(row.values().to_vec());
1995                    bit_vector = None;
1996                    rows.push(row);
1997                }
1998                x if x == MessageType::BitVector as u8 => {
1999                    // BitVector indicates which columns have actual data vs duplicates
2000                    let _num_columns_sent = buf.read_ub2()?;
2001                    let num_bytes = (columns.len() + 7) / 8; // Round up
2002                    if num_bytes > 0 {
2003                        let bv = buf.read_bytes_vec(num_bytes)?;
2004                        bit_vector = Some(bv);
2005                    }
2006                    // Continue processing - RowData follows
2007                }
2008                x if x == MessageType::Error as u8 => {
2009                    // Error message contains row count and cursor info
2010                    let (error_code, error_msg, more_rows) = self.parse_error_message_info(&mut buf)?;
2011                    has_more_rows = more_rows;
2012                    if error_code != 0 && error_code != 1403 { // 1403 = no data found
2013                        return Err(Error::OracleError {
2014                            code: error_code,
2015                            message: error_msg,
2016                        });
2017                    }
2018                    break; // Error message marks end of response
2019                }
2020                x if x == MessageType::Status as u8 => {
2021                    // Status message - usually marks end
2022                    break;
2023                }
2024                x if x == MessageType::EndOfResponse as u8 => {
2025                    break;
2026                }
2027                _ => {
2028                    // Unknown message type - stop processing
2029                    break;
2030                }
2031            }
2032        }
2033
2034        Ok(QueryResult {
2035            columns: columns.to_vec(),
2036            rows,
2037            rows_affected: 0,
2038            has_more_rows,
2039            cursor_id: 0,
2040        })
2041    }
2042
2043    /// Parse error message info including cursor_id and row counts
2044    fn parse_error_message_info(&self, buf: &mut ReadBuffer) -> Result<(u32, String, bool)> {
2045        let _call_status = buf.read_ub4()?; // end of call status
2046        buf.skip_ub2()?; // end to end seq#
2047        buf.skip_ub4()?; // current row number
2048        buf.skip_ub2()?; // error number
2049        buf.skip_ub2()?; // array elem error
2050        buf.skip_ub2()?; // array elem error
2051        let _cursor_id = buf.read_ub2()?; // cursor id
2052        let _error_pos = buf.read_sb2()?; // error position
2053        buf.skip(1)?; // sql type
2054        buf.skip(1)?; // fatal?
2055        buf.skip(1)?; // flags
2056        buf.skip(1)?; // user cursor options
2057        buf.skip(1)?; // UPI parameter
2058        let flags = buf.read_u8()?; // flags
2059        // Skip rowid - fixed 10 bytes in Oracle format
2060        buf.skip(10)?; // rowid is 10 bytes
2061        buf.skip_ub4()?; // OS error
2062        buf.skip(1)?; // statement number
2063        buf.skip(1)?; // call number
2064        buf.skip_ub2()?; // padding
2065        buf.skip_ub4()?; // success iters
2066        let num_bytes = buf.read_ub4()?; // oerrdd
2067        if num_bytes > 0 {
2068            buf.skip_raw_bytes_chunked()?;
2069        }
2070
2071        // Skip batch error codes
2072        let num_errors = buf.read_ub2()?;
2073        if num_errors > 0 {
2074            buf.skip_raw_bytes_chunked()?;
2075        }
2076
2077        // Skip batch error offsets
2078        let num_offsets = buf.read_ub4()?;
2079        if num_offsets > 0 {
2080            buf.skip_raw_bytes_chunked()?;
2081        }
2082
2083        // Skip batch error messages
2084        let temp16 = buf.read_ub2()?;
2085        if temp16 > 0 {
2086            buf.skip_raw_bytes_chunked()?;
2087        }
2088
2089        // Read extended error info
2090        let error_num = buf.read_ub4()?;
2091        let row_count = buf.read_ub8()?;
2092        let more_rows = row_count > 0 || (flags & 0x20) != 0;
2093
2094        // Read error message if present
2095        let error_msg = if error_num != 0 {
2096            buf.read_string_with_length()?.unwrap_or_default()
2097        } else {
2098            String::new()
2099        };
2100
2101        Ok((error_num, error_msg, more_rows))
2102    }
2103
2104    /// Open a scrollable cursor for bidirectional navigation
2105    ///
2106    /// Scrollable cursors allow moving forward and backward through result sets,
2107    /// jumping to specific positions, and fetching from various locations.
2108    ///
2109    /// # Arguments
2110    ///
2111    /// * `sql` - SQL query to execute
2112    ///
2113    /// # Example
2114    ///
2115    /// ```rust,ignore
2116    /// let mut cursor = conn.open_scrollable_cursor("SELECT * FROM employees").await?;
2117    ///
2118    /// // Move to different positions
2119    /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2120    /// let last = conn.scroll(&mut cursor, FetchOrientation::Last, 0).await?;
2121    /// let row5 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 5).await?;
2122    ///
2123    /// conn.close_cursor(&mut cursor).await?;
2124    /// ```
2125    pub async fn open_scrollable_cursor(&self, sql: &str) -> Result<ScrollableCursor> {
2126        self.ensure_ready().await?;
2127
2128        let statement = Statement::new(sql);
2129
2130        // For scrollable cursors, execute with scrollable flag and prefetch 1 row
2131        // to get column metadata. The scroll() method will fetch actual rows at
2132        // specific positions.
2133        let mut options = ExecuteOptions::for_query(1); // prefetch 1 row for column info
2134
2135        options.scrollable = true;
2136
2137        let mut execute_msg = ExecuteMessage::new(&statement, options);
2138
2139        let mut inner = self.inner.lock().await;
2140        let large_sdu = inner.large_sdu;
2141        let seq_num = inner.next_sequence_number();
2142        execute_msg.set_sequence_number(seq_num);
2143        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2144        inner.send(&request).await?;
2145
2146        // Receive and parse response
2147        let response = inner.receive().await?;
2148
2149        if response.len() <= PACKET_HEADER_SIZE {
2150            return Err(Error::Protocol("Empty scrollable cursor response".to_string()));
2151        }
2152
2153        // Check for MARKER packet (indicates error - requires reset protocol)
2154        let packet_type = response[4];
2155        if packet_type == PacketType::Marker as u8 {
2156            // Handle marker reset protocol and get the error packet
2157            let error_response = inner.handle_marker_reset().await?;
2158            let payload = &error_response[PACKET_HEADER_SIZE..];
2159            // Parse error response to extract the actual Oracle error
2160            let _: QueryResult = self.parse_error_response(payload)?;
2161            // If we get here without error, something unexpected happened
2162            return Err(Error::Protocol("Unexpected successful response after MARKER".to_string()));
2163        }
2164
2165        // Parse describe info to get columns
2166        let payload = &response[PACKET_HEADER_SIZE..];
2167        let result = self.parse_query_response(payload, &inner.capabilities)?;
2168
2169        Ok(ScrollableCursor::new(result.cursor_id, result.columns))
2170    }
2171
2172    /// Scroll to a position in a scrollable cursor and fetch rows
2173    ///
2174    /// # Arguments
2175    ///
2176    /// * `cursor` - The scrollable cursor to scroll
2177    /// * `orientation` - The direction/mode of scrolling
2178    /// * `offset` - Position offset (used for Absolute and Relative modes)
2179    ///
2180    /// # Example
2181    ///
2182    /// ```rust,ignore
2183    /// // Go to first row
2184    /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2185    ///
2186    /// // Go to absolute position 10
2187    /// let row10 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 10).await?;
2188    ///
2189    /// // Move 5 rows forward from current position
2190    /// let plus5 = conn.scroll(&mut cursor, FetchOrientation::Relative, 5).await?;
2191    ///
2192    /// // Move 3 rows backward
2193    /// let minus3 = conn.scroll(&mut cursor, FetchOrientation::Relative, -3).await?;
2194    /// ```
2195    pub async fn scroll(
2196        &self,
2197        cursor: &mut ScrollableCursor,
2198        orientation: FetchOrientation,
2199        offset: i64,
2200    ) -> Result<ScrollResult> {
2201        self.ensure_ready().await?;
2202
2203        if !cursor.is_open() {
2204            return Err(Error::CursorClosed);
2205        }
2206
2207        // Create a statement for the scroll operation (uses the existing cursor)
2208        let mut stmt = Statement::new("");
2209        stmt.set_cursor_id(cursor.cursor_id);
2210        stmt.set_columns(cursor.columns.clone());
2211        stmt.set_executed(true);
2212        stmt.set_statement_type(crate::statement::StatementType::Query);
2213
2214        // Build execute message with scroll_operation=true
2215        let mut options = ExecuteOptions::for_query(1);
2216        options.scrollable = true;
2217        options.scroll_operation = true;
2218        options.fetch_orientation = orientation as u32;
2219        // Calculate fetch_pos based on orientation
2220        options.fetch_pos = match orientation {
2221            FetchOrientation::First => 1,
2222            FetchOrientation::Last => 0, // Server calculates
2223            FetchOrientation::Absolute => offset.max(0) as u32,
2224            FetchOrientation::Relative => (cursor.position + offset).max(0) as u32,
2225            FetchOrientation::Next => (cursor.position + 1).max(0) as u32,
2226            FetchOrientation::Prior => (cursor.position - 1).max(0) as u32,
2227            FetchOrientation::Current => cursor.position.max(0) as u32,
2228        };
2229
2230        let mut execute_msg = ExecuteMessage::new(&stmt, options);
2231
2232        let mut inner = self.inner.lock().await;
2233        let large_sdu = inner.large_sdu;
2234        let seq_num = inner.next_sequence_number();
2235        execute_msg.set_sequence_number(seq_num);
2236        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2237        inner.send(&request).await?;
2238
2239        // Receive and parse response
2240        let response = inner.receive().await?;
2241        if response.len() <= PACKET_HEADER_SIZE {
2242            return Err(Error::Protocol("Empty scroll response".to_string()));
2243        }
2244
2245        // Check for MARKER packet
2246        let packet_type = response[4];
2247        if packet_type == PacketType::Marker as u8 {
2248            let error_response = inner.handle_marker_reset().await?;
2249            let payload = &error_response[PACKET_HEADER_SIZE..];
2250            let _: QueryResult = self.parse_error_response(payload)?;
2251            return Err(Error::Protocol("Scroll operation failed".to_string()));
2252        }
2253
2254        let payload = &response[PACKET_HEADER_SIZE..];
2255        // Use cursor's columns since Oracle doesn't re-send column metadata for scroll operations
2256        let query_result = self.parse_query_response_with_columns(payload, &inner.capabilities, &cursor.columns)?;
2257
2258        // Use position from Oracle's response (rows_affected contains the row position)
2259        // For scrollable cursors, Oracle returns the row number in error_info.rowcount
2260        let new_position = if !query_result.rows.is_empty() {
2261            // Position is the actual row number from Oracle
2262            query_result.rows_affected as i64
2263        } else {
2264            // No rows returned - calculate position based on orientation
2265            match orientation {
2266                FetchOrientation::First => 0, // Before first
2267                FetchOrientation::Last => cursor.row_count.unwrap_or(0) as i64 + 1, // After last
2268                FetchOrientation::Next => cursor.position + 1,
2269                FetchOrientation::Prior => cursor.position - 1,
2270                FetchOrientation::Absolute => offset,
2271                FetchOrientation::Relative => cursor.position + offset,
2272                FetchOrientation::Current => cursor.position,
2273            }
2274        };
2275
2276        cursor.update_position(new_position);
2277
2278        let mut result = ScrollResult::new(query_result.rows, new_position);
2279        result.at_end = !query_result.has_more_rows;
2280        result.at_beginning = new_position <= 1;
2281
2282        Ok(result)
2283    }
2284
2285    /// Close a scrollable cursor
2286    ///
2287    /// # Arguments
2288    ///
2289    /// * `cursor` - The scrollable cursor to close
2290    pub async fn close_cursor(&self, cursor: &mut ScrollableCursor) -> Result<()> {
2291        if !cursor.is_open() {
2292            return Ok(());
2293        }
2294
2295        // Send close cursor message
2296        // For now, just mark it as closed - the cursor will be cleaned up
2297        // when the connection is closed or reused
2298        cursor.mark_closed();
2299        Ok(())
2300    }
2301
2302    /// Get type information for a database object or collection type
2303    ///
2304    /// This method queries Oracle's data dictionary to retrieve type metadata
2305    /// for collections (VARRAY, Nested Table) and user-defined object types.
2306    ///
2307    /// # Arguments
2308    ///
2309    /// * `type_name` - Fully qualified type name (e.g., "SCHEMA.TYPE_NAME" or just "TYPE_NAME")
2310    ///
2311    /// # Returns
2312    ///
2313    /// A `DbObjectType` containing the type metadata, including:
2314    /// - Schema and type name
2315    /// - Whether it's a collection
2316    /// - Collection type (VARRAY, Nested Table, etc.)
2317    /// - Element type for collections
2318    ///
2319    /// # Example
2320    ///
2321    /// ```rust,ignore
2322    /// let number_array = conn.get_type("MY_NUMBER_ARRAY").await?;
2323    /// assert!(number_array.is_collection);
2324    /// ```
2325    pub async fn get_type(&self, type_name: &str) -> Result<crate::dbobject::DbObjectType> {
2326        use crate::dbobject::{CollectionType, DbObjectType};
2327
2328        self.ensure_ready().await?;
2329
2330        // Parse type name into schema and name
2331        let (schema, name) = parse_type_name(type_name, &self.config.username);
2332
2333        // First, query ALL_TYPES to get basic type info
2334        let type_info = self
2335            .query(
2336                "SELECT typecode, type_oid FROM all_types WHERE owner = :1 AND type_name = :2",
2337                &[Value::String(schema.clone()), Value::String(name.clone())],
2338            )
2339            .await?;
2340
2341        if type_info.rows.is_empty() {
2342            return Err(Error::OracleError {
2343                code: 4043, // ORA-04043: object does not exist
2344                message: format!("Type {}.{} not found", schema, name),
2345            });
2346        }
2347
2348        let row = &type_info.rows[0];
2349        let typecode = row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2350        let type_oid = row.get(1).and_then(|v| v.as_bytes()).map(|b| b.to_vec());
2351
2352        // Check if it's a collection
2353        if typecode == "COLLECTION" {
2354            // Query ALL_COLL_TYPES for collection details
2355            let coll_info = self.query(
2356                "SELECT coll_type, elem_type_name, elem_type_owner, upper_bound FROM all_coll_types WHERE owner = :1 AND type_name = :2",
2357                &[Value::String(schema.clone()), Value::String(name.clone())],
2358            ).await?;
2359
2360            if coll_info.rows.is_empty() {
2361                return Err(Error::OracleError {
2362                    code: 4043,
2363                    message: format!("Collection type {}.{} metadata not found", schema, name),
2364                });
2365            }
2366
2367            let coll_row = &coll_info.rows[0];
2368            let coll_type_str = coll_row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2369            let elem_type_name = coll_row.get(1).and_then(|v| v.as_str()).unwrap_or("VARCHAR2");
2370            let _elem_type_owner = coll_row.get(2).and_then(|v| v.as_str());
2371
2372            let collection_type = match coll_type_str {
2373                "VARYING ARRAY" => CollectionType::Varray,
2374                "TABLE" => CollectionType::NestedTable,
2375                _ => CollectionType::Varray, // Default
2376            };
2377
2378            let element_type = oracle_type_from_name(elem_type_name);
2379
2380            let mut obj_type = DbObjectType::collection(schema, name, collection_type, element_type);
2381            obj_type.oid = type_oid;
2382            Ok(obj_type)
2383        } else {
2384            // Regular object type (not yet fully implemented)
2385            let mut obj_type = DbObjectType::new(schema, name);
2386            obj_type.oid = type_oid;
2387            Ok(obj_type)
2388        }
2389    }
2390
2391    /// Internal: Execute a query statement with optional bind parameters
2392    async fn execute_query_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2393        let prefetch_rows = 100; // Default prefetch
2394
2395        // For first execution, check if we might have LOBs (no prefetch for safety)
2396        // This can be optimized later with describe-only first
2397        let options = ExecuteOptions::for_query(prefetch_rows);
2398        let mut execute_msg = ExecuteMessage::new(statement, options);
2399
2400        // Set bind values if provided
2401        if !params.is_empty() {
2402            execute_msg.set_bind_values(params.to_vec());
2403        }
2404
2405        let mut inner = self.inner.lock().await;
2406        let large_sdu = inner.large_sdu;
2407        let seq_num = inner.next_sequence_number();
2408        execute_msg.set_sequence_number(seq_num);
2409        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2410        inner.send(&request).await?;
2411
2412        // Receive and parse response
2413        let response = inner.receive().await?;
2414        if response.len() <= PACKET_HEADER_SIZE {
2415            return Err(Error::Protocol("Empty query response".to_string()));
2416        }
2417
2418        // Check for MARKER packet (indicates error - requires reset protocol)
2419        let packet_type = response[4];
2420        if packet_type == PacketType::Marker as u8 {
2421            // Handle marker reset protocol and get the error packet
2422            let error_response = inner.handle_marker_reset().await?;
2423            let payload = &error_response[PACKET_HEADER_SIZE..];
2424            return self.parse_error_response(payload);
2425        }
2426
2427        // Parse the response to extract columns and rows
2428        let payload = &response[PACKET_HEADER_SIZE..];
2429        let mut result = self.parse_query_response(payload, &inner.capabilities)?;
2430
2431        // Check if any columns are LOB types that require defines
2432        let has_lob_columns = result.columns.iter().any(|col| col.is_lob());
2433
2434        if has_lob_columns && !statement.requires_define() {
2435            // We need to re-execute with column defines
2436            // Create a modified statement with the define flag set
2437            let mut stmt_with_define = statement.clone();
2438            stmt_with_define.set_columns(result.columns.clone());
2439            stmt_with_define.set_cursor_id(result.cursor_id);
2440            stmt_with_define.set_requires_define(true);
2441            stmt_with_define.set_no_prefetch(true);
2442            stmt_with_define.set_executed(true);
2443
2444            // Re-execute with defines
2445            let define_options = ExecuteOptions::for_query(prefetch_rows);
2446            let mut define_msg = ExecuteMessage::new(&stmt_with_define, define_options);
2447            let seq_num = inner.next_sequence_number();
2448            define_msg.set_sequence_number(seq_num);
2449
2450            let define_request = define_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2451            inner.send(&define_request).await?;
2452
2453            // Receive the re-execute response
2454            let define_response = inner.receive().await?;
2455            if define_response.len() <= PACKET_HEADER_SIZE {
2456                return Err(Error::Protocol("Empty define response".to_string()));
2457            }
2458
2459            // Check for MARKER packet
2460            let packet_type = define_response[4];
2461            if packet_type == PacketType::Marker as u8 {
2462                let error_response = inner.handle_marker_reset().await?;
2463                let payload = &error_response[PACKET_HEADER_SIZE..];
2464                return self.parse_error_response(payload);
2465            }
2466
2467            // Parse the response with LOB data, using the columns we already know
2468            let payload = &define_response[PACKET_HEADER_SIZE..];
2469            result = self.parse_query_response_with_columns(
2470                payload,
2471                &inner.capabilities,
2472                &stmt_with_define.columns(),
2473            )?;
2474        }
2475
2476        Ok(result)
2477    }
2478
2479    /// Internal: Execute a DML statement with optional bind parameters
2480    async fn execute_dml_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2481        let options = ExecuteOptions::for_dml(false); // Don't auto-commit
2482        let mut execute_msg = ExecuteMessage::new(statement, options);
2483
2484        // Set bind values if provided
2485        if !params.is_empty() {
2486            execute_msg.set_bind_values(params.to_vec());
2487        }
2488
2489        let mut inner = self.inner.lock().await;
2490        let large_sdu = inner.large_sdu;
2491        let seq_num = inner.next_sequence_number();
2492        execute_msg.set_sequence_number(seq_num);
2493        let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2494
2495        inner.send(&request).await?;
2496
2497        // Receive response
2498        let mut response = inner.receive().await?;
2499        if response.len() <= PACKET_HEADER_SIZE {
2500            return Err(Error::Protocol("Empty DML response".to_string()));
2501        }
2502
2503        // Check packet type - handle MARKER packets
2504        let packet_type = response[4];
2505        if packet_type == PacketType::Marker as u8 {
2506            // Need to do reset protocol and then get the actual response
2507            // Extract marker type from packet: after header (8 bytes) + data flags (2 bytes) + marker type (1 byte)
2508            let marker_type = if response.len() >= PACKET_HEADER_SIZE + 3 {
2509                response[PACKET_HEADER_SIZE + 2]
2510            } else {
2511                1 // Assume BREAK
2512            };
2513
2514            if marker_type == 1 {
2515                // BREAK marker - send RESET and wait for response
2516                self.send_marker(&mut inner, 2).await?;
2517
2518                // Wait for RESET marker back or DATA packet with error
2519                // The server may send: RESET marker, or BREAK marker followed by DATA, or just close
2520                let mut got_reset = false;
2521                let mut max_attempts = 5;
2522                loop {
2523                    match inner.receive().await {
2524                        Ok(pkt) => {
2525                            if pkt.len() < PACKET_HEADER_SIZE + 1 {
2526                                break;
2527                            }
2528                            let pkt_type = pkt[4];
2529                            if pkt_type == PacketType::Marker as u8 {
2530                                if pkt.len() >= PACKET_HEADER_SIZE + 3 {
2531                                    let mk_type = pkt[PACKET_HEADER_SIZE + 2];
2532                                    if mk_type == 2 {
2533                                        // Got RESET marker, break out
2534                                        got_reset = true;
2535                                        break;
2536                                    }
2537                                    // Got another BREAK marker - server may be acknowledging
2538                                    // Keep trying a few times
2539                                    max_attempts -= 1;
2540                                    if max_attempts == 0 {
2541                                        // Give up and return a generic error
2542                                        return Err(Error::Protocol(
2543                                            "Server rejected operation (multiple BREAK markers)".to_string()
2544                                        ));
2545                                    }
2546                                    continue;
2547                                }
2548                            } else if pkt_type == PacketType::Data as u8 {
2549                                // Got DATA packet - use this as the response (may contain error)
2550                                let payload = &pkt[PACKET_HEADER_SIZE..];
2551                                return self.parse_dml_response(payload);
2552                            } else {
2553                                break;
2554                            }
2555                        }
2556                        Err(e) => {
2557                            // If we get EOF, the server may have closed the connection
2558                            // This can happen with temp LOB binding issues
2559                            inner.state = ConnectionState::Closed;
2560                            return Err(Error::Protocol(format!(
2561                                "Server closed connection during error handling: {}",
2562                                e
2563                            )));
2564                        }
2565                    }
2566                }
2567
2568                // After RESET, try to receive DATA packet with error response
2569                // Note: Some Oracle servers "quit immediately" after reset without sending
2570                // an error packet. In that case, we'll get EOF which is handled below.
2571                if got_reset {
2572                    loop {
2573                        match inner.receive().await {
2574                            Ok(pkt) => {
2575                                if pkt.len() < PACKET_HEADER_SIZE + 1 {
2576                                    // Too short, connection may be closing
2577                                    break;
2578                                }
2579                                let pkt_type = pkt[4];
2580                                if pkt_type == PacketType::Marker as u8 {
2581                                    // More markers, keep reading
2582                                    continue;
2583                                } else if pkt_type == PacketType::Data as u8 {
2584                                    // Got DATA packet, use this as the response
2585                                    response = pkt;
2586                                    break;
2587                                } else {
2588                                    // Unknown packet type, return error
2589                                    return Err(Error::Protocol(format!(
2590                                        "Unexpected packet type {} after reset",
2591                                        pkt_type
2592                                    )));
2593                                }
2594                            }
2595                            Err(_e) => {
2596                                // EOF after reset is normal - server may close connection
2597                                // without sending error details. Return a descriptive error.
2598                                inner.state = ConnectionState::Closed;
2599                                return Err(Error::OracleError {
2600                                    code: 0,
2601                                    message: "Server rejected the operation and closed the connection. \
2602                                              This may happen when binding a temporary LOB to an INSERT statement. \
2603                                              Try using a different approach (e.g., DBMS_LOB procedures).".to_string(),
2604                                });
2605                            }
2606                        }
2607                    }
2608                }
2609            }
2610        }
2611
2612        // Parse the response to extract rows affected (or error)
2613        let payload = &response[PACKET_HEADER_SIZE..];
2614        self.parse_dml_response(payload)
2615    }
2616
2617    /// Parse query response to extract columns and rows
2618    ///
2619    /// Oracle sends multiple messages in a single response:
2620    /// - DescribeInfo (16): column metadata
2621    /// - RowHeader (6): row header info
2622    /// - RowData (7): actual column values
2623    /// - Error (4): completion status (may contain error or success)
2624    fn parse_query_response(&self, payload: &[u8], caps: &Capabilities) -> Result<QueryResult> {
2625        self.parse_query_response_with_columns(payload, caps, &[])
2626    }
2627
2628    /// Parse query response with pre-known columns (for re-execute after define)
2629    fn parse_query_response_with_columns(
2630        &self,
2631        payload: &[u8],
2632        caps: &Capabilities,
2633        known_columns: &[ColumnInfo],
2634    ) -> Result<QueryResult> {
2635        if payload.len() < 3 {
2636            return Err(Error::Protocol("Query response too short".to_string()));
2637        }
2638
2639        let mut buf = ReadBuffer::from_slice(payload);
2640
2641        // Skip data flags
2642        buf.skip(2)?;
2643
2644        // Use known columns if provided, otherwise parse from describe info
2645        let mut columns: Vec<ColumnInfo> = known_columns.to_vec();
2646        let mut rows: Vec<Row> = Vec::new();
2647        let mut cursor_id: u16 = 0;
2648        let mut row_count: u64 = 0;
2649        let mut end_of_response = false;
2650
2651        // Bit vector for duplicate column optimization
2652        // When Some, indicates which columns have actual data (bit=1) vs duplicates (bit=0)
2653        let mut bit_vector: Option<Vec<u8>> = None;
2654        // Previous row values for copying duplicates
2655        let mut previous_row_values: Option<Vec<Value>> = None;
2656
2657        // Process messages until we hit end of response or run out of data
2658        while !end_of_response && buf.remaining() > 0 {
2659            let msg_type = buf.read_u8()?;
2660
2661            match msg_type {
2662                // DescribeInfo (16) - column metadata
2663                x if x == MessageType::DescribeInfo as u8 => {
2664                    // Skip chunked bytes first
2665                    buf.skip_raw_bytes_chunked()?;
2666                    columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2667                }
2668
2669                // RowHeader (6) - header info for rows
2670                x if x == MessageType::RowHeader as u8 => {
2671                    self.parse_row_header(&mut buf)?;
2672                }
2673
2674                // RowData (7) - actual row values
2675                x if x == MessageType::RowData as u8 => {
2676                    let row = self.parse_row_data_with_bitvector(
2677                        &mut buf,
2678                        &columns,
2679                        caps,
2680                        bit_vector.as_deref(),
2681                        previous_row_values.as_ref(),
2682                    )?;
2683                    // Store this row's values for potential duplicate copying
2684                    previous_row_values = Some(row.values().to_vec());
2685                    // Clear bit vector after using it (it's per-row)
2686                    bit_vector = None;
2687                    rows.push(row);
2688                }
2689
2690                // Error (4) - completion or error
2691                x if x == MessageType::Error as u8 => {
2692                    let (error_code, error_msg, cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2693                    cursor_id = cid;
2694                    row_count = rc;
2695                    if error_code != 0 && error_code != 1403 {
2696                        // 1403 is "no data found" which is not an error for queries
2697                        return Err(Error::OracleError {
2698                            code: error_code,
2699                            message: error_msg.unwrap_or_default(),
2700                        });
2701                    }
2702                    end_of_response = true;
2703                }
2704
2705                // Parameter (8) - return parameters
2706                x if x == MessageType::Parameter as u8 => {
2707                    self.parse_return_parameters(&mut buf)?;
2708                }
2709
2710                // Status (9) - call status
2711                x if x == MessageType::Status as u8 => {
2712                    // Read call status and end-to-end seq number
2713                    let _call_status = buf.read_ub4()?;
2714                    let _end_to_end_seq = buf.read_ub2()?;
2715                    // Note: end_of_response only if supports_end_of_response is false
2716                    // For now, we assume it's not the end
2717                }
2718
2719                // BitVector (21) - column presence bitmap for sparse results
2720                // Bit=1 means actual data is sent, bit=0 means duplicate from previous row
2721                21 => {
2722                    // Read num columns sent
2723                    let _num_columns_sent = buf.read_ub2()?;
2724                    // Read bit vector (1 byte per 8 columns, rounded up)
2725                    let num_bytes = (columns.len() + 7) / 8;
2726                    if num_bytes > 0 {
2727                        let bv = buf.read_bytes_vec(num_bytes)?;
2728                        bit_vector = Some(bv);
2729                    }
2730                }
2731
2732                _ => {
2733                    // Unknown message type - break to avoid parsing errors
2734                    break;
2735                }
2736            }
2737        }
2738
2739        Ok(QueryResult {
2740            columns,
2741            rows,
2742            rows_affected: row_count,
2743            has_more_rows: false,
2744            cursor_id,
2745        })
2746    }
2747
2748    /// Parse a PL/SQL response containing OUT parameter values
2749    ///
2750    /// PL/SQL responses may contain:
2751    /// - IoVector (11): bind directions for each parameter
2752    /// - RowData (7): OUT parameter values
2753    /// - FlushOutBinds (19): signals end of OUT bind data
2754    /// - Error (4): completion status
2755    fn parse_plsql_response(
2756        &self,
2757        payload: &[u8],
2758        caps: &Capabilities,
2759        params: &[BindParam],
2760    ) -> Result<PlsqlResult> {
2761        if payload.len() < 3 {
2762            return Err(Error::Protocol("PL/SQL response too short".to_string()));
2763        }
2764
2765        let mut buf = ReadBuffer::from_slice(payload);
2766
2767        // Skip data flags
2768        buf.skip(2)?;
2769
2770        let mut out_values: Vec<Value> = Vec::new();
2771        let mut _out_indices: Vec<usize> = Vec::new();
2772        let mut row_count: u64 = 0;
2773        let mut cursor_id: Option<u16> = None;
2774        let mut end_of_response = false;
2775        let mut implicit_results = ImplicitResults::new();
2776
2777        // Create column infos for OUT params based on their oracle types
2778        let mut out_columns: Vec<ColumnInfo> = Vec::new();
2779
2780        while !end_of_response && buf.remaining() > 0 {
2781            let msg_type = buf.read_u8()?;
2782
2783            match msg_type {
2784                // IoVector (11) - bind directions from server
2785                x if x == MessageType::IoVector as u8 => {
2786                    let (indices, cols) = self.parse_io_vector(&mut buf, params)?;
2787                    _out_indices = indices;
2788                    out_columns = cols;
2789                }
2790
2791                // RowHeader (6)
2792                x if x == MessageType::RowHeader as u8 => {
2793                    self.parse_row_header(&mut buf)?;
2794                }
2795
2796                // RowData (7) - OUT parameter values
2797                x if x == MessageType::RowData as u8 => {
2798                    if !out_columns.is_empty() {
2799                        let row = self.parse_row_data_single(&mut buf, &out_columns, caps)?;
2800                        // Extract values from the row into out_values
2801                        for (idx, value) in row.into_values().into_iter().enumerate() {
2802                            // Check if this is a cursor
2803                            if let Value::Cursor(cursor) = &value {
2804                                if cursor_id.is_none() && cursor.cursor_id() != 0 {
2805                                    cursor_id = Some(cursor.cursor_id());
2806                                }
2807                            }
2808                            // Map back to original param position if we have indices
2809                            if idx < out_values.len() {
2810                                out_values[idx] = value;
2811                            } else {
2812                                out_values.push(value);
2813                            }
2814                        }
2815                    } else {
2816                        // Skip the row data if we don't have column info
2817                        // This shouldn't normally happen
2818                        break;
2819                    }
2820                }
2821
2822                // DescribeInfo (16) - for REF CURSOR describe
2823                x if x == MessageType::DescribeInfo as u8 => {
2824                    buf.skip_raw_bytes_chunked()?;
2825                    let cursor_columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2826                    // Store cursor columns if needed
2827                    let _ = cursor_columns; // For now, just skip
2828                }
2829
2830                // FlushOutBinds (19) - signals end of OUT bind data
2831                x if x == MessageType::FlushOutBinds as u8 => {
2832                    // This indicates that OUT bind data is done
2833                    // Just continue to get the error/completion status
2834                }
2835
2836                // Error (4) - completion or error
2837                x if x == MessageType::Error as u8 => {
2838                    let (error_code, error_msg, _cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2839                    row_count = rc;
2840                    if error_code != 0 {
2841                        return Err(Error::OracleError {
2842                            code: error_code,
2843                            message: error_msg.unwrap_or_default(),
2844                        });
2845                    }
2846                    end_of_response = true;
2847                }
2848
2849                // Parameter (8) - return parameters
2850                x if x == MessageType::Parameter as u8 => {
2851                    self.parse_return_parameters(&mut buf)?;
2852                }
2853
2854                // Status (9)
2855                x if x == MessageType::Status as u8 => {
2856                    let _call_status = buf.read_ub4()?;
2857                    let _end_to_end_seq = buf.read_ub2()?;
2858                }
2859
2860                // ImplicitResultset (27) - result sets from DBMS_SQL.RETURN_RESULT
2861                x if x == MessageType::ImplicitResultset as u8 => {
2862                    let parsed_results = self.parse_implicit_results(&mut buf, caps)?;
2863                    implicit_results = parsed_results;
2864                }
2865
2866                _ => {
2867                    // Unknown message type - break to avoid parsing errors
2868                    break;
2869                }
2870            }
2871        }
2872
2873        // If no IoVector was received, all params might be IN-only
2874        // In that case, out_values should be empty
2875        Ok(PlsqlResult {
2876            out_values,
2877            rows_affected: row_count,
2878            cursor_id,
2879            implicit_results,
2880        })
2881    }
2882
2883    /// Parse implicit result sets from DBMS_SQL.RETURN_RESULT
2884    ///
2885    /// Format per Python base.pyx _process_implicit_result:
2886    /// - num_results: ub4 (number of implicit result sets)
2887    /// - For each result:
2888    ///   - num_bytes: ub1 + raw bytes (metadata to skip)
2889    ///   - describe_info: column metadata
2890    ///   - cursor_id: ub2
2891    fn parse_implicit_results(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<ImplicitResults> {
2892        let num_results = buf.read_ub4()?;
2893        let mut results = ImplicitResults::new();
2894
2895        for _ in 0..num_results {
2896            // Skip metadata bytes
2897            let num_bytes = buf.read_u8()?;
2898            if num_bytes > 0 {
2899                buf.skip(num_bytes as usize)?;
2900            }
2901
2902            // Parse column metadata for this result set
2903            let columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
2904
2905            // Read cursor ID
2906            let cursor_id = buf.read_ub2()?;
2907
2908            // Create implicit result with metadata but no rows yet
2909            // Rows will be fetched separately using fetch_implicit_result
2910            let result = ImplicitResult::new(cursor_id, columns, Vec::new());
2911            results.add(result);
2912        }
2913
2914        Ok(results)
2915    }
2916
2917    /// Parse IO Vector message to get bind directions
2918    ///
2919    /// Returns a tuple of:
2920    /// - indices of OUT/INOUT parameters in the params list
2921    /// - column infos for parsing OUT values
2922    fn parse_io_vector(
2923        &self,
2924        buf: &mut ReadBuffer,
2925        params: &[BindParam],
2926    ) -> Result<(Vec<usize>, Vec<ColumnInfo>)> {
2927        // I/O vector format (from Python reference):
2928        // - skip 1 byte (flag)
2929        // - read ub2 (num requests)
2930        // - read ub4 (num iters)
2931        // - num_binds = num_iters * 256 + num_requests
2932        // - skip ub4 (num iters this time)
2933        // - skip ub2 (uac buffer length)
2934        // - read ub2 (num_bytes for bit vector), skip if > 0
2935        // - read ub2 (num_bytes for rowid), skip if > 0
2936        // - for each bind: read ub1 (bind_dir)
2937
2938        buf.skip(1)?; // flag
2939        let num_requests = buf.read_ub2()? as u32;
2940        let num_iters = buf.read_ub4()?;
2941        let num_binds = num_iters * 256 + num_requests;
2942        let _ = buf.read_ub4()?; // num iters this time (discard)
2943        let _ = buf.read_ub2()?; // uac buffer length (discard)
2944
2945        // Bit vector
2946        let num_bytes = buf.read_ub2()? as usize;
2947        if num_bytes > 0 {
2948            buf.skip(num_bytes)?;
2949        }
2950
2951        // Rowid (raw bytes, not length-prefixed here)
2952        let num_bytes = buf.read_ub4()? as usize;
2953        if num_bytes > 0 {
2954            buf.skip_raw_bytes_chunked()?; // Skip rowid bytes using chunked read
2955        }
2956
2957        // Read bind directions
2958        let mut out_indices = Vec::new();
2959        let mut out_columns = Vec::new();
2960
2961        for i in 0..(num_binds as usize).min(params.len()) {
2962            let dir_byte = buf.read_u8()?;
2963            let dir = BindDirection::try_from(dir_byte).unwrap_or(BindDirection::Input);
2964
2965            // If this is not an INPUT-only parameter, it has OUT data
2966            if dir != BindDirection::Input {
2967                out_indices.push(i);
2968
2969                // Create a column info for parsing the OUT value
2970                let param = &params[i];
2971                let mut col = ColumnInfo::new(format!("OUT_{}", i), param.oracle_type);
2972                col.buffer_size = param.buffer_size;
2973                col.data_size = param.buffer_size;
2974                col.nullable = true;
2975
2976                // For collection OUT params, extract element type from the placeholder
2977                if let Some(Value::Collection(ref placeholder)) = param.value {
2978                    if let Some(Value::Integer(elem_type_code)) = placeholder.get("_element_type") {
2979                        col.element_type = crate::constants::OracleType::try_from(*elem_type_code as u8).ok();
2980                    }
2981                }
2982
2983                out_columns.push(col);
2984            }
2985        }
2986
2987        Ok((out_indices, out_columns))
2988    }
2989
2990    /// Parse row header (TNS_MSG_TYPE_ROW_HEADER = 6)
2991    fn parse_row_header(&self, buf: &mut ReadBuffer) -> Result<()> {
2992        buf.skip_ub1()?;  // flags
2993        buf.skip_ub2()?;  // num requests
2994        buf.skip_ub4()?;  // iteration number
2995        buf.skip_ub4()?;  // num iters
2996        buf.skip_ub2()?;  // buffer length
2997        let num_bytes = buf.read_ub4()? as usize;
2998        if num_bytes > 0 {
2999            buf.skip_ub1()?;  // skip repeated length
3000            buf.skip(num_bytes)?;  // bit vector
3001        }
3002        let num_bytes = buf.read_ub4()? as usize;
3003        if num_bytes > 0 {
3004            buf.skip_raw_bytes_chunked()?;  // rxhrid
3005        }
3006        Ok(())
3007    }
3008
3009    /// Parse return parameters (TNS_MSG_TYPE_PARAMETER = 8)
3010    fn parse_return_parameters(&self, buf: &mut ReadBuffer) -> Result<()> {
3011        self.parse_return_parameters_internal(buf, false).map(|_| ())
3012    }
3013
3014    /// Parse return parameters with optional row counts extraction
3015    /// When `want_row_counts` is true, attempts to read arraydmlrowcounts from the response.
3016    fn parse_return_parameters_internal(
3017        &self,
3018        buf: &mut ReadBuffer,
3019        want_row_counts: bool,
3020    ) -> Result<Option<Vec<u64>>> {
3021        // Per Python's _process_return_parameters
3022        let num_params = buf.read_ub2()?;  // al8o4l (ignored)
3023        for _ in 0..num_params {
3024            buf.skip_ub4()?;
3025        }
3026
3027        let al8txl = buf.read_ub2()?;  // al8txl (ignored)
3028        if al8txl > 0 {
3029            buf.skip(al8txl as usize)?;
3030        }
3031
3032        // num key/value pairs - skip for now
3033        let num_pairs = buf.read_ub2()?;
3034        for _ in 0..num_pairs {
3035            buf.read_bytes_with_length()?;  // text value
3036            buf.read_bytes_with_length()?;  // binary value
3037            buf.skip_ub2()?;  // keyword num
3038        }
3039
3040        // registration
3041        let num_bytes = buf.read_ub2()?;
3042        if num_bytes > 0 {
3043            buf.skip(num_bytes as usize)?;
3044        }
3045
3046        // If arraydmlrowcounts was requested, parse the row counts
3047        if want_row_counts && buf.remaining() >= 4 {
3048            let num_rows = buf.read_ub4()? as usize;
3049            let mut row_counts = Vec::with_capacity(num_rows);
3050            for _ in 0..num_rows {
3051                let count = buf.read_ub8()?;
3052                row_counts.push(count);
3053            }
3054            Ok(Some(row_counts))
3055        } else {
3056            Ok(None)
3057        }
3058    }
3059
3060    /// Parse a single row of data
3061    fn parse_row_data_single(
3062        &self,
3063        buf: &mut ReadBuffer,
3064        columns: &[ColumnInfo],
3065        caps: &Capabilities,
3066    ) -> Result<Row> {
3067        let mut values = Vec::with_capacity(columns.len());
3068
3069        for col in columns {
3070            let value = self.parse_column_value(buf, col, caps)?;
3071            values.push(value);
3072        }
3073
3074        Ok(Row::new(values))
3075    }
3076
3077    /// Parse a single row of data with bit vector support for duplicate column optimization
3078    ///
3079    /// Oracle sends a BitVector message before RowData when some columns have the same
3080    /// value as the previous row. Bits that are SET (1) indicate data is sent in the buffer;
3081    /// bits that are CLEAR (0) indicate the value should be copied from the previous row.
3082    fn parse_row_data_with_bitvector(
3083        &self,
3084        buf: &mut ReadBuffer,
3085        columns: &[ColumnInfo],
3086        caps: &Capabilities,
3087        bit_vector: Option<&[u8]>,
3088        previous_values: Option<&Vec<Value>>,
3089    ) -> Result<Row> {
3090        let mut values = Vec::with_capacity(columns.len());
3091
3092        for (col_idx, col) in columns.iter().enumerate() {
3093            // Check if this column is a duplicate (bit=0 means duplicate)
3094            let is_duplicate = if let Some(bv) = bit_vector {
3095                let byte_num = col_idx / 8;
3096                let bit_num = col_idx % 8;
3097                if byte_num < bv.len() {
3098                    // If bit is 0, it's a duplicate
3099                    (bv[byte_num] & (1 << bit_num)) == 0
3100                } else {
3101                    false
3102                }
3103            } else {
3104                false
3105            };
3106
3107            if is_duplicate {
3108                // Copy value from previous row
3109                if let Some(prev) = previous_values {
3110                    if col_idx < prev.len() {
3111                        values.push(prev[col_idx].clone());
3112                    } else {
3113                        // Shouldn't happen, but fallback to null
3114                        values.push(Value::Null);
3115                    }
3116                } else {
3117                    // No previous row (shouldn't happen for duplicate), fallback to null
3118                    values.push(Value::Null);
3119                }
3120            } else {
3121                // Read actual value from buffer
3122                let value = self.parse_column_value(buf, col, caps)?;
3123                values.push(value);
3124            }
3125        }
3126
3127        Ok(Row::new(values))
3128    }
3129
3130    /// Parse a single column value from the buffer
3131    fn parse_column_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo, caps: &Capabilities) -> Result<Value> {
3132        use crate::constants::OracleType;
3133
3134        // Handle LOB columns specially - they have a different format
3135        if col.is_lob() {
3136            return self.parse_lob_value(buf, col);
3137        }
3138
3139        // Handle CURSOR type - REF CURSOR from PL/SQL
3140        if col.oracle_type == OracleType::Cursor {
3141            return self.parse_cursor_value(buf, caps);
3142        }
3143
3144        // Handle Object type - collections (VARRAY, Nested Table) and UDTs
3145        if col.oracle_type == OracleType::Object {
3146            return self.parse_object_value(buf, col);
3147        }
3148
3149        // Read the value based on the column type
3150        // First, check if it's NULL
3151        let data = buf.read_bytes_with_length()?;
3152
3153        match data {
3154            None => Ok(Value::Null),
3155            Some(bytes) if bytes.is_empty() => Ok(Value::Null),
3156            Some(bytes) => {
3157                // Decode based on oracle type
3158                match col.oracle_type {
3159                    OracleType::Number => {
3160                        // Oracle NUMBER format - decode to string
3161                        let num = crate::types::decode_oracle_number(&bytes)?;
3162                        Ok(Value::String(num.value))
3163                    }
3164                    OracleType::Varchar | OracleType::Char | OracleType::Long => {
3165                        let s = String::from_utf8_lossy(&bytes).to_string();
3166                        Ok(Value::String(s))
3167                    }
3168                    OracleType::Raw | OracleType::LongRaw => {
3169                        // RAW/LONG RAW types - return as bytes
3170                        Ok(Value::Bytes(bytes.to_vec()))
3171                    }
3172                    OracleType::Date => {
3173                        // Oracle DATE format - 7 bytes
3174                        let date = crate::types::decode_oracle_date(&bytes)?;
3175                        Ok(Value::Date(date))
3176                    }
3177                    OracleType::Timestamp | OracleType::TimestampLtz => {
3178                        // Oracle TIMESTAMP format - 11 bytes (date + fractional seconds)
3179                        let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3180                        Ok(Value::Timestamp(ts))
3181                    }
3182                    OracleType::TimestampTz => {
3183                        // Oracle TIMESTAMP WITH TIME ZONE - 13 bytes
3184                        let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3185                        Ok(Value::Timestamp(ts))
3186                    }
3187                    _ => {
3188                        // Default: return as raw bytes or string
3189                        let s = String::from_utf8_lossy(&bytes).to_string();
3190                        Ok(Value::String(s))
3191                    }
3192                }
3193            }
3194        }
3195    }
3196
3197    /// Parse a REF CURSOR value from the buffer
3198    ///
3199    /// Per Python base.pyx lines 1038-1046:
3200    /// - Skip 1 byte (length indicator - fixed value)
3201    /// - Read describe info (column metadata for the cursor)
3202    /// - Read cursor_id (UB2)
3203    fn parse_cursor_value(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<Value> {
3204        use crate::types::RefCursor;
3205
3206        // Skip length indicator (fixed value for cursors)
3207        let _length = buf.read_u8()?;
3208
3209        // Read column metadata for this cursor
3210        let cursor_columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
3211
3212        // Read the cursor ID
3213        let cursor_id = buf.read_ub2()?;
3214
3215        // Create RefCursor with the metadata
3216        let ref_cursor = RefCursor::new(cursor_id, cursor_columns);
3217
3218        Ok(Value::Cursor(ref_cursor))
3219    }
3220
3221    /// Parse an Object/Collection value from the buffer
3222    ///
3223    /// Object format from Oracle (per Python packet.pyx read_dbobject):
3224    /// - UB4: type OID length, then type OID bytes if > 0
3225    /// - UB4: OID length, then OID bytes if > 0
3226    /// - UB4: snapshot length, then snapshot bytes if > 0 (discarded)
3227    /// - UB2: version (skip)
3228    /// - UB4: packed data length
3229    /// - UB2: flags (skip)
3230    /// - Bytes: packed data (pickle format)
3231    fn parse_object_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3232        use crate::dbobject::{CollectionType, DbObject, DbObjectType};
3233        use crate::types::decode_collection;
3234
3235        // Read type OID
3236        let toid_len = buf.read_ub4()?;
3237        let _toid = if toid_len > 0 {
3238            Some(buf.read_bytes_vec(toid_len as usize)?)
3239        } else {
3240            None
3241        };
3242
3243        // Read OID
3244        let oid_len = buf.read_ub4()?;
3245        let _oid = if oid_len > 0 {
3246            Some(buf.read_bytes_vec(oid_len as usize)?)
3247        } else {
3248            None
3249        };
3250
3251        // Read and discard snapshot
3252        let snapshot_len = buf.read_ub4()?;
3253        if snapshot_len > 0 {
3254            buf.skip_raw_bytes_chunked()?;
3255        }
3256
3257        // Skip version (length-prefixed UB2)
3258        let _version = buf.read_ub2()?;
3259
3260        // Read packed data length
3261        let data_len = buf.read_ub4()?;
3262
3263        // Skip flags (length-prefixed UB2)
3264        let _flags = buf.read_ub2()?;
3265
3266        if data_len == 0 {
3267            return Ok(Value::Null);
3268        }
3269
3270        // Read packed data (chunked format like other byte sequences)
3271        let packed_data = buf.read_bytes_with_length()?;
3272
3273        match packed_data {
3274            None => Ok(Value::Null),
3275            Some(data) if data.is_empty() => Ok(Value::Null),
3276            Some(data) => {
3277                // Create a placeholder type based on column info
3278                let type_name = col.type_name.clone().unwrap_or_else(|| "UNKNOWN".to_string());
3279
3280                // Try to determine if this is a collection based on the pickle data
3281                // The first byte contains flags - check for IS_COLLECTION (0x08)
3282                let is_collection = !data.is_empty() && (data[0] & 0x08) != 0;
3283
3284                if is_collection {
3285                    // Get element type from column info or default to VARCHAR
3286                    let element_type = col.element_type.unwrap_or(crate::constants::OracleType::Varchar);
3287
3288                    // Determine collection type from pickle flags
3289                    // Collection flags are after header - but we'll default for now
3290                    let collection_type = CollectionType::Varray;
3291
3292                    let obj_type = DbObjectType::collection(
3293                        &col.type_schema.clone().unwrap_or_default(),
3294                        &type_name,
3295                        collection_type,
3296                        element_type,
3297                    );
3298
3299                    match decode_collection(&obj_type, &data) {
3300                        Ok(collection) => Ok(Value::Collection(collection)),
3301                        Err(e) => {
3302                            tracing::warn!("Failed to decode collection: {}, data: {:02x?}", e, &data[..std::cmp::min(20, data.len())]);
3303                            // Return raw bytes as fallback
3304                            Ok(Value::Bytes(data))
3305                        }
3306                    }
3307                } else {
3308                    // Regular object type - not yet fully implemented
3309                    let mut obj = DbObject::new(&type_name);
3310                    // Store raw pickle data for later inspection
3311                    obj.set("_raw_data", Value::Bytes(data));
3312                    Ok(Value::Collection(obj))
3313                }
3314            }
3315        }
3316    }
3317
3318    /// Parse a LOB column value from the buffer
3319    ///
3320    /// LOB format from Oracle (per Python's read_lob_with_length):
3321    /// - UB4: num_bytes (indicator that LOB data follows)
3322    /// - If num_bytes > 0:
3323    ///   - For non-BFILE: UB8 size, UB4 chunk_size
3324    ///   - Bytes: LOB locator (chunked format)
3325    ///
3326    /// The actual LOB content must be fetched separately using LOB operations.
3327    /// For JSON columns, the data is OSON-encoded and decoded directly.
3328    /// For VECTOR columns, the data is decoded from Oracle's vector binary format.
3329    fn parse_lob_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3330        use crate::constants::OracleType;
3331        use crate::types::{decode_vector, OsonDecoder};
3332
3333        // Read length indicator
3334        let num_bytes = buf.read_ub4()?;
3335
3336        if num_bytes == 0 {
3337            // For JSON, null is Value::Json(serde_json::Value::Null)
3338            if col.oracle_type == OracleType::Json || col.is_json {
3339                return Ok(Value::Json(serde_json::Value::Null));
3340            }
3341            // For VECTOR, null is Value::Null
3342            if col.oracle_type == OracleType::Vector {
3343                return Ok(Value::Null);
3344            }
3345            return Ok(Value::Lob(LobValue::Null));
3346        }
3347
3348        // For BFILE, there's no size/chunk_size metadata
3349        let (size, chunk_size) = if col.oracle_type == OracleType::Bfile {
3350            (0u64, 0u32)
3351        } else {
3352            // Read LOB size and chunk size
3353            let size = buf.read_ub8()?;
3354            let chunk_size = buf.read_ub4()?;
3355            (size, chunk_size)
3356        };
3357
3358        // Read LOB data (could be locator or inline data depending on size)
3359        let data_bytes = buf.read_bytes_with_length()?;
3360
3361        // Handle JSON columns - decode OSON format
3362        // JSON is sent as a LOB with prefetched data + a LOB locator that must be consumed
3363        if col.oracle_type == OracleType::Json || col.is_json {
3364            // Read and discard the LOB locator
3365            let _locator = buf.read_bytes_with_length()?;
3366
3367            if let Some(data) = data_bytes {
3368                if !data.is_empty() {
3369                    // Decode OSON to JSON
3370                    match OsonDecoder::decode(bytes::Bytes::from(data)) {
3371                        Ok(json_value) => return Ok(Value::Json(json_value)),
3372                        Err(e) => {
3373                            tracing::warn!("Failed to decode OSON: {}", e);
3374                            return Ok(Value::Json(serde_json::Value::Null));
3375                        }
3376                    }
3377                }
3378            }
3379            return Ok(Value::Json(serde_json::Value::Null));
3380        }
3381
3382        // Handle VECTOR columns - decode vector binary format
3383        if col.oracle_type == OracleType::Vector {
3384            // Read and discard LOB locator (not needed for VECTOR)
3385            let _locator = buf.read_bytes_with_length()?;
3386
3387            if let Some(data) = data_bytes {
3388                if !data.is_empty() {
3389                    match decode_vector(&data) {
3390                        Ok(vector) => return Ok(Value::Vector(vector)),
3391                        Err(e) => {
3392                            tracing::warn!("Failed to decode VECTOR: {}", e);
3393                            return Ok(Value::Null);
3394                        }
3395                    }
3396                }
3397            }
3398            return Ok(Value::Null);
3399        }
3400
3401        // Create a LOB locator for fetching the data later
3402        if let Some(locator_data) = data_bytes {
3403            if !locator_data.is_empty() {
3404                let locator = LobLocator::new(
3405                    bytes::Bytes::from(locator_data),
3406                    size,
3407                    chunk_size,
3408                    col.oracle_type,
3409                    col.csfrm,
3410                );
3411                return Ok(Value::Lob(LobValue::locator(locator)));
3412            }
3413        }
3414
3415        // If we have size but no locator, it might be an empty LOB
3416        if size == 0 {
3417            return Ok(Value::Lob(LobValue::Empty));
3418        }
3419
3420        // Empty LOB (shouldn't normally reach here)
3421        Ok(Value::Lob(LobValue::Empty))
3422    }
3423
3424    /// Parse error info message and extract cursor_id
3425    /// Format per Python's _process_error_info in base.pyx
3426    fn parse_error_info(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16)> {
3427        // End of call status
3428        let _call_status = buf.read_ub4()?;
3429        // End to end seq#
3430        buf.skip_ub2()?;
3431        // Current row number
3432        buf.skip_ub4()?;
3433        // Error number (short form)
3434        buf.skip_ub2()?;
3435        // Array elem error
3436        buf.skip_ub2()?;
3437        // Array elem error
3438        buf.skip_ub2()?;
3439        // Cursor ID
3440        let cursor_id = buf.read_ub2()?;
3441        // Error position
3442        let _error_pos = buf.read_sb2()?;
3443        // SQL type (19c and earlier)
3444        buf.skip_ub1()?;
3445        // Fatal?
3446        buf.skip_ub1()?;
3447        // Flags
3448        buf.skip_ub1()?;
3449        // User cursor options
3450        buf.skip_ub1()?;
3451        // UPI parameter
3452        buf.skip_ub1()?;
3453        // Flags (second)
3454        buf.skip_ub1()?;
3455        // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3456        buf.skip_ub4()?; // rba
3457        buf.skip_ub2()?; // partition_id
3458        buf.skip_ub1()?; // skip
3459        buf.skip_ub4()?; // block_num
3460        buf.skip_ub2()?; // slot_num
3461        // OS error
3462        buf.skip_ub4()?;
3463        // Statement number
3464        buf.skip_ub1()?;
3465        // Call number
3466        buf.skip_ub1()?;
3467        // Padding
3468        buf.skip_ub2()?;
3469        // Success iters
3470        buf.skip_ub4()?;
3471        // oerrdd (logical rowid)
3472        let oerrdd_len = buf.read_ub4()?;
3473        if oerrdd_len > 0 {
3474            buf.skip_raw_bytes_chunked()?;
3475        }
3476
3477        // Batch error codes array
3478        let num_batch_errors = buf.read_ub2()?;
3479        if num_batch_errors > 0 {
3480            buf.skip_ub1()?;  // first byte
3481            for _ in 0..num_batch_errors {
3482                buf.skip_ub2()?;  // error code
3483            }
3484        }
3485
3486        // Batch error row offset array
3487        let num_offsets = buf.read_ub4()?;
3488        if num_offsets > 0 {
3489            buf.skip_ub1()?;  // first byte
3490            for _ in 0..num_offsets {
3491                buf.skip_ub4()?;  // offset
3492            }
3493        }
3494
3495        // Batch error messages array
3496        let num_batch_msgs = buf.read_ub2()?;
3497        if num_batch_msgs > 0 {
3498            buf.skip_ub1()?;  // packed size
3499            for _ in 0..num_batch_msgs {
3500                buf.skip_ub2()?;  // chunk length
3501                buf.read_string_with_length()?;  // message
3502                buf.skip(2)?;  // end marker
3503            }
3504        }
3505
3506        // Extended error number (UB4)
3507        let error_code = buf.read_ub4()?;
3508        // Row count (UB8)
3509        let _row_count = buf.read_ub8()?;
3510
3511        // Error message
3512        let error_msg = if error_code != 0 {
3513            buf.read_string_with_length()?.map(|s| s.trim().to_string())
3514        } else {
3515            None
3516        };
3517
3518        Ok((error_code, error_msg, cursor_id))
3519    }
3520
3521    /// Parse error response packet (received after marker reset)
3522    fn parse_error_response(&self, payload: &[u8]) -> Result<QueryResult> {
3523        if payload.len() < 3 {
3524            return Err(Error::Protocol("Error response too short".to_string()));
3525        }
3526
3527        let mut buf = ReadBuffer::from_slice(payload);
3528
3529        // Skip data flags
3530        buf.skip(2)?;
3531
3532        // Read message type
3533        let msg_type = buf.read_u8()?;
3534
3535        // Check for error message type (4)
3536        if msg_type == MessageType::Error as u8 {
3537            // Parse error info per Python's _process_error_info
3538            let _call_status = buf.read_ub4()?;  // end of call status
3539            buf.skip_ub2()?;  // end to end seq#
3540            buf.skip_ub4()?;  // current row number
3541            buf.skip_ub2()?;  // error number (short form)
3542            buf.skip_ub2()?;  // array elem error
3543            buf.skip_ub2()?;  // array elem error
3544            let _cursor_id = buf.read_ub2()?;  // cursor id
3545            let _error_pos = buf.read_sb2()?;  // error position
3546            buf.skip_ub1()?;  // sql type (19c and earlier)
3547            buf.skip_ub1()?;  // fatal?
3548            buf.skip_ub1()?;  // flags
3549            buf.skip_ub1()?;  // user cursor options
3550            buf.skip_ub1()?;  // UPI parameter
3551            buf.skip_ub1()?;  // flags
3552
3553            // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3554            buf.skip_ub4()?; // rba
3555            buf.skip_ub2()?; // partition_id
3556            buf.skip_ub1()?; // skip
3557            buf.skip_ub4()?; // block_num
3558            buf.skip_ub2()?; // slot_num
3559
3560            buf.skip_ub4()?;  // OS error
3561            buf.skip_ub1()?;  // statement number
3562            buf.skip_ub1()?;  // call number
3563            buf.skip_ub2()?;  // padding
3564            buf.skip_ub4()?;  // success iters
3565
3566            // oerrdd (logical rowid)
3567            let oerrdd_len = buf.read_ub4()?;
3568            if oerrdd_len > 0 {
3569                buf.skip_raw_bytes_chunked()?;
3570            }
3571
3572            // batch error codes array
3573            let num_batch_errors = buf.read_ub2()?;
3574            if num_batch_errors > 0 {
3575                // Skip batch error data - we don't process it for now
3576                buf.skip_ub1()?;  // first byte
3577                for _ in 0..num_batch_errors {
3578                    buf.skip_ub2()?;  // error code
3579                }
3580            }
3581
3582            // batch error row offset array
3583            let num_offsets = buf.read_ub4()?;
3584            if num_offsets > 0 {
3585                buf.skip_ub1()?;  // first byte
3586                for _ in 0..num_offsets {
3587                    buf.skip_ub4()?;  // offset
3588                }
3589            }
3590
3591            // batch error messages array
3592            let num_batch_msgs = buf.read_ub2()?;
3593            if num_batch_msgs > 0 {
3594                // Skip batch error messages
3595                buf.skip_ub1()?;  // packed size
3596                for _ in 0..num_batch_msgs {
3597                    buf.skip_ub2()?;  // chunk length
3598                    buf.read_string_with_length()?;  // message
3599                    buf.skip(2)?;  // end marker
3600                }
3601            }
3602
3603            // Extended error number (UB4)
3604            let error_num = buf.read_ub4()?;
3605            let _row_count = buf.read_ub8()?;  // row number (extended)
3606
3607            // Read error message
3608            let error_msg = if error_num != 0 {
3609                buf.read_string_with_length()?.map(|s| s.trim().to_string())
3610            } else {
3611                None
3612            };
3613
3614            return Err(Error::OracleError {
3615                code: error_num,
3616                message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_num)),
3617            });
3618        }
3619
3620        // If not an error message type, return generic error
3621        Err(Error::Protocol(format!(
3622            "Expected error message type 4, got {}",
3623            msg_type
3624        )))
3625    }
3626
3627    /// Parse DML response to extract rows affected
3628    fn parse_dml_response(&self, payload: &[u8]) -> Result<QueryResult> {
3629        if payload.len() < 3 {
3630            return Err(Error::Protocol("DML response too short".to_string()));
3631        }
3632
3633        let mut buf = ReadBuffer::from_slice(payload);
3634
3635        // Skip data flags
3636        buf.skip(2)?;
3637
3638        let mut rows_affected: u64 = 0;
3639        let mut cursor_id: u16 = 0;
3640        let mut end_of_response = false;
3641
3642        // Process messages until end_of_response or out of data
3643        // Note: If supports_end_of_response is true, we must continue until msg type 29
3644        while !end_of_response && buf.remaining() > 0 {
3645            let msg_type = buf.read_u8()?;
3646
3647            match msg_type {
3648                // Error (4) - may contain error or success info
3649                x if x == MessageType::Error as u8 => {
3650                    let (error_code, error_msg, cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
3651                    cursor_id = cid;
3652                    rows_affected = row_count;
3653                    if error_code != 0 && error_code != 1403 {
3654                        return Err(Error::OracleError {
3655                            code: error_code,
3656                            message: error_msg.unwrap_or_default(),
3657                        });
3658                    }
3659                    // Only end if server doesn't support end_of_response
3660                    // Otherwise, continue until we get msg type 29
3661                }
3662
3663                // Parameter (8) - return parameters
3664                x if x == MessageType::Parameter as u8 => {
3665                    self.parse_return_parameters(&mut buf)?;
3666                }
3667
3668                // Status (9) - call status
3669                x if x == MessageType::Status as u8 => {
3670                    let _call_status = buf.read_ub4()?;
3671                    let _end_to_end_seq = buf.read_ub2()?;
3672                }
3673
3674                // BitVector (21)
3675                21 => {
3676                    let _num_columns_sent = buf.read_ub2()?;
3677                    // No columns for DML, but read the byte if present
3678                    if buf.remaining() > 0 {
3679                        let _byte = buf.read_u8()?;
3680                    }
3681                }
3682
3683                // End of Response (29) - explicit end marker
3684                29 => {
3685                    end_of_response = true;
3686                }
3687
3688                _ => {
3689                    // Unknown message type - continue processing
3690                }
3691            }
3692        }
3693
3694        Ok(QueryResult {
3695            columns: Vec::new(),
3696            rows: Vec::new(),
3697            rows_affected,
3698            has_more_rows: false,
3699            cursor_id,
3700        })
3701    }
3702
3703    /// Parse error info and return (error_code, error_msg, cursor_id, row_count)
3704    fn parse_error_info_with_rowcount(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16, u64)> {
3705        // End of call status
3706        let _call_status = buf.read_ub4()?;
3707        // End to end seq#
3708        buf.skip_ub2()?;
3709        // Current row number
3710        buf.skip_ub4()?;
3711        // Error number (short form)
3712        buf.skip_ub2()?;
3713        // Array elem error
3714        buf.skip_ub2()?;
3715        // Array elem error
3716        buf.skip_ub2()?;
3717        // Cursor ID
3718        let cursor_id = buf.read_ub2()?;
3719        // Error position
3720        let _error_pos = buf.read_sb2()?;
3721        // SQL type (19c and earlier)
3722        buf.skip_ub1()?;
3723        // Fatal?
3724        buf.skip_ub1()?;
3725        // Flags
3726        buf.skip_ub1()?;
3727        // User cursor options
3728        buf.skip_ub1()?;
3729        // UPI parameter
3730        buf.skip_ub1()?;
3731        // Flags (second)
3732        buf.skip_ub1()?;
3733        // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3734        buf.skip_ub4()?; // rba
3735        buf.skip_ub2()?; // partition_id
3736        buf.skip_ub1()?; // skip
3737        buf.skip_ub4()?; // block_num
3738        buf.skip_ub2()?; // slot_num
3739        // OS error
3740        buf.skip_ub4()?;
3741        // Statement number
3742        buf.skip_ub1()?;
3743        // Call number
3744        buf.skip_ub1()?;
3745        // Padding
3746        buf.skip_ub2()?;
3747        // Success iters
3748        buf.skip_ub4()?;
3749        // oerrdd (logical rowid)
3750        let oerrdd_len = buf.read_ub4()?;
3751        if oerrdd_len > 0 {
3752            buf.skip_raw_bytes_chunked()?;
3753        }
3754
3755        // Batch error codes array
3756        let num_batch_errors = buf.read_ub2()?;
3757        if num_batch_errors > 0 {
3758            buf.skip_ub1()?;  // first byte
3759            for _ in 0..num_batch_errors {
3760                buf.skip_ub2()?;  // error code
3761            }
3762        }
3763
3764        // Batch error row offset array
3765        let num_offsets = buf.read_ub4()?;
3766        if num_offsets > 0 {
3767            buf.skip_ub1()?;  // first byte
3768            for _ in 0..num_offsets {
3769                buf.skip_ub4()?;  // offset
3770            }
3771        }
3772
3773        // Batch error messages array
3774        let num_batch_msgs = buf.read_ub2()?;
3775        if num_batch_msgs > 0 {
3776            buf.skip_ub1()?;  // packed size
3777            for _ in 0..num_batch_msgs {
3778                buf.skip_ub2()?;  // chunk length
3779                buf.read_string_with_length()?;  // message
3780                buf.skip(2)?;  // end marker
3781            }
3782        }
3783
3784        // Extended error number (UB4)
3785        let error_code = buf.read_ub4()?;
3786        // Row count (UB8) - this is the rows affected!
3787        let row_count = buf.read_ub8()?;
3788
3789        // Fields added in Oracle Database 20c (TTC field version >= 16)
3790        // We always skip these since we support Oracle 20c+
3791        buf.skip_ub4()?; // sql_type
3792        buf.skip_ub4()?; // server_checksum
3793
3794        // Error message
3795        let error_msg = if error_code != 0 {
3796            buf.read_string_with_length()?.map(|s| s.trim().to_string())
3797        } else {
3798            None
3799        };
3800
3801        Ok((error_code, error_msg, cursor_id, row_count))
3802    }
3803
3804    /// Parse describe info from response to extract column metadata
3805    ///
3806    /// Per Python's _process_describe_info, the format is:
3807    /// - UB4: max row size (skip)
3808    /// - UB4: number of columns
3809    /// - If num_columns > 0: UB1 (skip one byte)
3810    /// - For each column: metadata fields
3811    /// - After columns: current date, dcb flags, etc.
3812    fn parse_describe_info(&self, buf: &mut ReadBuffer, ttc_field_version: u8) -> Result<Vec<ColumnInfo>> {
3813        use crate::constants::ccap_value;
3814
3815        // Skip max row size
3816        buf.skip_ub4()?;
3817
3818        // Read number of columns
3819        let num_columns = buf.read_ub4()? as usize;
3820        if num_columns == 0 {
3821            return Ok(Vec::new());
3822        }
3823
3824        // Skip one byte if we have columns
3825        buf.skip_ub1()?;
3826
3827        let mut columns = Vec::with_capacity(num_columns);
3828
3829        for _col_idx in 0..num_columns {
3830
3831            // Parse column metadata per Python's _process_metadata
3832            let ora_type_num = buf.read_u8()?;
3833            buf.skip_ub1()?; // flags
3834            let precision = buf.read_u8()?; // precision as SB1
3835            let scale = buf.read_u8()?; // scale as SB1
3836            let buffer_size = buf.read_ub4()?;
3837
3838            buf.skip_ub4()?; // max_num_array_elements
3839            buf.skip_ub8()?; // cont_flags
3840            let _oid = buf.read_bytes_with_length()?; // OID
3841            buf.skip_ub2()?; // version
3842            buf.skip_ub2()?; // charset_id
3843            let _csfrm = buf.read_u8()?; // charset form
3844            let max_size = buf.read_ub4()?;
3845
3846            // For TTC field version >= 12.2 (8), skip oaccolid
3847            if ttc_field_version >= ccap_value::FIELD_VERSION_12_2 {
3848                buf.skip_ub4()?; // oaccolid
3849            }
3850
3851            let _nulls_allowed = buf.read_u8()?;
3852            buf.skip_ub1()?; // v7 length of name
3853            let name = buf.read_string_with_ub4_length()?.unwrap_or_default();
3854            let _schema = buf.read_string_with_ub4_length()?; // schema
3855            let _type_name = buf.read_string_with_ub4_length()?; // type_name
3856            buf.skip_ub2()?; // column position
3857            buf.skip_ub4()?; // uds_flags
3858
3859            // For TTC field version >= 23.1 (17), read domain fields
3860            if ttc_field_version >= ccap_value::FIELD_VERSION_23_1 {
3861                let _domain_schema = buf.read_string_with_ub4_length()?;
3862                let _domain_name = buf.read_string_with_ub4_length()?;
3863            }
3864
3865            // For TTC field version >= 20 (23.1_EXT_3), read annotations
3866            if ttc_field_version >= 20 {
3867                let num_annotations = buf.read_ub4()?;
3868                if num_annotations > 0 {
3869                    buf.skip_ub1()?;
3870                    // Read the actual annotations count (yes, it's read twice in Python)
3871                    let actual_num = buf.read_ub4()?;
3872                    buf.skip_ub1()?;
3873                    for _ in 0..actual_num {
3874                        // Skip annotation key and value (both are string with UB4 length)
3875                        let _key = buf.read_string_with_ub4_length()?;
3876                        let _value = buf.read_string_with_ub4_length()?;
3877                        buf.skip_ub4()?; // flags per annotation
3878                    }
3879                    buf.skip_ub4()?; // final flags
3880                }
3881            }
3882
3883            // For TTC field version >= 24 (23.4), read vector fields
3884            if ttc_field_version >= ccap_value::FIELD_VERSION_23_4 {
3885                buf.skip_ub4()?; // vector_dimensions
3886                buf.skip_ub1()?; // vector_format
3887                buf.skip_ub1()?; // vector_flags
3888            }
3889
3890            // Convert data type to OracleType
3891            let oracle_type = crate::constants::OracleType::try_from(ora_type_num)
3892                .unwrap_or(crate::constants::OracleType::Varchar);
3893
3894            let mut col = ColumnInfo::new(&name, oracle_type);
3895            col.data_size = if max_size > 0 { max_size } else { buffer_size };
3896            col.precision = precision as i16;
3897            col.scale = scale as i16;
3898            columns.push(col);
3899        }
3900
3901        // After columns: skip remaining describe info fields
3902        // Python's _process_describe_info uses:
3903        //   buf.read_ub4(&num_bytes)
3904        //   if num_bytes > 0:
3905        //       buf.skip_raw_bytes_chunked()    # current date
3906        //   buf.skip_ub4()                      # dcbflag
3907        //   buf.skip_ub4()                      # dcbmdbz
3908        //   buf.skip_ub4()                      # dcbmnpr
3909        //   buf.skip_ub4()                      # dcbmxpr
3910        //   buf.read_ub4(&num_bytes)
3911        //   if num_bytes > 0:
3912        //       buf.skip_raw_bytes_chunked()    # dcbqcky
3913
3914        // current_date - read UB4 indicator first, then skip chunked bytes if > 0
3915        let current_date_indicator = buf.read_ub4()?;
3916        if current_date_indicator > 0 {
3917            buf.skip_raw_bytes_chunked()?;
3918        }
3919
3920        // dcb* fields as UB4
3921        buf.skip_ub4()?; // dcbflag
3922        buf.skip_ub4()?; // dcbmdbz
3923        buf.skip_ub4()?; // dcbmnpr
3924        buf.skip_ub4()?; // dcbmxpr
3925
3926        // dcbqcky - read UB4 indicator first, then skip chunked bytes if > 0
3927        let dcbqcky_indicator = buf.read_ub4()?;
3928        if dcbqcky_indicator > 0 {
3929            buf.skip_raw_bytes_chunked()?;
3930        }
3931
3932        // After dcbqcky, the next message (RowHeader) follows directly
3933        // No additional fields to skip here
3934
3935
3936        Ok(columns)
3937    }
3938
3939    /// Commit the current transaction.
3940    ///
3941    /// Makes all changes in the current transaction permanent. After commit,
3942    /// a new transaction begins automatically.
3943    ///
3944    /// # Example
3945    ///
3946    /// ```rust,no_run
3947    /// # use oracle_rs::{Connection, Value};
3948    /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3949    /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Alice".into()]).await?;
3950    /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Bob".into()]).await?;
3951    /// conn.commit().await?; // Both inserts are now permanent
3952    /// # Ok(())
3953    /// # }
3954    /// ```
3955    pub async fn commit(&self) -> Result<()> {
3956        self.ensure_ready().await?;
3957        // Use SQL COMMIT statement instead of simple function
3958        // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3959        self.execute("COMMIT", &[]).await?;
3960        Ok(())
3961    }
3962
3963    /// Rollback the current transaction.
3964    ///
3965    /// Discards all changes made in the current transaction. After rollback,
3966    /// a new transaction begins automatically.
3967    ///
3968    /// # Example
3969    ///
3970    /// ```rust,no_run
3971    /// # use oracle_rs::{Connection, Value};
3972    /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3973    /// conn.execute("DELETE FROM users WHERE id = :1", &[1.into()]).await?;
3974    /// // Oops, wrong user!
3975    /// conn.rollback().await?; // Delete is undone
3976    /// # Ok(())
3977    /// # }
3978    /// ```
3979    pub async fn rollback(&self) -> Result<()> {
3980        self.ensure_ready().await?;
3981        // Use SQL ROLLBACK statement instead of simple function
3982        // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3983        self.execute("ROLLBACK", &[]).await?;
3984        Ok(())
3985    }
3986
3987    /// Create a savepoint within the current transaction
3988    ///
3989    /// Savepoints allow partial rollback of a transaction. You can create multiple
3990    /// savepoints and rollback to any of them without affecting work done before
3991    /// that savepoint.
3992    ///
3993    /// # Arguments
3994    /// * `name` - The savepoint name (must be a valid Oracle identifier)
3995    ///
3996    /// # Example
3997    /// ```rust,ignore
3998    /// conn.execute("INSERT INTO t VALUES (1)", &[]).await?;
3999    /// conn.savepoint("sp1").await?;
4000    /// conn.execute("INSERT INTO t VALUES (2)", &[]).await?;
4001    /// conn.rollback_to_savepoint("sp1").await?; // Undoes the second insert
4002    /// conn.commit().await?; // Commits only the first insert
4003    /// ```
4004    pub async fn savepoint(&self, name: &str) -> Result<()> {
4005        self.ensure_ready().await?;
4006        self.execute(&format!("SAVEPOINT {}", name), &[]).await?;
4007        Ok(())
4008    }
4009
4010    /// Rollback to a previously created savepoint
4011    ///
4012    /// This undoes all changes made after the savepoint was created, but keeps
4013    /// the transaction active. Changes made before the savepoint are preserved.
4014    ///
4015    /// # Arguments
4016    /// * `name` - The savepoint name to rollback to
4017    pub async fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4018        self.ensure_ready().await?;
4019        self.execute(&format!("ROLLBACK TO SAVEPOINT {}", name), &[]).await?;
4020        Ok(())
4021    }
4022
4023    /// Ping the server to check if the connection is still alive.
4024    ///
4025    /// This executes a lightweight query (`SELECT 1 FROM DUAL`) to verify
4026    /// the connection is responsive. Useful for connection health checks
4027    /// in pooling scenarios.
4028    ///
4029    /// # Example
4030    ///
4031    /// ```rust,no_run
4032    /// # use oracle_rs::Connection;
4033    /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
4034    /// if conn.ping().await.is_ok() {
4035    ///     println!("Connection is alive");
4036    /// } else {
4037    ///     println!("Connection is dead");
4038    /// }
4039    /// # Ok(())
4040    /// # }
4041    /// ```
4042    pub async fn ping(&self) -> Result<()> {
4043        self.ensure_ready().await?;
4044        // Use SELECT FROM DUAL instead of simple function
4045        // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
4046        self.query("SELECT 1 FROM DUAL", &[]).await?;
4047        Ok(())
4048    }
4049
4050    /// Clear the statement cache
4051    ///
4052    /// This should be called when recycling a connection in a pool to ensure
4053    /// that any stale cursor state is cleared. This is useful after errors
4054    /// or when the connection state may be inconsistent.
4055    pub async fn clear_statement_cache(&self) {
4056        let mut inner = self.inner.lock().await;
4057        if let Some(ref mut cache) = inner.statement_cache {
4058            cache.clear();
4059        }
4060    }
4061
4062    /// Read data from a LOB (CLOB or BLOB)
4063    ///
4064    /// # Arguments
4065    /// * `locator` - The LOB locator obtained from a query result
4066    /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4067    /// * `amount` - Amount to read (0 for entire LOB)
4068    ///
4069    /// # Returns
4070    /// For CLOB: returns the text content as a String
4071    /// For BLOB: returns the binary content as bytes
4072    pub async fn read_lob(&self, locator: &LobLocator) -> Result<LobData> {
4073        self.ensure_ready().await?;
4074
4075        // Read the entire LOB starting at offset 1
4076        let offset = 1u64;
4077        let amount = locator.size();
4078
4079        self.read_lob_internal(locator, offset, amount).await
4080    }
4081
4082    /// Read a portion of a LOB
4083    pub async fn read_lob_range(
4084        &self,
4085        locator: &LobLocator,
4086        offset: u64,
4087        amount: u64,
4088    ) -> Result<LobData> {
4089        self.ensure_ready().await?;
4090        self.read_lob_internal(locator, offset, amount).await
4091    }
4092
4093    /// Read a CLOB and return as String
4094    ///
4095    /// This is a convenience method for reading CLOB data directly as a String.
4096    /// Returns an error if the LOB is not a CLOB.
4097    pub async fn read_clob(&self, locator: &LobLocator) -> Result<String> {
4098        if locator.is_blob() || locator.is_bfile() {
4099            return Err(Error::Protocol(
4100                "Cannot read BLOB/BFILE as string, use read_blob instead".to_string(),
4101            ));
4102        }
4103
4104        let data = self.read_lob(locator).await?;
4105        match data {
4106            LobData::String(s) => Ok(s),
4107            LobData::Bytes(_) => Err(Error::Protocol(
4108                "Unexpected bytes from CLOB read".to_string(),
4109            )),
4110        }
4111    }
4112
4113    /// Read a BLOB and return as bytes
4114    ///
4115    /// This is a convenience method for reading BLOB data directly as bytes.
4116    /// Returns an error if the LOB is a CLOB (use read_clob instead).
4117    pub async fn read_blob(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4118        if locator.is_clob() {
4119            return Err(Error::Protocol(
4120                "Cannot read CLOB as bytes, use read_clob instead".to_string(),
4121            ));
4122        }
4123
4124        let data = self.read_lob(locator).await?;
4125        match data {
4126            LobData::Bytes(b) => Ok(b),
4127            LobData::String(_) => Err(Error::Protocol(
4128                "Unexpected string from BLOB read".to_string(),
4129            )),
4130        }
4131    }
4132
4133    /// Read a LOB in chunks, calling a callback for each chunk
4134    ///
4135    /// This is useful for processing large LOBs without loading the entire
4136    /// content into memory. The callback receives each chunk as it's read.
4137    ///
4138    /// # Arguments
4139    /// * `locator` - The LOB locator
4140    /// * `chunk_size` - Size of each chunk to read (0 uses the LOB's natural chunk size)
4141    /// * `callback` - Async function called for each chunk
4142    ///
4143    /// # Example
4144    /// ```ignore
4145    /// let mut total_size = 0;
4146    /// conn.read_lob_chunked(&locator, 8192, |chunk| async move {
4147    ///     match chunk {
4148    ///         LobData::Bytes(b) => total_size += b.len(),
4149    ///         LobData::String(s) => total_size += s.len(),
4150    ///     }
4151    ///     Ok(())
4152    /// }).await?;
4153    /// ```
4154    pub async fn read_lob_chunked<F, Fut>(
4155        &self,
4156        locator: &LobLocator,
4157        chunk_size: u64,
4158        mut callback: F,
4159    ) -> Result<()>
4160    where
4161        F: FnMut(LobData) -> Fut,
4162        Fut: std::future::Future<Output = Result<()>>,
4163    {
4164        self.ensure_ready().await?;
4165
4166        let total_size = locator.size();
4167        if total_size == 0 {
4168            return Ok(());
4169        }
4170
4171        // Use LOB's natural chunk size if not specified
4172        let chunk_size = if chunk_size == 0 {
4173            self.lob_chunk_size(locator).await?.max(8192) as u64
4174        } else {
4175            chunk_size
4176        };
4177
4178        let mut offset = 1u64;
4179        while offset <= total_size {
4180            let remaining = total_size - offset + 1;
4181            let amount = std::cmp::min(remaining, chunk_size);
4182
4183            let chunk = self.read_lob_internal(locator, offset, amount).await?;
4184            callback(chunk).await?;
4185
4186            offset += amount;
4187        }
4188
4189        Ok(())
4190    }
4191
4192    /// Get the optimal chunk size for a LOB
4193    ///
4194    /// This returns the chunk size that Oracle recommends for efficient
4195    /// reading and writing to this LOB.
4196    pub async fn lob_chunk_size(&self, locator: &LobLocator) -> Result<u32> {
4197        self.ensure_ready().await?;
4198
4199        let mut inner = self.inner.lock().await;
4200        let large_sdu = inner.large_sdu;
4201
4202        // Create LOB operation message for get chunk size
4203        let mut lob_msg = LobOpMessage::new_get_chunk_size(locator);
4204        let seq_num = inner.next_sequence_number();
4205        lob_msg.set_sequence_number(seq_num);
4206
4207        // Build and send the request
4208        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4209        inner.send(&request).await?;
4210
4211        // Receive and parse response
4212        let response = inner.receive().await?;
4213        if response.len() <= PACKET_HEADER_SIZE {
4214            return Err(Error::Protocol("Empty LOB chunk size response".to_string()));
4215        }
4216
4217        // Check for MARKER packet
4218        let packet_type = response[4];
4219        if packet_type == PacketType::Marker as u8 {
4220            let error_response = inner.handle_marker_reset().await?;
4221            let payload = &error_response[PACKET_HEADER_SIZE..];
4222            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4223            buf.skip(2)?;
4224            return self.parse_lob_error(&mut buf);
4225        }
4226
4227        // Parse the amount response (chunk size is returned as amount)
4228        self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4229            .map(|v| v as u32)
4230    }
4231
4232    /// Internal LOB read implementation
4233    async fn read_lob_internal(
4234        &self,
4235        locator: &LobLocator,
4236        offset: u64,
4237        amount: u64,
4238    ) -> Result<LobData> {
4239        let mut inner = self.inner.lock().await;
4240        let large_sdu = inner.large_sdu;
4241
4242        // Create LOB operation message for read
4243        let mut lob_msg = LobOpMessage::new_read(locator, offset, amount);
4244        let seq_num = inner.next_sequence_number();
4245        lob_msg.set_sequence_number(seq_num);
4246
4247        // Build and send the request
4248        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4249        inner.send(&request).await?;
4250
4251        // Receive and parse response (may span multiple packets for large LOBs)
4252        let response = inner.receive_response().await?;
4253        if response.len() <= PACKET_HEADER_SIZE {
4254            return Err(Error::Protocol("Empty LOB read response".to_string()));
4255        }
4256
4257        // Check for MARKER packet (indicates error)
4258        let packet_type = response[4];
4259        if packet_type == PacketType::Marker as u8 {
4260            let error_response = inner.handle_marker_reset().await?;
4261            let payload = &error_response[PACKET_HEADER_SIZE..];
4262            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4263            // Skip data flags
4264            buf.skip(2)?;
4265            return self.parse_lob_error(&mut buf);
4266        }
4267
4268        // Parse LOB data response
4269        let payload = &response[PACKET_HEADER_SIZE..];
4270        self.parse_lob_read_response(payload, locator)
4271    }
4272
4273    /// Parse LOB read response
4274    fn parse_lob_read_response(&self, payload: &[u8], locator: &LobLocator) -> Result<LobData> {
4275        use crate::buffer::ReadBuffer;
4276
4277        let mut buf = ReadBuffer::from_slice(payload);
4278
4279        // Skip data flags
4280        buf.skip(2)?;
4281
4282        let mut lob_data: Option<Vec<u8>> = None;
4283
4284        // Process messages until end of response
4285        while buf.remaining() > 0 {
4286            let msg_type = buf.read_u8()?;
4287
4288            match msg_type {
4289                // LobData message (14)
4290                x if x == MessageType::LobData as u8 => {
4291                    // Read LOB data with length
4292                    let data = buf.read_raw_bytes_chunked()?;
4293                    lob_data = Some(data);
4294                }
4295
4296                // Parameter return (8) - contains updated locator and amount
4297                x if x == MessageType::Parameter as u8 => {
4298                    // Skip the updated locator (same length as original)
4299                    let locator_len = locator.locator_bytes().len();
4300                    buf.skip(locator_len)?;
4301
4302                    // Read back the amount (ub8)
4303                    let _returned_amount = buf.read_ub8()?;
4304                }
4305
4306                // Error/Status message (4) - code 0 means success
4307                x if x == MessageType::Error as u8 => {
4308                    // Parse error info - code 0 means success
4309                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4310                        if code != 0 {
4311                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4312                            return Err(Error::OracleError { code, message });
4313                        }
4314                        // code 0 = success, continue processing
4315                    }
4316                }
4317
4318                // End of response (29)
4319                x if x == MessageType::EndOfResponse as u8 => {
4320                    break;
4321                }
4322
4323                // Skip other message types
4324                _ => {
4325                    // Try to skip unknown messages
4326                    continue;
4327                }
4328            }
4329        }
4330
4331        // Convert to appropriate type based on LOB type
4332        match lob_data {
4333            Some(data) => {
4334                if locator.is_blob() || locator.is_bfile() {
4335                    Ok(LobData::Bytes(bytes::Bytes::from(data)))
4336                } else {
4337                    // CLOB - decode based on encoding
4338                    let text = if locator.uses_var_length_charset() {
4339                        // UTF-16 BE encoding
4340                        let chars: Vec<u16> = data
4341                            .chunks_exact(2)
4342                            .map(|c| u16::from_be_bytes([c[0], c[1]]))
4343                            .collect();
4344                        String::from_utf16_lossy(&chars)
4345                    } else {
4346                        // UTF-8 encoding
4347                        String::from_utf8_lossy(&data).to_string()
4348                    };
4349                    Ok(LobData::String(text))
4350                }
4351            }
4352            None => {
4353                // Empty LOB
4354                if locator.is_blob() || locator.is_bfile() {
4355                    Ok(LobData::Bytes(bytes::Bytes::new()))
4356                } else {
4357                    Ok(LobData::String(String::new()))
4358                }
4359            }
4360        }
4361    }
4362
4363    /// Write data to a LOB
4364    ///
4365    /// # Arguments
4366    /// * `locator` - The LOB locator obtained from a query result
4367    /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4368    /// * `data` - Data to write (bytes for BLOB, UTF-8 encoded bytes for CLOB)
4369    pub async fn write_lob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4370        self.ensure_ready().await?;
4371
4372        let mut inner = self.inner.lock().await;
4373        let large_sdu = inner.large_sdu;
4374        let sdu_size = inner.sdu_size as usize;
4375
4376        // Encode data for CLOB if necessary
4377        let encoded_data: Vec<u8>;
4378        let write_data = if locator.is_clob() && locator.uses_var_length_charset() {
4379            // Convert UTF-8 to UTF-16 BE for CLOB with var length charset
4380            let text = String::from_utf8_lossy(data);
4381            encoded_data = text
4382                .encode_utf16()
4383                .flat_map(|c| c.to_be_bytes())
4384                .collect();
4385            &encoded_data[..]
4386        } else {
4387            data
4388        };
4389
4390        // Create LOB operation message for write
4391        let mut lob_msg = LobOpMessage::new_write(locator, offset, write_data);
4392        let seq_num = inner.next_sequence_number();
4393        lob_msg.set_sequence_number(seq_num);
4394
4395        // Build message content (without packet header or data flags)
4396        let message = lob_msg.build_message_only(&inner.capabilities)?;
4397
4398        // Calculate if this fits in a single packet
4399        // Single packet max payload = SDU - header (8) - data flags (2)
4400        let max_single_packet_payload = sdu_size.saturating_sub(PACKET_HEADER_SIZE + 2);
4401
4402        let is_multi_packet = message.len() > max_single_packet_payload;
4403
4404        if is_multi_packet {
4405            // Needs multiple packets - use multi-packet sender
4406            inner.send_multi_packet(&message, 0).await?;
4407        } else {
4408            // Fits in one packet - use standard send
4409            let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4410            inner.send(&request).await?;
4411        }
4412
4413        // Receive and parse response
4414        // Use receive_response() to accumulate all packets until END_OF_RESPONSE.
4415        // This is necessary because Oracle may send multiple packets for the response,
4416        // and if we only read one packet, leftover data causes close() to hang.
4417        let response = inner.receive_response().await?;
4418        if response.len() <= PACKET_HEADER_SIZE {
4419            return Err(Error::Protocol("Empty LOB write response".to_string()));
4420        }
4421
4422        // Check for MARKER packet (indicates error)
4423        let packet_type = response[4];
4424        if packet_type == PacketType::Marker as u8 {
4425            let error_response = inner.handle_marker_reset().await?;
4426            let payload = &error_response[PACKET_HEADER_SIZE..];
4427            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4428            buf.skip(2)?;
4429            return self.parse_lob_error(&mut buf);
4430        }
4431
4432        // Parse response to check for errors
4433        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4434    }
4435
4436    /// Write string data to a CLOB
4437    pub async fn write_clob(&self, locator: &LobLocator, offset: u64, text: &str) -> Result<()> {
4438        if locator.is_blob() || locator.is_bfile() {
4439            return Err(Error::Protocol(
4440                "Cannot write string to BLOB/BFILE, use write_blob instead".to_string(),
4441            ));
4442        }
4443        self.write_lob(locator, offset, text.as_bytes()).await
4444    }
4445
4446    /// Write binary data to a BLOB
4447    pub async fn write_blob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4448        if locator.is_clob() {
4449            return Err(Error::Protocol(
4450                "Cannot write bytes to CLOB, use write_clob instead".to_string(),
4451            ));
4452        }
4453        self.write_lob(locator, offset, data).await
4454    }
4455
4456    /// Get the length of a LOB
4457    ///
4458    /// For CLOB: returns length in characters
4459    /// For BLOB: returns length in bytes
4460    pub async fn lob_length(&self, locator: &LobLocator) -> Result<u64> {
4461        self.ensure_ready().await?;
4462
4463        let mut inner = self.inner.lock().await;
4464        let large_sdu = inner.large_sdu;
4465
4466        // Create LOB operation message for get length
4467        let mut lob_msg = LobOpMessage::new_get_length(locator);
4468        let seq_num = inner.next_sequence_number();
4469        lob_msg.set_sequence_number(seq_num);
4470
4471        // Build and send the request
4472        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4473        inner.send(&request).await?;
4474
4475        // Receive and parse response
4476        let response = inner.receive().await?;
4477        if response.len() <= PACKET_HEADER_SIZE {
4478            return Err(Error::Protocol("Empty LOB get_length response".to_string()));
4479        }
4480
4481        // Check for MARKER packet (indicates error)
4482        let packet_type = response[4];
4483        if packet_type == PacketType::Marker as u8 {
4484            let error_response = inner.handle_marker_reset().await?;
4485            let payload = &error_response[PACKET_HEADER_SIZE..];
4486            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4487            buf.skip(2)?;
4488            return self.parse_lob_error(&mut buf);
4489        }
4490
4491        // Parse response to get the length
4492        self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4493    }
4494
4495    /// Trim a LOB to a specified length
4496    ///
4497    /// # Arguments
4498    /// * `locator` - The LOB locator
4499    /// * `new_size` - The new size (in characters for CLOB, bytes for BLOB)
4500    pub async fn lob_trim(&self, locator: &LobLocator, new_size: u64) -> Result<()> {
4501        self.ensure_ready().await?;
4502
4503        let mut inner = self.inner.lock().await;
4504        let large_sdu = inner.large_sdu;
4505
4506        // Create LOB operation message for trim
4507        let mut lob_msg = LobOpMessage::new_trim(locator, new_size);
4508        let seq_num = inner.next_sequence_number();
4509        lob_msg.set_sequence_number(seq_num);
4510
4511        // Build and send the request
4512        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4513        inner.send(&request).await?;
4514
4515        // Receive and parse response
4516        let response = inner.receive().await?;
4517        if response.len() <= PACKET_HEADER_SIZE {
4518            return Err(Error::Protocol("Empty LOB trim response".to_string()));
4519        }
4520
4521        // Check for MARKER packet (indicates error)
4522        let packet_type = response[4];
4523        if packet_type == PacketType::Marker as u8 {
4524            let error_response = inner.handle_marker_reset().await?;
4525            let payload = &error_response[PACKET_HEADER_SIZE..];
4526            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4527            buf.skip(2)?;
4528            return self.parse_lob_error(&mut buf);
4529        }
4530
4531        // Parse response to check for errors
4532        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4533    }
4534
4535    /// Create a temporary LOB on the server
4536    ///
4537    /// Creates a temporary LOB of the specified type that lives until the connection
4538    /// is closed or the LOB is explicitly freed.
4539    ///
4540    /// # Arguments
4541    /// * `oracle_type` - The LOB type to create (Clob or Blob)
4542    ///
4543    /// # Returns
4544    /// A `LobLocator` for the newly created temporary LOB
4545    ///
4546    /// # Example
4547    /// ```ignore
4548    /// use oracle_rs::OracleType;
4549    ///
4550    /// let locator = conn.create_temp_lob(OracleType::Clob).await?;
4551    /// conn.write_clob(&locator, 1, "Hello, World!").await?;
4552    /// // Now bind the locator to insert into a CLOB column
4553    /// ```
4554    pub async fn create_temp_lob(&self, oracle_type: OracleType) -> Result<LobLocator> {
4555        use crate::buffer::ReadBuffer;
4556
4557        // Validate oracle_type is a LOB type
4558        match oracle_type {
4559            OracleType::Clob | OracleType::Blob => {}
4560            _ => {
4561                return Err(Error::Protocol(format!(
4562                    "create_temp_lob: invalid type {:?}, must be Clob or Blob",
4563                    oracle_type
4564                )));
4565            }
4566        }
4567
4568        self.ensure_ready().await?;
4569
4570        let mut inner = self.inner.lock().await;
4571        let large_sdu = inner.large_sdu;
4572
4573        // Create the CREATE_TEMP message
4574        let mut lob_msg = LobOpMessage::new_create_temp(oracle_type);
4575        let seq_num = inner.next_sequence_number();
4576        lob_msg.set_sequence_number(seq_num);
4577
4578        // Build and send the request
4579        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4580        inner.send(&request).await?;
4581
4582        // Receive and parse response
4583        let response = inner.receive().await?;
4584        if response.len() <= PACKET_HEADER_SIZE {
4585            return Err(Error::Protocol("Empty CREATE_TEMP LOB response".to_string()));
4586        }
4587
4588        // Check for MARKER packet (indicates error)
4589        let packet_type = response[4];
4590        if packet_type == PacketType::Marker as u8 {
4591            let error_response = inner.handle_marker_reset().await?;
4592            let payload = &error_response[PACKET_HEADER_SIZE..];
4593            let mut buf = ReadBuffer::from_slice(payload);
4594            buf.skip(2)?;
4595            return self.parse_lob_error(&mut buf);
4596        }
4597
4598        // Parse response to extract the locator
4599        let payload = &response[PACKET_HEADER_SIZE..];
4600        let mut buf = ReadBuffer::from_slice(payload);
4601        buf.skip(2)?; // Skip data flags
4602
4603        let mut locator_bytes: Option<Vec<u8>> = None;
4604
4605        while buf.remaining() > 0 {
4606            let msg_type = buf.read_u8()?;
4607
4608            match msg_type {
4609                // Parameter return (8) - contains the populated locator
4610                x if x == MessageType::Parameter as u8 => {
4611                    // Read the 40-byte locator (matches the 40 bytes sent in request)
4612                    let loc_data = buf.read_bytes_vec(40)?;
4613                    locator_bytes = Some(loc_data);
4614                    // Skip charset (variable-length ub2) and trailing flags (raw u8)
4615                    buf.skip_ub2()?;
4616                    buf.skip(1)?;
4617                }
4618
4619                // Error/Status message (4) - code 0 means success
4620                x if x == MessageType::Error as u8 => {
4621                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4622                        if code != 0 {
4623                            let message = msg.unwrap_or_else(|| "CREATE_TEMP LOB error".to_string());
4624                            return Err(Error::OracleError { code, message });
4625                        }
4626                    }
4627                }
4628
4629                // End of response (29)
4630                x if x == MessageType::EndOfResponse as u8 => {
4631                    break;
4632                }
4633
4634                _ => continue,
4635            }
4636        }
4637
4638        // Create the LobLocator from the returned bytes
4639        let loc_bytes = locator_bytes.ok_or_else(|| {
4640            Error::Protocol("CREATE_TEMP LOB response did not contain locator".to_string())
4641        })?;
4642
4643        // Create LobLocator with size 0, chunk_size 0 (will be fetched if needed)
4644        let locator = LobLocator::new(
4645            bytes::Bytes::from(loc_bytes),
4646            0,      // size - unknown for new temp LOB
4647            0,      // chunk_size - unknown, can be fetched later
4648            oracle_type,
4649            1,      // csfrm - 1 for CLOB, 0 for BLOB (but we store it on the locator type)
4650        );
4651
4652        Ok(locator)
4653    }
4654
4655    // ==================== BFILE Operations ====================
4656
4657    /// Check if a BFILE exists on the server
4658    ///
4659    /// Returns true if the file referenced by the BFILE locator exists on the server.
4660    pub async fn bfile_exists(&self, locator: &LobLocator) -> Result<bool> {
4661        self.ensure_ready().await?;
4662
4663        if !locator.is_bfile() {
4664            return Err(Error::Protocol("bfile_exists called on non-BFILE locator".to_string()));
4665        }
4666
4667        let mut inner = self.inner.lock().await;
4668        let large_sdu = inner.large_sdu;
4669
4670        let mut lob_msg = LobOpMessage::new_file_exists(locator);
4671        let seq_num = inner.next_sequence_number();
4672        lob_msg.set_sequence_number(seq_num);
4673
4674        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4675        inner.send(&request).await?;
4676
4677        let response = inner.receive().await?;
4678        if response.len() <= PACKET_HEADER_SIZE {
4679            return Err(Error::Protocol("Empty BFILE exists response".to_string()));
4680        }
4681
4682        let packet_type = response[4];
4683        if packet_type == PacketType::Marker as u8 {
4684            let error_response = inner.handle_marker_reset().await?;
4685            let payload = &error_response[PACKET_HEADER_SIZE..];
4686            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4687            buf.skip(2)?;
4688            return self.parse_lob_error(&mut buf);
4689        }
4690
4691        self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4692    }
4693
4694    /// Open a BFILE for reading
4695    ///
4696    /// The BFILE must be opened before reading. After reading, close it with bfile_close.
4697    pub async fn bfile_open(&self, locator: &LobLocator) -> Result<()> {
4698        self.ensure_ready().await?;
4699
4700        if !locator.is_bfile() {
4701            return Err(Error::Protocol("bfile_open called on non-BFILE locator".to_string()));
4702        }
4703
4704        let mut inner = self.inner.lock().await;
4705        let large_sdu = inner.large_sdu;
4706
4707        let mut lob_msg = LobOpMessage::new_file_open(locator);
4708        let seq_num = inner.next_sequence_number();
4709        lob_msg.set_sequence_number(seq_num);
4710
4711        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4712        inner.send(&request).await?;
4713
4714        let response = inner.receive().await?;
4715        if response.len() <= PACKET_HEADER_SIZE {
4716            return Err(Error::Protocol("Empty BFILE open response".to_string()));
4717        }
4718
4719        let packet_type = response[4];
4720        if packet_type == PacketType::Marker as u8 {
4721            let error_response = inner.handle_marker_reset().await?;
4722            let payload = &error_response[PACKET_HEADER_SIZE..];
4723            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4724            buf.skip(2)?;
4725            return self.parse_lob_error(&mut buf);
4726        }
4727
4728        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4729    }
4730
4731    /// Close a BFILE after reading
4732    pub async fn bfile_close(&self, locator: &LobLocator) -> Result<()> {
4733        self.ensure_ready().await?;
4734
4735        if !locator.is_bfile() {
4736            return Err(Error::Protocol("bfile_close called on non-BFILE locator".to_string()));
4737        }
4738
4739        let mut inner = self.inner.lock().await;
4740        let large_sdu = inner.large_sdu;
4741
4742        let mut lob_msg = LobOpMessage::new_file_close(locator);
4743        let seq_num = inner.next_sequence_number();
4744        lob_msg.set_sequence_number(seq_num);
4745
4746        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4747        inner.send(&request).await?;
4748
4749        let response = inner.receive().await?;
4750        if response.len() <= PACKET_HEADER_SIZE {
4751            return Err(Error::Protocol("Empty BFILE close response".to_string()));
4752        }
4753
4754        let packet_type = response[4];
4755        if packet_type == PacketType::Marker as u8 {
4756            let error_response = inner.handle_marker_reset().await?;
4757            let payload = &error_response[PACKET_HEADER_SIZE..];
4758            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4759            buf.skip(2)?;
4760            return self.parse_lob_error(&mut buf);
4761        }
4762
4763        self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4764    }
4765
4766    /// Check if a BFILE is currently open
4767    pub async fn bfile_is_open(&self, locator: &LobLocator) -> Result<bool> {
4768        self.ensure_ready().await?;
4769
4770        if !locator.is_bfile() {
4771            return Err(Error::Protocol("bfile_is_open called on non-BFILE locator".to_string()));
4772        }
4773
4774        let mut inner = self.inner.lock().await;
4775        let large_sdu = inner.large_sdu;
4776
4777        let mut lob_msg = LobOpMessage::new_file_is_open(locator);
4778        let seq_num = inner.next_sequence_number();
4779        lob_msg.set_sequence_number(seq_num);
4780
4781        let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4782        inner.send(&request).await?;
4783
4784        let response = inner.receive().await?;
4785        if response.len() <= PACKET_HEADER_SIZE {
4786            return Err(Error::Protocol("Empty BFILE is_open response".to_string()));
4787        }
4788
4789        let packet_type = response[4];
4790        if packet_type == PacketType::Marker as u8 {
4791            let error_response = inner.handle_marker_reset().await?;
4792            let payload = &error_response[PACKET_HEADER_SIZE..];
4793            let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4794            buf.skip(2)?;
4795            return self.parse_lob_error(&mut buf);
4796        }
4797
4798        self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4799    }
4800
4801    /// Read BFILE data
4802    ///
4803    /// This is a convenience method that opens the BFILE if needed, reads all content,
4804    /// and returns it as bytes. For large BFILEs, consider using read_lob_chunked.
4805    pub async fn read_bfile(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4806        if !locator.is_bfile() {
4807            return Err(Error::Protocol("read_bfile called on non-BFILE locator".to_string()));
4808        }
4809
4810        // Check if file is open, open if needed
4811        let should_close = if !self.bfile_is_open(locator).await? {
4812            self.bfile_open(locator).await?;
4813            true
4814        } else {
4815            false
4816        };
4817
4818        // Read all data
4819        let result = self.read_blob(locator).await;
4820
4821        // Close if we opened it
4822        if should_close {
4823            let _ = self.bfile_close(locator).await;
4824        }
4825
4826        result
4827    }
4828
4829    /// Parse a LOB operation response that returns a boolean (file_exists, is_open)
4830    fn parse_lob_bool_response(&self, payload: &[u8], locator: &LobLocator) -> Result<bool> {
4831        use crate::buffer::ReadBuffer;
4832
4833        let mut buf = ReadBuffer::from_slice(payload);
4834        buf.skip(2)?; // Skip data flags
4835
4836        let mut bool_result: bool = false;
4837
4838        while buf.remaining() > 0 {
4839            let msg_type = buf.read_u8()?;
4840
4841            match msg_type {
4842                // Parameter return (8) - contains updated locator and bool flag
4843                x if x == MessageType::Parameter as u8 => {
4844                    let locator_len = locator.locator_bytes().len();
4845                    buf.skip(locator_len)?;
4846                    // Boolean flag is a single byte, > 0 means true
4847                    let flag = buf.read_u8()?;
4848                    bool_result = flag > 0;
4849                }
4850
4851                // Error/Status message (4) - code 0 means success
4852                x if x == MessageType::Error as u8 => {
4853                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4854                        if code != 0 {
4855                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4856                            return Err(Error::OracleError { code, message });
4857                        }
4858                    }
4859                }
4860
4861                // End of response (29)
4862                x if x == MessageType::EndOfResponse as u8 => {
4863                    break;
4864                }
4865
4866                _ => continue,
4867            }
4868        }
4869
4870        Ok(bool_result)
4871    }
4872
4873    /// Parse a simple LOB operation response (write, trim)
4874    fn parse_lob_simple_response(&self, payload: &[u8], locator: &LobLocator) -> Result<()> {
4875        use crate::buffer::ReadBuffer;
4876
4877        let mut buf = ReadBuffer::from_slice(payload);
4878        buf.skip(2)?; // Skip data flags
4879
4880        while buf.remaining() > 0 {
4881            let msg_type = buf.read_u8()?;
4882
4883            match msg_type {
4884                // Parameter return (8) - contains updated locator and possibly amount
4885                x if x == MessageType::Parameter as u8 => {
4886                    let locator_len = locator.locator_bytes().len();
4887                    buf.skip(locator_len)?;
4888                    // After locator, there may be amount (ub8) if send_amount was true
4889                    // We just skip any remaining bytes until we hit Error or EndOfResponse
4890                }
4891
4892                // Error/Status message (4) - code 0 means success
4893                x if x == MessageType::Error as u8 => {
4894                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4895                        if code != 0 {
4896                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4897                            return Err(Error::OracleError { code, message });
4898                        }
4899                    }
4900                }
4901
4902                // End of response (29)
4903                x if x == MessageType::EndOfResponse as u8 => {
4904                    break;
4905                }
4906
4907                _ => continue,
4908            }
4909        }
4910
4911        Ok(())
4912    }
4913
4914    /// Parse a LOB operation response that returns an amount (get_length, get_chunk_size)
4915    fn parse_lob_amount_response(&self, payload: &[u8], locator: &LobLocator) -> Result<u64> {
4916        use crate::buffer::ReadBuffer;
4917
4918        let mut buf = ReadBuffer::from_slice(payload);
4919        buf.skip(2)?; // Skip data flags
4920
4921        let mut returned_amount: u64 = 0;
4922
4923        while buf.remaining() > 0 {
4924            let msg_type = buf.read_u8()?;
4925
4926            match msg_type {
4927                // Parameter return (8) - contains updated locator and amount
4928                x if x == MessageType::Parameter as u8 => {
4929                    let locator_len = locator.locator_bytes().len();
4930                    buf.skip(locator_len)?;
4931                    returned_amount = buf.read_ub8()?;
4932                }
4933
4934                // Error/Status message (4) - code 0 means success
4935                x if x == MessageType::Error as u8 => {
4936                    if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4937                        if code != 0 {
4938                            let message = msg.unwrap_or_else(|| "LOB error".to_string());
4939                            return Err(Error::OracleError { code, message });
4940                        }
4941                    }
4942                }
4943
4944                // End of response (29)
4945                x if x == MessageType::EndOfResponse as u8 => {
4946                    break;
4947                }
4948
4949                _ => continue,
4950            }
4951        }
4952
4953        Ok(returned_amount)
4954    }
4955
4956    /// Parse LOB error response
4957    fn parse_lob_error<T>(&self, buf: &mut crate::buffer::ReadBuffer) -> Result<T> {
4958        // Try to extract error info
4959        if let Ok((code, msg, _)) = self.parse_error_info(buf) {
4960            let message = msg.unwrap_or_else(|| "Unknown LOB error".to_string());
4961            Err(Error::OracleError { code, message })
4962        } else {
4963            Err(Error::Protocol("LOB operation failed".to_string()))
4964        }
4965    }
4966
4967    /// Close the connection.
4968    ///
4969    /// Sends a logoff message to the server and closes the underlying TCP
4970    /// connection. After calling close, the connection cannot be reused.
4971    ///
4972    /// If the connection is already closed, this method returns `Ok(())`
4973    /// without doing anything.
4974    ///
4975    /// # Note
4976    ///
4977    /// Any uncommitted transaction is rolled back by the server when the
4978    /// connection is closed.
4979    ///
4980    /// # Example
4981    ///
4982    /// ```rust,no_run
4983    /// # use oracle_rs::{Config, Connection};
4984    /// # async fn example() -> oracle_rs::Result<()> {
4985    /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
4986    /// let conn = Connection::connect_with_config(config).await?;
4987    ///
4988    /// // Do work...
4989    /// conn.commit().await?;
4990    ///
4991    /// // Explicitly close when done
4992    /// conn.close().await?;
4993    /// # Ok(())
4994    /// # }
4995    /// ```
4996    pub async fn close(&self) -> Result<()> {
4997        if self.closed.swap(true, Ordering::Relaxed) {
4998            // Already closed
4999            return Ok(());
5000        }
5001
5002        let mut inner = self.inner.lock().await;
5003
5004        if inner.state == ConnectionState::Ready {
5005            // Send logoff
5006            let _ = self.send_simple_function_inner(&mut inner, FunctionCode::Logoff).await;
5007        }
5008
5009        inner.state = ConnectionState::Closed;
5010
5011        // Close the TCP stream
5012        if let Some(stream) = inner.stream.take() {
5013            drop(stream);
5014        }
5015
5016        Ok(())
5017    }
5018
5019    /// Send a marker packet to the server
5020    /// marker_type: 1=BREAK, 2=RESET, 3=INTERRUPT
5021    async fn send_marker(&self, inner: &mut ConnectionInner, marker_type: u8) -> Result<()> {
5022        let mut packet_buf = WriteBuffer::new();
5023
5024        // Build MARKER packet header
5025        let packet_len = PACKET_HEADER_SIZE + 3; // Header + 3 bytes payload
5026        packet_buf.write_u16_be(packet_len as u16)?;
5027        packet_buf.write_u16_be(0)?; // Checksum
5028        packet_buf.write_u8(PacketType::Marker as u8)?;
5029        packet_buf.write_u8(0)?; // Flags
5030        packet_buf.write_u16_be(0)?; // Header checksum
5031
5032        // Marker payload: [1, 0, marker_type] per Python's _send_marker
5033        packet_buf.write_u8(1)?;
5034        packet_buf.write_u8(0)?;
5035        packet_buf.write_u8(marker_type)?;
5036
5037        inner.send(&packet_buf.freeze()).await
5038    }
5039
5040    async fn send_simple_function_inner(
5041        &self,
5042        inner: &mut ConnectionInner,
5043        function_code: FunctionCode,
5044    ) -> Result<()> {
5045        // Build function message
5046        let mut buf = WriteBuffer::new();
5047
5048        // Get next sequence number (must be done before sending)
5049        let seq_num = inner.next_sequence_number();
5050
5051        // Data flags
5052        buf.write_u16_be(0)?;
5053        // Message type: Function
5054        buf.write_u8(MessageType::Function as u8)?;
5055        // Function code
5056        buf.write_u8(function_code as u8)?;
5057        // Sequence number (tracked per connection)
5058        buf.write_u8(seq_num)?;
5059
5060        // Token number (required for TTC field version >= 18, i.e. Oracle 23ai)
5061        if inner.capabilities.ttc_field_version >= 18 {
5062            buf.write_ub8(0)?;
5063        }
5064
5065        // Build DATA packet
5066        let data_payload = buf.freeze();
5067        let mut packet_buf = WriteBuffer::new();
5068        let packet_len = PACKET_HEADER_SIZE + data_payload.len();
5069        packet_buf.write_u16_be(packet_len as u16)?;
5070        packet_buf.write_u16_be(0)?; // Checksum
5071        packet_buf.write_u8(PacketType::Data as u8)?;
5072        packet_buf.write_u8(0)?; // Flags
5073        packet_buf.write_u16_be(0)?; // Header checksum
5074        packet_buf.write_bytes(&data_payload)?;
5075
5076        let packet_bytes = packet_buf.freeze();
5077        inner.send(&packet_bytes).await?;
5078
5079        // Wait for response
5080        let response = inner.receive().await?;
5081
5082        // Check response
5083        if response.len() <= 4 {
5084            return Err(Error::Protocol("Response too short".to_string()));
5085        }
5086
5087        let packet_type = response[4];
5088
5089        // MARKER packet (type 12) - need to handle reset protocol
5090        if packet_type == PacketType::Marker as u8 {
5091            // Check marker type
5092            if response.len() >= PACKET_HEADER_SIZE + 3 {
5093                let marker_type = response[PACKET_HEADER_SIZE + 2];
5094
5095                // For BREAK marker (1), we need to do the reset protocol
5096                if marker_type == 1 {
5097                    // For Logoff, Oracle may send BREAK to indicate "connection closing"
5098                    // Don't try to do the full reset handshake - just return success
5099                    if function_code == FunctionCode::Logoff {
5100                        inner.state = ConnectionState::Closed;
5101                        return Ok(());
5102                    }
5103
5104                    // The BREAK marker means the server is interrupting/breaking the current operation
5105                    // We MUST complete the reset handshake or the connection will be in a bad state
5106
5107                    // Send RESET marker to server
5108                    if let Err(e) = self.send_marker(inner, 2).await {
5109                        inner.state = ConnectionState::Closed;
5110                        return Err(e);
5111                    }
5112
5113                    // Read and discard packets until we get RESET marker
5114                    // This follows Python's _reset() logic
5115                    let mut current_packet_type: u8;
5116                    loop {
5117                        match inner.receive().await {
5118                            Ok(pkt) => {
5119                                if pkt.len() < PACKET_HEADER_SIZE + 1 {
5120                                    break;
5121                                }
5122                                current_packet_type = pkt[4];
5123
5124                                if current_packet_type == PacketType::Marker as u8 {
5125                                    if pkt.len() >= PACKET_HEADER_SIZE + 3 {
5126                                        let mk_type = pkt[PACKET_HEADER_SIZE + 2];
5127                                        if mk_type == 2 {
5128                                            // Got RESET marker, exit this loop
5129                                            break;
5130                                        }
5131                                    }
5132                                } else {
5133                                    // Non-marker packet - unexpected during reset wait
5134                                    break;
5135                                }
5136                            }
5137                            Err(e) => {
5138                                inner.state = ConnectionState::Closed;
5139                                return Err(e);
5140                            }
5141                        }
5142                    }
5143
5144                    // After RESET, continue reading while we still get MARKER packets
5145                    // Some servers send multiple RESET markers, others send DATA response
5146                    // Python comment: "some quit immediately" - meaning some servers close
5147                    // the connection right after the reset handshake
5148                    loop {
5149                        match inner.receive().await {
5150                            Ok(pkt) => {
5151                                if pkt.len() < PACKET_HEADER_SIZE + 1 {
5152                                    break;
5153                                }
5154                                current_packet_type = pkt[4];
5155
5156                                if current_packet_type == PacketType::Marker as u8 {
5157                                    // Another marker, continue reading
5158                                    continue;
5159                                }
5160
5161                                // Got a non-marker packet (probably DATA with error/status)
5162                                if current_packet_type == PacketType::Data as u8 {
5163                                    if pkt.len() > PACKET_HEADER_SIZE + 2 {
5164                                        let msg_type = pkt[PACKET_HEADER_SIZE + 2];
5165                                        if msg_type == MessageType::Error as u8 {
5166                                            let payload = &pkt[PACKET_HEADER_SIZE..];
5167                                            let mut buf = ReadBuffer::from_slice(payload);
5168                                            buf.skip(2)?; // data flags
5169                                            buf.skip(1)?; // msg_type
5170                                            let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5171                                            if error_code != 0 {
5172                                                return Err(Error::OracleError {
5173                                                    code: error_code,
5174                                                    message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5175                                                });
5176                                            }
5177                                        }
5178                                    }
5179                                }
5180                                // Exit after processing non-marker packet
5181                                break;
5182                            }
5183                            Err(_) => {
5184                                // Error reading - connection might be closed
5185                                // Python comment says "some quit immediately" - meaning some
5186                                // servers close the connection after BREAK/RESET handshake.
5187                                // For commit/rollback/logoff, treat this as success since
5188                                // the operation was processed before the close.
5189                                if matches!(function_code,
5190                                    FunctionCode::Logoff |
5191                                    FunctionCode::Commit |
5192                                    FunctionCode::Rollback
5193                                ) {
5194                                    // The operation succeeded, but the server closed the connection
5195                                    // Mark connection as closed for future operations
5196                                    inner.state = ConnectionState::Closed;
5197                                    return Ok(());
5198                                }
5199                                // For other functions, this means connection is broken
5200                                inner.state = ConnectionState::Closed;
5201                                // Don't return error for Ping - treat as success
5202                                if function_code == FunctionCode::Ping {
5203                                    return Ok(());
5204                                }
5205                                return Ok(()); // Conservative approach - treat as success
5206                            }
5207                        }
5208                    }
5209
5210                    return Ok(());
5211                }
5212            }
5213            // For non-BREAK markers, just return success
5214            return Ok(());
5215        }
5216
5217        // DATA packet (type 6)
5218        if packet_type == PacketType::Data as u8 {
5219            // Parse response to check for errors
5220            if response.len() > PACKET_HEADER_SIZE + 2 {
5221                let msg_type = response[PACKET_HEADER_SIZE + 2];
5222                if msg_type == MessageType::Error as u8 {
5223                    // Parse the error info
5224                    let payload = &response[PACKET_HEADER_SIZE..];
5225                    let mut buf = ReadBuffer::from_slice(payload);
5226                    buf.skip(2)?; // data flags
5227                    buf.skip(1)?; // msg_type
5228                    let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5229                    if error_code != 0 {
5230                        return Err(Error::OracleError {
5231                            code: error_code,
5232                            message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5233                        });
5234                    }
5235                }
5236            }
5237            return Ok(());
5238        }
5239
5240        Err(Error::Protocol(format!("Unexpected packet type {} for function call", packet_type)))
5241    }
5242
5243    /// Ensure the connection is ready for operations
5244    async fn ensure_ready(&self) -> Result<()> {
5245        if self.is_closed() {
5246            return Err(Error::ConnectionClosed);
5247        }
5248
5249        let inner = self.inner.lock().await;
5250        if inner.state != ConnectionState::Ready {
5251            return Err(Error::ConnectionNotReady);
5252        }
5253
5254        Ok(())
5255    }
5256}
5257
5258impl Drop for Connection {
5259    fn drop(&mut self) {
5260        // Note: Can't do async cleanup in Drop
5261        // Users should call close() explicitly
5262        self.closed.store(true, Ordering::Relaxed);
5263    }
5264}
5265
5266// =============================================================================
5267// Helper functions for get_type()
5268// =============================================================================
5269
5270/// Parse a type name into (schema, name) components
5271///
5272/// Handles formats like:
5273/// - "SCHEMA.TYPE_NAME" -> ("SCHEMA", "TYPE_NAME")
5274/// - "TYPE_NAME" -> (default_schema, "TYPE_NAME")
5275fn parse_type_name(type_name: &str, default_schema: &str) -> (String, String) {
5276    let parts: Vec<&str> = type_name.split('.').collect();
5277    match parts.len() {
5278        1 => (default_schema.to_uppercase(), parts[0].to_uppercase()),
5279        2 => (parts[0].to_uppercase(), parts[1].to_uppercase()),
5280        _ => {
5281            // Multiple dots - take first as schema, rest as name
5282            (parts[0].to_uppercase(), parts[1..].join(".").to_uppercase())
5283        }
5284    }
5285}
5286
5287/// Convert an Oracle type name from data dictionary to OracleType enum
5288fn oracle_type_from_name(type_name: &str) -> crate::constants::OracleType {
5289    use crate::constants::OracleType;
5290
5291    match type_name.to_uppercase().as_str() {
5292        "NUMBER" => OracleType::Number,
5293        "INTEGER" | "INT" | "SMALLINT" => OracleType::Number,
5294        "FLOAT" | "REAL" | "DOUBLE PRECISION" => OracleType::BinaryDouble,
5295        "BINARY_FLOAT" => OracleType::BinaryFloat,
5296        "BINARY_DOUBLE" => OracleType::BinaryDouble,
5297        "VARCHAR2" | "VARCHAR" | "NVARCHAR2" => OracleType::Varchar,
5298        "CHAR" | "NCHAR" => OracleType::Char,
5299        "DATE" => OracleType::Date,
5300        "TIMESTAMP" => OracleType::Timestamp,
5301        "TIMESTAMP WITH TIME ZONE" => OracleType::TimestampTz,
5302        "TIMESTAMP WITH LOCAL TIME ZONE" => OracleType::TimestampLtz,
5303        "RAW" => OracleType::Raw,
5304        "BLOB" => OracleType::Blob,
5305        "CLOB" | "NCLOB" => OracleType::Clob,
5306        "BOOLEAN" | "PL/SQL BOOLEAN" => OracleType::Boolean,
5307        "ROWID" | "UROWID" => OracleType::Rowid,
5308        "XMLTYPE" => OracleType::Varchar, // Treat XMLType as string for now
5309        _ => OracleType::Varchar, // Default to VARCHAR for unknown types
5310    }
5311}
5312
5313#[cfg(test)]
5314mod tests {
5315    use super::*;
5316    use crate::row::Value;
5317
5318    #[test]
5319    fn test_query_options_default() {
5320        let opts = QueryOptions::default();
5321        assert_eq!(opts.prefetch_rows, 100);
5322        assert_eq!(opts.array_size, 100);
5323        assert!(!opts.auto_commit);
5324    }
5325
5326    #[test]
5327    fn test_query_result_empty() {
5328        let result = QueryResult::empty();
5329        assert!(result.is_empty());
5330        assert_eq!(result.column_count(), 0);
5331        assert_eq!(result.row_count(), 0);
5332        assert!(result.first().is_none());
5333    }
5334
5335    #[test]
5336    fn test_query_result_with_rows() {
5337        let columns = vec![ColumnInfo::new("ID", crate::constants::OracleType::Number)];
5338        let rows = vec![Row::new(vec![Value::Integer(1)])];
5339
5340        let result = QueryResult {
5341            columns,
5342            rows,
5343            rows_affected: 0,
5344            has_more_rows: false,
5345            cursor_id: 1,
5346        };
5347
5348        assert!(!result.is_empty());
5349        assert_eq!(result.column_count(), 1);
5350        assert_eq!(result.row_count(), 1);
5351        assert!(result.first().is_some());
5352        assert!(result.column_by_name("ID").is_some());
5353        assert!(result.column_by_name("id").is_some()); // Case insensitive
5354        assert_eq!(result.column_index("ID"), Some(0));
5355    }
5356
5357    #[test]
5358    fn test_server_info_default() {
5359        let info = ServerInfo::default();
5360        assert!(info.version.is_empty());
5361        assert_eq!(info.session_id, 0);
5362    }
5363
5364    #[test]
5365    fn test_connection_state_transitions() {
5366        assert_eq!(ConnectionState::Disconnected, ConnectionState::Disconnected);
5367        assert_ne!(ConnectionState::Connected, ConnectionState::Ready);
5368    }
5369
5370    #[test]
5371    fn test_query_result_iterator() {
5372        let rows = vec![
5373            Row::new(vec![Value::Integer(1)]),
5374            Row::new(vec![Value::Integer(2)]),
5375        ];
5376        let result = QueryResult {
5377            columns: vec![],
5378            rows,
5379            rows_affected: 0,
5380            has_more_rows: false,
5381            cursor_id: 0,
5382        };
5383
5384        let collected: Vec<_> = result.iter().collect();
5385        assert_eq!(collected.len(), 2);
5386    }
5387
5388    #[test]
5389    fn test_query_result_into_iterator() {
5390        let rows = vec![
5391            Row::new(vec![Value::Integer(1)]),
5392            Row::new(vec![Value::Integer(2)]),
5393        ];
5394        let result = QueryResult {
5395            columns: vec![],
5396            rows,
5397            rows_affected: 0,
5398            has_more_rows: false,
5399            cursor_id: 0,
5400        };
5401
5402        let collected: Vec<Row> = result.into_iter().collect();
5403        assert_eq!(collected.len(), 2);
5404    }
5405}