oracle_rs/connection.rs
1//! Oracle database connection
2//!
3//! This module provides the main `Connection` type for interacting with Oracle databases.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use oracle_rs::{Connection, Config};
9//!
10//! #[tokio::main]
11//! async fn main() -> oracle_rs::Result<()> {
12//! // Create a connection
13//! let conn = Connection::connect("localhost:1521/ORCLPDB1", "user", "password").await?;
14//!
15//! // Execute a query
16//! let rows = conn.query("SELECT * FROM employees WHERE department_id = :1", &[&10]).await?;
17//!
18//! for row in rows {
19//! println!("{:?}", row);
20//! }
21//!
22//! // Commit and close
23//! conn.commit().await?;
24//! conn.close().await?;
25//! Ok(())
26//! }
27//! ```
28
29use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
30use std::sync::Arc;
31use bytes::Bytes;
32use tokio::io::{AsyncReadExt, AsyncWriteExt};
33use tokio::net::TcpStream;
34use tokio::sync::Mutex;
35
36use crate::batch::{BatchBinds, BatchResult};
37use crate::buffer::{ReadBuffer, WriteBuffer};
38use crate::transport::{TlsConfig, TlsOracleStream, connect_tls};
39use crate::capabilities::Capabilities;
40use crate::config::{Config, ServiceMethod};
41use crate::constants::{BindDirection, FetchOrientation, FunctionCode, MessageType, OracleType, PacketType, PACKET_HEADER_SIZE};
42use crate::cursor::{ScrollableCursor, ScrollResult};
43use crate::error::{Error, Result};
44use crate::implicit::{ImplicitResult, ImplicitResults};
45use crate::messages::{AcceptMessage, AuthMessage, AuthPhase, ConnectMessage, ExecuteMessage, ExecuteOptions, FetchMessage, LobOpMessage};
46use crate::packet::Packet;
47use crate::row::{Row, Value};
48use crate::statement::{BindParam, ColumnInfo, Statement, StatementType};
49use crate::types::{LobData, LobLocator, LobValue};
50use crate::statement_cache::StatementCache;
51
52/// Connection state
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ConnectionState {
55 /// Not connected
56 Disconnected,
57 /// TCP connection established
58 Connected,
59 /// Protocol negotiation complete
60 ProtocolNegotiated,
61 /// Data types negotiated
62 DataTypesNegotiated,
63 /// Fully authenticated and ready
64 Ready,
65 /// Connection is closed
66 Closed,
67}
68
69/// Options for query execution
70#[derive(Debug, Clone)]
71pub struct QueryOptions {
72 /// Number of rows to prefetch
73 pub prefetch_rows: u32,
74 /// Array size for batch operations
75 pub array_size: u32,
76 /// Whether to auto-commit after DML
77 pub auto_commit: bool,
78}
79
80impl Default for QueryOptions {
81 fn default() -> Self {
82 Self {
83 prefetch_rows: 100,
84 array_size: 100,
85 auto_commit: false,
86 }
87 }
88}
89
90/// Result set from a query
91#[derive(Debug)]
92pub struct QueryResult {
93 /// Column information
94 pub columns: Vec<ColumnInfo>,
95 /// Rows returned
96 pub rows: Vec<Row>,
97 /// Number of rows affected (for DML)
98 pub rows_affected: u64,
99 /// Whether there are more rows to fetch
100 pub has_more_rows: bool,
101 /// Cursor ID for subsequent fetches (needed for fetch_more)
102 pub cursor_id: u16,
103}
104
105impl QueryResult {
106 /// Create an empty query result
107 pub fn empty() -> Self {
108 Self {
109 columns: Vec::new(),
110 rows: Vec::new(),
111 rows_affected: 0,
112 has_more_rows: false,
113 cursor_id: 0,
114 }
115 }
116
117 /// Get the number of columns
118 pub fn column_count(&self) -> usize {
119 self.columns.len()
120 }
121
122 /// Get the number of rows
123 pub fn row_count(&self) -> usize {
124 self.rows.len()
125 }
126
127 /// Check if the result is empty
128 pub fn is_empty(&self) -> bool {
129 self.rows.is_empty()
130 }
131
132 /// Get a column by name
133 pub fn column_by_name(&self, name: &str) -> Option<&ColumnInfo> {
134 self.columns.iter().find(|c| c.name.eq_ignore_ascii_case(name))
135 }
136
137 /// Get column index by name
138 pub fn column_index(&self, name: &str) -> Option<usize> {
139 self.columns.iter().position(|c| c.name.eq_ignore_ascii_case(name))
140 }
141
142 /// Iterate over rows
143 pub fn iter(&self) -> impl Iterator<Item = &Row> {
144 self.rows.iter()
145 }
146
147 /// Get a single row (first row)
148 pub fn first(&self) -> Option<&Row> {
149 self.rows.first()
150 }
151}
152
153impl IntoIterator for QueryResult {
154 type Item = Row;
155 type IntoIter = std::vec::IntoIter<Row>;
156
157 fn into_iter(self) -> Self::IntoIter {
158 self.rows.into_iter()
159 }
160}
161
162/// Result from executing a PL/SQL block with OUT parameters
163#[derive(Debug)]
164pub struct PlsqlResult {
165 /// OUT parameter values indexed by position (0-based)
166 pub out_values: Vec<Value>,
167 /// Number of rows affected (if applicable)
168 pub rows_affected: u64,
169 /// Cursor ID (if the result contains a REF CURSOR)
170 pub cursor_id: Option<u16>,
171 /// Implicit result sets returned via DBMS_SQL.RETURN_RESULT
172 pub implicit_results: ImplicitResults,
173}
174
175impl PlsqlResult {
176 /// Create an empty PL/SQL result
177 pub fn empty() -> Self {
178 Self {
179 out_values: Vec::new(),
180 rows_affected: 0,
181 cursor_id: None,
182 implicit_results: ImplicitResults::new(),
183 }
184 }
185
186 /// Get an OUT value by position (0-based)
187 pub fn get(&self, index: usize) -> Option<&Value> {
188 self.out_values.get(index)
189 }
190
191 /// Get a string OUT value by position
192 pub fn get_string(&self, index: usize) -> Option<&str> {
193 self.out_values.get(index).and_then(|v| v.as_str())
194 }
195
196 /// Get an integer OUT value by position
197 pub fn get_integer(&self, index: usize) -> Option<i64> {
198 self.out_values.get(index).and_then(|v| v.as_i64())
199 }
200
201 /// Get a float OUT value by position
202 pub fn get_float(&self, index: usize) -> Option<f64> {
203 self.out_values.get(index).and_then(|v| v.as_f64())
204 }
205
206 /// Get a cursor ID from OUT value by position (for REF CURSOR)
207 pub fn get_cursor_id(&self, index: usize) -> Option<u16> {
208 self.out_values.get(index).and_then(|v| v.as_cursor_id())
209 }
210}
211
212/// Server information obtained during connection
213#[derive(Debug, Clone, Default)]
214pub struct ServerInfo {
215 /// Oracle version string
216 pub version: String,
217 /// Server banner
218 pub banner: String,
219 /// Session ID (SID)
220 pub session_id: u32,
221 /// Serial number
222 pub serial_number: u32,
223 /// Instance name
224 pub instance_name: Option<String>,
225 /// Service name
226 pub service_name: Option<String>,
227 /// Database name
228 pub database_name: Option<String>,
229 /// Negotiated protocol version
230 pub protocol_version: u16,
231 /// Whether server supports OOB (out of band) data
232 pub supports_oob: bool,
233}
234
235/// Stream type that can be either plain TCP or TLS-encrypted
236enum OracleStream {
237 /// Plain TCP connection
238 Plain(TcpStream),
239 /// TLS-encrypted connection
240 Tls(TlsOracleStream),
241}
242
243impl OracleStream {
244 async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
245 match self {
246 OracleStream::Plain(stream) => {
247 AsyncReadExt::read_exact(stream, buf).await?;
248 Ok(())
249 }
250 OracleStream::Tls(stream) => {
251 AsyncReadExt::read_exact(stream, buf).await?;
252 Ok(())
253 }
254 }
255 }
256
257 async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
258 match self {
259 OracleStream::Plain(stream) => stream.write_all(buf).await,
260 OracleStream::Tls(stream) => stream.write_all(buf).await,
261 }
262 }
263
264 async fn flush(&mut self) -> std::io::Result<()> {
265 match self {
266 OracleStream::Plain(stream) => stream.flush().await,
267 OracleStream::Tls(stream) => stream.flush().await,
268 }
269 }
270
271}
272
273/// Internal connection state shared across async operations
274struct ConnectionInner {
275 stream: Option<OracleStream>,
276 capabilities: Capabilities,
277 state: ConnectionState,
278 server_info: ServerInfo,
279 sdu_size: u16,
280 large_sdu: bool,
281 /// Sequence number for TTC messages (increments per message)
282 sequence_number: u8,
283 /// Statement cache for prepared statement reuse
284 statement_cache: Option<StatementCache>,
285}
286
287impl ConnectionInner {
288 fn new_with_cache(cache_size: usize) -> Self {
289 Self {
290 stream: None,
291 capabilities: Capabilities::default(),
292 state: ConnectionState::Disconnected,
293 server_info: ServerInfo::default(),
294 sdu_size: 8192,
295 large_sdu: false,
296 sequence_number: 0,
297 statement_cache: if cache_size > 0 {
298 Some(StatementCache::new(cache_size))
299 } else {
300 None
301 },
302 }
303 }
304
305 /// Get the next sequence number (auto-increments, wraps at 255 to 1)
306 fn next_sequence_number(&mut self) -> u8 {
307 self.sequence_number = self.sequence_number.wrapping_add(1);
308 if self.sequence_number == 0 {
309 self.sequence_number = 1;
310 }
311 self.sequence_number
312 }
313
314 async fn send(&mut self, data: &[u8]) -> Result<()> {
315 if let Some(stream) = &mut self.stream {
316 stream.write_all(data).await?;
317 stream.flush().await?;
318 Ok(())
319 } else {
320 Err(Error::ConnectionClosed)
321 }
322 }
323
324 /// Send a payload that may need to be split across multiple packets.
325 ///
326 /// This is used for large LOB writes and other operations where the payload
327 /// exceeds the SDU size. The payload is split into multiple DATA packets,
328 /// each with proper headers.
329 ///
330 /// # Arguments
331 /// * `payload` - The raw message payload (without packet header or data flags)
332 /// * `data_flags` - The data flags for the first packet (typically 0)
333 async fn send_multi_packet(&mut self, payload: &[u8], data_flags: u16) -> Result<()> {
334 let stream = self.stream.as_mut().ok_or(Error::ConnectionClosed)?;
335
336 // Calculate max payload per packet: SDU - header (8) - data flags (2)
337 let max_payload_per_packet = self.sdu_size as usize - PACKET_HEADER_SIZE - 2;
338
339 let mut offset = 0;
340 let mut is_first = true;
341
342 while offset < payload.len() {
343 let remaining = payload.len() - offset;
344 let chunk_size = std::cmp::min(remaining, max_payload_per_packet);
345 let is_last = offset + chunk_size >= payload.len();
346
347 // Build packet
348 let packet_len = PACKET_HEADER_SIZE + 2 + chunk_size; // header + data flags + payload
349 let mut packet = Vec::with_capacity(packet_len);
350
351 // Header
352 if self.large_sdu {
353 packet.extend_from_slice(&(packet_len as u32).to_be_bytes());
354 } else {
355 packet.extend_from_slice(&(packet_len as u16).to_be_bytes());
356 packet.extend_from_slice(&[0, 0]); // Checksum
357 }
358 packet.push(PacketType::Data as u8);
359 packet.push(0); // Flags
360 packet.extend_from_slice(&[0, 0]); // Header checksum
361
362 // Data flags - only include on first packet
363 if is_first {
364 packet.extend_from_slice(&data_flags.to_be_bytes());
365 is_first = false;
366 } else {
367 // Continuation packets still need data flags position but value is 0
368 packet.extend_from_slice(&0u16.to_be_bytes());
369 }
370
371 // Payload chunk
372 packet.extend_from_slice(&payload[offset..offset + chunk_size]);
373
374 // Send this packet
375 stream.write_all(&packet).await?;
376
377 offset += chunk_size;
378
379 // Don't flush until the last packet to improve performance
380 if is_last {
381 stream.flush().await?;
382 }
383 }
384
385 Ok(())
386 }
387
388 async fn receive(&mut self) -> Result<bytes::Bytes> {
389 if let Some(stream) = &mut self.stream {
390 // Read packet header first (always 8 bytes)
391 // large_sdu only affects how the length field is interpreted, not header size
392 let mut header_buf = vec![0u8; PACKET_HEADER_SIZE];
393 stream.read_exact(&mut header_buf).await?;
394
395 // Parse header to get payload length
396 // In large_sdu mode, first 4 bytes are length; otherwise first 2 bytes
397 let packet_len = if self.large_sdu {
398 u32::from_be_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]])
399 as usize
400 } else {
401 u16::from_be_bytes([header_buf[0], header_buf[1]]) as usize
402 };
403
404 // Read remaining payload
405 let payload_len = packet_len.saturating_sub(PACKET_HEADER_SIZE);
406 let mut payload_buf = vec![0u8; payload_len];
407 if payload_len > 0 {
408 stream.read_exact(&mut payload_buf).await?;
409 }
410
411 // Combine header and payload
412 let mut full_packet = header_buf.clone();
413 full_packet.extend(payload_buf);
414
415 Ok(bytes::Bytes::from(full_packet))
416 } else {
417 Err(Error::ConnectionClosed)
418 }
419 }
420
421 /// Receive a complete response that may span multiple packets
422 ///
423 /// This method accumulates packets until the END_OF_RESPONSE flag is detected
424 /// in the data flags. It's used for operations like LOB reads that may return
425 /// data spanning multiple TNS packets.
426 ///
427 /// Returns the combined payload of all packets (excluding headers).
428 async fn receive_response(&mut self) -> Result<bytes::Bytes> {
429 use crate::constants::{data_flags, MessageType};
430
431 let mut accumulated_payload = Vec::new();
432 let mut is_first_packet = true;
433
434 loop {
435 let packet = self.receive().await?;
436
437 if packet.len() < PACKET_HEADER_SIZE {
438 return Err(Error::Protocol("Packet too small".to_string()));
439 }
440
441 // Check packet type - only DATA packets can be accumulated
442 let packet_type = packet[4];
443 if packet_type != PacketType::Data as u8 {
444 // Non-DATA packet (e.g., MARKER) - return as-is for special handling
445 return Ok(packet);
446 }
447
448 // Get payload (everything after the 8-byte header)
449 let payload = &packet[PACKET_HEADER_SIZE..];
450
451 if payload.len() < 2 {
452 return Err(Error::Protocol("DATA packet payload too small".to_string()));
453 }
454
455 // Read data flags (first 2 bytes of payload)
456 let data_flags_value = u16::from_be_bytes([payload[0], payload[1]]);
457
458 // Check for end of response - Python checks both flags and message type
459 let has_end_flag = (data_flags_value & data_flags::END_OF_RESPONSE) != 0;
460 let has_eof_flag = (data_flags_value & data_flags::EOF) != 0;
461
462 // Also check for EndOfResponse message type (header + 3 bytes with msg type 29)
463 let has_end_message = payload.len() == 3
464 && payload[2] == MessageType::EndOfResponse as u8;
465
466 // Accumulate payload first
467 if is_first_packet {
468 // First packet: include data flags in accumulated payload
469 accumulated_payload.extend_from_slice(payload);
470 is_first_packet = false;
471 } else {
472 // Subsequent packets: skip the data flags, append only the message data
473 accumulated_payload.extend_from_slice(&payload[2..]);
474 }
475
476 // Check for end of response using data flags from this packet
477 let is_end_of_response = has_end_flag || has_eof_flag || has_end_message;
478
479 // If data flags don't indicate end, scan the ACCUMULATED message data
480 // for terminal messages. We scan accumulated data (not just current packet)
481 // because messages can span packet boundaries.
482 let has_terminal_message = if !is_end_of_response && accumulated_payload.len() > 2 {
483 self.scan_for_terminal_message(&accumulated_payload[2..])
484 } else {
485 false
486 };
487
488 // Check if this is the last packet
489 if is_end_of_response || has_terminal_message {
490 break;
491 }
492 }
493
494 // Build a synthetic packet with combined payload
495 let total_len = PACKET_HEADER_SIZE + accumulated_payload.len();
496 let mut result = Vec::with_capacity(total_len);
497
498 // Build header
499 if self.large_sdu {
500 result.extend_from_slice(&(total_len as u32).to_be_bytes());
501 } else {
502 result.extend_from_slice(&(total_len as u16).to_be_bytes());
503 result.extend_from_slice(&[0, 0]); // Checksum
504 }
505 result.push(PacketType::Data as u8);
506 result.push(0); // Flags
507 result.extend_from_slice(&[0, 0]); // Header checksum
508
509 // Add combined payload
510 result.extend_from_slice(&accumulated_payload);
511
512 Ok(bytes::Bytes::from(result))
513 }
514
515 /// Scan message data for terminal message types (ERROR or END_OF_RESPONSE)
516 /// that indicate the response is complete.
517 ///
518 /// This is needed because Oracle doesn't always set the END_OF_RESPONSE flag
519 /// in the data flags for LOB operations. Instead, we must detect the terminal
520 /// message by parsing the message stream.
521 ///
522 /// NOTE: This is conservative - it only returns true if we can definitively
523 /// identify a terminal message. We avoid false positives by not scanning
524 /// raw byte values (which could match message type values by coincidence).
525 fn scan_for_terminal_message(&self, data: &[u8]) -> bool {
526 use crate::buffer::ReadBuffer;
527 use crate::constants::MessageType;
528
529 if data.is_empty() {
530 return false;
531 }
532
533 // Try to parse the message stream and look for ERROR or END_OF_RESPONSE
534 let mut buf = ReadBuffer::from_slice(data);
535
536 while buf.remaining() > 0 {
537 let msg_type = match buf.read_u8() {
538 Ok(t) => t,
539 Err(_) => return false, // Can't read, assume incomplete
540 };
541
542 // END_OF_RESPONSE is a standalone message with no additional data
543 if msg_type == MessageType::EndOfResponse as u8 {
544 return true;
545 }
546
547 // ERROR message indicates end of response for older Oracle
548 if msg_type == MessageType::Error as u8 {
549 // Error message found - this indicates end of response
550 return true;
551 }
552
553 // STATUS message also indicates end of response
554 if msg_type == MessageType::Status as u8 {
555 return true;
556 }
557
558 // LOB_DATA message - skip the data
559 if msg_type == MessageType::LobData as u8 {
560 // Read length-prefixed data and skip it
561 match buf.read_raw_bytes_chunked() {
562 Ok(_) => continue,
563 Err(_) => return false, // Incomplete LOB data, need more packets
564 }
565 }
566
567 // PARAMETER message (8) - this contains the updated locator and amount.
568 // For LOB write responses, PARAMETER is the first message and the response
569 // is relatively small (locator + error info). We can safely scan for
570 // ERROR/END_OF_RESPONSE bytes because the locator doesn't contain arbitrary
571 // binary data that would false-positive.
572 //
573 // For LOB read responses, LobData comes first and contains the actual data,
574 // which might contain bytes that match ERROR (4) or END_OF_RESPONSE (29).
575 // But since we skip LobData content, by the time we reach PARAMETER,
576 // the remaining data is just locator + error info.
577 if msg_type == MessageType::Parameter as u8 {
578 let remaining = buf.remaining_bytes();
579 // Check if ERROR (4) or END_OF_RESPONSE (29) appears in remaining bytes
580 // This is safe because PARAMETER data (locator + amount) doesn't contain
581 // arbitrary binary data that would false-positive.
582 if remaining.contains(&(MessageType::Error as u8))
583 || remaining.contains(&(MessageType::EndOfResponse as u8))
584 {
585 return true;
586 }
587 // If no terminal marker found, response might be incomplete
588 return false;
589 }
590
591 // For other unknown message types, we can't determine the end
592 return false;
593 }
594
595 false
596 }
597
598 /// Send a marker packet with the specified marker type
599 async fn send_marker(&mut self, marker_type: u8) -> Result<()> {
600 let mut buf = WriteBuffer::with_capacity(16);
601
602 // Build marker packet header
603 // Marker packet structure: [length][0x00][0x00][0x00][0x0c][flags][0x00][0x00] + payload
604 // Payload: [0x01][0x00][marker_type]
605 let payload_len = 3; // 0x01, 0x00, marker_type
606 let total_len = (PACKET_HEADER_SIZE + payload_len) as u16;
607
608 // Header
609 buf.write_u16_be(total_len)?;
610 buf.write_u16_be(0)?; // zeros in large_sdu position
611 buf.write_u8(PacketType::Marker as u8)?;
612 buf.write_u8(0)?; // flags
613 buf.write_u16_be(0)?; // reserved
614
615 // Payload
616 buf.write_u8(0x01)?; // constant
617 buf.write_u8(0x00)?; // constant
618 buf.write_u8(marker_type)?;
619
620 self.send(buf.as_slice()).await
621 }
622
623 /// Handle the reset protocol after receiving a MARKER packet
624 /// This sends a reset marker, waits for the reset response, then returns the error packet
625 /// Returns Err if the connection is closed after reset (some Oracle versions do this)
626 async fn handle_marker_reset(&mut self) -> Result<bytes::Bytes> {
627 const MARKER_TYPE_RESET: u8 = 2;
628
629 // Send reset marker
630 self.send_marker(MARKER_TYPE_RESET).await?;
631
632 // Read packets until we get a reset marker back
633 loop {
634 let packet = self.receive().await?;
635 if packet.len() < PACKET_HEADER_SIZE {
636 return Err(Error::Protocol("Invalid packet received".to_string()));
637 }
638
639 let packet_type = packet[4];
640
641 if packet_type == PacketType::Marker as u8 {
642 // Check if it's a reset marker
643 if packet.len() >= PACKET_HEADER_SIZE + 3 {
644 let marker_type = packet[PACKET_HEADER_SIZE + 2];
645 if marker_type == MARKER_TYPE_RESET {
646 break;
647 }
648 }
649 } else {
650 // Non-marker packet received unexpectedly during reset wait
651 return Ok(packet);
652 }
653 }
654
655 // Try to read the error packet (may need to skip additional marker packets first)
656 // Note: Some Oracle versions (like Oracle Free) may close the connection after reset
657 // instead of sending an error packet
658 loop {
659 match self.receive().await {
660 Ok(packet) => {
661 let packet_type = packet[4];
662
663 if packet_type != PacketType::Marker as u8 {
664 // This should be the error data packet
665 return Ok(packet);
666 }
667 // Skip additional marker packets
668 }
669 Err(_) => {
670 // Connection closed after reset - Oracle Free and some versions
671 // close the connection instead of sending the error details.
672 // This typically happens when:
673 // - Table or view doesn't exist
674 // - Insufficient privileges to access the object
675 // - Invalid SQL syntax
676 return Err(Error::ConnectionClosedByServer(
677 "Query failed - Oracle closed the connection without providing error details. \
678 This typically indicates insufficient privileges or the object doesn't exist.".to_string()
679 ));
680 }
681 }
682 }
683 }
684}
685
686/// An Oracle database connection.
687///
688/// This is the main type for interacting with Oracle databases. It provides
689/// methods for executing queries, DML statements, PL/SQL blocks, and managing
690/// transactions.
691///
692/// Connections are created using [`Connection::connect`] or
693/// [`Connection::connect_with_config`]. For connection pooling, use the
694/// `deadpool-oracle` crate.
695///
696/// # Example
697///
698/// ```rust,no_run
699/// use oracle_rs::{Config, Connection, Value};
700///
701/// # async fn example() -> oracle_rs::Result<()> {
702/// // Create a connection
703/// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
704/// let conn = Connection::connect_with_config(config).await?;
705///
706/// // Execute a query
707/// let result = conn.query("SELECT * FROM employees WHERE dept_id = :1", &[10.into()]).await?;
708/// for row in &result.rows {
709/// let name = row.get_by_name("name").and_then(|v| v.as_str()).unwrap_or("");
710/// println!("Employee: {}", name);
711/// }
712///
713/// // Execute DML with transaction
714/// conn.execute("INSERT INTO logs (msg) VALUES (:1)", &["Hello".into()]).await?;
715/// conn.commit().await?;
716///
717/// // Close the connection
718/// conn.close().await?;
719/// # Ok(())
720/// # }
721/// ```
722///
723/// # Thread Safety
724///
725/// `Connection` is `Send` and `Sync`, but operations are serialized internally
726/// via a mutex. For parallel query execution, use multiple connections (e.g.,
727/// via a connection pool).
728pub struct Connection {
729 inner: Arc<Mutex<ConnectionInner>>,
730 config: Config,
731 closed: AtomicBool,
732 id: u32,
733}
734
735// Connection ID counter
736static CONNECTION_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
737
738impl Connection {
739 /// Create a new connection to an Oracle database
740 ///
741 /// # Arguments
742 ///
743 /// * `connect_string` - Connection string in EZConnect format (e.g., "host:port/service")
744 /// * `username` - Database username
745 /// * `password` - Database password
746 ///
747 /// # Example
748 ///
749 /// ```rust,ignore
750 /// let conn = Connection::connect("localhost:1521/ORCLPDB1", "scott", "tiger").await?;
751 /// ```
752 pub async fn connect(connect_string: &str, username: &str, password: &str) -> Result<Self> {
753 let mut config: Config = connect_string.parse()?;
754 config.username = username.to_string();
755 config.set_password(password);
756 Self::connect_with_config(config).await
757 }
758
759 /// Create a new connection using a [`Config`].
760 ///
761 /// This is the preferred way to create connections as it gives full control
762 /// over connection parameters including TLS, timeouts, and statement caching.
763 ///
764 /// # Example
765 ///
766 /// ```rust,no_run
767 /// use oracle_rs::{Config, Connection};
768 ///
769 /// # async fn example() -> oracle_rs::Result<()> {
770 /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password")
771 /// .with_statement_cache_size(50);
772 ///
773 /// let conn = Connection::connect_with_config(config).await?;
774 /// # Ok(())
775 /// # }
776 /// ```
777 pub async fn connect_with_config(config: Config) -> Result<Self> {
778 let id = CONNECTION_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
779
780 // Create TCP connection
781 let addr = config.socket_addr();
782 let tcp_stream = TcpStream::connect(&addr).await?;
783
784 // Set TCP options
785 tcp_stream.set_nodelay(true)?;
786
787 // Wrap with TLS if configured
788 let stream = if config.is_tls_enabled() {
789 let tls_config = config.tls_config.as_ref()
790 .cloned()
791 .unwrap_or_else(TlsConfig::new);
792
793 let tls_stream = connect_tls(tcp_stream, &config.host, &tls_config).await?;
794 OracleStream::Tls(tls_stream)
795 } else {
796 OracleStream::Plain(tcp_stream)
797 };
798
799 let mut inner = ConnectionInner::new_with_cache(config.stmtcachesize);
800 inner.stream = Some(stream);
801 inner.state = ConnectionState::Connected;
802
803 let conn = Connection {
804 inner: Arc::new(Mutex::new(inner)),
805 config,
806 closed: AtomicBool::new(false),
807 id,
808 };
809
810 // Perform connection handshake
811 conn.perform_handshake().await?;
812
813 Ok(conn)
814 }
815
816 /// Get the connection ID
817 pub fn id(&self) -> u32 {
818 self.id
819 }
820
821 /// Check if the connection is closed
822 pub fn is_closed(&self) -> bool {
823 self.closed.load(Ordering::Relaxed)
824 }
825
826 /// Mark the connection as closed
827 ///
828 /// This should be called when the underlying connection is detected as broken.
829 /// Once marked closed, `is_closed()` returns true and operations will fail fast.
830 pub fn mark_closed(&self) {
831 self.closed.store(true, Ordering::Relaxed);
832 }
833
834 /// Helper to mark connection as closed if the result is a connection error
835 fn handle_result<T>(&self, result: Result<T>) -> Result<T> {
836 if let Err(ref e) = result {
837 if e.is_connection_error() {
838 self.mark_closed();
839 }
840 }
841 result
842 }
843
844 /// Get server information
845 pub async fn server_info(&self) -> ServerInfo {
846 let inner = self.inner.lock().await;
847 inner.server_info.clone()
848 }
849
850 /// Get the current connection state
851 pub async fn state(&self) -> ConnectionState {
852 let inner = self.inner.lock().await;
853 inner.state
854 }
855
856 /// Perform the connection handshake
857 async fn perform_handshake(&self) -> Result<()> {
858 // Step 1: Send CONNECT packet and parse ACCEPT response
859 self.send_connect_packet().await?;
860
861 // Step 2: OOB check (required for protocol version >= 318 AND server supports OOB)
862 // Both conditions must be met - server must have indicated OOB support in ACCEPT
863 let needs_oob_check = {
864 let inner = self.inner.lock().await;
865 inner.server_info.protocol_version >= crate::constants::version::MIN_OOB_CHECK
866 && inner.server_info.supports_oob
867 };
868 if needs_oob_check {
869 self.send_oob_check().await?;
870 }
871
872 // Step 3: Protocol negotiation
873 self.negotiate_protocol().await?;
874
875 // Step 4: Data types negotiation
876 self.negotiate_data_types().await?;
877
878 // Step 5: Authentication
879 self.authenticate().await?;
880
881 Ok(())
882 }
883
884 /// Send OOB (Out of Band) check
885 /// Required for protocol version >= 318
886 async fn send_oob_check(&self) -> Result<()> {
887 let mut inner = self.inner.lock().await;
888 let large_sdu = inner.large_sdu;
889
890 // Step 1: Send raw byte "!" (0x21) for OOB check
891 inner.send(&[0x21]).await?;
892
893 // Step 2: Send MARKER packet with Reset
894 let marker_payload = [1u8, 0u8, crate::constants::MarkerType::Reset as u8];
895 let mut packet_buf = WriteBuffer::new();
896
897 if large_sdu {
898 packet_buf.write_u32_be((PACKET_HEADER_SIZE + marker_payload.len()) as u32)?;
899 } else {
900 packet_buf.write_u16_be((PACKET_HEADER_SIZE + marker_payload.len()) as u16)?;
901 packet_buf.write_u16_be(0)?; // Checksum
902 }
903 packet_buf.write_u8(PacketType::Marker as u8)?;
904 packet_buf.write_u8(0)?; // Flags
905 packet_buf.write_u16_be(0)?; // Header checksum
906 packet_buf.write_bytes(&marker_payload)?;
907
908 inner.send(&packet_buf.freeze()).await?;
909
910 // Step 3: Wait for OOB reset response
911 // The server sends back a MARKER packet or reset acknowledgment
912 let response = inner.receive().await?;
913
914 // Validate response - should be a MARKER packet type (12)
915 if response.len() > 4 && response[4] == PacketType::Marker as u8 {
916 Ok(())
917 } else {
918 // Server might just acknowledge without a specific packet
919 // This is acceptable in some Oracle versions
920 Ok(())
921 }
922 }
923
924 /// Send the initial CONNECT packet
925 async fn send_connect_packet(&self) -> Result<()> {
926 let mut inner = self.inner.lock().await;
927
928 // Build connect packet using ConnectMessage for proper packet format
929 let connect_msg = ConnectMessage::from_config(&self.config);
930 let (connect_packet, continuation) = connect_msg.build_with_continuation()?;
931
932 // Send the CONNECT packet
933 inner.send(&connect_packet).await?;
934
935 // If we have a continuation DATA packet (for large connect strings), send it
936 if let Some(data_packet) = continuation {
937 inner.send(&data_packet).await?;
938 }
939
940 // Wait for response
941 let response = inner.receive().await?;
942
943 // Parse response packet type
944 if response.len() < PACKET_HEADER_SIZE {
945 return Err(Error::PacketTooShort {
946 expected: PACKET_HEADER_SIZE,
947 actual: response.len(),
948 });
949 }
950
951 let packet_type = response[4];
952
953 match packet_type {
954 2 => {
955 // ACCEPT - parse the accept message to get protocol version and capabilities
956 let packet = Packet::from_bytes(response)?;
957 let accept = AcceptMessage::parse(&packet)?;
958
959 // Set large_sdu mode if protocol version >= 315
960 inner.large_sdu = accept.uses_large_sdu();
961
962 // Update server info
963 inner.server_info.protocol_version = accept.protocol_version;
964 inner.server_info.supports_oob = accept.supports_oob;
965 inner.sdu_size = accept.sdu.min(65535) as u16;
966
967 inner.state = ConnectionState::Connected;
968 Ok(())
969 }
970 4 => {
971 // REFUSE
972 let mut buf = ReadBuffer::new(response.slice(PACKET_HEADER_SIZE..));
973 let _reason = buf.read_u8()?;
974 let _user_reason = buf.read_u8()?;
975
976 Err(Error::ConnectionRefused {
977 error_code: None,
978 message: Some("Connection refused by server".to_string()),
979 })
980 }
981 5 => {
982 // REDIRECT
983 Err(Error::ConnectionRedirect("redirect not implemented".to_string()))
984 }
985 _ => Err(Error::ProtocolError(format!(
986 "Unexpected packet type during connect: {}",
987 packet_type
988 ))),
989 }
990 }
991
992 /// Negotiate protocol version and capabilities
993 async fn negotiate_protocol(&self) -> Result<()> {
994 use crate::messages::ProtocolMessage;
995
996 let mut inner = self.inner.lock().await;
997 let large_sdu = inner.large_sdu;
998
999
1000 // Build protocol request (includes header)
1001 let protocol_msg = ProtocolMessage::new();
1002 let packet = protocol_msg.build_request(large_sdu)?;
1003 inner.send(&packet).await?;
1004
1005 // Receive response
1006 let response = inner.receive().await?;
1007
1008 // Validate packet type (at offset 4 for both SDU modes)
1009 if response.len() <= 4 || response[4] != PacketType::Data as u8 {
1010 return Err(Error::ProtocolError("Protocol negotiation failed".to_string()));
1011 }
1012
1013 // Parse the Protocol response to extract server capabilities
1014 // The payload starts after the 8-byte header
1015 let payload = &response[PACKET_HEADER_SIZE..];
1016 let mut protocol_msg = ProtocolMessage::new();
1017 protocol_msg.parse_response(payload, &mut inner.capabilities)?;
1018
1019 // Update server info with banner
1020 if let Some(banner) = &protocol_msg.server_banner {
1021 inner.server_info.banner = banner.clone();
1022 }
1023
1024 inner.state = ConnectionState::ProtocolNegotiated;
1025 Ok(())
1026 }
1027
1028 /// Negotiate data types
1029 async fn negotiate_data_types(&self) -> Result<()> {
1030 use crate::messages::DataTypesMessage;
1031
1032 let mut inner = self.inner.lock().await;
1033 let large_sdu = inner.large_sdu;
1034
1035
1036 // Build data types request using DataTypesMessage (includes all ~320 data types)
1037 let data_types_msg = DataTypesMessage::new();
1038 let packet = data_types_msg.build_request(&inner.capabilities, large_sdu)?;
1039 inner.send(&packet).await?;
1040
1041 // Receive response
1042 let response = inner.receive().await?;
1043
1044 // Basic validation - packet type is at offset 4 regardless of large_sdu
1045 if response.len() > 4 && response[4] == PacketType::Data as u8 {
1046 inner.state = ConnectionState::DataTypesNegotiated;
1047 Ok(())
1048 } else {
1049 Err(Error::ProtocolError("Data types negotiation failed".to_string()))
1050 }
1051 }
1052
1053 /// Perform authentication
1054 async fn authenticate(&self) -> Result<()> {
1055
1056 let service_name = match &self.config.service {
1057 ServiceMethod::ServiceName(name) => name.clone(),
1058 ServiceMethod::Sid(sid) => sid.clone(),
1059 };
1060
1061 let mut auth = AuthMessage::new(
1062 &self.config.username,
1063 self.config.password().as_bytes(),
1064 &service_name,
1065 );
1066
1067 // Phase one: send username and session info
1068 {
1069 let mut inner = self.inner.lock().await;
1070 let large_sdu = inner.large_sdu;
1071 let request = auth.build_request(&inner.capabilities, large_sdu)?;
1072 inner.send(&request).await?;
1073
1074 let response = inner.receive().await?;
1075 if response.len() <= PACKET_HEADER_SIZE {
1076 return Err(Error::Protocol("Empty auth response".to_string()));
1077 }
1078
1079 // Check for error message type
1080 if response.len() > PACKET_HEADER_SIZE + 2 {
1081 let msg_type = response[PACKET_HEADER_SIZE + 2];
1082 if msg_type == MessageType::Error as u8 {
1083 return Err(Error::AuthenticationFailed(
1084 "Server rejected authentication phase one".to_string(),
1085 ));
1086 }
1087 }
1088
1089 auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1090 }
1091
1092 // Phase two: send encrypted password
1093 if auth.phase() == AuthPhase::Two {
1094 let mut inner = self.inner.lock().await;
1095 let large_sdu = inner.large_sdu;
1096 let request = auth.build_request(&inner.capabilities, large_sdu)?;
1097 inner.send(&request).await?;
1098
1099 let response = inner.receive().await?;
1100 if response.len() <= PACKET_HEADER_SIZE {
1101 return Err(Error::Protocol("Empty auth phase two response".to_string()));
1102 }
1103
1104 // Check for error message type or marker
1105 let packet_type = response[4];
1106 if packet_type == 12 {
1107 // Marker - authentication failed
1108 return Err(Error::AuthenticationFailed("Server sent MARKER - authentication rejected".to_string()));
1109 }
1110
1111 if response.len() > PACKET_HEADER_SIZE + 2 {
1112 let msg_type = response[PACKET_HEADER_SIZE + 2];
1113 if msg_type == MessageType::Error as u8 {
1114 return Err(Error::InvalidCredentials);
1115 }
1116 }
1117
1118 auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1119 }
1120
1121 // Verify authentication completed
1122 if !auth.is_complete() {
1123 return Err(Error::AuthenticationFailed(
1124 "Authentication did not complete".to_string(),
1125 ));
1126 }
1127
1128 // Store combo key for later use (encrypted data)
1129 let mut inner = self.inner.lock().await;
1130 if let Some(combo_key) = auth.combo_key() {
1131 inner.capabilities.combo_key = Some(combo_key.to_vec());
1132 }
1133 // Auth used sequence numbers 1 and 2, set to 2 so next is 3
1134 inner.sequence_number = 2;
1135 inner.state = ConnectionState::Ready;
1136
1137 Ok(())
1138 }
1139
1140 /// Execute a SQL statement and return the result
1141 ///
1142 /// # Arguments
1143 ///
1144 /// * `sql` - SQL statement to execute
1145 /// * `params` - Bind parameters (use `Value::Integer`, `Value::String`, etc.)
1146 ///
1147 /// # Example
1148 ///
1149 /// ```rust,ignore
1150 /// use oracle_rs::Value;
1151 ///
1152 /// // Query with bind parameters
1153 /// let result = conn.execute(
1154 /// "SELECT * FROM employees WHERE department_id = :1",
1155 /// &[Value::Integer(10)]
1156 /// ).await?;
1157 ///
1158 /// // DML with bind parameters
1159 /// let result = conn.execute(
1160 /// "UPDATE employees SET salary = :1 WHERE employee_id = :2",
1161 /// &[Value::Integer(50000), Value::Integer(100)]
1162 /// ).await?;
1163 /// println!("Rows affected: {}", result.rows_affected);
1164 /// ```
1165 pub async fn execute(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1166 self.ensure_ready().await?;
1167
1168 // Check statement cache for existing prepared statement
1169 let (statement, from_cache) = {
1170 let mut inner = self.inner.lock().await;
1171 if let Some(ref mut cache) = inner.statement_cache {
1172 if let Some(cached_stmt) = cache.get(sql) {
1173 tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement (execute)");
1174 (cached_stmt, true)
1175 } else {
1176 (Statement::new(sql), false)
1177 }
1178 } else {
1179 (Statement::new(sql), false)
1180 }
1181 };
1182
1183 let result = match statement.statement_type() {
1184 StatementType::Query => self.execute_query_with_params(&statement, params).await,
1185 _ => self.execute_dml_with_params(&statement, params).await,
1186 };
1187
1188 // Return statement to cache or cache it for the first time
1189 match &result {
1190 Ok(query_result) => {
1191 let mut inner = self.inner.lock().await;
1192 if let Some(ref mut cache) = inner.statement_cache {
1193 if from_cache {
1194 cache.return_statement(sql);
1195 } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1196 let mut stmt_to_cache = statement.clone();
1197 stmt_to_cache.set_cursor_id(query_result.cursor_id);
1198 stmt_to_cache.set_executed(true);
1199 cache.put(sql.to_string(), stmt_to_cache);
1200 }
1201 }
1202 }
1203 Err(_) => {
1204 if from_cache {
1205 let mut inner = self.inner.lock().await;
1206 if let Some(ref mut cache) = inner.statement_cache {
1207 cache.return_statement(sql);
1208 }
1209 }
1210 }
1211 }
1212
1213 result
1214 }
1215
1216 /// Execute a query and return rows
1217 ///
1218 /// # Example
1219 ///
1220 /// ```rust,ignore
1221 /// use oracle_rs::Value;
1222 ///
1223 /// let result = conn.query(
1224 /// "SELECT * FROM employees WHERE salary > :1",
1225 /// &[Value::Integer(50000)]
1226 /// ).await?;
1227 /// ```
1228 pub async fn query(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1229 self.ensure_ready().await?;
1230
1231 // Check statement cache for existing prepared statement
1232 let (statement, from_cache) = {
1233 let mut inner = self.inner.lock().await;
1234 if let Some(ref mut cache) = inner.statement_cache {
1235 if let Some(cached_stmt) = cache.get(sql) {
1236 tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement");
1237 (cached_stmt, true)
1238 } else {
1239 (Statement::new(sql), false)
1240 }
1241 } else {
1242 (Statement::new(sql), false)
1243 }
1244 };
1245
1246 // If using cached statement, save the columns (Oracle won't resend on reexecute)
1247 let cached_columns = if from_cache {
1248 Some(statement.columns().to_vec())
1249 } else {
1250 None
1251 };
1252
1253 let mut result = self.execute_query_with_params(&statement, params).await;
1254
1255 // For cached statements, restore columns if Oracle didn't send them
1256 if let (Ok(ref mut query_result), Some(columns)) = (&mut result, cached_columns) {
1257 if query_result.columns.is_empty() && !columns.is_empty() {
1258 query_result.columns = columns;
1259 }
1260 }
1261
1262 // Return statement to cache or cache it for the first time
1263 match &result {
1264 Ok(query_result) => {
1265 let mut inner = self.inner.lock().await;
1266 if let Some(ref mut cache) = inner.statement_cache {
1267 if from_cache {
1268 // Return to cache
1269 cache.return_statement(sql);
1270 } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1271 // Cache the newly prepared statement with cursor_id and columns
1272 let mut stmt_to_cache = statement.clone();
1273 stmt_to_cache.set_cursor_id(query_result.cursor_id);
1274 stmt_to_cache.set_executed(true);
1275 stmt_to_cache.set_columns(query_result.columns.clone());
1276 cache.put(sql.to_string(), stmt_to_cache);
1277 }
1278 }
1279 }
1280 Err(_) => {
1281 // On error, still return statement to cache if it came from there
1282 if from_cache {
1283 let mut inner = self.inner.lock().await;
1284 if let Some(ref mut cache) = inner.statement_cache {
1285 cache.return_statement(sql);
1286 }
1287 }
1288 }
1289 }
1290
1291 self.handle_result(result)
1292 }
1293
1294 /// Execute DML (INSERT, UPDATE, DELETE) and return rows affected
1295 pub async fn execute_dml_sql(&self, sql: &str, params: &[Value]) -> Result<u64> {
1296 self.ensure_ready().await?;
1297
1298 // Check statement cache for existing prepared statement
1299 let (statement, from_cache) = {
1300 let mut inner = self.inner.lock().await;
1301 if let Some(ref mut cache) = inner.statement_cache {
1302 if let Some(cached_stmt) = cache.get(sql) {
1303 tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached DML statement");
1304 (cached_stmt, true)
1305 } else {
1306 (Statement::new(sql), false)
1307 }
1308 } else {
1309 (Statement::new(sql), false)
1310 }
1311 };
1312
1313 let result = self.execute_dml_with_params(&statement, params).await;
1314
1315 // Return statement to cache or cache it for the first time
1316 match &result {
1317 Ok(query_result) => {
1318 let mut inner = self.inner.lock().await;
1319 if let Some(ref mut cache) = inner.statement_cache {
1320 if from_cache {
1321 cache.return_statement(sql);
1322 } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1323 let mut stmt_to_cache = statement.clone();
1324 stmt_to_cache.set_cursor_id(query_result.cursor_id);
1325 stmt_to_cache.set_executed(true);
1326 cache.put(sql.to_string(), stmt_to_cache);
1327 }
1328 }
1329 }
1330 Err(_) => {
1331 if from_cache {
1332 let mut inner = self.inner.lock().await;
1333 if let Some(ref mut cache) = inner.statement_cache {
1334 cache.return_statement(sql);
1335 }
1336 }
1337 }
1338 }
1339
1340 self.handle_result(result).map(|r| r.rows_affected)
1341 }
1342
1343 /// Execute a PL/SQL block with IN/OUT/INOUT parameters
1344 ///
1345 /// This method allows execution of PL/SQL anonymous blocks or procedure calls
1346 /// that have OUT or IN OUT parameters. The `params` slice specifies the direction
1347 /// and type of each bind parameter.
1348 ///
1349 /// # Arguments
1350 ///
1351 /// * `sql` - The PL/SQL block or procedure call
1352 /// * `params` - The bind parameters with direction information
1353 ///
1354 /// # Example
1355 ///
1356 /// ```rust,ignore
1357 /// use oracle_rs::{Connection, BindParam, OracleType, Value};
1358 ///
1359 /// // Call a procedure with IN and OUT parameters
1360 /// let result = conn.execute_plsql(
1361 /// "BEGIN get_employee_name(:1, :2); END;",
1362 /// &[
1363 /// BindParam::input(Value::Integer(100)), // IN: employee_id
1364 /// BindParam::output(OracleType::Varchar, 100), // OUT: employee_name
1365 /// ]
1366 /// ).await?;
1367 ///
1368 /// // Get the OUT parameter value
1369 /// let name = result.get_string(0).unwrap_or("Unknown");
1370 /// println!("Employee name: {}", name);
1371 /// ```
1372 ///
1373 /// # REF CURSOR Example
1374 ///
1375 /// ```rust,ignore
1376 /// use oracle_rs::{Connection, BindParam, Value};
1377 ///
1378 /// // Call a procedure that returns a REF CURSOR
1379 /// let result = conn.execute_plsql(
1380 /// "BEGIN OPEN :1 FOR SELECT * FROM employees; END;",
1381 /// &[BindParam::output_cursor()]
1382 /// ).await?;
1383 ///
1384 /// // Get the cursor ID and fetch rows
1385 /// if let Some(cursor_id) = result.get_cursor_id(0) {
1386 /// let rows = conn.fetch_cursor(cursor_id, 100).await?;
1387 /// for row in rows {
1388 /// println!("{:?}", row);
1389 /// }
1390 /// }
1391 /// ```
1392 pub async fn execute_plsql(&self, sql: &str, params: &[BindParam]) -> Result<PlsqlResult> {
1393 self.ensure_ready().await?;
1394
1395 let statement = Statement::new(sql);
1396
1397 // Build values for bind parameters
1398 // IN params: use provided value or Null
1399 // OUT params: use placeholder value (required for metadata, server ignores actual value)
1400 // INOUT params: use provided value or Null
1401 let bind_values: Vec<Value> = params
1402 .iter()
1403 .map(|p| {
1404 if p.direction == BindDirection::Output {
1405 // OUT params get a placeholder based on their type
1406 // Oracle still needs a value sent in the request (even though it's ignored)
1407 p.placeholder_value()
1408 } else {
1409 // IN and INOUT params use the provided value or Null
1410 p.value.clone().unwrap_or(Value::Null)
1411 }
1412 })
1413 .collect();
1414
1415 // Build bind metadata for proper buffer sizes
1416 // For OUTPUT params, use the user-specified buffer_size
1417 // For INPUT params, derive buffer_size from the actual value
1418 let bind_metadata: Vec<crate::messages::BindMetadata> = params
1419 .iter()
1420 .zip(bind_values.iter())
1421 .map(|(p, v)| {
1422 let buffer_size = if p.buffer_size > 0 {
1423 p.buffer_size
1424 } else {
1425 // Derive from value
1426 match v {
1427 Value::String(s) => std::cmp::max(s.len() as u32, 1),
1428 Value::Bytes(b) => std::cmp::max(b.len() as u32, 1),
1429 Value::Integer(_) | Value::Number(_) => 22, // Oracle NUMBER max size
1430 Value::Float(_) => 8, // BINARY_DOUBLE
1431 Value::Boolean(_) => 1,
1432 Value::Timestamp(_) => 13,
1433 Value::Date(_) => 7,
1434 Value::RowId(_) => 18,
1435 _ => 100, // Default fallback
1436 }
1437 };
1438 crate::messages::BindMetadata {
1439 oracle_type: p.oracle_type,
1440 buffer_size,
1441 }
1442 })
1443 .collect();
1444
1445 // Create execute message with PL/SQL options
1446 let options = ExecuteOptions::for_plsql();
1447 let mut execute_msg = ExecuteMessage::new(&statement, options);
1448 execute_msg.set_bind_values(bind_values);
1449 execute_msg.set_bind_metadata(bind_metadata);
1450
1451 let mut inner = self.inner.lock().await;
1452 let large_sdu = inner.large_sdu;
1453 let seq_num = inner.next_sequence_number();
1454 execute_msg.set_sequence_number(seq_num);
1455 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1456 inner.send(&request).await?;
1457
1458 // Receive response
1459 let response = inner.receive().await?;
1460 if response.len() <= PACKET_HEADER_SIZE {
1461 return Err(Error::Protocol("Empty PL/SQL response".to_string()));
1462 }
1463
1464 // Check for MARKER packet (indicates error - requires reset protocol)
1465 let packet_type = response[4];
1466 if packet_type == PacketType::Marker as u8 {
1467 // Handle marker reset protocol and get the error packet
1468 let error_response = inner.handle_marker_reset().await?;
1469 let payload = &error_response[PACKET_HEADER_SIZE..];
1470 // Parse error response to extract the actual Oracle error
1471 let _: QueryResult = self.parse_error_response(payload)?;
1472 return Err(Error::Protocol("PL/SQL execution failed".to_string()));
1473 }
1474
1475 // Parse the PL/SQL response
1476 let payload = &response[PACKET_HEADER_SIZE..];
1477 let caps = inner.capabilities.clone();
1478 drop(inner); // Release lock before parsing
1479
1480 self.parse_plsql_response(payload, &caps, params)
1481 }
1482
1483 /// Execute a batch of DML statements with multiple rows of bind values
1484 ///
1485 /// This method efficiently executes the same SQL statement multiple times
1486 /// with different bind values (executemany pattern).
1487 ///
1488 /// # Arguments
1489 ///
1490 /// * `batch` - The batch containing SQL and rows of bind values
1491 ///
1492 /// # Example
1493 ///
1494 /// ```rust,ignore
1495 /// use oracle_rs::{Connection, BatchBuilder, Value};
1496 ///
1497 /// let batch = BatchBuilder::new("INSERT INTO users (id, name) VALUES (:1, :2)")
1498 /// .add_row(vec![Value::Integer(1), Value::String("Alice".to_string())])
1499 /// .add_row(vec![Value::Integer(2), Value::String("Bob".to_string())])
1500 /// .with_row_counts()
1501 /// .build();
1502 ///
1503 /// let result = conn.execute_batch(&batch).await?;
1504 /// println!("Total rows affected: {}", result.total_rows_affected);
1505 /// ```
1506 pub async fn execute_batch(&self, batch: &BatchBinds) -> Result<BatchResult> {
1507 self.ensure_ready().await?;
1508
1509 // Validate the batch
1510 batch.validate()?;
1511
1512 if batch.rows.is_empty() {
1513 return Ok(BatchResult::new());
1514 }
1515
1516 // Build execute options for batch DML
1517 let mut options = ExecuteOptions::for_dml(batch.options.auto_commit);
1518 options.num_execs = batch.rows.len() as u32;
1519 options.batch_errors = batch.options.batch_errors;
1520 options.dml_row_counts = batch.options.array_dml_row_counts;
1521
1522 // Create execute message with batch bind values
1523 let mut execute_msg = ExecuteMessage::new(&batch.statement, options);
1524 execute_msg.set_batch_bind_values(batch.rows.clone());
1525
1526 let mut inner = self.inner.lock().await;
1527 let large_sdu = inner.large_sdu;
1528 let seq_num = inner.next_sequence_number();
1529 execute_msg.set_sequence_number(seq_num);
1530 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1531 inner.send(&request).await?;
1532
1533 // Receive response
1534 let mut response = inner.receive().await?;
1535 if response.len() <= PACKET_HEADER_SIZE {
1536 return Err(Error::Protocol("Empty batch response".to_string()));
1537 }
1538
1539 // Check packet type - handle MARKER packets
1540 let packet_type = response[4];
1541 if packet_type == PacketType::Marker as u8 {
1542 // Handle BREAK/RESET protocol same as regular DML
1543 response = self.handle_marker_protocol(&mut inner, response).await?;
1544 }
1545
1546 // Parse the batch response
1547 let payload = &response[PACKET_HEADER_SIZE..];
1548 drop(inner); // Release lock before parsing
1549
1550 self.parse_batch_response(payload, batch.rows.len(), batch.options.array_dml_row_counts)
1551 }
1552
1553 /// Handle MARKER packet protocol (BREAK/RESET)
1554 async fn handle_marker_protocol(
1555 &self,
1556 inner: &mut ConnectionInner,
1557 initial_response: Bytes,
1558 ) -> Result<Bytes> {
1559 // Extract marker type from packet
1560 let marker_type = if initial_response.len() >= PACKET_HEADER_SIZE + 3 {
1561 initial_response[PACKET_HEADER_SIZE + 2]
1562 } else {
1563 1 // Assume BREAK
1564 };
1565
1566 if marker_type == 1 {
1567 // BREAK marker - send RESET and wait for response
1568 self.send_marker(inner, 2).await?;
1569
1570 // Wait for RESET marker back
1571 loop {
1572 match inner.receive().await {
1573 Ok(pkt) => {
1574 if pkt.len() < PACKET_HEADER_SIZE + 1 {
1575 break;
1576 }
1577 let pkt_type = pkt[4];
1578 if pkt_type == PacketType::Marker as u8 {
1579 if pkt.len() >= PACKET_HEADER_SIZE + 3 {
1580 let mk_type = pkt[PACKET_HEADER_SIZE + 2];
1581 if mk_type == 2 {
1582 // Got RESET marker, break out
1583 break;
1584 }
1585 }
1586 } else if pkt_type == PacketType::Data as u8 {
1587 // Got DATA packet - return it as the response
1588 return Ok(pkt);
1589 } else {
1590 break;
1591 }
1592 }
1593 Err(e) => {
1594 inner.state = ConnectionState::Closed;
1595 return Err(e);
1596 }
1597 }
1598 }
1599
1600 // After RESET, continue receiving until we get DATA packet
1601 loop {
1602 match inner.receive().await {
1603 Ok(pkt) => {
1604 let pkt_type = pkt[4];
1605 if pkt_type == PacketType::Marker as u8 {
1606 continue;
1607 } else if pkt_type == PacketType::Data as u8 {
1608 return Ok(pkt);
1609 } else {
1610 return Err(Error::Protocol(format!(
1611 "Unexpected packet type {} after reset",
1612 pkt_type
1613 )));
1614 }
1615 }
1616 Err(e) => {
1617 inner.state = ConnectionState::Closed;
1618 return Err(e);
1619 }
1620 }
1621 }
1622 }
1623
1624 Ok(initial_response)
1625 }
1626
1627 /// Parse batch execution response
1628 fn parse_batch_response(
1629 &self,
1630 payload: &[u8],
1631 batch_size: usize,
1632 want_row_counts: bool,
1633 ) -> Result<BatchResult> {
1634 if payload.len() < 3 {
1635 return Err(Error::Protocol("Batch response too short".to_string()));
1636 }
1637
1638 let mut buf = ReadBuffer::from_slice(payload);
1639
1640 // Skip data flags
1641 buf.skip(2)?;
1642
1643 let mut rows_affected: u64 = 0;
1644 let mut row_counts: Option<Vec<u64>> = None;
1645 let mut end_of_response = false;
1646
1647 // Process messages until end_of_response or out of data
1648 while !end_of_response && buf.remaining() > 0 {
1649 let msg_type = buf.read_u8()?;
1650
1651 match msg_type {
1652 // Error (4) - may contain error or success info
1653 x if x == MessageType::Error as u8 => {
1654 let (error_code, error_msg, _cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
1655 rows_affected = row_count;
1656 if error_code != 0 && error_code != 1403 {
1657 return Err(Error::OracleError {
1658 code: error_code,
1659 message: error_msg.unwrap_or_default(),
1660 });
1661 }
1662 }
1663
1664 // Parameter (8) - return parameters (may contain row counts)
1665 x if x == MessageType::Parameter as u8 => {
1666 if let Some(counts) = self.parse_return_parameters_internal(&mut buf, want_row_counts)? {
1667 row_counts = Some(counts);
1668 }
1669 }
1670
1671 // Status (9) - call status
1672 x if x == MessageType::Status as u8 => {
1673 let _call_status = buf.read_ub4()?;
1674 let _end_to_end_seq = buf.read_ub2()?;
1675 }
1676
1677 // BitVector (21)
1678 21 => {
1679 let _num_columns_sent = buf.read_ub2()?;
1680 if buf.remaining() > 0 {
1681 let _byte = buf.read_u8()?;
1682 }
1683 }
1684
1685 // End of Response (29) - explicit end marker
1686 29 => {
1687 end_of_response = true;
1688 }
1689
1690 _ => {
1691 // Unknown message type - continue processing
1692 }
1693 }
1694 }
1695
1696 let mut result = BatchResult::new();
1697 result.total_rows_affected = rows_affected;
1698 result.success_count = batch_size;
1699 result.row_counts = row_counts;
1700
1701 Ok(result)
1702 }
1703
1704 /// Fetch more rows from an open cursor
1705 ///
1706 /// This method is used when a query result has `has_more_rows == true`
1707 /// to retrieve additional rows from the server.
1708 ///
1709 /// # Arguments
1710 ///
1711 /// * `cursor_id` - The cursor ID from a previous query result
1712 /// * `columns` - Column information from the original query
1713 /// * `fetch_size` - Number of rows to fetch
1714 ///
1715 /// # Example
1716 ///
1717 /// ```rust,ignore
1718 /// let mut result = conn.query("SELECT * FROM large_table", &[]).await?;
1719 /// let mut all_rows = result.rows.clone();
1720 ///
1721 /// while result.has_more_rows {
1722 /// result = conn.fetch_more(result.cursor_id, &result.columns, 100).await?;
1723 /// all_rows.extend(result.rows);
1724 /// }
1725 /// ```
1726 pub async fn fetch_more(
1727 &self,
1728 cursor_id: u16,
1729 columns: &[ColumnInfo],
1730 fetch_size: u32,
1731 ) -> Result<QueryResult> {
1732 self.ensure_ready().await?;
1733
1734 // Build fetch message
1735 let fetch_msg = FetchMessage::new(cursor_id, fetch_size);
1736
1737 let mut inner = self.inner.lock().await;
1738 let request = fetch_msg.build_request(&inner.capabilities)?;
1739 inner.send(&request).await?;
1740
1741 // Receive and parse response
1742 let response = inner.receive().await?;
1743 if response.len() <= PACKET_HEADER_SIZE {
1744 return Err(Error::Protocol("Empty fetch response".to_string()));
1745 }
1746
1747 // Parse row data from response
1748 let payload = &response[PACKET_HEADER_SIZE..];
1749 let caps = inner.capabilities.clone();
1750 drop(inner); // Release lock before parsing
1751 self.parse_fetch_response(payload, columns, &caps)
1752 }
1753
1754 /// Fetch rows from a REF CURSOR
1755 ///
1756 /// This method fetches rows from a REF CURSOR that was returned from a
1757 /// PL/SQL procedure or function. The cursor contains the column metadata
1758 /// and cursor ID needed to fetch the rows.
1759 ///
1760 /// # Arguments
1761 ///
1762 /// * `cursor` - The REF CURSOR returned from PL/SQL
1763 ///
1764 /// # Example
1765 ///
1766 /// ```rust,ignore
1767 /// use oracle_rs::{Connection, BindParam, Value};
1768 ///
1769 /// // Call a procedure that returns a REF CURSOR
1770 /// let result = conn.execute_plsql(
1771 /// "BEGIN OPEN :1 FOR SELECT id, name FROM employees; END;",
1772 /// &[BindParam::output_cursor()]
1773 /// ).await?;
1774 ///
1775 /// // Get the cursor and fetch rows
1776 /// if let Value::Cursor(cursor) = &result.out_values[0] {
1777 /// let rows = conn.fetch_cursor(cursor).await?;
1778 /// println!("Fetched {} rows", rows.row_count());
1779 /// for row in &rows.rows {
1780 /// println!("{:?}", row);
1781 /// }
1782 /// }
1783 /// ```
1784 pub async fn fetch_cursor(&self, cursor: &crate::types::RefCursor) -> Result<QueryResult> {
1785 self.fetch_cursor_with_size(cursor, 100).await
1786 }
1787
1788 /// Fetch rows from a REF CURSOR with a specified fetch size
1789 ///
1790 /// This is the same as `fetch_cursor` but allows specifying how many
1791 /// rows to fetch at once.
1792 ///
1793 /// REF CURSORs use an ExecuteMessage with only the FETCH option because
1794 /// the cursor is already open from the PL/SQL execution. The cursor_id
1795 /// and column metadata were obtained when the REF CURSOR was returned.
1796 ///
1797 /// # Arguments
1798 ///
1799 /// * `cursor` - The REF CURSOR returned from PL/SQL
1800 /// * `fetch_size` - Number of rows to fetch (default is 100)
1801 pub async fn fetch_cursor_with_size(
1802 &self,
1803 cursor: &crate::types::RefCursor,
1804 fetch_size: u32,
1805 ) -> Result<QueryResult> {
1806 use crate::messages::ExecuteMessage;
1807
1808 if cursor.cursor_id() == 0 {
1809 return Err(Error::InvalidCursor("Cursor ID is 0 (not initialized)".to_string()));
1810 }
1811
1812 self.ensure_ready().await?;
1813
1814 // REF CURSOR uses ExecuteMessage with FETCH only (no SQL, no EXECUTE)
1815 // Create a statement with the cursor's metadata
1816 let mut stmt = Statement::new(""); // No SQL for REF CURSOR
1817 stmt.set_cursor_id(cursor.cursor_id());
1818 stmt.set_columns(cursor.columns().to_vec());
1819 stmt.set_executed(true); // Already executed by Oracle
1820 stmt.set_statement_type(crate::statement::StatementType::Query); // This is a query cursor
1821
1822 // Build execute message with only FETCH option
1823 let options = crate::messages::ExecuteOptions::for_ref_cursor(fetch_size);
1824 let mut execute_msg = ExecuteMessage::new(&stmt, options);
1825
1826 let mut inner = self.inner.lock().await;
1827 let large_sdu = inner.large_sdu;
1828 let seq_num = inner.next_sequence_number();
1829 execute_msg.set_sequence_number(seq_num);
1830
1831 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1832 inner.send(&request).await?;
1833
1834 // Receive and parse response
1835 let response = inner.receive().await?;
1836 if response.len() <= PACKET_HEADER_SIZE {
1837 return Err(Error::Protocol("Empty cursor response".to_string()));
1838 }
1839
1840 // Check for MARKER packet (indicates error)
1841 let packet_type = response[4];
1842 if packet_type == PacketType::Marker as u8 {
1843 let error_response = inner.handle_marker_reset().await?;
1844 let payload = &error_response[PACKET_HEADER_SIZE..];
1845 return self.parse_error_response(payload);
1846 }
1847
1848 // Parse query response - use cursor's columns since they're already defined
1849 let payload = &response[PACKET_HEADER_SIZE..];
1850 let caps = inner.capabilities.clone();
1851 drop(inner); // Release lock before parsing
1852 self.parse_fetch_response(payload, cursor.columns(), &caps)
1853 }
1854
1855 /// Fetch rows from an implicit result set
1856 ///
1857 /// Implicit results are returned via `DBMS_SQL.RETURN_RESULT` from PL/SQL.
1858 /// They contain cursor metadata but no rows until fetched.
1859 ///
1860 /// # Arguments
1861 ///
1862 /// * `result` - The implicit result from PL/SQL execution
1863 ///
1864 /// # Example
1865 ///
1866 /// ```rust,ignore
1867 /// let plsql_result = conn.execute_plsql(r#"
1868 /// declare
1869 /// c sys_refcursor;
1870 /// begin
1871 /// open c for select * from employees;
1872 /// dbms_sql.return_result(c);
1873 /// end;
1874 /// "#, &[]).await?;
1875 ///
1876 /// for implicit in plsql_result.implicit_results.iter() {
1877 /// let rows = conn.fetch_implicit_result(implicit).await?;
1878 /// println!("Fetched {} rows", rows.row_count());
1879 /// }
1880 /// ```
1881 pub async fn fetch_implicit_result(&self, result: &ImplicitResult) -> Result<QueryResult> {
1882 self.fetch_implicit_result_with_size(result, 100).await
1883 }
1884
1885 /// Fetch rows from an implicit result set with a specified fetch size
1886 pub async fn fetch_implicit_result_with_size(
1887 &self,
1888 result: &ImplicitResult,
1889 fetch_size: u32,
1890 ) -> Result<QueryResult> {
1891 // Convert implicit result to RefCursor and use fetch_cursor mechanism
1892 let cursor = crate::types::RefCursor::new(result.cursor_id, result.columns.clone());
1893 self.fetch_cursor_with_size(&cursor, fetch_size).await
1894 }
1895
1896 /// Parse fetch response to extract additional rows
1897 ///
1898 /// REF CURSOR fetch responses contain a series of messages:
1899 /// - RowHeader (6): Contains metadata about the following row data
1900 /// - RowData (7): Contains the actual row values
1901 /// - Error (4): Contains error info with cursor_id and row counts
1902 fn parse_fetch_response(&self, payload: &[u8], columns: &[ColumnInfo], caps: &Capabilities) -> Result<QueryResult> {
1903 if payload.len() < 3 {
1904 return Err(Error::Protocol("Fetch response too short".to_string()));
1905 }
1906
1907 let mut buf = ReadBuffer::from_slice(payload);
1908 let mut rows = Vec::new();
1909 let mut has_more_rows = false;
1910
1911 // Bit vector for duplicate column optimization
1912 let mut bit_vector: Option<Vec<u8>> = None;
1913 let mut previous_row_values: Option<Vec<Value>> = None;
1914
1915 // Skip data flags
1916 buf.skip(2)?;
1917
1918 // Process multiple messages in the response
1919 while buf.remaining() >= 1 {
1920 let msg_type = buf.read_u8()?;
1921
1922 match msg_type {
1923 x if x == MessageType::RowHeader as u8 => {
1924 // Skip row header metadata (per Python's _process_row_header)
1925 buf.skip(1)?; // flags
1926 buf.skip_ub2()?; // num requests
1927 buf.skip_ub4()?; // iteration number
1928 buf.skip_ub4()?; // num iters
1929 buf.skip_ub2()?; // buffer length
1930 let num_bytes = buf.read_ub4()?;
1931 if num_bytes > 0 {
1932 buf.skip(1)?; // skip repeated length
1933 // This bit vector in row header is for the following row data
1934 let bv = buf.read_bytes_vec(num_bytes as usize)?;
1935 bit_vector = Some(bv);
1936 }
1937 let rxhrid_bytes = buf.read_ub4()?;
1938 if rxhrid_bytes > 0 {
1939 buf.skip_raw_bytes_chunked()?;
1940 }
1941 }
1942 x if x == MessageType::RowData as u8 => {
1943 // Parse actual row data with bit vector support
1944 let row = self.parse_row_data_with_bitvector(
1945 &mut buf,
1946 columns,
1947 caps,
1948 bit_vector.as_deref(),
1949 previous_row_values.as_ref(),
1950 )?;
1951 previous_row_values = Some(row.values().to_vec());
1952 bit_vector = None;
1953 rows.push(row);
1954 }
1955 x if x == MessageType::BitVector as u8 => {
1956 // BitVector indicates which columns have actual data vs duplicates
1957 let _num_columns_sent = buf.read_ub2()?;
1958 let num_bytes = (columns.len() + 7) / 8; // Round up
1959 if num_bytes > 0 {
1960 let bv = buf.read_bytes_vec(num_bytes)?;
1961 bit_vector = Some(bv);
1962 }
1963 // Continue processing - RowData follows
1964 }
1965 x if x == MessageType::Error as u8 => {
1966 // Error message contains row count and cursor info
1967 let (error_code, error_msg, more_rows) = self.parse_error_message_info(&mut buf)?;
1968 has_more_rows = more_rows;
1969 if error_code != 0 && error_code != 1403 { // 1403 = no data found
1970 return Err(Error::OracleError {
1971 code: error_code,
1972 message: error_msg,
1973 });
1974 }
1975 break; // Error message marks end of response
1976 }
1977 x if x == MessageType::Status as u8 => {
1978 // Status message - usually marks end
1979 break;
1980 }
1981 x if x == MessageType::EndOfResponse as u8 => {
1982 break;
1983 }
1984 _ => {
1985 // Unknown message type - stop processing
1986 break;
1987 }
1988 }
1989 }
1990
1991 Ok(QueryResult {
1992 columns: columns.to_vec(),
1993 rows,
1994 rows_affected: 0,
1995 has_more_rows,
1996 cursor_id: 0,
1997 })
1998 }
1999
2000 /// Parse error message info including cursor_id and row counts
2001 fn parse_error_message_info(&self, buf: &mut ReadBuffer) -> Result<(u32, String, bool)> {
2002 let _call_status = buf.read_ub4()?; // end of call status
2003 buf.skip_ub2()?; // end to end seq#
2004 buf.skip_ub4()?; // current row number
2005 buf.skip_ub2()?; // error number
2006 buf.skip_ub2()?; // array elem error
2007 buf.skip_ub2()?; // array elem error
2008 let _cursor_id = buf.read_ub2()?; // cursor id
2009 let _error_pos = buf.read_sb2()?; // error position
2010 buf.skip(1)?; // sql type
2011 buf.skip(1)?; // fatal?
2012 buf.skip(1)?; // flags
2013 buf.skip(1)?; // user cursor options
2014 buf.skip(1)?; // UPI parameter
2015 let flags = buf.read_u8()?; // flags
2016 // Skip rowid - fixed 10 bytes in Oracle format
2017 buf.skip(10)?; // rowid is 10 bytes
2018 buf.skip_ub4()?; // OS error
2019 buf.skip(1)?; // statement number
2020 buf.skip(1)?; // call number
2021 buf.skip_ub2()?; // padding
2022 buf.skip_ub4()?; // success iters
2023 let num_bytes = buf.read_ub4()?; // oerrdd
2024 if num_bytes > 0 {
2025 buf.skip_raw_bytes_chunked()?;
2026 }
2027
2028 // Skip batch error codes
2029 let num_errors = buf.read_ub2()?;
2030 if num_errors > 0 {
2031 buf.skip_raw_bytes_chunked()?;
2032 }
2033
2034 // Skip batch error offsets
2035 let num_offsets = buf.read_ub4()?;
2036 if num_offsets > 0 {
2037 buf.skip_raw_bytes_chunked()?;
2038 }
2039
2040 // Skip batch error messages
2041 let temp16 = buf.read_ub2()?;
2042 if temp16 > 0 {
2043 buf.skip_raw_bytes_chunked()?;
2044 }
2045
2046 // Read extended error info
2047 let error_num = buf.read_ub4()?;
2048 let row_count = buf.read_ub8()?;
2049 let more_rows = row_count > 0 || (flags & 0x20) != 0;
2050
2051 // Read error message if present
2052 let error_msg = if error_num != 0 {
2053 buf.read_string_with_length()?.unwrap_or_default()
2054 } else {
2055 String::new()
2056 };
2057
2058 Ok((error_num, error_msg, more_rows))
2059 }
2060
2061 /// Open a scrollable cursor for bidirectional navigation
2062 ///
2063 /// Scrollable cursors allow moving forward and backward through result sets,
2064 /// jumping to specific positions, and fetching from various locations.
2065 ///
2066 /// # Arguments
2067 ///
2068 /// * `sql` - SQL query to execute
2069 ///
2070 /// # Example
2071 ///
2072 /// ```rust,ignore
2073 /// let mut cursor = conn.open_scrollable_cursor("SELECT * FROM employees").await?;
2074 ///
2075 /// // Move to different positions
2076 /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2077 /// let last = conn.scroll(&mut cursor, FetchOrientation::Last, 0).await?;
2078 /// let row5 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 5).await?;
2079 ///
2080 /// conn.close_cursor(&mut cursor).await?;
2081 /// ```
2082 pub async fn open_scrollable_cursor(&self, sql: &str) -> Result<ScrollableCursor> {
2083 self.ensure_ready().await?;
2084
2085 let statement = Statement::new(sql);
2086
2087 // For scrollable cursors, execute with scrollable flag and prefetch 1 row
2088 // to get column metadata. The scroll() method will fetch actual rows at
2089 // specific positions.
2090 let mut options = ExecuteOptions::for_query(1); // prefetch 1 row for column info
2091
2092 options.scrollable = true;
2093
2094 let mut execute_msg = ExecuteMessage::new(&statement, options);
2095
2096 let mut inner = self.inner.lock().await;
2097 let large_sdu = inner.large_sdu;
2098 let seq_num = inner.next_sequence_number();
2099 execute_msg.set_sequence_number(seq_num);
2100 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2101 inner.send(&request).await?;
2102
2103 // Receive and parse response
2104 let response = inner.receive().await?;
2105
2106 if response.len() <= PACKET_HEADER_SIZE {
2107 return Err(Error::Protocol("Empty scrollable cursor response".to_string()));
2108 }
2109
2110 // Check for MARKER packet (indicates error - requires reset protocol)
2111 let packet_type = response[4];
2112 if packet_type == PacketType::Marker as u8 {
2113 // Handle marker reset protocol and get the error packet
2114 let error_response = inner.handle_marker_reset().await?;
2115 let payload = &error_response[PACKET_HEADER_SIZE..];
2116 // Parse error response to extract the actual Oracle error
2117 let _: QueryResult = self.parse_error_response(payload)?;
2118 // If we get here without error, something unexpected happened
2119 return Err(Error::Protocol("Unexpected successful response after MARKER".to_string()));
2120 }
2121
2122 // Parse describe info to get columns
2123 let payload = &response[PACKET_HEADER_SIZE..];
2124 let result = self.parse_query_response(payload, &inner.capabilities)?;
2125
2126 Ok(ScrollableCursor::new(result.cursor_id, result.columns))
2127 }
2128
2129 /// Scroll to a position in a scrollable cursor and fetch rows
2130 ///
2131 /// # Arguments
2132 ///
2133 /// * `cursor` - The scrollable cursor to scroll
2134 /// * `orientation` - The direction/mode of scrolling
2135 /// * `offset` - Position offset (used for Absolute and Relative modes)
2136 ///
2137 /// # Example
2138 ///
2139 /// ```rust,ignore
2140 /// // Go to first row
2141 /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2142 ///
2143 /// // Go to absolute position 10
2144 /// let row10 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 10).await?;
2145 ///
2146 /// // Move 5 rows forward from current position
2147 /// let plus5 = conn.scroll(&mut cursor, FetchOrientation::Relative, 5).await?;
2148 ///
2149 /// // Move 3 rows backward
2150 /// let minus3 = conn.scroll(&mut cursor, FetchOrientation::Relative, -3).await?;
2151 /// ```
2152 pub async fn scroll(
2153 &self,
2154 cursor: &mut ScrollableCursor,
2155 orientation: FetchOrientation,
2156 offset: i64,
2157 ) -> Result<ScrollResult> {
2158 self.ensure_ready().await?;
2159
2160 if !cursor.is_open() {
2161 return Err(Error::CursorClosed);
2162 }
2163
2164 // Create a statement for the scroll operation (uses the existing cursor)
2165 let mut stmt = Statement::new("");
2166 stmt.set_cursor_id(cursor.cursor_id);
2167 stmt.set_columns(cursor.columns.clone());
2168 stmt.set_executed(true);
2169 stmt.set_statement_type(crate::statement::StatementType::Query);
2170
2171 // Build execute message with scroll_operation=true
2172 let mut options = ExecuteOptions::for_query(1);
2173 options.scrollable = true;
2174 options.scroll_operation = true;
2175 options.fetch_orientation = orientation as u32;
2176 // Calculate fetch_pos based on orientation
2177 options.fetch_pos = match orientation {
2178 FetchOrientation::First => 1,
2179 FetchOrientation::Last => 0, // Server calculates
2180 FetchOrientation::Absolute => offset.max(0) as u32,
2181 FetchOrientation::Relative => (cursor.position + offset).max(0) as u32,
2182 FetchOrientation::Next => (cursor.position + 1).max(0) as u32,
2183 FetchOrientation::Prior => (cursor.position - 1).max(0) as u32,
2184 FetchOrientation::Current => cursor.position.max(0) as u32,
2185 };
2186
2187 let mut execute_msg = ExecuteMessage::new(&stmt, options);
2188
2189 let mut inner = self.inner.lock().await;
2190 let large_sdu = inner.large_sdu;
2191 let seq_num = inner.next_sequence_number();
2192 execute_msg.set_sequence_number(seq_num);
2193 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2194 inner.send(&request).await?;
2195
2196 // Receive and parse response
2197 let response = inner.receive().await?;
2198 if response.len() <= PACKET_HEADER_SIZE {
2199 return Err(Error::Protocol("Empty scroll response".to_string()));
2200 }
2201
2202 // Check for MARKER packet
2203 let packet_type = response[4];
2204 if packet_type == PacketType::Marker as u8 {
2205 let error_response = inner.handle_marker_reset().await?;
2206 let payload = &error_response[PACKET_HEADER_SIZE..];
2207 let _: QueryResult = self.parse_error_response(payload)?;
2208 return Err(Error::Protocol("Scroll operation failed".to_string()));
2209 }
2210
2211 let payload = &response[PACKET_HEADER_SIZE..];
2212 // Use cursor's columns since Oracle doesn't re-send column metadata for scroll operations
2213 let query_result = self.parse_query_response_with_columns(payload, &inner.capabilities, &cursor.columns)?;
2214
2215 // Use position from Oracle's response (rows_affected contains the row position)
2216 // For scrollable cursors, Oracle returns the row number in error_info.rowcount
2217 let new_position = if !query_result.rows.is_empty() {
2218 // Position is the actual row number from Oracle
2219 query_result.rows_affected as i64
2220 } else {
2221 // No rows returned - calculate position based on orientation
2222 match orientation {
2223 FetchOrientation::First => 0, // Before first
2224 FetchOrientation::Last => cursor.row_count.unwrap_or(0) as i64 + 1, // After last
2225 FetchOrientation::Next => cursor.position + 1,
2226 FetchOrientation::Prior => cursor.position - 1,
2227 FetchOrientation::Absolute => offset,
2228 FetchOrientation::Relative => cursor.position + offset,
2229 FetchOrientation::Current => cursor.position,
2230 }
2231 };
2232
2233 cursor.update_position(new_position);
2234
2235 let mut result = ScrollResult::new(query_result.rows, new_position);
2236 result.at_end = !query_result.has_more_rows;
2237 result.at_beginning = new_position <= 1;
2238
2239 Ok(result)
2240 }
2241
2242 /// Close a scrollable cursor
2243 ///
2244 /// # Arguments
2245 ///
2246 /// * `cursor` - The scrollable cursor to close
2247 pub async fn close_cursor(&self, cursor: &mut ScrollableCursor) -> Result<()> {
2248 if !cursor.is_open() {
2249 return Ok(());
2250 }
2251
2252 // Send close cursor message
2253 // For now, just mark it as closed - the cursor will be cleaned up
2254 // when the connection is closed or reused
2255 cursor.mark_closed();
2256 Ok(())
2257 }
2258
2259 /// Get type information for a database object or collection type
2260 ///
2261 /// This method queries Oracle's data dictionary to retrieve type metadata
2262 /// for collections (VARRAY, Nested Table) and user-defined object types.
2263 ///
2264 /// # Arguments
2265 ///
2266 /// * `type_name` - Fully qualified type name (e.g., "SCHEMA.TYPE_NAME" or just "TYPE_NAME")
2267 ///
2268 /// # Returns
2269 ///
2270 /// A `DbObjectType` containing the type metadata, including:
2271 /// - Schema and type name
2272 /// - Whether it's a collection
2273 /// - Collection type (VARRAY, Nested Table, etc.)
2274 /// - Element type for collections
2275 ///
2276 /// # Example
2277 ///
2278 /// ```rust,ignore
2279 /// let number_array = conn.get_type("MY_NUMBER_ARRAY").await?;
2280 /// assert!(number_array.is_collection);
2281 /// ```
2282 pub async fn get_type(&self, type_name: &str) -> Result<crate::dbobject::DbObjectType> {
2283 use crate::dbobject::{CollectionType, DbObjectType};
2284
2285 self.ensure_ready().await?;
2286
2287 // Parse type name into schema and name
2288 let (schema, name) = parse_type_name(type_name, &self.config.username);
2289
2290 // First, query ALL_TYPES to get basic type info
2291 let type_info = self
2292 .query(
2293 "SELECT typecode, type_oid FROM all_types WHERE owner = :1 AND type_name = :2",
2294 &[Value::String(schema.clone()), Value::String(name.clone())],
2295 )
2296 .await?;
2297
2298 if type_info.rows.is_empty() {
2299 return Err(Error::OracleError {
2300 code: 4043, // ORA-04043: object does not exist
2301 message: format!("Type {}.{} not found", schema, name),
2302 });
2303 }
2304
2305 let row = &type_info.rows[0];
2306 let typecode = row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2307 let type_oid = row.get(1).and_then(|v| v.as_bytes()).map(|b| b.to_vec());
2308
2309 // Check if it's a collection
2310 if typecode == "COLLECTION" {
2311 // Query ALL_COLL_TYPES for collection details
2312 let coll_info = self.query(
2313 "SELECT coll_type, elem_type_name, elem_type_owner, upper_bound FROM all_coll_types WHERE owner = :1 AND type_name = :2",
2314 &[Value::String(schema.clone()), Value::String(name.clone())],
2315 ).await?;
2316
2317 if coll_info.rows.is_empty() {
2318 return Err(Error::OracleError {
2319 code: 4043,
2320 message: format!("Collection type {}.{} metadata not found", schema, name),
2321 });
2322 }
2323
2324 let coll_row = &coll_info.rows[0];
2325 let coll_type_str = coll_row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2326 let elem_type_name = coll_row.get(1).and_then(|v| v.as_str()).unwrap_or("VARCHAR2");
2327 let _elem_type_owner = coll_row.get(2).and_then(|v| v.as_str());
2328
2329 let collection_type = match coll_type_str {
2330 "VARYING ARRAY" => CollectionType::Varray,
2331 "TABLE" => CollectionType::NestedTable,
2332 _ => CollectionType::Varray, // Default
2333 };
2334
2335 let element_type = oracle_type_from_name(elem_type_name);
2336
2337 let mut obj_type = DbObjectType::collection(schema, name, collection_type, element_type);
2338 obj_type.oid = type_oid;
2339 Ok(obj_type)
2340 } else {
2341 // Regular object type (not yet fully implemented)
2342 let mut obj_type = DbObjectType::new(schema, name);
2343 obj_type.oid = type_oid;
2344 Ok(obj_type)
2345 }
2346 }
2347
2348 /// Internal: Execute a query statement with optional bind parameters
2349 async fn execute_query_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2350 let prefetch_rows = 100; // Default prefetch
2351
2352 // For first execution, check if we might have LOBs (no prefetch for safety)
2353 // This can be optimized later with describe-only first
2354 let options = ExecuteOptions::for_query(prefetch_rows);
2355 let mut execute_msg = ExecuteMessage::new(statement, options);
2356
2357 // Set bind values if provided
2358 if !params.is_empty() {
2359 execute_msg.set_bind_values(params.to_vec());
2360 }
2361
2362 let mut inner = self.inner.lock().await;
2363 let large_sdu = inner.large_sdu;
2364 let seq_num = inner.next_sequence_number();
2365 execute_msg.set_sequence_number(seq_num);
2366 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2367 inner.send(&request).await?;
2368
2369 // Receive and parse response
2370 let response = inner.receive().await?;
2371 if response.len() <= PACKET_HEADER_SIZE {
2372 return Err(Error::Protocol("Empty query response".to_string()));
2373 }
2374
2375 // Check for MARKER packet (indicates error - requires reset protocol)
2376 let packet_type = response[4];
2377 if packet_type == PacketType::Marker as u8 {
2378 // Handle marker reset protocol and get the error packet
2379 let error_response = inner.handle_marker_reset().await?;
2380 let payload = &error_response[PACKET_HEADER_SIZE..];
2381 return self.parse_error_response(payload);
2382 }
2383
2384 // Parse the response to extract columns and rows
2385 let payload = &response[PACKET_HEADER_SIZE..];
2386 let mut result = self.parse_query_response(payload, &inner.capabilities)?;
2387
2388 // Check if any columns are LOB types that require defines
2389 let has_lob_columns = result.columns.iter().any(|col| col.is_lob());
2390
2391 if has_lob_columns && !statement.requires_define() {
2392 // We need to re-execute with column defines
2393 // Create a modified statement with the define flag set
2394 let mut stmt_with_define = statement.clone();
2395 stmt_with_define.set_columns(result.columns.clone());
2396 stmt_with_define.set_cursor_id(result.cursor_id);
2397 stmt_with_define.set_requires_define(true);
2398 stmt_with_define.set_no_prefetch(true);
2399 stmt_with_define.set_executed(true);
2400
2401 // Re-execute with defines
2402 let define_options = ExecuteOptions::for_query(prefetch_rows);
2403 let mut define_msg = ExecuteMessage::new(&stmt_with_define, define_options);
2404 let seq_num = inner.next_sequence_number();
2405 define_msg.set_sequence_number(seq_num);
2406
2407 let define_request = define_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2408 inner.send(&define_request).await?;
2409
2410 // Receive the re-execute response
2411 let define_response = inner.receive().await?;
2412 if define_response.len() <= PACKET_HEADER_SIZE {
2413 return Err(Error::Protocol("Empty define response".to_string()));
2414 }
2415
2416 // Check for MARKER packet
2417 let packet_type = define_response[4];
2418 if packet_type == PacketType::Marker as u8 {
2419 let error_response = inner.handle_marker_reset().await?;
2420 let payload = &error_response[PACKET_HEADER_SIZE..];
2421 return self.parse_error_response(payload);
2422 }
2423
2424 // Parse the response with LOB data, using the columns we already know
2425 let payload = &define_response[PACKET_HEADER_SIZE..];
2426 result = self.parse_query_response_with_columns(
2427 payload,
2428 &inner.capabilities,
2429 &stmt_with_define.columns(),
2430 )?;
2431 }
2432
2433 Ok(result)
2434 }
2435
2436 /// Internal: Execute a DML statement with optional bind parameters
2437 async fn execute_dml_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2438 let options = ExecuteOptions::for_dml(false); // Don't auto-commit
2439 let mut execute_msg = ExecuteMessage::new(statement, options);
2440
2441 // Set bind values if provided
2442 if !params.is_empty() {
2443 execute_msg.set_bind_values(params.to_vec());
2444 }
2445
2446 let mut inner = self.inner.lock().await;
2447 let large_sdu = inner.large_sdu;
2448 let seq_num = inner.next_sequence_number();
2449 execute_msg.set_sequence_number(seq_num);
2450 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2451
2452 inner.send(&request).await?;
2453
2454 // Receive response
2455 let mut response = inner.receive().await?;
2456 if response.len() <= PACKET_HEADER_SIZE {
2457 return Err(Error::Protocol("Empty DML response".to_string()));
2458 }
2459
2460 // Check packet type - handle MARKER packets
2461 let packet_type = response[4];
2462 if packet_type == PacketType::Marker as u8 {
2463 // Need to do reset protocol and then get the actual response
2464 // Extract marker type from packet: after header (8 bytes) + data flags (2 bytes) + marker type (1 byte)
2465 let marker_type = if response.len() >= PACKET_HEADER_SIZE + 3 {
2466 response[PACKET_HEADER_SIZE + 2]
2467 } else {
2468 1 // Assume BREAK
2469 };
2470
2471 if marker_type == 1 {
2472 // BREAK marker - send RESET and wait for response
2473 self.send_marker(&mut inner, 2).await?;
2474
2475 // Wait for RESET marker back or DATA packet with error
2476 // The server may send: RESET marker, or BREAK marker followed by DATA, or just close
2477 let mut got_reset = false;
2478 let mut max_attempts = 5;
2479 loop {
2480 match inner.receive().await {
2481 Ok(pkt) => {
2482 if pkt.len() < PACKET_HEADER_SIZE + 1 {
2483 break;
2484 }
2485 let pkt_type = pkt[4];
2486 if pkt_type == PacketType::Marker as u8 {
2487 if pkt.len() >= PACKET_HEADER_SIZE + 3 {
2488 let mk_type = pkt[PACKET_HEADER_SIZE + 2];
2489 if mk_type == 2 {
2490 // Got RESET marker, break out
2491 got_reset = true;
2492 break;
2493 }
2494 // Got another BREAK marker - server may be acknowledging
2495 // Keep trying a few times
2496 max_attempts -= 1;
2497 if max_attempts == 0 {
2498 // Give up and return a generic error
2499 return Err(Error::Protocol(
2500 "Server rejected operation (multiple BREAK markers)".to_string()
2501 ));
2502 }
2503 continue;
2504 }
2505 } else if pkt_type == PacketType::Data as u8 {
2506 // Got DATA packet - use this as the response (may contain error)
2507 let payload = &pkt[PACKET_HEADER_SIZE..];
2508 return self.parse_dml_response(payload);
2509 } else {
2510 break;
2511 }
2512 }
2513 Err(e) => {
2514 // If we get EOF, the server may have closed the connection
2515 // This can happen with temp LOB binding issues
2516 inner.state = ConnectionState::Closed;
2517 return Err(Error::Protocol(format!(
2518 "Server closed connection during error handling: {}",
2519 e
2520 )));
2521 }
2522 }
2523 }
2524
2525 // After RESET, try to receive DATA packet with error response
2526 // Note: Some Oracle servers "quit immediately" after reset without sending
2527 // an error packet. In that case, we'll get EOF which is handled below.
2528 if got_reset {
2529 loop {
2530 match inner.receive().await {
2531 Ok(pkt) => {
2532 if pkt.len() < PACKET_HEADER_SIZE + 1 {
2533 // Too short, connection may be closing
2534 break;
2535 }
2536 let pkt_type = pkt[4];
2537 if pkt_type == PacketType::Marker as u8 {
2538 // More markers, keep reading
2539 continue;
2540 } else if pkt_type == PacketType::Data as u8 {
2541 // Got DATA packet, use this as the response
2542 response = pkt;
2543 break;
2544 } else {
2545 // Unknown packet type, return error
2546 return Err(Error::Protocol(format!(
2547 "Unexpected packet type {} after reset",
2548 pkt_type
2549 )));
2550 }
2551 }
2552 Err(_e) => {
2553 // EOF after reset is normal - server may close connection
2554 // without sending error details. Return a descriptive error.
2555 inner.state = ConnectionState::Closed;
2556 return Err(Error::OracleError {
2557 code: 0,
2558 message: "Server rejected the operation and closed the connection. \
2559 This may happen when binding a temporary LOB to an INSERT statement. \
2560 Try using a different approach (e.g., DBMS_LOB procedures).".to_string(),
2561 });
2562 }
2563 }
2564 }
2565 }
2566 }
2567 }
2568
2569 // Parse the response to extract rows affected (or error)
2570 let payload = &response[PACKET_HEADER_SIZE..];
2571 self.parse_dml_response(payload)
2572 }
2573
2574 /// Parse query response to extract columns and rows
2575 ///
2576 /// Oracle sends multiple messages in a single response:
2577 /// - DescribeInfo (16): column metadata
2578 /// - RowHeader (6): row header info
2579 /// - RowData (7): actual column values
2580 /// - Error (4): completion status (may contain error or success)
2581 fn parse_query_response(&self, payload: &[u8], caps: &Capabilities) -> Result<QueryResult> {
2582 self.parse_query_response_with_columns(payload, caps, &[])
2583 }
2584
2585 /// Parse query response with pre-known columns (for re-execute after define)
2586 fn parse_query_response_with_columns(
2587 &self,
2588 payload: &[u8],
2589 caps: &Capabilities,
2590 known_columns: &[ColumnInfo],
2591 ) -> Result<QueryResult> {
2592 if payload.len() < 3 {
2593 return Err(Error::Protocol("Query response too short".to_string()));
2594 }
2595
2596 let mut buf = ReadBuffer::from_slice(payload);
2597
2598 // Skip data flags
2599 buf.skip(2)?;
2600
2601 // Use known columns if provided, otherwise parse from describe info
2602 let mut columns: Vec<ColumnInfo> = known_columns.to_vec();
2603 let mut rows: Vec<Row> = Vec::new();
2604 let mut cursor_id: u16 = 0;
2605 let mut row_count: u64 = 0;
2606 let mut end_of_response = false;
2607
2608 // Bit vector for duplicate column optimization
2609 // When Some, indicates which columns have actual data (bit=1) vs duplicates (bit=0)
2610 let mut bit_vector: Option<Vec<u8>> = None;
2611 // Previous row values for copying duplicates
2612 let mut previous_row_values: Option<Vec<Value>> = None;
2613
2614 // Process messages until we hit end of response or run out of data
2615 while !end_of_response && buf.remaining() > 0 {
2616 let msg_type = buf.read_u8()?;
2617
2618 match msg_type {
2619 // DescribeInfo (16) - column metadata
2620 x if x == MessageType::DescribeInfo as u8 => {
2621 // Skip chunked bytes first
2622 buf.skip_raw_bytes_chunked()?;
2623 columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2624 }
2625
2626 // RowHeader (6) - header info for rows
2627 x if x == MessageType::RowHeader as u8 => {
2628 self.parse_row_header(&mut buf)?;
2629 }
2630
2631 // RowData (7) - actual row values
2632 x if x == MessageType::RowData as u8 => {
2633 let row = self.parse_row_data_with_bitvector(
2634 &mut buf,
2635 &columns,
2636 caps,
2637 bit_vector.as_deref(),
2638 previous_row_values.as_ref(),
2639 )?;
2640 // Store this row's values for potential duplicate copying
2641 previous_row_values = Some(row.values().to_vec());
2642 // Clear bit vector after using it (it's per-row)
2643 bit_vector = None;
2644 rows.push(row);
2645 }
2646
2647 // Error (4) - completion or error
2648 x if x == MessageType::Error as u8 => {
2649 let (error_code, error_msg, cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2650 cursor_id = cid;
2651 row_count = rc;
2652 if error_code != 0 && error_code != 1403 {
2653 // 1403 is "no data found" which is not an error for queries
2654 return Err(Error::OracleError {
2655 code: error_code,
2656 message: error_msg.unwrap_or_default(),
2657 });
2658 }
2659 end_of_response = true;
2660 }
2661
2662 // Parameter (8) - return parameters
2663 x if x == MessageType::Parameter as u8 => {
2664 self.parse_return_parameters(&mut buf)?;
2665 }
2666
2667 // Status (9) - call status
2668 x if x == MessageType::Status as u8 => {
2669 // Read call status and end-to-end seq number
2670 let _call_status = buf.read_ub4()?;
2671 let _end_to_end_seq = buf.read_ub2()?;
2672 // Note: end_of_response only if supports_end_of_response is false
2673 // For now, we assume it's not the end
2674 }
2675
2676 // BitVector (21) - column presence bitmap for sparse results
2677 // Bit=1 means actual data is sent, bit=0 means duplicate from previous row
2678 21 => {
2679 // Read num columns sent
2680 let _num_columns_sent = buf.read_ub2()?;
2681 // Read bit vector (1 byte per 8 columns, rounded up)
2682 let num_bytes = (columns.len() + 7) / 8;
2683 if num_bytes > 0 {
2684 let bv = buf.read_bytes_vec(num_bytes)?;
2685 bit_vector = Some(bv);
2686 }
2687 }
2688
2689 _ => {
2690 // Unknown message type - break to avoid parsing errors
2691 break;
2692 }
2693 }
2694 }
2695
2696 Ok(QueryResult {
2697 columns,
2698 rows,
2699 rows_affected: row_count,
2700 has_more_rows: false,
2701 cursor_id,
2702 })
2703 }
2704
2705 /// Parse a PL/SQL response containing OUT parameter values
2706 ///
2707 /// PL/SQL responses may contain:
2708 /// - IoVector (11): bind directions for each parameter
2709 /// - RowData (7): OUT parameter values
2710 /// - FlushOutBinds (19): signals end of OUT bind data
2711 /// - Error (4): completion status
2712 fn parse_plsql_response(
2713 &self,
2714 payload: &[u8],
2715 caps: &Capabilities,
2716 params: &[BindParam],
2717 ) -> Result<PlsqlResult> {
2718 if payload.len() < 3 {
2719 return Err(Error::Protocol("PL/SQL response too short".to_string()));
2720 }
2721
2722 let mut buf = ReadBuffer::from_slice(payload);
2723
2724 // Skip data flags
2725 buf.skip(2)?;
2726
2727 let mut out_values: Vec<Value> = Vec::new();
2728 let mut _out_indices: Vec<usize> = Vec::new();
2729 let mut row_count: u64 = 0;
2730 let mut cursor_id: Option<u16> = None;
2731 let mut end_of_response = false;
2732 let mut implicit_results = ImplicitResults::new();
2733
2734 // Create column infos for OUT params based on their oracle types
2735 let mut out_columns: Vec<ColumnInfo> = Vec::new();
2736
2737 while !end_of_response && buf.remaining() > 0 {
2738 let msg_type = buf.read_u8()?;
2739
2740 match msg_type {
2741 // IoVector (11) - bind directions from server
2742 x if x == MessageType::IoVector as u8 => {
2743 let (indices, cols) = self.parse_io_vector(&mut buf, params)?;
2744 _out_indices = indices;
2745 out_columns = cols;
2746 }
2747
2748 // RowHeader (6)
2749 x if x == MessageType::RowHeader as u8 => {
2750 self.parse_row_header(&mut buf)?;
2751 }
2752
2753 // RowData (7) - OUT parameter values
2754 x if x == MessageType::RowData as u8 => {
2755 if !out_columns.is_empty() {
2756 let row = self.parse_row_data_single(&mut buf, &out_columns, caps)?;
2757 // Extract values from the row into out_values
2758 for (idx, value) in row.into_values().into_iter().enumerate() {
2759 // Check if this is a cursor
2760 if let Value::Cursor(cursor) = &value {
2761 if cursor_id.is_none() && cursor.cursor_id() != 0 {
2762 cursor_id = Some(cursor.cursor_id());
2763 }
2764 }
2765 // Map back to original param position if we have indices
2766 if idx < out_values.len() {
2767 out_values[idx] = value;
2768 } else {
2769 out_values.push(value);
2770 }
2771 }
2772 } else {
2773 // Skip the row data if we don't have column info
2774 // This shouldn't normally happen
2775 break;
2776 }
2777 }
2778
2779 // DescribeInfo (16) - for REF CURSOR describe
2780 x if x == MessageType::DescribeInfo as u8 => {
2781 buf.skip_raw_bytes_chunked()?;
2782 let cursor_columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2783 // Store cursor columns if needed
2784 let _ = cursor_columns; // For now, just skip
2785 }
2786
2787 // FlushOutBinds (19) - signals end of OUT bind data
2788 x if x == MessageType::FlushOutBinds as u8 => {
2789 // This indicates that OUT bind data is done
2790 // Just continue to get the error/completion status
2791 }
2792
2793 // Error (4) - completion or error
2794 x if x == MessageType::Error as u8 => {
2795 let (error_code, error_msg, _cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2796 row_count = rc;
2797 if error_code != 0 {
2798 return Err(Error::OracleError {
2799 code: error_code,
2800 message: error_msg.unwrap_or_default(),
2801 });
2802 }
2803 end_of_response = true;
2804 }
2805
2806 // Parameter (8) - return parameters
2807 x if x == MessageType::Parameter as u8 => {
2808 self.parse_return_parameters(&mut buf)?;
2809 }
2810
2811 // Status (9)
2812 x if x == MessageType::Status as u8 => {
2813 let _call_status = buf.read_ub4()?;
2814 let _end_to_end_seq = buf.read_ub2()?;
2815 }
2816
2817 // ImplicitResultset (27) - result sets from DBMS_SQL.RETURN_RESULT
2818 x if x == MessageType::ImplicitResultset as u8 => {
2819 let parsed_results = self.parse_implicit_results(&mut buf, caps)?;
2820 implicit_results = parsed_results;
2821 }
2822
2823 _ => {
2824 // Unknown message type - break to avoid parsing errors
2825 break;
2826 }
2827 }
2828 }
2829
2830 // If no IoVector was received, all params might be IN-only
2831 // In that case, out_values should be empty
2832 Ok(PlsqlResult {
2833 out_values,
2834 rows_affected: row_count,
2835 cursor_id,
2836 implicit_results,
2837 })
2838 }
2839
2840 /// Parse implicit result sets from DBMS_SQL.RETURN_RESULT
2841 ///
2842 /// Format per Python base.pyx _process_implicit_result:
2843 /// - num_results: ub4 (number of implicit result sets)
2844 /// - For each result:
2845 /// - num_bytes: ub1 + raw bytes (metadata to skip)
2846 /// - describe_info: column metadata
2847 /// - cursor_id: ub2
2848 fn parse_implicit_results(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<ImplicitResults> {
2849 let num_results = buf.read_ub4()?;
2850 let mut results = ImplicitResults::new();
2851
2852 for _ in 0..num_results {
2853 // Skip metadata bytes
2854 let num_bytes = buf.read_u8()?;
2855 if num_bytes > 0 {
2856 buf.skip(num_bytes as usize)?;
2857 }
2858
2859 // Parse column metadata for this result set
2860 let columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
2861
2862 // Read cursor ID
2863 let cursor_id = buf.read_ub2()?;
2864
2865 // Create implicit result with metadata but no rows yet
2866 // Rows will be fetched separately using fetch_implicit_result
2867 let result = ImplicitResult::new(cursor_id, columns, Vec::new());
2868 results.add(result);
2869 }
2870
2871 Ok(results)
2872 }
2873
2874 /// Parse IO Vector message to get bind directions
2875 ///
2876 /// Returns a tuple of:
2877 /// - indices of OUT/INOUT parameters in the params list
2878 /// - column infos for parsing OUT values
2879 fn parse_io_vector(
2880 &self,
2881 buf: &mut ReadBuffer,
2882 params: &[BindParam],
2883 ) -> Result<(Vec<usize>, Vec<ColumnInfo>)> {
2884 // I/O vector format (from Python reference):
2885 // - skip 1 byte (flag)
2886 // - read ub2 (num requests)
2887 // - read ub4 (num iters)
2888 // - num_binds = num_iters * 256 + num_requests
2889 // - skip ub4 (num iters this time)
2890 // - skip ub2 (uac buffer length)
2891 // - read ub2 (num_bytes for bit vector), skip if > 0
2892 // - read ub2 (num_bytes for rowid), skip if > 0
2893 // - for each bind: read ub1 (bind_dir)
2894
2895 buf.skip(1)?; // flag
2896 let num_requests = buf.read_ub2()? as u32;
2897 let num_iters = buf.read_ub4()?;
2898 let num_binds = num_iters * 256 + num_requests;
2899 let _ = buf.read_ub4()?; // num iters this time (discard)
2900 let _ = buf.read_ub2()?; // uac buffer length (discard)
2901
2902 // Bit vector
2903 let num_bytes = buf.read_ub2()? as usize;
2904 if num_bytes > 0 {
2905 buf.skip(num_bytes)?;
2906 }
2907
2908 // Rowid (raw bytes, not length-prefixed here)
2909 let num_bytes = buf.read_ub4()? as usize;
2910 if num_bytes > 0 {
2911 buf.skip_raw_bytes_chunked()?; // Skip rowid bytes using chunked read
2912 }
2913
2914 // Read bind directions
2915 let mut out_indices = Vec::new();
2916 let mut out_columns = Vec::new();
2917
2918 for i in 0..(num_binds as usize).min(params.len()) {
2919 let dir_byte = buf.read_u8()?;
2920 let dir = BindDirection::try_from(dir_byte).unwrap_or(BindDirection::Input);
2921
2922 // If this is not an INPUT-only parameter, it has OUT data
2923 if dir != BindDirection::Input {
2924 out_indices.push(i);
2925
2926 // Create a column info for parsing the OUT value
2927 let param = ¶ms[i];
2928 let mut col = ColumnInfo::new(format!("OUT_{}", i), param.oracle_type);
2929 col.buffer_size = param.buffer_size;
2930 col.data_size = param.buffer_size;
2931 col.nullable = true;
2932
2933 // For collection OUT params, extract element type from the placeholder
2934 if let Some(Value::Collection(ref placeholder)) = param.value {
2935 if let Some(Value::Integer(elem_type_code)) = placeholder.get("_element_type") {
2936 col.element_type = crate::constants::OracleType::try_from(*elem_type_code as u8).ok();
2937 }
2938 }
2939
2940 out_columns.push(col);
2941 }
2942 }
2943
2944 Ok((out_indices, out_columns))
2945 }
2946
2947 /// Parse row header (TNS_MSG_TYPE_ROW_HEADER = 6)
2948 fn parse_row_header(&self, buf: &mut ReadBuffer) -> Result<()> {
2949 buf.skip_ub1()?; // flags
2950 buf.skip_ub2()?; // num requests
2951 buf.skip_ub4()?; // iteration number
2952 buf.skip_ub4()?; // num iters
2953 buf.skip_ub2()?; // buffer length
2954 let num_bytes = buf.read_ub4()? as usize;
2955 if num_bytes > 0 {
2956 buf.skip_ub1()?; // skip repeated length
2957 buf.skip(num_bytes)?; // bit vector
2958 }
2959 let num_bytes = buf.read_ub4()? as usize;
2960 if num_bytes > 0 {
2961 buf.skip_raw_bytes_chunked()?; // rxhrid
2962 }
2963 Ok(())
2964 }
2965
2966 /// Parse return parameters (TNS_MSG_TYPE_PARAMETER = 8)
2967 fn parse_return_parameters(&self, buf: &mut ReadBuffer) -> Result<()> {
2968 self.parse_return_parameters_internal(buf, false).map(|_| ())
2969 }
2970
2971 /// Parse return parameters with optional row counts extraction
2972 /// When `want_row_counts` is true, attempts to read arraydmlrowcounts from the response.
2973 fn parse_return_parameters_internal(
2974 &self,
2975 buf: &mut ReadBuffer,
2976 want_row_counts: bool,
2977 ) -> Result<Option<Vec<u64>>> {
2978 // Per Python's _process_return_parameters
2979 let num_params = buf.read_ub2()?; // al8o4l (ignored)
2980 for _ in 0..num_params {
2981 buf.skip_ub4()?;
2982 }
2983
2984 let al8txl = buf.read_ub2()?; // al8txl (ignored)
2985 if al8txl > 0 {
2986 buf.skip(al8txl as usize)?;
2987 }
2988
2989 // num key/value pairs - skip for now
2990 let num_pairs = buf.read_ub2()?;
2991 for _ in 0..num_pairs {
2992 buf.read_bytes_with_length()?; // text value
2993 buf.read_bytes_with_length()?; // binary value
2994 buf.skip_ub2()?; // keyword num
2995 }
2996
2997 // registration
2998 let num_bytes = buf.read_ub2()?;
2999 if num_bytes > 0 {
3000 buf.skip(num_bytes as usize)?;
3001 }
3002
3003 // If arraydmlrowcounts was requested, parse the row counts
3004 if want_row_counts && buf.remaining() >= 4 {
3005 let num_rows = buf.read_ub4()? as usize;
3006 let mut row_counts = Vec::with_capacity(num_rows);
3007 for _ in 0..num_rows {
3008 let count = buf.read_ub8()?;
3009 row_counts.push(count);
3010 }
3011 Ok(Some(row_counts))
3012 } else {
3013 Ok(None)
3014 }
3015 }
3016
3017 /// Parse a single row of data
3018 fn parse_row_data_single(
3019 &self,
3020 buf: &mut ReadBuffer,
3021 columns: &[ColumnInfo],
3022 caps: &Capabilities,
3023 ) -> Result<Row> {
3024 let mut values = Vec::with_capacity(columns.len());
3025
3026 for col in columns {
3027 let value = self.parse_column_value(buf, col, caps)?;
3028 values.push(value);
3029 }
3030
3031 Ok(Row::new(values))
3032 }
3033
3034 /// Parse a single row of data with bit vector support for duplicate column optimization
3035 ///
3036 /// Oracle sends a BitVector message before RowData when some columns have the same
3037 /// value as the previous row. Bits that are SET (1) indicate data is sent in the buffer;
3038 /// bits that are CLEAR (0) indicate the value should be copied from the previous row.
3039 fn parse_row_data_with_bitvector(
3040 &self,
3041 buf: &mut ReadBuffer,
3042 columns: &[ColumnInfo],
3043 caps: &Capabilities,
3044 bit_vector: Option<&[u8]>,
3045 previous_values: Option<&Vec<Value>>,
3046 ) -> Result<Row> {
3047 let mut values = Vec::with_capacity(columns.len());
3048
3049 for (col_idx, col) in columns.iter().enumerate() {
3050 // Check if this column is a duplicate (bit=0 means duplicate)
3051 let is_duplicate = if let Some(bv) = bit_vector {
3052 let byte_num = col_idx / 8;
3053 let bit_num = col_idx % 8;
3054 if byte_num < bv.len() {
3055 // If bit is 0, it's a duplicate
3056 (bv[byte_num] & (1 << bit_num)) == 0
3057 } else {
3058 false
3059 }
3060 } else {
3061 false
3062 };
3063
3064 if is_duplicate {
3065 // Copy value from previous row
3066 if let Some(prev) = previous_values {
3067 if col_idx < prev.len() {
3068 values.push(prev[col_idx].clone());
3069 } else {
3070 // Shouldn't happen, but fallback to null
3071 values.push(Value::Null);
3072 }
3073 } else {
3074 // No previous row (shouldn't happen for duplicate), fallback to null
3075 values.push(Value::Null);
3076 }
3077 } else {
3078 // Read actual value from buffer
3079 let value = self.parse_column_value(buf, col, caps)?;
3080 values.push(value);
3081 }
3082 }
3083
3084 Ok(Row::new(values))
3085 }
3086
3087 /// Parse a single column value from the buffer
3088 fn parse_column_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo, caps: &Capabilities) -> Result<Value> {
3089 use crate::constants::OracleType;
3090
3091 // Handle LOB columns specially - they have a different format
3092 if col.is_lob() {
3093 return self.parse_lob_value(buf, col);
3094 }
3095
3096 // Handle CURSOR type - REF CURSOR from PL/SQL
3097 if col.oracle_type == OracleType::Cursor {
3098 return self.parse_cursor_value(buf, caps);
3099 }
3100
3101 // Handle Object type - collections (VARRAY, Nested Table) and UDTs
3102 if col.oracle_type == OracleType::Object {
3103 return self.parse_object_value(buf, col);
3104 }
3105
3106 // Read the value based on the column type
3107 // First, check if it's NULL
3108 let data = buf.read_bytes_with_length()?;
3109
3110 match data {
3111 None => Ok(Value::Null),
3112 Some(bytes) if bytes.is_empty() => Ok(Value::Null),
3113 Some(bytes) => {
3114 // Decode based on oracle type
3115 match col.oracle_type {
3116 OracleType::Number => {
3117 // Oracle NUMBER format - decode to string
3118 let num = crate::types::decode_oracle_number(&bytes)?;
3119 Ok(Value::String(num.value))
3120 }
3121 OracleType::Varchar | OracleType::Char | OracleType::Long => {
3122 let s = String::from_utf8_lossy(&bytes).to_string();
3123 Ok(Value::String(s))
3124 }
3125 OracleType::Raw | OracleType::LongRaw => {
3126 // RAW/LONG RAW types - return as bytes
3127 Ok(Value::Bytes(bytes.to_vec()))
3128 }
3129 OracleType::Date => {
3130 // Oracle DATE format - 7 bytes
3131 let date = crate::types::decode_oracle_date(&bytes)?;
3132 Ok(Value::Date(date))
3133 }
3134 OracleType::Timestamp | OracleType::TimestampLtz => {
3135 // Oracle TIMESTAMP format - 11 bytes (date + fractional seconds)
3136 let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3137 Ok(Value::Timestamp(ts))
3138 }
3139 OracleType::TimestampTz => {
3140 // Oracle TIMESTAMP WITH TIME ZONE - 13 bytes
3141 let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3142 Ok(Value::Timestamp(ts))
3143 }
3144 _ => {
3145 // Default: return as raw bytes or string
3146 let s = String::from_utf8_lossy(&bytes).to_string();
3147 Ok(Value::String(s))
3148 }
3149 }
3150 }
3151 }
3152 }
3153
3154 /// Parse a REF CURSOR value from the buffer
3155 ///
3156 /// Per Python base.pyx lines 1038-1046:
3157 /// - Skip 1 byte (length indicator - fixed value)
3158 /// - Read describe info (column metadata for the cursor)
3159 /// - Read cursor_id (UB2)
3160 fn parse_cursor_value(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<Value> {
3161 use crate::types::RefCursor;
3162
3163 // Skip length indicator (fixed value for cursors)
3164 let _length = buf.read_u8()?;
3165
3166 // Read column metadata for this cursor
3167 let cursor_columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
3168
3169 // Read the cursor ID
3170 let cursor_id = buf.read_ub2()?;
3171
3172 // Create RefCursor with the metadata
3173 let ref_cursor = RefCursor::new(cursor_id, cursor_columns);
3174
3175 Ok(Value::Cursor(ref_cursor))
3176 }
3177
3178 /// Parse an Object/Collection value from the buffer
3179 ///
3180 /// Object format from Oracle (per Python packet.pyx read_dbobject):
3181 /// - UB4: type OID length, then type OID bytes if > 0
3182 /// - UB4: OID length, then OID bytes if > 0
3183 /// - UB4: snapshot length, then snapshot bytes if > 0 (discarded)
3184 /// - UB2: version (skip)
3185 /// - UB4: packed data length
3186 /// - UB2: flags (skip)
3187 /// - Bytes: packed data (pickle format)
3188 fn parse_object_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3189 use crate::dbobject::{CollectionType, DbObject, DbObjectType};
3190 use crate::types::decode_collection;
3191
3192 // Read type OID
3193 let toid_len = buf.read_ub4()?;
3194 let _toid = if toid_len > 0 {
3195 Some(buf.read_bytes_vec(toid_len as usize)?)
3196 } else {
3197 None
3198 };
3199
3200 // Read OID
3201 let oid_len = buf.read_ub4()?;
3202 let _oid = if oid_len > 0 {
3203 Some(buf.read_bytes_vec(oid_len as usize)?)
3204 } else {
3205 None
3206 };
3207
3208 // Read and discard snapshot
3209 let snapshot_len = buf.read_ub4()?;
3210 if snapshot_len > 0 {
3211 buf.skip_raw_bytes_chunked()?;
3212 }
3213
3214 // Skip version (length-prefixed UB2)
3215 let _version = buf.read_ub2()?;
3216
3217 // Read packed data length
3218 let data_len = buf.read_ub4()?;
3219
3220 // Skip flags (length-prefixed UB2)
3221 let _flags = buf.read_ub2()?;
3222
3223 if data_len == 0 {
3224 return Ok(Value::Null);
3225 }
3226
3227 // Read packed data (chunked format like other byte sequences)
3228 let packed_data = buf.read_bytes_with_length()?;
3229
3230 match packed_data {
3231 None => Ok(Value::Null),
3232 Some(data) if data.is_empty() => Ok(Value::Null),
3233 Some(data) => {
3234 // Create a placeholder type based on column info
3235 let type_name = col.type_name.clone().unwrap_or_else(|| "UNKNOWN".to_string());
3236
3237 // Try to determine if this is a collection based on the pickle data
3238 // The first byte contains flags - check for IS_COLLECTION (0x08)
3239 let is_collection = !data.is_empty() && (data[0] & 0x08) != 0;
3240
3241 if is_collection {
3242 // Get element type from column info or default to VARCHAR
3243 let element_type = col.element_type.unwrap_or(crate::constants::OracleType::Varchar);
3244
3245 // Determine collection type from pickle flags
3246 // Collection flags are after header - but we'll default for now
3247 let collection_type = CollectionType::Varray;
3248
3249 let obj_type = DbObjectType::collection(
3250 &col.type_schema.clone().unwrap_or_default(),
3251 &type_name,
3252 collection_type,
3253 element_type,
3254 );
3255
3256 match decode_collection(&obj_type, &data) {
3257 Ok(collection) => Ok(Value::Collection(collection)),
3258 Err(e) => {
3259 tracing::warn!("Failed to decode collection: {}, data: {:02x?}", e, &data[..std::cmp::min(20, data.len())]);
3260 // Return raw bytes as fallback
3261 Ok(Value::Bytes(data))
3262 }
3263 }
3264 } else {
3265 // Regular object type - not yet fully implemented
3266 let mut obj = DbObject::new(&type_name);
3267 // Store raw pickle data for later inspection
3268 obj.set("_raw_data", Value::Bytes(data));
3269 Ok(Value::Collection(obj))
3270 }
3271 }
3272 }
3273 }
3274
3275 /// Parse a LOB column value from the buffer
3276 ///
3277 /// LOB format from Oracle (per Python's read_lob_with_length):
3278 /// - UB4: num_bytes (indicator that LOB data follows)
3279 /// - If num_bytes > 0:
3280 /// - For non-BFILE: UB8 size, UB4 chunk_size
3281 /// - Bytes: LOB locator (chunked format)
3282 ///
3283 /// The actual LOB content must be fetched separately using LOB operations.
3284 /// For JSON columns, the data is OSON-encoded and decoded directly.
3285 /// For VECTOR columns, the data is decoded from Oracle's vector binary format.
3286 fn parse_lob_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3287 use crate::constants::OracleType;
3288 use crate::types::{decode_vector, OsonDecoder};
3289
3290 // Read length indicator
3291 let num_bytes = buf.read_ub4()?;
3292
3293 if num_bytes == 0 {
3294 // For JSON, null is Value::Json(serde_json::Value::Null)
3295 if col.oracle_type == OracleType::Json || col.is_json {
3296 return Ok(Value::Json(serde_json::Value::Null));
3297 }
3298 // For VECTOR, null is Value::Null
3299 if col.oracle_type == OracleType::Vector {
3300 return Ok(Value::Null);
3301 }
3302 return Ok(Value::Lob(LobValue::Null));
3303 }
3304
3305 // For BFILE, there's no size/chunk_size metadata
3306 let (size, chunk_size) = if col.oracle_type == OracleType::Bfile {
3307 (0u64, 0u32)
3308 } else {
3309 // Read LOB size and chunk size
3310 let size = buf.read_ub8()?;
3311 let chunk_size = buf.read_ub4()?;
3312 (size, chunk_size)
3313 };
3314
3315 // Read LOB data (could be locator or inline data depending on size)
3316 let data_bytes = buf.read_bytes_with_length()?;
3317
3318 // Handle JSON columns - decode OSON format
3319 if col.oracle_type == OracleType::Json || col.is_json {
3320 if let Some(data) = data_bytes {
3321 if !data.is_empty() {
3322 // Decode OSON to JSON
3323 match OsonDecoder::decode(bytes::Bytes::from(data)) {
3324 Ok(json_value) => return Ok(Value::Json(json_value)),
3325 Err(e) => {
3326 tracing::warn!("Failed to decode OSON: {}", e);
3327 return Ok(Value::Json(serde_json::Value::Null));
3328 }
3329 }
3330 }
3331 }
3332 return Ok(Value::Json(serde_json::Value::Null));
3333 }
3334
3335 // Handle VECTOR columns - decode vector binary format
3336 if col.oracle_type == OracleType::Vector {
3337 // Read and discard LOB locator (not needed for VECTOR)
3338 let _locator = buf.read_bytes_with_length()?;
3339
3340 if let Some(data) = data_bytes {
3341 if !data.is_empty() {
3342 match decode_vector(&data) {
3343 Ok(vector) => return Ok(Value::Vector(vector)),
3344 Err(e) => {
3345 tracing::warn!("Failed to decode VECTOR: {}", e);
3346 return Ok(Value::Null);
3347 }
3348 }
3349 }
3350 }
3351 return Ok(Value::Null);
3352 }
3353
3354 // Create a LOB locator for fetching the data later
3355 if let Some(locator_data) = data_bytes {
3356 if !locator_data.is_empty() {
3357 let locator = LobLocator::new(
3358 bytes::Bytes::from(locator_data),
3359 size,
3360 chunk_size,
3361 col.oracle_type,
3362 col.csfrm,
3363 );
3364 return Ok(Value::Lob(LobValue::locator(locator)));
3365 }
3366 }
3367
3368 // If we have size but no locator, it might be an empty LOB
3369 if size == 0 {
3370 return Ok(Value::Lob(LobValue::Empty));
3371 }
3372
3373 // Empty LOB (shouldn't normally reach here)
3374 Ok(Value::Lob(LobValue::Empty))
3375 }
3376
3377 /// Parse error info message and extract cursor_id
3378 /// Format per Python's _process_error_info in base.pyx
3379 fn parse_error_info(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16)> {
3380 // End of call status
3381 let _call_status = buf.read_ub4()?;
3382 // End to end seq#
3383 buf.skip_ub2()?;
3384 // Current row number
3385 buf.skip_ub4()?;
3386 // Error number (short form)
3387 buf.skip_ub2()?;
3388 // Array elem error
3389 buf.skip_ub2()?;
3390 // Array elem error
3391 buf.skip_ub2()?;
3392 // Cursor ID
3393 let cursor_id = buf.read_ub2()?;
3394 // Error position
3395 let _error_pos = buf.read_sb2()?;
3396 // SQL type (19c and earlier)
3397 buf.skip_ub1()?;
3398 // Fatal?
3399 buf.skip_ub1()?;
3400 // Flags
3401 buf.skip_ub1()?;
3402 // User cursor options
3403 buf.skip_ub1()?;
3404 // UPI parameter
3405 buf.skip_ub1()?;
3406 // Flags (second)
3407 buf.skip_ub1()?;
3408 // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3409 buf.skip_ub4()?; // rba
3410 buf.skip_ub2()?; // partition_id
3411 buf.skip_ub1()?; // skip
3412 buf.skip_ub4()?; // block_num
3413 buf.skip_ub2()?; // slot_num
3414 // OS error
3415 buf.skip_ub4()?;
3416 // Statement number
3417 buf.skip_ub1()?;
3418 // Call number
3419 buf.skip_ub1()?;
3420 // Padding
3421 buf.skip_ub2()?;
3422 // Success iters
3423 buf.skip_ub4()?;
3424 // oerrdd (logical rowid)
3425 let oerrdd_len = buf.read_ub4()?;
3426 if oerrdd_len > 0 {
3427 buf.skip_raw_bytes_chunked()?;
3428 }
3429
3430 // Batch error codes array
3431 let num_batch_errors = buf.read_ub2()?;
3432 if num_batch_errors > 0 {
3433 buf.skip_ub1()?; // first byte
3434 for _ in 0..num_batch_errors {
3435 buf.skip_ub2()?; // error code
3436 }
3437 }
3438
3439 // Batch error row offset array
3440 let num_offsets = buf.read_ub4()?;
3441 if num_offsets > 0 {
3442 buf.skip_ub1()?; // first byte
3443 for _ in 0..num_offsets {
3444 buf.skip_ub4()?; // offset
3445 }
3446 }
3447
3448 // Batch error messages array
3449 let num_batch_msgs = buf.read_ub2()?;
3450 if num_batch_msgs > 0 {
3451 buf.skip_ub1()?; // packed size
3452 for _ in 0..num_batch_msgs {
3453 buf.skip_ub2()?; // chunk length
3454 buf.read_string_with_length()?; // message
3455 buf.skip(2)?; // end marker
3456 }
3457 }
3458
3459 // Extended error number (UB4)
3460 let error_code = buf.read_ub4()?;
3461 // Row count (UB8)
3462 let _row_count = buf.read_ub8()?;
3463
3464 // Error message
3465 let error_msg = if error_code != 0 {
3466 buf.read_string_with_length()?.map(|s| s.trim().to_string())
3467 } else {
3468 None
3469 };
3470
3471 Ok((error_code, error_msg, cursor_id))
3472 }
3473
3474 /// Parse error response packet (received after marker reset)
3475 fn parse_error_response(&self, payload: &[u8]) -> Result<QueryResult> {
3476 if payload.len() < 3 {
3477 return Err(Error::Protocol("Error response too short".to_string()));
3478 }
3479
3480 let mut buf = ReadBuffer::from_slice(payload);
3481
3482 // Skip data flags
3483 buf.skip(2)?;
3484
3485 // Read message type
3486 let msg_type = buf.read_u8()?;
3487
3488 // Check for error message type (4)
3489 if msg_type == MessageType::Error as u8 {
3490 // Parse error info per Python's _process_error_info
3491 let _call_status = buf.read_ub4()?; // end of call status
3492 buf.skip_ub2()?; // end to end seq#
3493 buf.skip_ub4()?; // current row number
3494 buf.skip_ub2()?; // error number (short form)
3495 buf.skip_ub2()?; // array elem error
3496 buf.skip_ub2()?; // array elem error
3497 let _cursor_id = buf.read_ub2()?; // cursor id
3498 let _error_pos = buf.read_sb2()?; // error position
3499 buf.skip_ub1()?; // sql type (19c and earlier)
3500 buf.skip_ub1()?; // fatal?
3501 buf.skip_ub1()?; // flags
3502 buf.skip_ub1()?; // user cursor options
3503 buf.skip_ub1()?; // UPI parameter
3504 buf.skip_ub1()?; // flags
3505
3506 // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3507 buf.skip_ub4()?; // rba
3508 buf.skip_ub2()?; // partition_id
3509 buf.skip_ub1()?; // skip
3510 buf.skip_ub4()?; // block_num
3511 buf.skip_ub2()?; // slot_num
3512
3513 buf.skip_ub4()?; // OS error
3514 buf.skip_ub1()?; // statement number
3515 buf.skip_ub1()?; // call number
3516 buf.skip_ub2()?; // padding
3517 buf.skip_ub4()?; // success iters
3518
3519 // oerrdd (logical rowid)
3520 let oerrdd_len = buf.read_ub4()?;
3521 if oerrdd_len > 0 {
3522 buf.skip_raw_bytes_chunked()?;
3523 }
3524
3525 // batch error codes array
3526 let num_batch_errors = buf.read_ub2()?;
3527 if num_batch_errors > 0 {
3528 // Skip batch error data - we don't process it for now
3529 buf.skip_ub1()?; // first byte
3530 for _ in 0..num_batch_errors {
3531 buf.skip_ub2()?; // error code
3532 }
3533 }
3534
3535 // batch error row offset array
3536 let num_offsets = buf.read_ub4()?;
3537 if num_offsets > 0 {
3538 buf.skip_ub1()?; // first byte
3539 for _ in 0..num_offsets {
3540 buf.skip_ub4()?; // offset
3541 }
3542 }
3543
3544 // batch error messages array
3545 let num_batch_msgs = buf.read_ub2()?;
3546 if num_batch_msgs > 0 {
3547 // Skip batch error messages
3548 buf.skip_ub1()?; // packed size
3549 for _ in 0..num_batch_msgs {
3550 buf.skip_ub2()?; // chunk length
3551 buf.read_string_with_length()?; // message
3552 buf.skip(2)?; // end marker
3553 }
3554 }
3555
3556 // Extended error number (UB4)
3557 let error_num = buf.read_ub4()?;
3558 let _row_count = buf.read_ub8()?; // row number (extended)
3559
3560 // Read error message
3561 let error_msg = if error_num != 0 {
3562 buf.read_string_with_length()?.map(|s| s.trim().to_string())
3563 } else {
3564 None
3565 };
3566
3567 return Err(Error::OracleError {
3568 code: error_num,
3569 message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_num)),
3570 });
3571 }
3572
3573 // If not an error message type, return generic error
3574 Err(Error::Protocol(format!(
3575 "Expected error message type 4, got {}",
3576 msg_type
3577 )))
3578 }
3579
3580 /// Parse DML response to extract rows affected
3581 fn parse_dml_response(&self, payload: &[u8]) -> Result<QueryResult> {
3582 if payload.len() < 3 {
3583 return Err(Error::Protocol("DML response too short".to_string()));
3584 }
3585
3586 let mut buf = ReadBuffer::from_slice(payload);
3587
3588 // Skip data flags
3589 buf.skip(2)?;
3590
3591 let mut rows_affected: u64 = 0;
3592 let mut cursor_id: u16 = 0;
3593 let mut end_of_response = false;
3594
3595 // Process messages until end_of_response or out of data
3596 // Note: If supports_end_of_response is true, we must continue until msg type 29
3597 while !end_of_response && buf.remaining() > 0 {
3598 let msg_type = buf.read_u8()?;
3599
3600 match msg_type {
3601 // Error (4) - may contain error or success info
3602 x if x == MessageType::Error as u8 => {
3603 let (error_code, error_msg, cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
3604 cursor_id = cid;
3605 rows_affected = row_count;
3606 if error_code != 0 && error_code != 1403 {
3607 return Err(Error::OracleError {
3608 code: error_code,
3609 message: error_msg.unwrap_or_default(),
3610 });
3611 }
3612 // Only end if server doesn't support end_of_response
3613 // Otherwise, continue until we get msg type 29
3614 }
3615
3616 // Parameter (8) - return parameters
3617 x if x == MessageType::Parameter as u8 => {
3618 self.parse_return_parameters(&mut buf)?;
3619 }
3620
3621 // Status (9) - call status
3622 x if x == MessageType::Status as u8 => {
3623 let _call_status = buf.read_ub4()?;
3624 let _end_to_end_seq = buf.read_ub2()?;
3625 }
3626
3627 // BitVector (21)
3628 21 => {
3629 let _num_columns_sent = buf.read_ub2()?;
3630 // No columns for DML, but read the byte if present
3631 if buf.remaining() > 0 {
3632 let _byte = buf.read_u8()?;
3633 }
3634 }
3635
3636 // End of Response (29) - explicit end marker
3637 29 => {
3638 end_of_response = true;
3639 }
3640
3641 _ => {
3642 // Unknown message type - continue processing
3643 }
3644 }
3645 }
3646
3647 Ok(QueryResult {
3648 columns: Vec::new(),
3649 rows: Vec::new(),
3650 rows_affected,
3651 has_more_rows: false,
3652 cursor_id,
3653 })
3654 }
3655
3656 /// Parse error info and return (error_code, error_msg, cursor_id, row_count)
3657 fn parse_error_info_with_rowcount(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16, u64)> {
3658 // End of call status
3659 let _call_status = buf.read_ub4()?;
3660 // End to end seq#
3661 buf.skip_ub2()?;
3662 // Current row number
3663 buf.skip_ub4()?;
3664 // Error number (short form)
3665 buf.skip_ub2()?;
3666 // Array elem error
3667 buf.skip_ub2()?;
3668 // Array elem error
3669 buf.skip_ub2()?;
3670 // Cursor ID
3671 let cursor_id = buf.read_ub2()?;
3672 // Error position
3673 let _error_pos = buf.read_sb2()?;
3674 // SQL type (19c and earlier)
3675 buf.skip_ub1()?;
3676 // Fatal?
3677 buf.skip_ub1()?;
3678 // Flags
3679 buf.skip_ub1()?;
3680 // User cursor options
3681 buf.skip_ub1()?;
3682 // UPI parameter
3683 buf.skip_ub1()?;
3684 // Flags (second)
3685 buf.skip_ub1()?;
3686 // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3687 buf.skip_ub4()?; // rba
3688 buf.skip_ub2()?; // partition_id
3689 buf.skip_ub1()?; // skip
3690 buf.skip_ub4()?; // block_num
3691 buf.skip_ub2()?; // slot_num
3692 // OS error
3693 buf.skip_ub4()?;
3694 // Statement number
3695 buf.skip_ub1()?;
3696 // Call number
3697 buf.skip_ub1()?;
3698 // Padding
3699 buf.skip_ub2()?;
3700 // Success iters
3701 buf.skip_ub4()?;
3702 // oerrdd (logical rowid)
3703 let oerrdd_len = buf.read_ub4()?;
3704 if oerrdd_len > 0 {
3705 buf.skip_raw_bytes_chunked()?;
3706 }
3707
3708 // Batch error codes array
3709 let num_batch_errors = buf.read_ub2()?;
3710 if num_batch_errors > 0 {
3711 buf.skip_ub1()?; // first byte
3712 for _ in 0..num_batch_errors {
3713 buf.skip_ub2()?; // error code
3714 }
3715 }
3716
3717 // Batch error row offset array
3718 let num_offsets = buf.read_ub4()?;
3719 if num_offsets > 0 {
3720 buf.skip_ub1()?; // first byte
3721 for _ in 0..num_offsets {
3722 buf.skip_ub4()?; // offset
3723 }
3724 }
3725
3726 // Batch error messages array
3727 let num_batch_msgs = buf.read_ub2()?;
3728 if num_batch_msgs > 0 {
3729 buf.skip_ub1()?; // packed size
3730 for _ in 0..num_batch_msgs {
3731 buf.skip_ub2()?; // chunk length
3732 buf.read_string_with_length()?; // message
3733 buf.skip(2)?; // end marker
3734 }
3735 }
3736
3737 // Extended error number (UB4)
3738 let error_code = buf.read_ub4()?;
3739 // Row count (UB8) - this is the rows affected!
3740 let row_count = buf.read_ub8()?;
3741
3742 // Fields added in Oracle Database 20c (TTC field version >= 16)
3743 // We always skip these since we support Oracle 20c+
3744 buf.skip_ub4()?; // sql_type
3745 buf.skip_ub4()?; // server_checksum
3746
3747 // Error message
3748 let error_msg = if error_code != 0 {
3749 buf.read_string_with_length()?.map(|s| s.trim().to_string())
3750 } else {
3751 None
3752 };
3753
3754 Ok((error_code, error_msg, cursor_id, row_count))
3755 }
3756
3757 /// Parse describe info from response to extract column metadata
3758 ///
3759 /// Per Python's _process_describe_info, the format is:
3760 /// - UB4: max row size (skip)
3761 /// - UB4: number of columns
3762 /// - If num_columns > 0: UB1 (skip one byte)
3763 /// - For each column: metadata fields
3764 /// - After columns: current date, dcb flags, etc.
3765 fn parse_describe_info(&self, buf: &mut ReadBuffer, ttc_field_version: u8) -> Result<Vec<ColumnInfo>> {
3766 use crate::constants::ccap_value;
3767
3768 // Skip max row size
3769 buf.skip_ub4()?;
3770
3771 // Read number of columns
3772 let num_columns = buf.read_ub4()? as usize;
3773 if num_columns == 0 {
3774 return Ok(Vec::new());
3775 }
3776
3777 // Skip one byte if we have columns
3778 buf.skip_ub1()?;
3779
3780 let mut columns = Vec::with_capacity(num_columns);
3781
3782 for _col_idx in 0..num_columns {
3783
3784 // Parse column metadata per Python's _process_metadata
3785 let ora_type_num = buf.read_u8()?;
3786 buf.skip_ub1()?; // flags
3787 let precision = buf.read_u8()?; // precision as SB1
3788 let scale = buf.read_u8()?; // scale as SB1
3789 let buffer_size = buf.read_ub4()?;
3790
3791 buf.skip_ub4()?; // max_num_array_elements
3792 buf.skip_ub8()?; // cont_flags
3793 let _oid = buf.read_bytes_with_length()?; // OID
3794 buf.skip_ub2()?; // version
3795 buf.skip_ub2()?; // charset_id
3796 let _csfrm = buf.read_u8()?; // charset form
3797 let max_size = buf.read_ub4()?;
3798
3799 // For TTC field version >= 12.2 (8), skip oaccolid
3800 if ttc_field_version >= ccap_value::FIELD_VERSION_12_2 {
3801 buf.skip_ub4()?; // oaccolid
3802 }
3803
3804 let _nulls_allowed = buf.read_u8()?;
3805 buf.skip_ub1()?; // v7 length of name
3806 let name = buf.read_string_with_ub4_length()?.unwrap_or_default();
3807 let _schema = buf.read_string_with_ub4_length()?; // schema
3808 let _type_name = buf.read_string_with_ub4_length()?; // type_name
3809 buf.skip_ub2()?; // column position
3810 buf.skip_ub4()?; // uds_flags
3811
3812 // For TTC field version >= 23.1 (17), read domain fields
3813 if ttc_field_version >= ccap_value::FIELD_VERSION_23_1 {
3814 let _domain_schema = buf.read_string_with_ub4_length()?;
3815 let _domain_name = buf.read_string_with_ub4_length()?;
3816 }
3817
3818 // For TTC field version >= 20 (23.1_EXT_3), read annotations
3819 if ttc_field_version >= 20 {
3820 let num_annotations = buf.read_ub4()?;
3821 if num_annotations > 0 {
3822 buf.skip_ub1()?;
3823 // Read the actual annotations count (yes, it's read twice in Python)
3824 let actual_num = buf.read_ub4()?;
3825 buf.skip_ub1()?;
3826 for _ in 0..actual_num {
3827 // Skip annotation key and value (both are string with UB4 length)
3828 let _key = buf.read_string_with_ub4_length()?;
3829 let _value = buf.read_string_with_ub4_length()?;
3830 buf.skip_ub4()?; // flags per annotation
3831 }
3832 buf.skip_ub4()?; // final flags
3833 }
3834 }
3835
3836 // For TTC field version >= 24 (23.4), read vector fields
3837 if ttc_field_version >= ccap_value::FIELD_VERSION_23_4 {
3838 buf.skip_ub4()?; // vector_dimensions
3839 buf.skip_ub1()?; // vector_format
3840 buf.skip_ub1()?; // vector_flags
3841 }
3842
3843 // Convert data type to OracleType
3844 let oracle_type = crate::constants::OracleType::try_from(ora_type_num)
3845 .unwrap_or(crate::constants::OracleType::Varchar);
3846
3847 let mut col = ColumnInfo::new(&name, oracle_type);
3848 col.data_size = if max_size > 0 { max_size } else { buffer_size };
3849 col.precision = precision as i16;
3850 col.scale = scale as i16;
3851 columns.push(col);
3852 }
3853
3854 // After columns: skip remaining describe info fields
3855 // Python's _process_describe_info uses:
3856 // buf.read_ub4(&num_bytes)
3857 // if num_bytes > 0:
3858 // buf.skip_raw_bytes_chunked() # current date
3859 // buf.skip_ub4() # dcbflag
3860 // buf.skip_ub4() # dcbmdbz
3861 // buf.skip_ub4() # dcbmnpr
3862 // buf.skip_ub4() # dcbmxpr
3863 // buf.read_ub4(&num_bytes)
3864 // if num_bytes > 0:
3865 // buf.skip_raw_bytes_chunked() # dcbqcky
3866
3867 // current_date - read UB4 indicator first, then skip chunked bytes if > 0
3868 let current_date_indicator = buf.read_ub4()?;
3869 if current_date_indicator > 0 {
3870 buf.skip_raw_bytes_chunked()?;
3871 }
3872
3873 // dcb* fields as UB4
3874 buf.skip_ub4()?; // dcbflag
3875 buf.skip_ub4()?; // dcbmdbz
3876 buf.skip_ub4()?; // dcbmnpr
3877 buf.skip_ub4()?; // dcbmxpr
3878
3879 // dcbqcky - read UB4 indicator first, then skip chunked bytes if > 0
3880 let dcbqcky_indicator = buf.read_ub4()?;
3881 if dcbqcky_indicator > 0 {
3882 buf.skip_raw_bytes_chunked()?;
3883 }
3884
3885 // After dcbqcky, the next message (RowHeader) follows directly
3886 // No additional fields to skip here
3887
3888
3889 Ok(columns)
3890 }
3891
3892 /// Commit the current transaction.
3893 ///
3894 /// Makes all changes in the current transaction permanent. After commit,
3895 /// a new transaction begins automatically.
3896 ///
3897 /// # Example
3898 ///
3899 /// ```rust,no_run
3900 /// # use oracle_rs::{Connection, Value};
3901 /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3902 /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Alice".into()]).await?;
3903 /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Bob".into()]).await?;
3904 /// conn.commit().await?; // Both inserts are now permanent
3905 /// # Ok(())
3906 /// # }
3907 /// ```
3908 pub async fn commit(&self) -> Result<()> {
3909 self.ensure_ready().await?;
3910 // Use SQL COMMIT statement instead of simple function
3911 // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3912 self.execute("COMMIT", &[]).await?;
3913 Ok(())
3914 }
3915
3916 /// Rollback the current transaction.
3917 ///
3918 /// Discards all changes made in the current transaction. After rollback,
3919 /// a new transaction begins automatically.
3920 ///
3921 /// # Example
3922 ///
3923 /// ```rust,no_run
3924 /// # use oracle_rs::{Connection, Value};
3925 /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3926 /// conn.execute("DELETE FROM users WHERE id = :1", &[1.into()]).await?;
3927 /// // Oops, wrong user!
3928 /// conn.rollback().await?; // Delete is undone
3929 /// # Ok(())
3930 /// # }
3931 /// ```
3932 pub async fn rollback(&self) -> Result<()> {
3933 self.ensure_ready().await?;
3934 // Use SQL ROLLBACK statement instead of simple function
3935 // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3936 self.execute("ROLLBACK", &[]).await?;
3937 Ok(())
3938 }
3939
3940 /// Create a savepoint within the current transaction
3941 ///
3942 /// Savepoints allow partial rollback of a transaction. You can create multiple
3943 /// savepoints and rollback to any of them without affecting work done before
3944 /// that savepoint.
3945 ///
3946 /// # Arguments
3947 /// * `name` - The savepoint name (must be a valid Oracle identifier)
3948 ///
3949 /// # Example
3950 /// ```rust,ignore
3951 /// conn.execute("INSERT INTO t VALUES (1)", &[]).await?;
3952 /// conn.savepoint("sp1").await?;
3953 /// conn.execute("INSERT INTO t VALUES (2)", &[]).await?;
3954 /// conn.rollback_to_savepoint("sp1").await?; // Undoes the second insert
3955 /// conn.commit().await?; // Commits only the first insert
3956 /// ```
3957 pub async fn savepoint(&self, name: &str) -> Result<()> {
3958 self.ensure_ready().await?;
3959 self.execute(&format!("SAVEPOINT {}", name), &[]).await?;
3960 Ok(())
3961 }
3962
3963 /// Rollback to a previously created savepoint
3964 ///
3965 /// This undoes all changes made after the savepoint was created, but keeps
3966 /// the transaction active. Changes made before the savepoint are preserved.
3967 ///
3968 /// # Arguments
3969 /// * `name` - The savepoint name to rollback to
3970 pub async fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
3971 self.ensure_ready().await?;
3972 self.execute(&format!("ROLLBACK TO SAVEPOINT {}", name), &[]).await?;
3973 Ok(())
3974 }
3975
3976 /// Ping the server to check if the connection is still alive.
3977 ///
3978 /// This executes a lightweight query (`SELECT 1 FROM DUAL`) to verify
3979 /// the connection is responsive. Useful for connection health checks
3980 /// in pooling scenarios.
3981 ///
3982 /// # Example
3983 ///
3984 /// ```rust,no_run
3985 /// # use oracle_rs::Connection;
3986 /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3987 /// if conn.ping().await.is_ok() {
3988 /// println!("Connection is alive");
3989 /// } else {
3990 /// println!("Connection is dead");
3991 /// }
3992 /// # Ok(())
3993 /// # }
3994 /// ```
3995 pub async fn ping(&self) -> Result<()> {
3996 self.ensure_ready().await?;
3997 // Use SELECT FROM DUAL instead of simple function
3998 // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3999 self.query("SELECT 1 FROM DUAL", &[]).await?;
4000 Ok(())
4001 }
4002
4003 /// Clear the statement cache
4004 ///
4005 /// This should be called when recycling a connection in a pool to ensure
4006 /// that any stale cursor state is cleared. This is useful after errors
4007 /// or when the connection state may be inconsistent.
4008 pub async fn clear_statement_cache(&self) {
4009 let mut inner = self.inner.lock().await;
4010 if let Some(ref mut cache) = inner.statement_cache {
4011 cache.clear();
4012 }
4013 }
4014
4015 /// Read data from a LOB (CLOB or BLOB)
4016 ///
4017 /// # Arguments
4018 /// * `locator` - The LOB locator obtained from a query result
4019 /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4020 /// * `amount` - Amount to read (0 for entire LOB)
4021 ///
4022 /// # Returns
4023 /// For CLOB: returns the text content as a String
4024 /// For BLOB: returns the binary content as bytes
4025 pub async fn read_lob(&self, locator: &LobLocator) -> Result<LobData> {
4026 self.ensure_ready().await?;
4027
4028 // Read the entire LOB starting at offset 1
4029 let offset = 1u64;
4030 let amount = locator.size();
4031
4032 self.read_lob_internal(locator, offset, amount).await
4033 }
4034
4035 /// Read a portion of a LOB
4036 pub async fn read_lob_range(
4037 &self,
4038 locator: &LobLocator,
4039 offset: u64,
4040 amount: u64,
4041 ) -> Result<LobData> {
4042 self.ensure_ready().await?;
4043 self.read_lob_internal(locator, offset, amount).await
4044 }
4045
4046 /// Read a CLOB and return as String
4047 ///
4048 /// This is a convenience method for reading CLOB data directly as a String.
4049 /// Returns an error if the LOB is not a CLOB.
4050 pub async fn read_clob(&self, locator: &LobLocator) -> Result<String> {
4051 if locator.is_blob() || locator.is_bfile() {
4052 return Err(Error::Protocol(
4053 "Cannot read BLOB/BFILE as string, use read_blob instead".to_string(),
4054 ));
4055 }
4056
4057 let data = self.read_lob(locator).await?;
4058 match data {
4059 LobData::String(s) => Ok(s),
4060 LobData::Bytes(_) => Err(Error::Protocol(
4061 "Unexpected bytes from CLOB read".to_string(),
4062 )),
4063 }
4064 }
4065
4066 /// Read a BLOB and return as bytes
4067 ///
4068 /// This is a convenience method for reading BLOB data directly as bytes.
4069 /// Returns an error if the LOB is a CLOB (use read_clob instead).
4070 pub async fn read_blob(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4071 if locator.is_clob() {
4072 return Err(Error::Protocol(
4073 "Cannot read CLOB as bytes, use read_clob instead".to_string(),
4074 ));
4075 }
4076
4077 let data = self.read_lob(locator).await?;
4078 match data {
4079 LobData::Bytes(b) => Ok(b),
4080 LobData::String(_) => Err(Error::Protocol(
4081 "Unexpected string from BLOB read".to_string(),
4082 )),
4083 }
4084 }
4085
4086 /// Read a LOB in chunks, calling a callback for each chunk
4087 ///
4088 /// This is useful for processing large LOBs without loading the entire
4089 /// content into memory. The callback receives each chunk as it's read.
4090 ///
4091 /// # Arguments
4092 /// * `locator` - The LOB locator
4093 /// * `chunk_size` - Size of each chunk to read (0 uses the LOB's natural chunk size)
4094 /// * `callback` - Async function called for each chunk
4095 ///
4096 /// # Example
4097 /// ```ignore
4098 /// let mut total_size = 0;
4099 /// conn.read_lob_chunked(&locator, 8192, |chunk| async move {
4100 /// match chunk {
4101 /// LobData::Bytes(b) => total_size += b.len(),
4102 /// LobData::String(s) => total_size += s.len(),
4103 /// }
4104 /// Ok(())
4105 /// }).await?;
4106 /// ```
4107 pub async fn read_lob_chunked<F, Fut>(
4108 &self,
4109 locator: &LobLocator,
4110 chunk_size: u64,
4111 mut callback: F,
4112 ) -> Result<()>
4113 where
4114 F: FnMut(LobData) -> Fut,
4115 Fut: std::future::Future<Output = Result<()>>,
4116 {
4117 self.ensure_ready().await?;
4118
4119 let total_size = locator.size();
4120 if total_size == 0 {
4121 return Ok(());
4122 }
4123
4124 // Use LOB's natural chunk size if not specified
4125 let chunk_size = if chunk_size == 0 {
4126 self.lob_chunk_size(locator).await?.max(8192) as u64
4127 } else {
4128 chunk_size
4129 };
4130
4131 let mut offset = 1u64;
4132 while offset <= total_size {
4133 let remaining = total_size - offset + 1;
4134 let amount = std::cmp::min(remaining, chunk_size);
4135
4136 let chunk = self.read_lob_internal(locator, offset, amount).await?;
4137 callback(chunk).await?;
4138
4139 offset += amount;
4140 }
4141
4142 Ok(())
4143 }
4144
4145 /// Get the optimal chunk size for a LOB
4146 ///
4147 /// This returns the chunk size that Oracle recommends for efficient
4148 /// reading and writing to this LOB.
4149 pub async fn lob_chunk_size(&self, locator: &LobLocator) -> Result<u32> {
4150 self.ensure_ready().await?;
4151
4152 let mut inner = self.inner.lock().await;
4153 let large_sdu = inner.large_sdu;
4154
4155 // Create LOB operation message for get chunk size
4156 let mut lob_msg = LobOpMessage::new_get_chunk_size(locator);
4157 let seq_num = inner.next_sequence_number();
4158 lob_msg.set_sequence_number(seq_num);
4159
4160 // Build and send the request
4161 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4162 inner.send(&request).await?;
4163
4164 // Receive and parse response
4165 let response = inner.receive().await?;
4166 if response.len() <= PACKET_HEADER_SIZE {
4167 return Err(Error::Protocol("Empty LOB chunk size response".to_string()));
4168 }
4169
4170 // Check for MARKER packet
4171 let packet_type = response[4];
4172 if packet_type == PacketType::Marker as u8 {
4173 let error_response = inner.handle_marker_reset().await?;
4174 let payload = &error_response[PACKET_HEADER_SIZE..];
4175 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4176 buf.skip(2)?;
4177 return self.parse_lob_error(&mut buf);
4178 }
4179
4180 // Parse the amount response (chunk size is returned as amount)
4181 self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4182 .map(|v| v as u32)
4183 }
4184
4185 /// Internal LOB read implementation
4186 async fn read_lob_internal(
4187 &self,
4188 locator: &LobLocator,
4189 offset: u64,
4190 amount: u64,
4191 ) -> Result<LobData> {
4192 let mut inner = self.inner.lock().await;
4193 let large_sdu = inner.large_sdu;
4194
4195 // Create LOB operation message for read
4196 let mut lob_msg = LobOpMessage::new_read(locator, offset, amount);
4197 let seq_num = inner.next_sequence_number();
4198 lob_msg.set_sequence_number(seq_num);
4199
4200 // Build and send the request
4201 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4202 inner.send(&request).await?;
4203
4204 // Receive and parse response (may span multiple packets for large LOBs)
4205 let response = inner.receive_response().await?;
4206 if response.len() <= PACKET_HEADER_SIZE {
4207 return Err(Error::Protocol("Empty LOB read response".to_string()));
4208 }
4209
4210 // Check for MARKER packet (indicates error)
4211 let packet_type = response[4];
4212 if packet_type == PacketType::Marker as u8 {
4213 let error_response = inner.handle_marker_reset().await?;
4214 let payload = &error_response[PACKET_HEADER_SIZE..];
4215 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4216 // Skip data flags
4217 buf.skip(2)?;
4218 return self.parse_lob_error(&mut buf);
4219 }
4220
4221 // Parse LOB data response
4222 let payload = &response[PACKET_HEADER_SIZE..];
4223 self.parse_lob_read_response(payload, locator)
4224 }
4225
4226 /// Parse LOB read response
4227 fn parse_lob_read_response(&self, payload: &[u8], locator: &LobLocator) -> Result<LobData> {
4228 use crate::buffer::ReadBuffer;
4229
4230 let mut buf = ReadBuffer::from_slice(payload);
4231
4232 // Skip data flags
4233 buf.skip(2)?;
4234
4235 let mut lob_data: Option<Vec<u8>> = None;
4236
4237 // Process messages until end of response
4238 while buf.remaining() > 0 {
4239 let msg_type = buf.read_u8()?;
4240
4241 match msg_type {
4242 // LobData message (14)
4243 x if x == MessageType::LobData as u8 => {
4244 // Read LOB data with length
4245 let data = buf.read_raw_bytes_chunked()?;
4246 lob_data = Some(data);
4247 }
4248
4249 // Parameter return (8) - contains updated locator and amount
4250 x if x == MessageType::Parameter as u8 => {
4251 // Skip the updated locator (same length as original)
4252 let locator_len = locator.locator_bytes().len();
4253 buf.skip(locator_len)?;
4254
4255 // Read back the amount (ub8)
4256 let _returned_amount = buf.read_ub8()?;
4257 }
4258
4259 // Error/Status message (4) - code 0 means success
4260 x if x == MessageType::Error as u8 => {
4261 // Parse error info - code 0 means success
4262 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4263 if code != 0 {
4264 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4265 return Err(Error::OracleError { code, message });
4266 }
4267 // code 0 = success, continue processing
4268 }
4269 }
4270
4271 // End of response (29)
4272 x if x == MessageType::EndOfResponse as u8 => {
4273 break;
4274 }
4275
4276 // Skip other message types
4277 _ => {
4278 // Try to skip unknown messages
4279 continue;
4280 }
4281 }
4282 }
4283
4284 // Convert to appropriate type based on LOB type
4285 match lob_data {
4286 Some(data) => {
4287 if locator.is_blob() || locator.is_bfile() {
4288 Ok(LobData::Bytes(bytes::Bytes::from(data)))
4289 } else {
4290 // CLOB - decode based on encoding
4291 let text = if locator.uses_var_length_charset() {
4292 // UTF-16 BE encoding
4293 let chars: Vec<u16> = data
4294 .chunks_exact(2)
4295 .map(|c| u16::from_be_bytes([c[0], c[1]]))
4296 .collect();
4297 String::from_utf16_lossy(&chars)
4298 } else {
4299 // UTF-8 encoding
4300 String::from_utf8_lossy(&data).to_string()
4301 };
4302 Ok(LobData::String(text))
4303 }
4304 }
4305 None => {
4306 // Empty LOB
4307 if locator.is_blob() || locator.is_bfile() {
4308 Ok(LobData::Bytes(bytes::Bytes::new()))
4309 } else {
4310 Ok(LobData::String(String::new()))
4311 }
4312 }
4313 }
4314 }
4315
4316 /// Write data to a LOB
4317 ///
4318 /// # Arguments
4319 /// * `locator` - The LOB locator obtained from a query result
4320 /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4321 /// * `data` - Data to write (bytes for BLOB, UTF-8 encoded bytes for CLOB)
4322 pub async fn write_lob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4323 self.ensure_ready().await?;
4324
4325 let mut inner = self.inner.lock().await;
4326 let large_sdu = inner.large_sdu;
4327 let sdu_size = inner.sdu_size as usize;
4328
4329 // Encode data for CLOB if necessary
4330 let encoded_data: Vec<u8>;
4331 let write_data = if locator.is_clob() && locator.uses_var_length_charset() {
4332 // Convert UTF-8 to UTF-16 BE for CLOB with var length charset
4333 let text = String::from_utf8_lossy(data);
4334 encoded_data = text
4335 .encode_utf16()
4336 .flat_map(|c| c.to_be_bytes())
4337 .collect();
4338 &encoded_data[..]
4339 } else {
4340 data
4341 };
4342
4343 // Create LOB operation message for write
4344 let mut lob_msg = LobOpMessage::new_write(locator, offset, write_data);
4345 let seq_num = inner.next_sequence_number();
4346 lob_msg.set_sequence_number(seq_num);
4347
4348 // Build message content (without packet header or data flags)
4349 let message = lob_msg.build_message_only(&inner.capabilities)?;
4350
4351 // Calculate if this fits in a single packet
4352 // Single packet max payload = SDU - header (8) - data flags (2)
4353 let max_single_packet_payload = sdu_size.saturating_sub(PACKET_HEADER_SIZE + 2);
4354
4355 let is_multi_packet = message.len() > max_single_packet_payload;
4356
4357 if is_multi_packet {
4358 // Needs multiple packets - use multi-packet sender
4359 inner.send_multi_packet(&message, 0).await?;
4360 } else {
4361 // Fits in one packet - use standard send
4362 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4363 inner.send(&request).await?;
4364 }
4365
4366 // Receive and parse response
4367 // Use receive_response() to accumulate all packets until END_OF_RESPONSE.
4368 // This is necessary because Oracle may send multiple packets for the response,
4369 // and if we only read one packet, leftover data causes close() to hang.
4370 let response = inner.receive_response().await?;
4371 if response.len() <= PACKET_HEADER_SIZE {
4372 return Err(Error::Protocol("Empty LOB write response".to_string()));
4373 }
4374
4375 // Check for MARKER packet (indicates error)
4376 let packet_type = response[4];
4377 if packet_type == PacketType::Marker as u8 {
4378 let error_response = inner.handle_marker_reset().await?;
4379 let payload = &error_response[PACKET_HEADER_SIZE..];
4380 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4381 buf.skip(2)?;
4382 return self.parse_lob_error(&mut buf);
4383 }
4384
4385 // Parse response to check for errors
4386 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4387 }
4388
4389 /// Write string data to a CLOB
4390 pub async fn write_clob(&self, locator: &LobLocator, offset: u64, text: &str) -> Result<()> {
4391 if locator.is_blob() || locator.is_bfile() {
4392 return Err(Error::Protocol(
4393 "Cannot write string to BLOB/BFILE, use write_blob instead".to_string(),
4394 ));
4395 }
4396 self.write_lob(locator, offset, text.as_bytes()).await
4397 }
4398
4399 /// Write binary data to a BLOB
4400 pub async fn write_blob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4401 if locator.is_clob() {
4402 return Err(Error::Protocol(
4403 "Cannot write bytes to CLOB, use write_clob instead".to_string(),
4404 ));
4405 }
4406 self.write_lob(locator, offset, data).await
4407 }
4408
4409 /// Get the length of a LOB
4410 ///
4411 /// For CLOB: returns length in characters
4412 /// For BLOB: returns length in bytes
4413 pub async fn lob_length(&self, locator: &LobLocator) -> Result<u64> {
4414 self.ensure_ready().await?;
4415
4416 let mut inner = self.inner.lock().await;
4417 let large_sdu = inner.large_sdu;
4418
4419 // Create LOB operation message for get length
4420 let mut lob_msg = LobOpMessage::new_get_length(locator);
4421 let seq_num = inner.next_sequence_number();
4422 lob_msg.set_sequence_number(seq_num);
4423
4424 // Build and send the request
4425 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4426 inner.send(&request).await?;
4427
4428 // Receive and parse response
4429 let response = inner.receive().await?;
4430 if response.len() <= PACKET_HEADER_SIZE {
4431 return Err(Error::Protocol("Empty LOB get_length response".to_string()));
4432 }
4433
4434 // Check for MARKER packet (indicates error)
4435 let packet_type = response[4];
4436 if packet_type == PacketType::Marker as u8 {
4437 let error_response = inner.handle_marker_reset().await?;
4438 let payload = &error_response[PACKET_HEADER_SIZE..];
4439 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4440 buf.skip(2)?;
4441 return self.parse_lob_error(&mut buf);
4442 }
4443
4444 // Parse response to get the length
4445 self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4446 }
4447
4448 /// Trim a LOB to a specified length
4449 ///
4450 /// # Arguments
4451 /// * `locator` - The LOB locator
4452 /// * `new_size` - The new size (in characters for CLOB, bytes for BLOB)
4453 pub async fn lob_trim(&self, locator: &LobLocator, new_size: u64) -> Result<()> {
4454 self.ensure_ready().await?;
4455
4456 let mut inner = self.inner.lock().await;
4457 let large_sdu = inner.large_sdu;
4458
4459 // Create LOB operation message for trim
4460 let mut lob_msg = LobOpMessage::new_trim(locator, new_size);
4461 let seq_num = inner.next_sequence_number();
4462 lob_msg.set_sequence_number(seq_num);
4463
4464 // Build and send the request
4465 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4466 inner.send(&request).await?;
4467
4468 // Receive and parse response
4469 let response = inner.receive().await?;
4470 if response.len() <= PACKET_HEADER_SIZE {
4471 return Err(Error::Protocol("Empty LOB trim response".to_string()));
4472 }
4473
4474 // Check for MARKER packet (indicates error)
4475 let packet_type = response[4];
4476 if packet_type == PacketType::Marker as u8 {
4477 let error_response = inner.handle_marker_reset().await?;
4478 let payload = &error_response[PACKET_HEADER_SIZE..];
4479 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4480 buf.skip(2)?;
4481 return self.parse_lob_error(&mut buf);
4482 }
4483
4484 // Parse response to check for errors
4485 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4486 }
4487
4488 /// Create a temporary LOB on the server
4489 ///
4490 /// Creates a temporary LOB of the specified type that lives until the connection
4491 /// is closed or the LOB is explicitly freed.
4492 ///
4493 /// # Arguments
4494 /// * `oracle_type` - The LOB type to create (Clob or Blob)
4495 ///
4496 /// # Returns
4497 /// A `LobLocator` for the newly created temporary LOB
4498 ///
4499 /// # Example
4500 /// ```ignore
4501 /// use oracle_rs::OracleType;
4502 ///
4503 /// let locator = conn.create_temp_lob(OracleType::Clob).await?;
4504 /// conn.write_clob(&locator, 1, "Hello, World!").await?;
4505 /// // Now bind the locator to insert into a CLOB column
4506 /// ```
4507 pub async fn create_temp_lob(&self, oracle_type: OracleType) -> Result<LobLocator> {
4508 use crate::buffer::ReadBuffer;
4509
4510 // Validate oracle_type is a LOB type
4511 match oracle_type {
4512 OracleType::Clob | OracleType::Blob => {}
4513 _ => {
4514 return Err(Error::Protocol(format!(
4515 "create_temp_lob: invalid type {:?}, must be Clob or Blob",
4516 oracle_type
4517 )));
4518 }
4519 }
4520
4521 self.ensure_ready().await?;
4522
4523 let mut inner = self.inner.lock().await;
4524 let large_sdu = inner.large_sdu;
4525
4526 // Create the CREATE_TEMP message
4527 let mut lob_msg = LobOpMessage::new_create_temp(oracle_type);
4528 let seq_num = inner.next_sequence_number();
4529 lob_msg.set_sequence_number(seq_num);
4530
4531 // Build and send the request
4532 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4533 inner.send(&request).await?;
4534
4535 // Receive and parse response
4536 let response = inner.receive().await?;
4537 if response.len() <= PACKET_HEADER_SIZE {
4538 return Err(Error::Protocol("Empty CREATE_TEMP LOB response".to_string()));
4539 }
4540
4541 // Check for MARKER packet (indicates error)
4542 let packet_type = response[4];
4543 if packet_type == PacketType::Marker as u8 {
4544 let error_response = inner.handle_marker_reset().await?;
4545 let payload = &error_response[PACKET_HEADER_SIZE..];
4546 let mut buf = ReadBuffer::from_slice(payload);
4547 buf.skip(2)?;
4548 return self.parse_lob_error(&mut buf);
4549 }
4550
4551 // Parse response to extract the locator
4552 let payload = &response[PACKET_HEADER_SIZE..];
4553 let mut buf = ReadBuffer::from_slice(payload);
4554 buf.skip(2)?; // Skip data flags
4555
4556 let mut locator_bytes: Option<Vec<u8>> = None;
4557
4558 while buf.remaining() > 0 {
4559 let msg_type = buf.read_u8()?;
4560
4561 match msg_type {
4562 // Parameter return (8) - contains the populated locator
4563 x if x == MessageType::Parameter as u8 => {
4564 // Read the 40-byte locator
4565 let loc_data = buf.read_bytes_vec(40)?;
4566 locator_bytes = Some(loc_data);
4567 // Skip charset (ub2) and trailing flags (ub1)
4568 buf.skip(2)?; // charset
4569 buf.skip(1)?; // flags
4570 }
4571
4572 // Error/Status message (4) - code 0 means success
4573 x if x == MessageType::Error as u8 => {
4574 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4575 if code != 0 {
4576 let message = msg.unwrap_or_else(|| "CREATE_TEMP LOB error".to_string());
4577 return Err(Error::OracleError { code, message });
4578 }
4579 }
4580 }
4581
4582 // End of response (29)
4583 x if x == MessageType::EndOfResponse as u8 => {
4584 break;
4585 }
4586
4587 _ => continue,
4588 }
4589 }
4590
4591 // Create the LobLocator from the returned bytes
4592 let loc_bytes = locator_bytes.ok_or_else(|| {
4593 Error::Protocol("CREATE_TEMP LOB response did not contain locator".to_string())
4594 })?;
4595
4596 // Create LobLocator with size 0, chunk_size 0 (will be fetched if needed)
4597 let locator = LobLocator::new(
4598 bytes::Bytes::from(loc_bytes),
4599 0, // size - unknown for new temp LOB
4600 0, // chunk_size - unknown, can be fetched later
4601 oracle_type,
4602 1, // csfrm - 1 for CLOB, 0 for BLOB (but we store it on the locator type)
4603 );
4604
4605 Ok(locator)
4606 }
4607
4608 // ==================== BFILE Operations ====================
4609
4610 /// Check if a BFILE exists on the server
4611 ///
4612 /// Returns true if the file referenced by the BFILE locator exists on the server.
4613 pub async fn bfile_exists(&self, locator: &LobLocator) -> Result<bool> {
4614 self.ensure_ready().await?;
4615
4616 if !locator.is_bfile() {
4617 return Err(Error::Protocol("bfile_exists called on non-BFILE locator".to_string()));
4618 }
4619
4620 let mut inner = self.inner.lock().await;
4621 let large_sdu = inner.large_sdu;
4622
4623 let mut lob_msg = LobOpMessage::new_file_exists(locator);
4624 let seq_num = inner.next_sequence_number();
4625 lob_msg.set_sequence_number(seq_num);
4626
4627 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4628 inner.send(&request).await?;
4629
4630 let response = inner.receive().await?;
4631 if response.len() <= PACKET_HEADER_SIZE {
4632 return Err(Error::Protocol("Empty BFILE exists response".to_string()));
4633 }
4634
4635 let packet_type = response[4];
4636 if packet_type == PacketType::Marker as u8 {
4637 let error_response = inner.handle_marker_reset().await?;
4638 let payload = &error_response[PACKET_HEADER_SIZE..];
4639 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4640 buf.skip(2)?;
4641 return self.parse_lob_error(&mut buf);
4642 }
4643
4644 self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4645 }
4646
4647 /// Open a BFILE for reading
4648 ///
4649 /// The BFILE must be opened before reading. After reading, close it with bfile_close.
4650 pub async fn bfile_open(&self, locator: &LobLocator) -> Result<()> {
4651 self.ensure_ready().await?;
4652
4653 if !locator.is_bfile() {
4654 return Err(Error::Protocol("bfile_open called on non-BFILE locator".to_string()));
4655 }
4656
4657 let mut inner = self.inner.lock().await;
4658 let large_sdu = inner.large_sdu;
4659
4660 let mut lob_msg = LobOpMessage::new_file_open(locator);
4661 let seq_num = inner.next_sequence_number();
4662 lob_msg.set_sequence_number(seq_num);
4663
4664 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4665 inner.send(&request).await?;
4666
4667 let response = inner.receive().await?;
4668 if response.len() <= PACKET_HEADER_SIZE {
4669 return Err(Error::Protocol("Empty BFILE open response".to_string()));
4670 }
4671
4672 let packet_type = response[4];
4673 if packet_type == PacketType::Marker as u8 {
4674 let error_response = inner.handle_marker_reset().await?;
4675 let payload = &error_response[PACKET_HEADER_SIZE..];
4676 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4677 buf.skip(2)?;
4678 return self.parse_lob_error(&mut buf);
4679 }
4680
4681 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4682 }
4683
4684 /// Close a BFILE after reading
4685 pub async fn bfile_close(&self, locator: &LobLocator) -> Result<()> {
4686 self.ensure_ready().await?;
4687
4688 if !locator.is_bfile() {
4689 return Err(Error::Protocol("bfile_close called on non-BFILE locator".to_string()));
4690 }
4691
4692 let mut inner = self.inner.lock().await;
4693 let large_sdu = inner.large_sdu;
4694
4695 let mut lob_msg = LobOpMessage::new_file_close(locator);
4696 let seq_num = inner.next_sequence_number();
4697 lob_msg.set_sequence_number(seq_num);
4698
4699 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4700 inner.send(&request).await?;
4701
4702 let response = inner.receive().await?;
4703 if response.len() <= PACKET_HEADER_SIZE {
4704 return Err(Error::Protocol("Empty BFILE close response".to_string()));
4705 }
4706
4707 let packet_type = response[4];
4708 if packet_type == PacketType::Marker as u8 {
4709 let error_response = inner.handle_marker_reset().await?;
4710 let payload = &error_response[PACKET_HEADER_SIZE..];
4711 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4712 buf.skip(2)?;
4713 return self.parse_lob_error(&mut buf);
4714 }
4715
4716 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4717 }
4718
4719 /// Check if a BFILE is currently open
4720 pub async fn bfile_is_open(&self, locator: &LobLocator) -> Result<bool> {
4721 self.ensure_ready().await?;
4722
4723 if !locator.is_bfile() {
4724 return Err(Error::Protocol("bfile_is_open called on non-BFILE locator".to_string()));
4725 }
4726
4727 let mut inner = self.inner.lock().await;
4728 let large_sdu = inner.large_sdu;
4729
4730 let mut lob_msg = LobOpMessage::new_file_is_open(locator);
4731 let seq_num = inner.next_sequence_number();
4732 lob_msg.set_sequence_number(seq_num);
4733
4734 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4735 inner.send(&request).await?;
4736
4737 let response = inner.receive().await?;
4738 if response.len() <= PACKET_HEADER_SIZE {
4739 return Err(Error::Protocol("Empty BFILE is_open response".to_string()));
4740 }
4741
4742 let packet_type = response[4];
4743 if packet_type == PacketType::Marker as u8 {
4744 let error_response = inner.handle_marker_reset().await?;
4745 let payload = &error_response[PACKET_HEADER_SIZE..];
4746 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4747 buf.skip(2)?;
4748 return self.parse_lob_error(&mut buf);
4749 }
4750
4751 self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4752 }
4753
4754 /// Read BFILE data
4755 ///
4756 /// This is a convenience method that opens the BFILE if needed, reads all content,
4757 /// and returns it as bytes. For large BFILEs, consider using read_lob_chunked.
4758 pub async fn read_bfile(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4759 if !locator.is_bfile() {
4760 return Err(Error::Protocol("read_bfile called on non-BFILE locator".to_string()));
4761 }
4762
4763 // Check if file is open, open if needed
4764 let should_close = if !self.bfile_is_open(locator).await? {
4765 self.bfile_open(locator).await?;
4766 true
4767 } else {
4768 false
4769 };
4770
4771 // Read all data
4772 let result = self.read_blob(locator).await;
4773
4774 // Close if we opened it
4775 if should_close {
4776 let _ = self.bfile_close(locator).await;
4777 }
4778
4779 result
4780 }
4781
4782 /// Parse a LOB operation response that returns a boolean (file_exists, is_open)
4783 fn parse_lob_bool_response(&self, payload: &[u8], locator: &LobLocator) -> Result<bool> {
4784 use crate::buffer::ReadBuffer;
4785
4786 let mut buf = ReadBuffer::from_slice(payload);
4787 buf.skip(2)?; // Skip data flags
4788
4789 let mut bool_result: bool = false;
4790
4791 while buf.remaining() > 0 {
4792 let msg_type = buf.read_u8()?;
4793
4794 match msg_type {
4795 // Parameter return (8) - contains updated locator and bool flag
4796 x if x == MessageType::Parameter as u8 => {
4797 let locator_len = locator.locator_bytes().len();
4798 buf.skip(locator_len)?;
4799 // Boolean flag is a single byte, > 0 means true
4800 let flag = buf.read_u8()?;
4801 bool_result = flag > 0;
4802 }
4803
4804 // Error/Status message (4) - code 0 means success
4805 x if x == MessageType::Error as u8 => {
4806 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4807 if code != 0 {
4808 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4809 return Err(Error::OracleError { code, message });
4810 }
4811 }
4812 }
4813
4814 // End of response (29)
4815 x if x == MessageType::EndOfResponse as u8 => {
4816 break;
4817 }
4818
4819 _ => continue,
4820 }
4821 }
4822
4823 Ok(bool_result)
4824 }
4825
4826 /// Parse a simple LOB operation response (write, trim)
4827 fn parse_lob_simple_response(&self, payload: &[u8], locator: &LobLocator) -> Result<()> {
4828 use crate::buffer::ReadBuffer;
4829
4830 let mut buf = ReadBuffer::from_slice(payload);
4831 buf.skip(2)?; // Skip data flags
4832
4833 while buf.remaining() > 0 {
4834 let msg_type = buf.read_u8()?;
4835
4836 match msg_type {
4837 // Parameter return (8) - contains updated locator and possibly amount
4838 x if x == MessageType::Parameter as u8 => {
4839 let locator_len = locator.locator_bytes().len();
4840 buf.skip(locator_len)?;
4841 // After locator, there may be amount (ub8) if send_amount was true
4842 // We just skip any remaining bytes until we hit Error or EndOfResponse
4843 }
4844
4845 // Error/Status message (4) - code 0 means success
4846 x if x == MessageType::Error as u8 => {
4847 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4848 if code != 0 {
4849 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4850 return Err(Error::OracleError { code, message });
4851 }
4852 }
4853 }
4854
4855 // End of response (29)
4856 x if x == MessageType::EndOfResponse as u8 => {
4857 break;
4858 }
4859
4860 _ => continue,
4861 }
4862 }
4863
4864 Ok(())
4865 }
4866
4867 /// Parse a LOB operation response that returns an amount (get_length, get_chunk_size)
4868 fn parse_lob_amount_response(&self, payload: &[u8], locator: &LobLocator) -> Result<u64> {
4869 use crate::buffer::ReadBuffer;
4870
4871 let mut buf = ReadBuffer::from_slice(payload);
4872 buf.skip(2)?; // Skip data flags
4873
4874 let mut returned_amount: u64 = 0;
4875
4876 while buf.remaining() > 0 {
4877 let msg_type = buf.read_u8()?;
4878
4879 match msg_type {
4880 // Parameter return (8) - contains updated locator and amount
4881 x if x == MessageType::Parameter as u8 => {
4882 let locator_len = locator.locator_bytes().len();
4883 buf.skip(locator_len)?;
4884 returned_amount = buf.read_ub8()?;
4885 }
4886
4887 // Error/Status message (4) - code 0 means success
4888 x if x == MessageType::Error as u8 => {
4889 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4890 if code != 0 {
4891 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4892 return Err(Error::OracleError { code, message });
4893 }
4894 }
4895 }
4896
4897 // End of response (29)
4898 x if x == MessageType::EndOfResponse as u8 => {
4899 break;
4900 }
4901
4902 _ => continue,
4903 }
4904 }
4905
4906 Ok(returned_amount)
4907 }
4908
4909 /// Parse LOB error response
4910 fn parse_lob_error<T>(&self, buf: &mut crate::buffer::ReadBuffer) -> Result<T> {
4911 // Try to extract error info
4912 if let Ok((code, msg, _)) = self.parse_error_info(buf) {
4913 let message = msg.unwrap_or_else(|| "Unknown LOB error".to_string());
4914 Err(Error::OracleError { code, message })
4915 } else {
4916 Err(Error::Protocol("LOB operation failed".to_string()))
4917 }
4918 }
4919
4920 /// Close the connection.
4921 ///
4922 /// Sends a logoff message to the server and closes the underlying TCP
4923 /// connection. After calling close, the connection cannot be reused.
4924 ///
4925 /// If the connection is already closed, this method returns `Ok(())`
4926 /// without doing anything.
4927 ///
4928 /// # Note
4929 ///
4930 /// Any uncommitted transaction is rolled back by the server when the
4931 /// connection is closed.
4932 ///
4933 /// # Example
4934 ///
4935 /// ```rust,no_run
4936 /// # use oracle_rs::{Config, Connection};
4937 /// # async fn example() -> oracle_rs::Result<()> {
4938 /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
4939 /// let conn = Connection::connect_with_config(config).await?;
4940 ///
4941 /// // Do work...
4942 /// conn.commit().await?;
4943 ///
4944 /// // Explicitly close when done
4945 /// conn.close().await?;
4946 /// # Ok(())
4947 /// # }
4948 /// ```
4949 pub async fn close(&self) -> Result<()> {
4950 if self.closed.swap(true, Ordering::Relaxed) {
4951 // Already closed
4952 return Ok(());
4953 }
4954
4955 let mut inner = self.inner.lock().await;
4956
4957 if inner.state == ConnectionState::Ready {
4958 // Send logoff
4959 let _ = self.send_simple_function_inner(&mut inner, FunctionCode::Logoff).await;
4960 }
4961
4962 inner.state = ConnectionState::Closed;
4963
4964 // Close the TCP stream
4965 if let Some(stream) = inner.stream.take() {
4966 drop(stream);
4967 }
4968
4969 Ok(())
4970 }
4971
4972 /// Send a marker packet to the server
4973 /// marker_type: 1=BREAK, 2=RESET, 3=INTERRUPT
4974 async fn send_marker(&self, inner: &mut ConnectionInner, marker_type: u8) -> Result<()> {
4975 let mut packet_buf = WriteBuffer::new();
4976
4977 // Build MARKER packet header
4978 let packet_len = PACKET_HEADER_SIZE + 3; // Header + 3 bytes payload
4979 packet_buf.write_u16_be(packet_len as u16)?;
4980 packet_buf.write_u16_be(0)?; // Checksum
4981 packet_buf.write_u8(PacketType::Marker as u8)?;
4982 packet_buf.write_u8(0)?; // Flags
4983 packet_buf.write_u16_be(0)?; // Header checksum
4984
4985 // Marker payload: [1, 0, marker_type] per Python's _send_marker
4986 packet_buf.write_u8(1)?;
4987 packet_buf.write_u8(0)?;
4988 packet_buf.write_u8(marker_type)?;
4989
4990 inner.send(&packet_buf.freeze()).await
4991 }
4992
4993 async fn send_simple_function_inner(
4994 &self,
4995 inner: &mut ConnectionInner,
4996 function_code: FunctionCode,
4997 ) -> Result<()> {
4998 // Build function message
4999 let mut buf = WriteBuffer::new();
5000
5001 // Get next sequence number (must be done before sending)
5002 let seq_num = inner.next_sequence_number();
5003
5004 // Data flags
5005 buf.write_u16_be(0)?;
5006 // Message type: Function
5007 buf.write_u8(MessageType::Function as u8)?;
5008 // Function code
5009 buf.write_u8(function_code as u8)?;
5010 // Sequence number (tracked per connection)
5011 buf.write_u8(seq_num)?;
5012
5013 // Token number (required for TTC field version >= 18, i.e. Oracle 23ai)
5014 if inner.capabilities.ttc_field_version >= 18 {
5015 buf.write_ub8(0)?;
5016 }
5017
5018 // Build DATA packet
5019 let data_payload = buf.freeze();
5020 let mut packet_buf = WriteBuffer::new();
5021 let packet_len = PACKET_HEADER_SIZE + data_payload.len();
5022 packet_buf.write_u16_be(packet_len as u16)?;
5023 packet_buf.write_u16_be(0)?; // Checksum
5024 packet_buf.write_u8(PacketType::Data as u8)?;
5025 packet_buf.write_u8(0)?; // Flags
5026 packet_buf.write_u16_be(0)?; // Header checksum
5027 packet_buf.write_bytes(&data_payload)?;
5028
5029 let packet_bytes = packet_buf.freeze();
5030 inner.send(&packet_bytes).await?;
5031
5032 // Wait for response
5033 let response = inner.receive().await?;
5034
5035 // Check response
5036 if response.len() <= 4 {
5037 return Err(Error::Protocol("Response too short".to_string()));
5038 }
5039
5040 let packet_type = response[4];
5041
5042 // MARKER packet (type 12) - need to handle reset protocol
5043 if packet_type == PacketType::Marker as u8 {
5044 // Check marker type
5045 if response.len() >= PACKET_HEADER_SIZE + 3 {
5046 let marker_type = response[PACKET_HEADER_SIZE + 2];
5047
5048 // For BREAK marker (1), we need to do the reset protocol
5049 if marker_type == 1 {
5050 // For Logoff, Oracle may send BREAK to indicate "connection closing"
5051 // Don't try to do the full reset handshake - just return success
5052 if function_code == FunctionCode::Logoff {
5053 inner.state = ConnectionState::Closed;
5054 return Ok(());
5055 }
5056
5057 // The BREAK marker means the server is interrupting/breaking the current operation
5058 // We MUST complete the reset handshake or the connection will be in a bad state
5059
5060 // Send RESET marker to server
5061 if let Err(e) = self.send_marker(inner, 2).await {
5062 inner.state = ConnectionState::Closed;
5063 return Err(e);
5064 }
5065
5066 // Read and discard packets until we get RESET marker
5067 // This follows Python's _reset() logic
5068 let mut current_packet_type: u8;
5069 loop {
5070 match inner.receive().await {
5071 Ok(pkt) => {
5072 if pkt.len() < PACKET_HEADER_SIZE + 1 {
5073 break;
5074 }
5075 current_packet_type = pkt[4];
5076
5077 if current_packet_type == PacketType::Marker as u8 {
5078 if pkt.len() >= PACKET_HEADER_SIZE + 3 {
5079 let mk_type = pkt[PACKET_HEADER_SIZE + 2];
5080 if mk_type == 2 {
5081 // Got RESET marker, exit this loop
5082 break;
5083 }
5084 }
5085 } else {
5086 // Non-marker packet - unexpected during reset wait
5087 break;
5088 }
5089 }
5090 Err(e) => {
5091 inner.state = ConnectionState::Closed;
5092 return Err(e);
5093 }
5094 }
5095 }
5096
5097 // After RESET, continue reading while we still get MARKER packets
5098 // Some servers send multiple RESET markers, others send DATA response
5099 // Python comment: "some quit immediately" - meaning some servers close
5100 // the connection right after the reset handshake
5101 loop {
5102 match inner.receive().await {
5103 Ok(pkt) => {
5104 if pkt.len() < PACKET_HEADER_SIZE + 1 {
5105 break;
5106 }
5107 current_packet_type = pkt[4];
5108
5109 if current_packet_type == PacketType::Marker as u8 {
5110 // Another marker, continue reading
5111 continue;
5112 }
5113
5114 // Got a non-marker packet (probably DATA with error/status)
5115 if current_packet_type == PacketType::Data as u8 {
5116 if pkt.len() > PACKET_HEADER_SIZE + 2 {
5117 let msg_type = pkt[PACKET_HEADER_SIZE + 2];
5118 if msg_type == MessageType::Error as u8 {
5119 let payload = &pkt[PACKET_HEADER_SIZE..];
5120 let mut buf = ReadBuffer::from_slice(payload);
5121 buf.skip(2)?; // data flags
5122 buf.skip(1)?; // msg_type
5123 let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5124 if error_code != 0 {
5125 return Err(Error::OracleError {
5126 code: error_code,
5127 message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5128 });
5129 }
5130 }
5131 }
5132 }
5133 // Exit after processing non-marker packet
5134 break;
5135 }
5136 Err(_) => {
5137 // Error reading - connection might be closed
5138 // Python comment says "some quit immediately" - meaning some
5139 // servers close the connection after BREAK/RESET handshake.
5140 // For commit/rollback/logoff, treat this as success since
5141 // the operation was processed before the close.
5142 if matches!(function_code,
5143 FunctionCode::Logoff |
5144 FunctionCode::Commit |
5145 FunctionCode::Rollback
5146 ) {
5147 // The operation succeeded, but the server closed the connection
5148 // Mark connection as closed for future operations
5149 inner.state = ConnectionState::Closed;
5150 return Ok(());
5151 }
5152 // For other functions, this means connection is broken
5153 inner.state = ConnectionState::Closed;
5154 // Don't return error for Ping - treat as success
5155 if function_code == FunctionCode::Ping {
5156 return Ok(());
5157 }
5158 return Ok(()); // Conservative approach - treat as success
5159 }
5160 }
5161 }
5162
5163 return Ok(());
5164 }
5165 }
5166 // For non-BREAK markers, just return success
5167 return Ok(());
5168 }
5169
5170 // DATA packet (type 6)
5171 if packet_type == PacketType::Data as u8 {
5172 // Parse response to check for errors
5173 if response.len() > PACKET_HEADER_SIZE + 2 {
5174 let msg_type = response[PACKET_HEADER_SIZE + 2];
5175 if msg_type == MessageType::Error as u8 {
5176 // Parse the error info
5177 let payload = &response[PACKET_HEADER_SIZE..];
5178 let mut buf = ReadBuffer::from_slice(payload);
5179 buf.skip(2)?; // data flags
5180 buf.skip(1)?; // msg_type
5181 let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5182 if error_code != 0 {
5183 return Err(Error::OracleError {
5184 code: error_code,
5185 message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5186 });
5187 }
5188 }
5189 }
5190 return Ok(());
5191 }
5192
5193 Err(Error::Protocol(format!("Unexpected packet type {} for function call", packet_type)))
5194 }
5195
5196 /// Ensure the connection is ready for operations
5197 async fn ensure_ready(&self) -> Result<()> {
5198 if self.is_closed() {
5199 return Err(Error::ConnectionClosed);
5200 }
5201
5202 let inner = self.inner.lock().await;
5203 if inner.state != ConnectionState::Ready {
5204 return Err(Error::ConnectionNotReady);
5205 }
5206
5207 Ok(())
5208 }
5209}
5210
5211impl Drop for Connection {
5212 fn drop(&mut self) {
5213 // Note: Can't do async cleanup in Drop
5214 // Users should call close() explicitly
5215 self.closed.store(true, Ordering::Relaxed);
5216 }
5217}
5218
5219// =============================================================================
5220// Helper functions for get_type()
5221// =============================================================================
5222
5223/// Parse a type name into (schema, name) components
5224///
5225/// Handles formats like:
5226/// - "SCHEMA.TYPE_NAME" -> ("SCHEMA", "TYPE_NAME")
5227/// - "TYPE_NAME" -> (default_schema, "TYPE_NAME")
5228fn parse_type_name(type_name: &str, default_schema: &str) -> (String, String) {
5229 let parts: Vec<&str> = type_name.split('.').collect();
5230 match parts.len() {
5231 1 => (default_schema.to_uppercase(), parts[0].to_uppercase()),
5232 2 => (parts[0].to_uppercase(), parts[1].to_uppercase()),
5233 _ => {
5234 // Multiple dots - take first as schema, rest as name
5235 (parts[0].to_uppercase(), parts[1..].join(".").to_uppercase())
5236 }
5237 }
5238}
5239
5240/// Convert an Oracle type name from data dictionary to OracleType enum
5241fn oracle_type_from_name(type_name: &str) -> crate::constants::OracleType {
5242 use crate::constants::OracleType;
5243
5244 match type_name.to_uppercase().as_str() {
5245 "NUMBER" => OracleType::Number,
5246 "INTEGER" | "INT" | "SMALLINT" => OracleType::Number,
5247 "FLOAT" | "REAL" | "DOUBLE PRECISION" => OracleType::BinaryDouble,
5248 "BINARY_FLOAT" => OracleType::BinaryFloat,
5249 "BINARY_DOUBLE" => OracleType::BinaryDouble,
5250 "VARCHAR2" | "VARCHAR" | "NVARCHAR2" => OracleType::Varchar,
5251 "CHAR" | "NCHAR" => OracleType::Char,
5252 "DATE" => OracleType::Date,
5253 "TIMESTAMP" => OracleType::Timestamp,
5254 "TIMESTAMP WITH TIME ZONE" => OracleType::TimestampTz,
5255 "TIMESTAMP WITH LOCAL TIME ZONE" => OracleType::TimestampLtz,
5256 "RAW" => OracleType::Raw,
5257 "BLOB" => OracleType::Blob,
5258 "CLOB" | "NCLOB" => OracleType::Clob,
5259 "BOOLEAN" | "PL/SQL BOOLEAN" => OracleType::Boolean,
5260 "ROWID" | "UROWID" => OracleType::Rowid,
5261 "XMLTYPE" => OracleType::Varchar, // Treat XMLType as string for now
5262 _ => OracleType::Varchar, // Default to VARCHAR for unknown types
5263 }
5264}
5265
5266#[cfg(test)]
5267mod tests {
5268 use super::*;
5269 use crate::row::Value;
5270
5271 #[test]
5272 fn test_query_options_default() {
5273 let opts = QueryOptions::default();
5274 assert_eq!(opts.prefetch_rows, 100);
5275 assert_eq!(opts.array_size, 100);
5276 assert!(!opts.auto_commit);
5277 }
5278
5279 #[test]
5280 fn test_query_result_empty() {
5281 let result = QueryResult::empty();
5282 assert!(result.is_empty());
5283 assert_eq!(result.column_count(), 0);
5284 assert_eq!(result.row_count(), 0);
5285 assert!(result.first().is_none());
5286 }
5287
5288 #[test]
5289 fn test_query_result_with_rows() {
5290 let columns = vec![ColumnInfo::new("ID", crate::constants::OracleType::Number)];
5291 let rows = vec![Row::new(vec![Value::Integer(1)])];
5292
5293 let result = QueryResult {
5294 columns,
5295 rows,
5296 rows_affected: 0,
5297 has_more_rows: false,
5298 cursor_id: 1,
5299 };
5300
5301 assert!(!result.is_empty());
5302 assert_eq!(result.column_count(), 1);
5303 assert_eq!(result.row_count(), 1);
5304 assert!(result.first().is_some());
5305 assert!(result.column_by_name("ID").is_some());
5306 assert!(result.column_by_name("id").is_some()); // Case insensitive
5307 assert_eq!(result.column_index("ID"), Some(0));
5308 }
5309
5310 #[test]
5311 fn test_server_info_default() {
5312 let info = ServerInfo::default();
5313 assert!(info.version.is_empty());
5314 assert_eq!(info.session_id, 0);
5315 }
5316
5317 #[test]
5318 fn test_connection_state_transitions() {
5319 assert_eq!(ConnectionState::Disconnected, ConnectionState::Disconnected);
5320 assert_ne!(ConnectionState::Connected, ConnectionState::Ready);
5321 }
5322
5323 #[test]
5324 fn test_query_result_iterator() {
5325 let rows = vec![
5326 Row::new(vec![Value::Integer(1)]),
5327 Row::new(vec![Value::Integer(2)]),
5328 ];
5329 let result = QueryResult {
5330 columns: vec![],
5331 rows,
5332 rows_affected: 0,
5333 has_more_rows: false,
5334 cursor_id: 0,
5335 };
5336
5337 let collected: Vec<_> = result.iter().collect();
5338 assert_eq!(collected.len(), 2);
5339 }
5340
5341 #[test]
5342 fn test_query_result_into_iterator() {
5343 let rows = vec![
5344 Row::new(vec![Value::Integer(1)]),
5345 Row::new(vec![Value::Integer(2)]),
5346 ];
5347 let result = QueryResult {
5348 columns: vec![],
5349 rows,
5350 rows_affected: 0,
5351 has_more_rows: false,
5352 cursor_id: 0,
5353 };
5354
5355 let collected: Vec<Row> = result.into_iter().collect();
5356 assert_eq!(collected.len(), 2);
5357 }
5358}