oracle_rs/connection.rs
1//! Oracle database connection
2//!
3//! This module provides the main `Connection` type for interacting with Oracle databases.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use oracle_rs::{Connection, Config};
9//!
10//! #[tokio::main]
11//! async fn main() -> oracle_rs::Result<()> {
12//! // Create a connection
13//! let conn = Connection::connect("localhost:1521/ORCLPDB1", "user", "password").await?;
14//!
15//! // Execute a query
16//! let rows = conn.query("SELECT * FROM employees WHERE department_id = :1", &[&10]).await?;
17//!
18//! for row in rows {
19//! println!("{:?}", row);
20//! }
21//!
22//! // Commit and close
23//! conn.commit().await?;
24//! conn.close().await?;
25//! Ok(())
26//! }
27//! ```
28
29use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
30use std::sync::Arc;
31use bytes::Bytes;
32use tokio::io::{AsyncReadExt, AsyncWriteExt};
33use tokio::net::TcpStream;
34use tokio::sync::Mutex;
35
36use crate::batch::{BatchBinds, BatchResult};
37use crate::buffer::{ReadBuffer, WriteBuffer};
38use crate::transport::{TlsConfig, TlsOracleStream, connect_tls};
39use crate::capabilities::Capabilities;
40use crate::config::{Config, ServiceMethod};
41use crate::constants::{BindDirection, FetchOrientation, FunctionCode, MessageType, OracleType, PacketType, PACKET_HEADER_SIZE};
42use crate::cursor::{ScrollableCursor, ScrollResult};
43use crate::error::{Error, Result};
44use crate::implicit::{ImplicitResult, ImplicitResults};
45use crate::messages::{AcceptMessage, AuthMessage, AuthPhase, ConnectMessage, ExecuteMessage, ExecuteOptions, FetchMessage, LobOpMessage};
46use crate::packet::Packet;
47use crate::row::{Row, Value};
48use crate::statement::{BindParam, ColumnInfo, Statement, StatementType};
49use crate::types::{LobData, LobLocator, LobValue};
50use crate::statement_cache::StatementCache;
51
52/// Connection state
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ConnectionState {
55 /// Not connected
56 Disconnected,
57 /// TCP connection established
58 Connected,
59 /// Protocol negotiation complete
60 ProtocolNegotiated,
61 /// Data types negotiated
62 DataTypesNegotiated,
63 /// Fully authenticated and ready
64 Ready,
65 /// Connection is closed
66 Closed,
67}
68
69/// Options for query execution
70#[derive(Debug, Clone)]
71pub struct QueryOptions {
72 /// Number of rows to prefetch
73 pub prefetch_rows: u32,
74 /// Array size for batch operations
75 pub array_size: u32,
76 /// Whether to auto-commit after DML
77 pub auto_commit: bool,
78}
79
80impl Default for QueryOptions {
81 fn default() -> Self {
82 Self {
83 prefetch_rows: 100,
84 array_size: 100,
85 auto_commit: false,
86 }
87 }
88}
89
90/// Result set from a query
91#[derive(Debug)]
92pub struct QueryResult {
93 /// Column information
94 pub columns: Vec<ColumnInfo>,
95 /// Rows returned
96 pub rows: Vec<Row>,
97 /// Number of rows affected (for DML)
98 pub rows_affected: u64,
99 /// Whether there are more rows to fetch
100 pub has_more_rows: bool,
101 /// Cursor ID for subsequent fetches (needed for fetch_more)
102 pub cursor_id: u16,
103}
104
105impl QueryResult {
106 /// Create an empty query result
107 pub fn empty() -> Self {
108 Self {
109 columns: Vec::new(),
110 rows: Vec::new(),
111 rows_affected: 0,
112 has_more_rows: false,
113 cursor_id: 0,
114 }
115 }
116
117 /// Get the number of columns
118 pub fn column_count(&self) -> usize {
119 self.columns.len()
120 }
121
122 /// Get the number of rows
123 pub fn row_count(&self) -> usize {
124 self.rows.len()
125 }
126
127 /// Check if the result is empty
128 pub fn is_empty(&self) -> bool {
129 self.rows.is_empty()
130 }
131
132 /// Get a column by name
133 pub fn column_by_name(&self, name: &str) -> Option<&ColumnInfo> {
134 self.columns.iter().find(|c| c.name.eq_ignore_ascii_case(name))
135 }
136
137 /// Get column index by name
138 pub fn column_index(&self, name: &str) -> Option<usize> {
139 self.columns.iter().position(|c| c.name.eq_ignore_ascii_case(name))
140 }
141
142 /// Iterate over rows
143 pub fn iter(&self) -> impl Iterator<Item = &Row> {
144 self.rows.iter()
145 }
146
147 /// Get a single row (first row)
148 pub fn first(&self) -> Option<&Row> {
149 self.rows.first()
150 }
151}
152
153impl IntoIterator for QueryResult {
154 type Item = Row;
155 type IntoIter = std::vec::IntoIter<Row>;
156
157 fn into_iter(self) -> Self::IntoIter {
158 self.rows.into_iter()
159 }
160}
161
162/// Result from executing a PL/SQL block with OUT parameters
163#[derive(Debug)]
164pub struct PlsqlResult {
165 /// OUT parameter values indexed by position (0-based)
166 pub out_values: Vec<Value>,
167 /// Number of rows affected (if applicable)
168 pub rows_affected: u64,
169 /// Cursor ID (if the result contains a REF CURSOR)
170 pub cursor_id: Option<u16>,
171 /// Implicit result sets returned via DBMS_SQL.RETURN_RESULT
172 pub implicit_results: ImplicitResults,
173}
174
175impl PlsqlResult {
176 /// Create an empty PL/SQL result
177 pub fn empty() -> Self {
178 Self {
179 out_values: Vec::new(),
180 rows_affected: 0,
181 cursor_id: None,
182 implicit_results: ImplicitResults::new(),
183 }
184 }
185
186 /// Get an OUT value by position (0-based)
187 pub fn get(&self, index: usize) -> Option<&Value> {
188 self.out_values.get(index)
189 }
190
191 /// Get a string OUT value by position
192 pub fn get_string(&self, index: usize) -> Option<&str> {
193 self.out_values.get(index).and_then(|v| v.as_str())
194 }
195
196 /// Get an integer OUT value by position
197 pub fn get_integer(&self, index: usize) -> Option<i64> {
198 self.out_values.get(index).and_then(|v| v.as_i64())
199 }
200
201 /// Get a float OUT value by position
202 pub fn get_float(&self, index: usize) -> Option<f64> {
203 self.out_values.get(index).and_then(|v| v.as_f64())
204 }
205
206 /// Get a cursor ID from OUT value by position (for REF CURSOR)
207 pub fn get_cursor_id(&self, index: usize) -> Option<u16> {
208 self.out_values.get(index).and_then(|v| v.as_cursor_id())
209 }
210}
211
212/// Server information obtained during connection
213#[derive(Debug, Clone, Default)]
214pub struct ServerInfo {
215 /// Oracle version string
216 pub version: String,
217 /// Server banner
218 pub banner: String,
219 /// Session ID (SID)
220 pub session_id: u32,
221 /// Serial number
222 pub serial_number: u32,
223 /// Instance name
224 pub instance_name: Option<String>,
225 /// Service name
226 pub service_name: Option<String>,
227 /// Database name
228 pub database_name: Option<String>,
229 /// Negotiated protocol version
230 pub protocol_version: u16,
231 /// Whether server supports OOB (out of band) data
232 pub supports_oob: bool,
233}
234
235/// Stream type that can be either plain TCP or TLS-encrypted
236enum OracleStream {
237 /// Plain TCP connection
238 Plain(TcpStream),
239 /// TLS-encrypted connection
240 Tls(TlsOracleStream),
241}
242
243impl OracleStream {
244 async fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
245 match self {
246 OracleStream::Plain(stream) => {
247 AsyncReadExt::read_exact(stream, buf).await?;
248 Ok(())
249 }
250 OracleStream::Tls(stream) => {
251 AsyncReadExt::read_exact(stream, buf).await?;
252 Ok(())
253 }
254 }
255 }
256
257 async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
258 match self {
259 OracleStream::Plain(stream) => stream.write_all(buf).await,
260 OracleStream::Tls(stream) => stream.write_all(buf).await,
261 }
262 }
263
264 async fn flush(&mut self) -> std::io::Result<()> {
265 match self {
266 OracleStream::Plain(stream) => stream.flush().await,
267 OracleStream::Tls(stream) => stream.flush().await,
268 }
269 }
270
271}
272
273/// Internal connection state shared across async operations
274struct ConnectionInner {
275 stream: Option<OracleStream>,
276 capabilities: Capabilities,
277 state: ConnectionState,
278 server_info: ServerInfo,
279 sdu_size: u16,
280 large_sdu: bool,
281 /// Sequence number for TTC messages (increments per message)
282 sequence_number: u8,
283 /// Statement cache for prepared statement reuse
284 statement_cache: Option<StatementCache>,
285}
286
287impl ConnectionInner {
288 fn new_with_cache(cache_size: usize) -> Self {
289 Self {
290 stream: None,
291 capabilities: Capabilities::default(),
292 state: ConnectionState::Disconnected,
293 server_info: ServerInfo::default(),
294 sdu_size: 8192,
295 large_sdu: false,
296 sequence_number: 0,
297 statement_cache: if cache_size > 0 {
298 Some(StatementCache::new(cache_size))
299 } else {
300 None
301 },
302 }
303 }
304
305 /// Get the next sequence number (auto-increments, wraps at 255 to 1)
306 fn next_sequence_number(&mut self) -> u8 {
307 self.sequence_number = self.sequence_number.wrapping_add(1);
308 if self.sequence_number == 0 {
309 self.sequence_number = 1;
310 }
311 self.sequence_number
312 }
313
314 async fn send(&mut self, data: &[u8]) -> Result<()> {
315 if let Some(stream) = &mut self.stream {
316 stream.write_all(data).await?;
317 stream.flush().await?;
318 Ok(())
319 } else {
320 Err(Error::ConnectionClosed)
321 }
322 }
323
324 /// Send a payload that may need to be split across multiple packets.
325 ///
326 /// This is used for large LOB writes and other operations where the payload
327 /// exceeds the SDU size. The payload is split into multiple DATA packets,
328 /// each with proper headers.
329 ///
330 /// # Arguments
331 /// * `payload` - The raw message payload (without packet header or data flags)
332 /// * `data_flags` - The data flags for the first packet (typically 0)
333 async fn send_multi_packet(&mut self, payload: &[u8], data_flags: u16) -> Result<()> {
334 let stream = self.stream.as_mut().ok_or(Error::ConnectionClosed)?;
335
336 // Calculate max payload per packet: SDU - header (8) - data flags (2)
337 let max_payload_per_packet = self.sdu_size as usize - PACKET_HEADER_SIZE - 2;
338
339 let mut offset = 0;
340 let mut is_first = true;
341
342 while offset < payload.len() {
343 let remaining = payload.len() - offset;
344 let chunk_size = std::cmp::min(remaining, max_payload_per_packet);
345 let is_last = offset + chunk_size >= payload.len();
346
347 // Build packet
348 let packet_len = PACKET_HEADER_SIZE + 2 + chunk_size; // header + data flags + payload
349 let mut packet = Vec::with_capacity(packet_len);
350
351 // Header
352 if self.large_sdu {
353 packet.extend_from_slice(&(packet_len as u32).to_be_bytes());
354 } else {
355 packet.extend_from_slice(&(packet_len as u16).to_be_bytes());
356 packet.extend_from_slice(&[0, 0]); // Checksum
357 }
358 packet.push(PacketType::Data as u8);
359 packet.push(0); // Flags
360 packet.extend_from_slice(&[0, 0]); // Header checksum
361
362 // Data flags - only include on first packet
363 if is_first {
364 packet.extend_from_slice(&data_flags.to_be_bytes());
365 is_first = false;
366 } else {
367 // Continuation packets still need data flags position but value is 0
368 packet.extend_from_slice(&0u16.to_be_bytes());
369 }
370
371 // Payload chunk
372 packet.extend_from_slice(&payload[offset..offset + chunk_size]);
373
374 // Send this packet
375 stream.write_all(&packet).await?;
376
377 offset += chunk_size;
378
379 // Don't flush until the last packet to improve performance
380 if is_last {
381 stream.flush().await?;
382 }
383 }
384
385 Ok(())
386 }
387
388 async fn receive(&mut self) -> Result<bytes::Bytes> {
389 if let Some(stream) = &mut self.stream {
390 // Read packet header first (always 8 bytes)
391 // large_sdu only affects how the length field is interpreted, not header size
392 let mut header_buf = vec![0u8; PACKET_HEADER_SIZE];
393 stream.read_exact(&mut header_buf).await?;
394
395 // Parse header to get payload length
396 // In large_sdu mode, first 4 bytes are length; otherwise first 2 bytes
397 let packet_len = if self.large_sdu {
398 u32::from_be_bytes([header_buf[0], header_buf[1], header_buf[2], header_buf[3]])
399 as usize
400 } else {
401 u16::from_be_bytes([header_buf[0], header_buf[1]]) as usize
402 };
403
404 // Read remaining payload
405 let payload_len = packet_len.saturating_sub(PACKET_HEADER_SIZE);
406 let mut payload_buf = vec![0u8; payload_len];
407 if payload_len > 0 {
408 stream.read_exact(&mut payload_buf).await?;
409 }
410
411 // Combine header and payload
412 let mut full_packet = header_buf.clone();
413 full_packet.extend(payload_buf);
414
415 Ok(bytes::Bytes::from(full_packet))
416 } else {
417 Err(Error::ConnectionClosed)
418 }
419 }
420
421 /// Receive a complete response that may span multiple packets
422 ///
423 /// This method accumulates packets until the END_OF_RESPONSE flag is detected
424 /// in the data flags. It's used for operations like LOB reads that may return
425 /// data spanning multiple TNS packets.
426 ///
427 /// Returns the combined payload of all packets (excluding headers).
428 async fn receive_response(&mut self) -> Result<bytes::Bytes> {
429 use crate::constants::{data_flags, MessageType};
430
431 let mut accumulated_payload = Vec::new();
432 let mut is_first_packet = true;
433
434 loop {
435 let packet = self.receive().await?;
436
437 if packet.len() < PACKET_HEADER_SIZE {
438 return Err(Error::Protocol("Packet too small".to_string()));
439 }
440
441 // Check packet type - only DATA packets can be accumulated
442 let packet_type = packet[4];
443 if packet_type != PacketType::Data as u8 {
444 // Non-DATA packet (e.g., MARKER) - return as-is for special handling
445 return Ok(packet);
446 }
447
448 // Get payload (everything after the 8-byte header)
449 let payload = &packet[PACKET_HEADER_SIZE..];
450
451 if payload.len() < 2 {
452 return Err(Error::Protocol("DATA packet payload too small".to_string()));
453 }
454
455 // Read data flags (first 2 bytes of payload)
456 let data_flags_value = u16::from_be_bytes([payload[0], payload[1]]);
457
458 // Check for end of response - Python checks both flags and message type
459 let has_end_flag = (data_flags_value & data_flags::END_OF_RESPONSE) != 0;
460 let has_eof_flag = (data_flags_value & data_flags::EOF) != 0;
461
462 // Also check for EndOfResponse message type (header + 3 bytes with msg type 29)
463 let has_end_message = payload.len() == 3
464 && payload[2] == MessageType::EndOfResponse as u8;
465
466 // Accumulate payload first
467 if is_first_packet {
468 // First packet: include data flags in accumulated payload
469 accumulated_payload.extend_from_slice(payload);
470 is_first_packet = false;
471 } else {
472 // Subsequent packets: skip the data flags, append only the message data
473 accumulated_payload.extend_from_slice(&payload[2..]);
474 }
475
476 // Check for end of response using data flags from this packet
477 let is_end_of_response = has_end_flag || has_eof_flag || has_end_message;
478
479 // If data flags don't indicate end, scan the ACCUMULATED message data
480 // for terminal messages. We scan accumulated data (not just current packet)
481 // because messages can span packet boundaries.
482 let has_terminal_message = if !is_end_of_response && accumulated_payload.len() > 2 {
483 self.scan_for_terminal_message(&accumulated_payload[2..])
484 } else {
485 false
486 };
487
488 // Check if this is the last packet
489 if is_end_of_response || has_terminal_message {
490 break;
491 }
492 }
493
494 // Build a synthetic packet with combined payload
495 let total_len = PACKET_HEADER_SIZE + accumulated_payload.len();
496 let mut result = Vec::with_capacity(total_len);
497
498 // Build header
499 if self.large_sdu {
500 result.extend_from_slice(&(total_len as u32).to_be_bytes());
501 } else {
502 result.extend_from_slice(&(total_len as u16).to_be_bytes());
503 result.extend_from_slice(&[0, 0]); // Checksum
504 }
505 result.push(PacketType::Data as u8);
506 result.push(0); // Flags
507 result.extend_from_slice(&[0, 0]); // Header checksum
508
509 // Add combined payload
510 result.extend_from_slice(&accumulated_payload);
511
512 Ok(bytes::Bytes::from(result))
513 }
514
515 /// Scan message data for terminal message types (ERROR or END_OF_RESPONSE)
516 /// that indicate the response is complete.
517 ///
518 /// This is needed because Oracle doesn't always set the END_OF_RESPONSE flag
519 /// in the data flags for LOB operations. Instead, we must detect the terminal
520 /// message by parsing the message stream.
521 ///
522 /// NOTE: This is conservative - it only returns true if we can definitively
523 /// identify a terminal message. We avoid false positives by not scanning
524 /// raw byte values (which could match message type values by coincidence).
525 fn scan_for_terminal_message(&self, data: &[u8]) -> bool {
526 use crate::buffer::ReadBuffer;
527 use crate::constants::MessageType;
528
529 if data.is_empty() {
530 return false;
531 }
532
533 // Try to parse the message stream and look for ERROR or END_OF_RESPONSE
534 let mut buf = ReadBuffer::from_slice(data);
535
536 while buf.remaining() > 0 {
537 let msg_type = match buf.read_u8() {
538 Ok(t) => t,
539 Err(_) => return false, // Can't read, assume incomplete
540 };
541
542 // END_OF_RESPONSE is a standalone message with no additional data
543 if msg_type == MessageType::EndOfResponse as u8 {
544 return true;
545 }
546
547 // ERROR message indicates end of response for older Oracle
548 if msg_type == MessageType::Error as u8 {
549 // Error message found - this indicates end of response
550 return true;
551 }
552
553 // STATUS message also indicates end of response
554 if msg_type == MessageType::Status as u8 {
555 return true;
556 }
557
558 // LOB_DATA message - skip the data
559 if msg_type == MessageType::LobData as u8 {
560 // Read length-prefixed data and skip it
561 match buf.read_raw_bytes_chunked() {
562 Ok(_) => continue,
563 Err(_) => return false, // Incomplete LOB data, need more packets
564 }
565 }
566
567 // PARAMETER message (8) - this contains the updated locator and amount.
568 // For LOB write responses, PARAMETER is the first message and the response
569 // is relatively small (locator + error info). We can safely scan for
570 // ERROR/END_OF_RESPONSE bytes because the locator doesn't contain arbitrary
571 // binary data that would false-positive.
572 //
573 // For LOB read responses, LobData comes first and contains the actual data,
574 // which might contain bytes that match ERROR (4) or END_OF_RESPONSE (29).
575 // But since we skip LobData content, by the time we reach PARAMETER,
576 // the remaining data is just locator + error info.
577 if msg_type == MessageType::Parameter as u8 {
578 let remaining = buf.remaining_bytes();
579 // Check if ERROR (4) or END_OF_RESPONSE (29) appears in remaining bytes
580 // This is safe because PARAMETER data (locator + amount) doesn't contain
581 // arbitrary binary data that would false-positive.
582 if remaining.contains(&(MessageType::Error as u8))
583 || remaining.contains(&(MessageType::EndOfResponse as u8))
584 {
585 return true;
586 }
587 // If no terminal marker found, response might be incomplete
588 return false;
589 }
590
591 // For other unknown message types, we can't determine the end
592 return false;
593 }
594
595 false
596 }
597
598 /// Send a marker packet with the specified marker type
599 async fn send_marker(&mut self, marker_type: u8) -> Result<()> {
600 let mut buf = WriteBuffer::with_capacity(16);
601
602 // Build marker packet header
603 // Marker packet structure: [length][0x00][0x00][0x00][0x0c][flags][0x00][0x00] + payload
604 // Payload: [0x01][0x00][marker_type]
605 let payload_len = 3; // 0x01, 0x00, marker_type
606 let total_len = (PACKET_HEADER_SIZE + payload_len) as u16;
607
608 // Header
609 buf.write_u16_be(total_len)?;
610 buf.write_u16_be(0)?; // zeros in large_sdu position
611 buf.write_u8(PacketType::Marker as u8)?;
612 buf.write_u8(0)?; // flags
613 buf.write_u16_be(0)?; // reserved
614
615 // Payload
616 buf.write_u8(0x01)?; // constant
617 buf.write_u8(0x00)?; // constant
618 buf.write_u8(marker_type)?;
619
620 self.send(buf.as_slice()).await
621 }
622
623 /// Handle the reset protocol after receiving a MARKER packet
624 /// This sends a reset marker, waits for the reset response, then returns the error packet
625 /// Returns Err if the connection is closed after reset (some Oracle versions do this)
626 async fn handle_marker_reset(&mut self) -> Result<bytes::Bytes> {
627 const MARKER_TYPE_RESET: u8 = 2;
628
629 // Send reset marker
630 self.send_marker(MARKER_TYPE_RESET).await?;
631
632 // Read packets until we get a reset marker back
633 loop {
634 let packet = self.receive().await?;
635 if packet.len() < PACKET_HEADER_SIZE {
636 return Err(Error::Protocol("Invalid packet received".to_string()));
637 }
638
639 let packet_type = packet[4];
640
641 if packet_type == PacketType::Marker as u8 {
642 // Check if it's a reset marker
643 if packet.len() >= PACKET_HEADER_SIZE + 3 {
644 let marker_type = packet[PACKET_HEADER_SIZE + 2];
645 if marker_type == MARKER_TYPE_RESET {
646 break;
647 }
648 }
649 } else {
650 // Non-marker packet received unexpectedly during reset wait
651 return Ok(packet);
652 }
653 }
654
655 // Try to read the error packet (may need to skip additional marker packets first)
656 // Note: Some Oracle versions (like Oracle Free) may close the connection after reset
657 // instead of sending an error packet
658 loop {
659 match self.receive().await {
660 Ok(packet) => {
661 let packet_type = packet[4];
662
663 if packet_type != PacketType::Marker as u8 {
664 // This should be the error data packet
665 return Ok(packet);
666 }
667 // Skip additional marker packets
668 }
669 Err(_) => {
670 // Connection closed after reset - Oracle Free and some versions
671 // close the connection instead of sending the error details.
672 // This typically happens when:
673 // - Table or view doesn't exist
674 // - Insufficient privileges to access the object
675 // - Invalid SQL syntax
676 return Err(Error::ConnectionClosedByServer(
677 "Query failed - Oracle closed the connection without providing error details. \
678 This typically indicates insufficient privileges or the object doesn't exist.".to_string()
679 ));
680 }
681 }
682 }
683 }
684}
685
686/// An Oracle database connection.
687///
688/// This is the main type for interacting with Oracle databases. It provides
689/// methods for executing queries, DML statements, PL/SQL blocks, and managing
690/// transactions.
691///
692/// Connections are created using [`Connection::connect`] or
693/// [`Connection::connect_with_config`]. For connection pooling, use the
694/// `deadpool-oracle` crate.
695///
696/// # Example
697///
698/// ```rust,no_run
699/// use oracle_rs::{Config, Connection, Value};
700///
701/// # async fn example() -> oracle_rs::Result<()> {
702/// // Create a connection
703/// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
704/// let conn = Connection::connect_with_config(config).await?;
705///
706/// // Execute a query
707/// let result = conn.query("SELECT * FROM employees WHERE dept_id = :1", &[10.into()]).await?;
708/// for row in &result.rows {
709/// let name = row.get_by_name("name").and_then(|v| v.as_str()).unwrap_or("");
710/// println!("Employee: {}", name);
711/// }
712///
713/// // Execute DML with transaction
714/// conn.execute("INSERT INTO logs (msg) VALUES (:1)", &["Hello".into()]).await?;
715/// conn.commit().await?;
716///
717/// // Close the connection
718/// conn.close().await?;
719/// # Ok(())
720/// # }
721/// ```
722///
723/// # Thread Safety
724///
725/// `Connection` is `Send` and `Sync`, but operations are serialized internally
726/// via a mutex. For parallel query execution, use multiple connections (e.g.,
727/// via a connection pool).
728pub struct Connection {
729 inner: Arc<Mutex<ConnectionInner>>,
730 config: Config,
731 closed: AtomicBool,
732 id: u32,
733}
734
735// Connection ID counter
736static CONNECTION_ID_COUNTER: AtomicU32 = AtomicU32::new(1);
737
738impl Connection {
739 /// Create a new connection to an Oracle database
740 ///
741 /// # Arguments
742 ///
743 /// * `connect_string` - Connection string in EZConnect format (e.g., "host:port/service")
744 /// * `username` - Database username
745 /// * `password` - Database password
746 ///
747 /// # Example
748 ///
749 /// ```rust,ignore
750 /// let conn = Connection::connect("localhost:1521/ORCLPDB1", "scott", "tiger").await?;
751 /// ```
752 pub async fn connect(connect_string: &str, username: &str, password: &str) -> Result<Self> {
753 let mut config: Config = connect_string.parse()?;
754 config.username = username.to_string();
755 config.set_password(password);
756 Self::connect_with_config(config).await
757 }
758
759 /// Create a new connection using a [`Config`].
760 ///
761 /// This is the preferred way to create connections as it gives full control
762 /// over connection parameters including TLS, timeouts, and statement caching.
763 ///
764 /// # Example
765 ///
766 /// ```rust,no_run
767 /// use oracle_rs::{Config, Connection};
768 ///
769 /// # async fn example() -> oracle_rs::Result<()> {
770 /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password")
771 /// .with_statement_cache_size(50);
772 ///
773 /// let conn = Connection::connect_with_config(config).await?;
774 /// # Ok(())
775 /// # }
776 /// ```
777 pub async fn connect_with_config(config: Config) -> Result<Self> {
778 let id = CONNECTION_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
779
780 // Create TCP connection
781 let addr = config.socket_addr();
782 let tcp_stream = TcpStream::connect(&addr).await?;
783
784 // Set TCP options
785 tcp_stream.set_nodelay(true)?;
786
787 // Wrap with TLS if configured
788 let stream = if config.is_tls_enabled() {
789 let tls_config = config.tls_config.as_ref()
790 .cloned()
791 .unwrap_or_else(TlsConfig::new);
792
793 let tls_stream = connect_tls(tcp_stream, &config.host, &tls_config).await?;
794 OracleStream::Tls(tls_stream)
795 } else {
796 OracleStream::Plain(tcp_stream)
797 };
798
799 let mut inner = ConnectionInner::new_with_cache(config.stmtcachesize);
800 inner.stream = Some(stream);
801 inner.state = ConnectionState::Connected;
802
803 let conn = Connection {
804 inner: Arc::new(Mutex::new(inner)),
805 config,
806 closed: AtomicBool::new(false),
807 id,
808 };
809
810 // Perform connection handshake
811 conn.perform_handshake().await?;
812
813 Ok(conn)
814 }
815
816 /// Get the connection ID
817 pub fn id(&self) -> u32 {
818 self.id
819 }
820
821 /// Check if the connection is closed
822 pub fn is_closed(&self) -> bool {
823 self.closed.load(Ordering::Relaxed)
824 }
825
826 /// Mark the connection as closed
827 ///
828 /// This should be called when the underlying connection is detected as broken.
829 /// Once marked closed, `is_closed()` returns true and operations will fail fast.
830 pub fn mark_closed(&self) {
831 self.closed.store(true, Ordering::Relaxed);
832 }
833
834 /// Helper to mark connection as closed if the result is a connection error
835 fn handle_result<T>(&self, result: Result<T>) -> Result<T> {
836 if let Err(ref e) = result {
837 if e.is_connection_error() {
838 self.mark_closed();
839 }
840 }
841 result
842 }
843
844 /// Get server information
845 pub async fn server_info(&self) -> ServerInfo {
846 let inner = self.inner.lock().await;
847 inner.server_info.clone()
848 }
849
850 /// Get the current connection state
851 pub async fn state(&self) -> ConnectionState {
852 let inner = self.inner.lock().await;
853 inner.state
854 }
855
856 /// Perform the connection handshake
857 async fn perform_handshake(&self) -> Result<()> {
858 // Step 1: Send CONNECT packet and parse ACCEPT response
859 self.send_connect_packet().await?;
860
861 // Step 2: OOB check (required for protocol version >= 318 AND server supports OOB)
862 // Both conditions must be met - server must have indicated OOB support in ACCEPT
863 let needs_oob_check = {
864 let inner = self.inner.lock().await;
865 inner.server_info.protocol_version >= crate::constants::version::MIN_OOB_CHECK
866 && inner.server_info.supports_oob
867 };
868 if needs_oob_check {
869 self.send_oob_check().await?;
870 }
871
872 // Step 3: Protocol negotiation
873 self.negotiate_protocol().await?;
874
875 // Step 4: Data types negotiation
876 self.negotiate_data_types().await?;
877
878 // Step 5: Authentication
879 self.authenticate().await?;
880
881 Ok(())
882 }
883
884 /// Send OOB (Out of Band) check
885 /// Required for protocol version >= 318
886 async fn send_oob_check(&self) -> Result<()> {
887 let mut inner = self.inner.lock().await;
888 let large_sdu = inner.large_sdu;
889
890 // Step 1: Send raw byte "!" (0x21) for OOB check
891 inner.send(&[0x21]).await?;
892
893 // Step 2: Send MARKER packet with Reset
894 let marker_payload = [1u8, 0u8, crate::constants::MarkerType::Reset as u8];
895 let mut packet_buf = WriteBuffer::new();
896
897 if large_sdu {
898 packet_buf.write_u32_be((PACKET_HEADER_SIZE + marker_payload.len()) as u32)?;
899 } else {
900 packet_buf.write_u16_be((PACKET_HEADER_SIZE + marker_payload.len()) as u16)?;
901 packet_buf.write_u16_be(0)?; // Checksum
902 }
903 packet_buf.write_u8(PacketType::Marker as u8)?;
904 packet_buf.write_u8(0)?; // Flags
905 packet_buf.write_u16_be(0)?; // Header checksum
906 packet_buf.write_bytes(&marker_payload)?;
907
908 inner.send(&packet_buf.freeze()).await?;
909
910 // Step 3: Wait for OOB reset response
911 // The server sends back a MARKER packet or reset acknowledgment
912 let response = inner.receive().await?;
913
914 // Validate response - should be a MARKER packet type (12)
915 if response.len() > 4 && response[4] == PacketType::Marker as u8 {
916 Ok(())
917 } else {
918 // Server might just acknowledge without a specific packet
919 // This is acceptable in some Oracle versions
920 Ok(())
921 }
922 }
923
924 /// Send the initial CONNECT packet
925 async fn send_connect_packet(&self) -> Result<()> {
926 let mut inner = self.inner.lock().await;
927
928 // Build connect packet using ConnectMessage for proper packet format
929 let connect_msg = ConnectMessage::from_config(&self.config);
930 let (connect_packet, continuation) = connect_msg.build_with_continuation()?;
931
932 // Send the CONNECT packet
933 inner.send(&connect_packet).await?;
934
935 // If we have a continuation DATA packet (for large connect strings), send it
936 if let Some(ref data_packet) = continuation {
937 inner.send(data_packet).await?;
938 }
939
940 const MAX_RESENDS: u8 = 3;
941 let mut resend_count: u8 = 0;
942
943 loop {
944 // Wait for response
945 let response = inner.receive().await?;
946
947 // Parse response packet type
948 if response.len() < PACKET_HEADER_SIZE {
949 return Err(Error::PacketTooShort {
950 expected: PACKET_HEADER_SIZE,
951 actual: response.len(),
952 });
953 }
954
955 let packet_type = response[4];
956
957 match packet_type {
958 2 => {
959 // ACCEPT - parse the accept message to get protocol version and capabilities
960 let packet = Packet::from_bytes(response)?;
961 let accept = AcceptMessage::parse(&packet)?;
962
963 // Set large_sdu mode if protocol version >= 315
964 inner.large_sdu = accept.uses_large_sdu();
965
966 // Update server info
967 inner.server_info.protocol_version = accept.protocol_version;
968 inner.server_info.supports_oob = accept.supports_oob;
969 inner.sdu_size = accept.sdu.min(65535) as u16;
970
971 inner.state = ConnectionState::Connected;
972 return Ok(());
973 }
974 4 => {
975 // REFUSE
976 let mut buf = ReadBuffer::new(response.slice(PACKET_HEADER_SIZE..));
977 let _reason = buf.read_u8()?;
978 let _user_reason = buf.read_u8()?;
979
980 return Err(Error::ConnectionRefused {
981 error_code: None,
982 message: Some("Connection refused by server".to_string()),
983 });
984 }
985 5 => {
986 // REDIRECT
987 return Err(Error::ConnectionRedirect(
988 "redirect not implemented".to_string(),
989 ));
990 }
991 11 => {
992 // RESEND - server requests retransmission of the connect packet
993 resend_count += 1;
994 if resend_count > MAX_RESENDS {
995 return Err(Error::ProtocolError(
996 "Server requested too many resends during connect".to_string(),
997 ));
998 }
999 inner.send(&connect_packet).await?;
1000 if let Some(ref data_packet) = continuation {
1001 inner.send(data_packet).await?;
1002 }
1003 }
1004 _ => {
1005 return Err(Error::ProtocolError(format!(
1006 "Unexpected packet type during connect: {}",
1007 packet_type,
1008 )));
1009 }
1010 }
1011 }
1012 }
1013
1014 /// Negotiate protocol version and capabilities
1015 async fn negotiate_protocol(&self) -> Result<()> {
1016 use crate::messages::ProtocolMessage;
1017
1018 let mut inner = self.inner.lock().await;
1019 let large_sdu = inner.large_sdu;
1020
1021
1022 // Build protocol request (includes header)
1023 let protocol_msg = ProtocolMessage::new();
1024 let packet = protocol_msg.build_request(large_sdu)?;
1025 inner.send(&packet).await?;
1026
1027 // Receive response
1028 let response = inner.receive().await?;
1029
1030 // Validate packet type (at offset 4 for both SDU modes)
1031 if response.len() <= 4 || response[4] != PacketType::Data as u8 {
1032 return Err(Error::ProtocolError("Protocol negotiation failed".to_string()));
1033 }
1034
1035 // Parse the Protocol response to extract server capabilities
1036 // The payload starts after the 8-byte header
1037 let payload = &response[PACKET_HEADER_SIZE..];
1038 let mut protocol_msg = ProtocolMessage::new();
1039 protocol_msg.parse_response(payload, &mut inner.capabilities)?;
1040
1041 // Update server info with banner
1042 if let Some(banner) = &protocol_msg.server_banner {
1043 inner.server_info.banner = banner.clone();
1044 }
1045
1046 inner.state = ConnectionState::ProtocolNegotiated;
1047 Ok(())
1048 }
1049
1050 /// Negotiate data types
1051 async fn negotiate_data_types(&self) -> Result<()> {
1052 use crate::messages::DataTypesMessage;
1053
1054 let mut inner = self.inner.lock().await;
1055 let large_sdu = inner.large_sdu;
1056
1057
1058 // Build data types request using DataTypesMessage (includes all ~320 data types)
1059 let data_types_msg = DataTypesMessage::new();
1060 let packet = data_types_msg.build_request(&inner.capabilities, large_sdu)?;
1061 inner.send(&packet).await?;
1062
1063 // Receive response
1064 let response = inner.receive().await?;
1065
1066 // Basic validation - packet type is at offset 4 regardless of large_sdu
1067 if response.len() > 4 && response[4] == PacketType::Data as u8 {
1068 inner.state = ConnectionState::DataTypesNegotiated;
1069 Ok(())
1070 } else {
1071 Err(Error::ProtocolError("Data types negotiation failed".to_string()))
1072 }
1073 }
1074
1075 /// Perform authentication
1076 async fn authenticate(&self) -> Result<()> {
1077
1078 let service_name = match &self.config.service {
1079 ServiceMethod::ServiceName(name) => name.clone(),
1080 ServiceMethod::Sid(sid) => sid.clone(),
1081 };
1082
1083 let mut auth = AuthMessage::new(
1084 &self.config.username,
1085 self.config.password().as_bytes(),
1086 &service_name,
1087 );
1088
1089 // Phase one: send username and session info
1090 {
1091 let mut inner = self.inner.lock().await;
1092 let large_sdu = inner.large_sdu;
1093 let request = auth.build_request(&inner.capabilities, large_sdu)?;
1094 inner.send(&request).await?;
1095
1096 let response = inner.receive().await?;
1097 if response.len() <= PACKET_HEADER_SIZE {
1098 return Err(Error::Protocol("Empty auth response".to_string()));
1099 }
1100
1101 // Check for error message type
1102 if response.len() > PACKET_HEADER_SIZE + 2 {
1103 let msg_type = response[PACKET_HEADER_SIZE + 2];
1104 if msg_type == MessageType::Error as u8 {
1105 return Err(Error::AuthenticationFailed(
1106 "Server rejected authentication phase one".to_string(),
1107 ));
1108 }
1109 }
1110
1111 auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1112 }
1113
1114 // Phase two: send encrypted password
1115 if auth.phase() == AuthPhase::Two {
1116 let mut inner = self.inner.lock().await;
1117 let large_sdu = inner.large_sdu;
1118 let request = auth.build_request(&inner.capabilities, large_sdu)?;
1119 inner.send(&request).await?;
1120
1121 let response = inner.receive().await?;
1122 if response.len() <= PACKET_HEADER_SIZE {
1123 return Err(Error::Protocol("Empty auth phase two response".to_string()));
1124 }
1125
1126 // Check for error message type or marker
1127 let packet_type = response[4];
1128 if packet_type == 12 {
1129 // Marker - authentication failed
1130 return Err(Error::AuthenticationFailed("Server sent MARKER - authentication rejected".to_string()));
1131 }
1132
1133 if response.len() > PACKET_HEADER_SIZE + 2 {
1134 let msg_type = response[PACKET_HEADER_SIZE + 2];
1135 if msg_type == MessageType::Error as u8 {
1136 return Err(Error::InvalidCredentials);
1137 }
1138 }
1139
1140 auth.parse_response(&response[PACKET_HEADER_SIZE..])?;
1141 }
1142
1143 // Verify authentication completed
1144 if !auth.is_complete() {
1145 return Err(Error::AuthenticationFailed(
1146 "Authentication did not complete".to_string(),
1147 ));
1148 }
1149
1150 // Store combo key for later use (encrypted data)
1151 let mut inner = self.inner.lock().await;
1152 if let Some(combo_key) = auth.combo_key() {
1153 inner.capabilities.combo_key = Some(combo_key.to_vec());
1154 }
1155 // Auth used sequence numbers 1 and 2, set to 2 so next is 3
1156 inner.sequence_number = 2;
1157 inner.state = ConnectionState::Ready;
1158
1159 Ok(())
1160 }
1161
1162 /// Execute a SQL statement and return the result
1163 ///
1164 /// # Arguments
1165 ///
1166 /// * `sql` - SQL statement to execute
1167 /// * `params` - Bind parameters (use `Value::Integer`, `Value::String`, etc.)
1168 ///
1169 /// # Example
1170 ///
1171 /// ```rust,ignore
1172 /// use oracle_rs::Value;
1173 ///
1174 /// // Query with bind parameters
1175 /// let result = conn.execute(
1176 /// "SELECT * FROM employees WHERE department_id = :1",
1177 /// &[Value::Integer(10)]
1178 /// ).await?;
1179 ///
1180 /// // DML with bind parameters
1181 /// let result = conn.execute(
1182 /// "UPDATE employees SET salary = :1 WHERE employee_id = :2",
1183 /// &[Value::Integer(50000), Value::Integer(100)]
1184 /// ).await?;
1185 /// println!("Rows affected: {}", result.rows_affected);
1186 /// ```
1187 pub async fn execute(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1188 self.ensure_ready().await?;
1189
1190 // Check statement cache for existing prepared statement
1191 let (statement, from_cache) = {
1192 let mut inner = self.inner.lock().await;
1193 if let Some(ref mut cache) = inner.statement_cache {
1194 if let Some(cached_stmt) = cache.get(sql) {
1195 tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement (execute)");
1196 (cached_stmt, true)
1197 } else {
1198 (Statement::new(sql), false)
1199 }
1200 } else {
1201 (Statement::new(sql), false)
1202 }
1203 };
1204
1205 let result = match statement.statement_type() {
1206 StatementType::Query => self.execute_query_with_params(&statement, params).await,
1207 _ => self.execute_dml_with_params(&statement, params).await,
1208 };
1209
1210 // Return statement to cache or cache it for the first time
1211 match &result {
1212 Ok(query_result) => {
1213 let mut inner = self.inner.lock().await;
1214 if let Some(ref mut cache) = inner.statement_cache {
1215 let should_close_cursor = if statement.statement_type() == StatementType::Query {
1216 !query_result.has_more_rows
1217 } else {
1218 true // DML/DDL/PL-SQL: always close
1219 };
1220
1221 if from_cache {
1222 cache.return_statement(sql);
1223 if should_close_cursor {
1224 cache.mark_cursor_closed(sql);
1225 }
1226 } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1227 let mut stmt_to_cache = statement.clone();
1228 stmt_to_cache.set_cursor_id(query_result.cursor_id);
1229 stmt_to_cache.set_executed(true);
1230 cache.put(sql.to_string(), stmt_to_cache);
1231 if should_close_cursor {
1232 cache.mark_cursor_closed(sql);
1233 }
1234 }
1235 }
1236 }
1237 Err(_) => {
1238 if from_cache {
1239 let mut inner = self.inner.lock().await;
1240 if let Some(ref mut cache) = inner.statement_cache {
1241 cache.return_statement(sql);
1242 cache.mark_cursor_closed(sql);
1243 }
1244 }
1245 }
1246 }
1247
1248 result
1249 }
1250
1251 /// Execute a query and return rows
1252 ///
1253 /// # Example
1254 ///
1255 /// ```rust,ignore
1256 /// use oracle_rs::Value;
1257 ///
1258 /// let result = conn.query(
1259 /// "SELECT * FROM employees WHERE salary > :1",
1260 /// &[Value::Integer(50000)]
1261 /// ).await?;
1262 /// ```
1263 pub async fn query(&self, sql: &str, params: &[Value]) -> Result<QueryResult> {
1264 self.ensure_ready().await?;
1265
1266 // Check statement cache for existing prepared statement
1267 let (statement, from_cache) = {
1268 let mut inner = self.inner.lock().await;
1269 if let Some(ref mut cache) = inner.statement_cache {
1270 if let Some(cached_stmt) = cache.get(sql) {
1271 tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached statement");
1272 (cached_stmt, true)
1273 } else {
1274 (Statement::new(sql), false)
1275 }
1276 } else {
1277 (Statement::new(sql), false)
1278 }
1279 };
1280
1281 // If using cached statement, save the columns (Oracle won't resend on reexecute)
1282 let cached_columns = if from_cache {
1283 Some(statement.columns().to_vec())
1284 } else {
1285 None
1286 };
1287
1288 let mut result = self.execute_query_with_params(&statement, params).await;
1289
1290 // For cached statements, restore columns if Oracle didn't send them
1291 if let (Ok(ref mut query_result), Some(columns)) = (&mut result, cached_columns) {
1292 if query_result.columns.is_empty() && !columns.is_empty() {
1293 query_result.columns = columns;
1294 }
1295 }
1296
1297 // Return statement to cache or cache it for the first time
1298 match &result {
1299 Ok(query_result) => {
1300 let mut inner = self.inner.lock().await;
1301 if let Some(ref mut cache) = inner.statement_cache {
1302 if from_cache {
1303 cache.return_statement(sql);
1304 if !query_result.has_more_rows {
1305 cache.mark_cursor_closed(sql);
1306 }
1307 } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1308 let mut stmt_to_cache = statement.clone();
1309 stmt_to_cache.set_cursor_id(query_result.cursor_id);
1310 stmt_to_cache.set_executed(true);
1311 stmt_to_cache.set_columns(query_result.columns.clone());
1312 cache.put(sql.to_string(), stmt_to_cache);
1313 if !query_result.has_more_rows {
1314 cache.mark_cursor_closed(sql);
1315 }
1316 }
1317 }
1318 }
1319 Err(_) => {
1320 if from_cache {
1321 let mut inner = self.inner.lock().await;
1322 if let Some(ref mut cache) = inner.statement_cache {
1323 cache.return_statement(sql);
1324 cache.mark_cursor_closed(sql);
1325 }
1326 }
1327 }
1328 }
1329
1330 self.handle_result(result)
1331 }
1332
1333 /// Execute DML (INSERT, UPDATE, DELETE) and return rows affected
1334 pub async fn execute_dml_sql(&self, sql: &str, params: &[Value]) -> Result<u64> {
1335 self.ensure_ready().await?;
1336
1337 // Check statement cache for existing prepared statement
1338 let (statement, from_cache) = {
1339 let mut inner = self.inner.lock().await;
1340 if let Some(ref mut cache) = inner.statement_cache {
1341 if let Some(cached_stmt) = cache.get(sql) {
1342 tracing::trace!(sql = sql, cursor_id = cached_stmt.cursor_id(), "Using cached DML statement");
1343 (cached_stmt, true)
1344 } else {
1345 (Statement::new(sql), false)
1346 }
1347 } else {
1348 (Statement::new(sql), false)
1349 }
1350 };
1351
1352 let result = self.execute_dml_with_params(&statement, params).await;
1353
1354 // Return statement to cache or cache it for the first time
1355 // DML cursors are always closed after execution (no fetch phase)
1356 match &result {
1357 Ok(query_result) => {
1358 let mut inner = self.inner.lock().await;
1359 if let Some(ref mut cache) = inner.statement_cache {
1360 if from_cache {
1361 cache.return_statement(sql);
1362 cache.mark_cursor_closed(sql);
1363 } else if query_result.cursor_id > 0 && !statement.is_ddl() {
1364 let mut stmt_to_cache = statement.clone();
1365 stmt_to_cache.set_cursor_id(query_result.cursor_id);
1366 stmt_to_cache.set_executed(true);
1367 cache.put(sql.to_string(), stmt_to_cache);
1368 cache.mark_cursor_closed(sql);
1369 }
1370 }
1371 }
1372 Err(_) => {
1373 if from_cache {
1374 let mut inner = self.inner.lock().await;
1375 if let Some(ref mut cache) = inner.statement_cache {
1376 cache.return_statement(sql);
1377 cache.mark_cursor_closed(sql);
1378 }
1379 }
1380 }
1381 }
1382
1383 self.handle_result(result).map(|r| r.rows_affected)
1384 }
1385
1386 /// Execute a PL/SQL block with IN/OUT/INOUT parameters
1387 ///
1388 /// This method allows execution of PL/SQL anonymous blocks or procedure calls
1389 /// that have OUT or IN OUT parameters. The `params` slice specifies the direction
1390 /// and type of each bind parameter.
1391 ///
1392 /// # Arguments
1393 ///
1394 /// * `sql` - The PL/SQL block or procedure call
1395 /// * `params` - The bind parameters with direction information
1396 ///
1397 /// # Example
1398 ///
1399 /// ```rust,ignore
1400 /// use oracle_rs::{Connection, BindParam, OracleType, Value};
1401 ///
1402 /// // Call a procedure with IN and OUT parameters
1403 /// let result = conn.execute_plsql(
1404 /// "BEGIN get_employee_name(:1, :2); END;",
1405 /// &[
1406 /// BindParam::input(Value::Integer(100)), // IN: employee_id
1407 /// BindParam::output(OracleType::Varchar, 100), // OUT: employee_name
1408 /// ]
1409 /// ).await?;
1410 ///
1411 /// // Get the OUT parameter value
1412 /// let name = result.get_string(0).unwrap_or("Unknown");
1413 /// println!("Employee name: {}", name);
1414 /// ```
1415 ///
1416 /// # REF CURSOR Example
1417 ///
1418 /// ```rust,ignore
1419 /// use oracle_rs::{Connection, BindParam, Value};
1420 ///
1421 /// // Call a procedure that returns a REF CURSOR
1422 /// let result = conn.execute_plsql(
1423 /// "BEGIN OPEN :1 FOR SELECT * FROM employees; END;",
1424 /// &[BindParam::output_cursor()]
1425 /// ).await?;
1426 ///
1427 /// // Get the cursor ID and fetch rows
1428 /// if let Some(cursor_id) = result.get_cursor_id(0) {
1429 /// let rows = conn.fetch_cursor(cursor_id, 100).await?;
1430 /// for row in rows {
1431 /// println!("{:?}", row);
1432 /// }
1433 /// }
1434 /// ```
1435 pub async fn execute_plsql(&self, sql: &str, params: &[BindParam]) -> Result<PlsqlResult> {
1436 self.ensure_ready().await?;
1437
1438 let statement = Statement::new(sql);
1439
1440 // Build values for bind parameters
1441 // IN params: use provided value or Null
1442 // OUT params: use placeholder value (required for metadata, server ignores actual value)
1443 // INOUT params: use provided value or Null
1444 let bind_values: Vec<Value> = params
1445 .iter()
1446 .map(|p| {
1447 if p.direction == BindDirection::Output {
1448 // OUT params get a placeholder based on their type
1449 // Oracle still needs a value sent in the request (even though it's ignored)
1450 p.placeholder_value()
1451 } else {
1452 // IN and INOUT params use the provided value or Null
1453 p.value.clone().unwrap_or(Value::Null)
1454 }
1455 })
1456 .collect();
1457
1458 // Build bind metadata for proper buffer sizes
1459 // For OUTPUT params, use the user-specified buffer_size
1460 // For INPUT params, derive buffer_size from the actual value
1461 let bind_metadata: Vec<crate::messages::BindMetadata> = params
1462 .iter()
1463 .zip(bind_values.iter())
1464 .map(|(p, v)| {
1465 let buffer_size = if p.buffer_size > 0 {
1466 p.buffer_size
1467 } else {
1468 // Derive from value
1469 match v {
1470 Value::String(s) => std::cmp::max(s.len() as u32, 1),
1471 Value::Bytes(b) => std::cmp::max(b.len() as u32, 1),
1472 Value::Integer(_) | Value::Number(_) => 22, // Oracle NUMBER max size
1473 Value::Float(_) => 8, // BINARY_DOUBLE
1474 Value::Boolean(_) => 1,
1475 Value::Timestamp(_) => 13,
1476 Value::Date(_) => 7,
1477 Value::RowId(_) => 18,
1478 _ => 100, // Default fallback
1479 }
1480 };
1481 crate::messages::BindMetadata {
1482 oracle_type: p.oracle_type,
1483 buffer_size,
1484 }
1485 })
1486 .collect();
1487
1488 // Create execute message with PL/SQL options
1489 let options = ExecuteOptions::for_plsql();
1490 let mut execute_msg = ExecuteMessage::new(&statement, options);
1491 execute_msg.set_bind_values(bind_values);
1492 execute_msg.set_bind_metadata(bind_metadata);
1493
1494 let mut inner = self.inner.lock().await;
1495 let large_sdu = inner.large_sdu;
1496 let seq_num = inner.next_sequence_number();
1497 execute_msg.set_sequence_number(seq_num);
1498 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1499 inner.send(&request).await?;
1500
1501 // Receive response
1502 let response = inner.receive().await?;
1503 if response.len() <= PACKET_HEADER_SIZE {
1504 return Err(Error::Protocol("Empty PL/SQL response".to_string()));
1505 }
1506
1507 // Check for MARKER packet (indicates error - requires reset protocol)
1508 let packet_type = response[4];
1509 if packet_type == PacketType::Marker as u8 {
1510 // Handle marker reset protocol and get the error packet
1511 let error_response = inner.handle_marker_reset().await?;
1512 let payload = &error_response[PACKET_HEADER_SIZE..];
1513 // Parse error response to extract the actual Oracle error
1514 let _: QueryResult = self.parse_error_response(payload)?;
1515 return Err(Error::Protocol("PL/SQL execution failed".to_string()));
1516 }
1517
1518 // Parse the PL/SQL response
1519 let payload = &response[PACKET_HEADER_SIZE..];
1520 let caps = inner.capabilities.clone();
1521 drop(inner); // Release lock before parsing
1522
1523 self.parse_plsql_response(payload, &caps, params)
1524 }
1525
1526 /// Execute a batch of DML statements with multiple rows of bind values
1527 ///
1528 /// This method efficiently executes the same SQL statement multiple times
1529 /// with different bind values (executemany pattern).
1530 ///
1531 /// # Arguments
1532 ///
1533 /// * `batch` - The batch containing SQL and rows of bind values
1534 ///
1535 /// # Example
1536 ///
1537 /// ```rust,ignore
1538 /// use oracle_rs::{Connection, BatchBuilder, Value};
1539 ///
1540 /// let batch = BatchBuilder::new("INSERT INTO users (id, name) VALUES (:1, :2)")
1541 /// .add_row(vec![Value::Integer(1), Value::String("Alice".to_string())])
1542 /// .add_row(vec![Value::Integer(2), Value::String("Bob".to_string())])
1543 /// .with_row_counts()
1544 /// .build();
1545 ///
1546 /// let result = conn.execute_batch(&batch).await?;
1547 /// println!("Total rows affected: {}", result.total_rows_affected);
1548 /// ```
1549 pub async fn execute_batch(&self, batch: &BatchBinds) -> Result<BatchResult> {
1550 self.ensure_ready().await?;
1551
1552 // Validate the batch
1553 batch.validate()?;
1554
1555 if batch.rows.is_empty() {
1556 return Ok(BatchResult::new());
1557 }
1558
1559 // Build execute options for batch DML
1560 let mut options = ExecuteOptions::for_dml(batch.options.auto_commit);
1561 options.num_execs = batch.rows.len() as u32;
1562 options.batch_errors = batch.options.batch_errors;
1563 options.dml_row_counts = batch.options.array_dml_row_counts;
1564
1565 // Create execute message with batch bind values
1566 let mut execute_msg = ExecuteMessage::new(&batch.statement, options);
1567 execute_msg.set_batch_bind_values(batch.rows.clone());
1568
1569 let mut inner = self.inner.lock().await;
1570 let large_sdu = inner.large_sdu;
1571 let seq_num = inner.next_sequence_number();
1572 execute_msg.set_sequence_number(seq_num);
1573 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1574 inner.send(&request).await?;
1575
1576 // Receive response
1577 let mut response = inner.receive().await?;
1578 if response.len() <= PACKET_HEADER_SIZE {
1579 return Err(Error::Protocol("Empty batch response".to_string()));
1580 }
1581
1582 // Check packet type - handle MARKER packets
1583 let packet_type = response[4];
1584 if packet_type == PacketType::Marker as u8 {
1585 // Handle BREAK/RESET protocol same as regular DML
1586 response = self.handle_marker_protocol(&mut inner, response).await?;
1587 }
1588
1589 // Parse the batch response
1590 let payload = &response[PACKET_HEADER_SIZE..];
1591 drop(inner); // Release lock before parsing
1592
1593 self.parse_batch_response(payload, batch.rows.len(), batch.options.array_dml_row_counts)
1594 }
1595
1596 /// Handle MARKER packet protocol (BREAK/RESET)
1597 async fn handle_marker_protocol(
1598 &self,
1599 inner: &mut ConnectionInner,
1600 initial_response: Bytes,
1601 ) -> Result<Bytes> {
1602 // Extract marker type from packet
1603 let marker_type = if initial_response.len() >= PACKET_HEADER_SIZE + 3 {
1604 initial_response[PACKET_HEADER_SIZE + 2]
1605 } else {
1606 1 // Assume BREAK
1607 };
1608
1609 if marker_type == 1 {
1610 // BREAK marker - send RESET and wait for response
1611 self.send_marker(inner, 2).await?;
1612
1613 // Wait for RESET marker back
1614 loop {
1615 match inner.receive().await {
1616 Ok(pkt) => {
1617 if pkt.len() < PACKET_HEADER_SIZE + 1 {
1618 break;
1619 }
1620 let pkt_type = pkt[4];
1621 if pkt_type == PacketType::Marker as u8 {
1622 if pkt.len() >= PACKET_HEADER_SIZE + 3 {
1623 let mk_type = pkt[PACKET_HEADER_SIZE + 2];
1624 if mk_type == 2 {
1625 // Got RESET marker, break out
1626 break;
1627 }
1628 }
1629 } else if pkt_type == PacketType::Data as u8 {
1630 // Got DATA packet - return it as the response
1631 return Ok(pkt);
1632 } else {
1633 break;
1634 }
1635 }
1636 Err(e) => {
1637 inner.state = ConnectionState::Closed;
1638 return Err(e);
1639 }
1640 }
1641 }
1642
1643 // After RESET, continue receiving until we get DATA packet
1644 loop {
1645 match inner.receive().await {
1646 Ok(pkt) => {
1647 let pkt_type = pkt[4];
1648 if pkt_type == PacketType::Marker as u8 {
1649 continue;
1650 } else if pkt_type == PacketType::Data as u8 {
1651 return Ok(pkt);
1652 } else {
1653 return Err(Error::Protocol(format!(
1654 "Unexpected packet type {} after reset",
1655 pkt_type
1656 )));
1657 }
1658 }
1659 Err(e) => {
1660 inner.state = ConnectionState::Closed;
1661 return Err(e);
1662 }
1663 }
1664 }
1665 }
1666
1667 Ok(initial_response)
1668 }
1669
1670 /// Parse batch execution response
1671 fn parse_batch_response(
1672 &self,
1673 payload: &[u8],
1674 batch_size: usize,
1675 want_row_counts: bool,
1676 ) -> Result<BatchResult> {
1677 if payload.len() < 3 {
1678 return Err(Error::Protocol("Batch response too short".to_string()));
1679 }
1680
1681 let mut buf = ReadBuffer::from_slice(payload);
1682
1683 // Skip data flags
1684 buf.skip(2)?;
1685
1686 let mut rows_affected: u64 = 0;
1687 let mut row_counts: Option<Vec<u64>> = None;
1688 let mut end_of_response = false;
1689
1690 // Process messages until end_of_response or out of data
1691 while !end_of_response && buf.remaining() > 0 {
1692 let msg_type = buf.read_u8()?;
1693
1694 match msg_type {
1695 // Error (4) - may contain error or success info
1696 x if x == MessageType::Error as u8 => {
1697 let (error_code, error_msg, _cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
1698 rows_affected = row_count;
1699 if error_code != 0 && error_code != 1403 {
1700 return Err(Error::OracleError {
1701 code: error_code,
1702 message: error_msg.unwrap_or_default(),
1703 });
1704 }
1705 }
1706
1707 // Parameter (8) - return parameters (may contain row counts)
1708 x if x == MessageType::Parameter as u8 => {
1709 if let Some(counts) = self.parse_return_parameters_internal(&mut buf, want_row_counts)? {
1710 row_counts = Some(counts);
1711 }
1712 }
1713
1714 // Status (9) - call status
1715 x if x == MessageType::Status as u8 => {
1716 let _call_status = buf.read_ub4()?;
1717 let _end_to_end_seq = buf.read_ub2()?;
1718 }
1719
1720 // BitVector (21)
1721 21 => {
1722 let _num_columns_sent = buf.read_ub2()?;
1723 if buf.remaining() > 0 {
1724 let _byte = buf.read_u8()?;
1725 }
1726 }
1727
1728 // End of Response (29) - explicit end marker
1729 29 => {
1730 end_of_response = true;
1731 }
1732
1733 _ => {
1734 // Unknown message type - continue processing
1735 }
1736 }
1737 }
1738
1739 let mut result = BatchResult::new();
1740 result.total_rows_affected = rows_affected;
1741 result.success_count = batch_size;
1742 result.row_counts = row_counts;
1743
1744 Ok(result)
1745 }
1746
1747 /// Fetch more rows from an open cursor
1748 ///
1749 /// This method is used when a query result has `has_more_rows == true`
1750 /// to retrieve additional rows from the server.
1751 ///
1752 /// # Arguments
1753 ///
1754 /// * `cursor_id` - The cursor ID from a previous query result
1755 /// * `columns` - Column information from the original query
1756 /// * `fetch_size` - Number of rows to fetch
1757 ///
1758 /// # Example
1759 ///
1760 /// ```rust,ignore
1761 /// let mut result = conn.query("SELECT * FROM large_table", &[]).await?;
1762 /// let mut all_rows = result.rows.clone();
1763 ///
1764 /// while result.has_more_rows {
1765 /// result = conn.fetch_more(result.cursor_id, &result.columns, 100).await?;
1766 /// all_rows.extend(result.rows);
1767 /// }
1768 /// ```
1769 pub async fn fetch_more(
1770 &self,
1771 cursor_id: u16,
1772 columns: &[ColumnInfo],
1773 fetch_size: u32,
1774 ) -> Result<QueryResult> {
1775 self.ensure_ready().await?;
1776
1777 // Build fetch message
1778 let fetch_msg = FetchMessage::new(cursor_id, fetch_size);
1779
1780 let mut inner = self.inner.lock().await;
1781 let request = fetch_msg.build_request(&inner.capabilities)?;
1782 inner.send(&request).await?;
1783
1784 // Receive and parse response
1785 let response = inner.receive().await?;
1786 if response.len() <= PACKET_HEADER_SIZE {
1787 return Err(Error::Protocol("Empty fetch response".to_string()));
1788 }
1789
1790 // Parse row data from response
1791 let payload = &response[PACKET_HEADER_SIZE..];
1792 let caps = inner.capabilities.clone();
1793 drop(inner); // Release lock before parsing
1794 self.parse_fetch_response(payload, columns, &caps)
1795 }
1796
1797 /// Fetch rows from a REF CURSOR
1798 ///
1799 /// This method fetches rows from a REF CURSOR that was returned from a
1800 /// PL/SQL procedure or function. The cursor contains the column metadata
1801 /// and cursor ID needed to fetch the rows.
1802 ///
1803 /// # Arguments
1804 ///
1805 /// * `cursor` - The REF CURSOR returned from PL/SQL
1806 ///
1807 /// # Example
1808 ///
1809 /// ```rust,ignore
1810 /// use oracle_rs::{Connection, BindParam, Value};
1811 ///
1812 /// // Call a procedure that returns a REF CURSOR
1813 /// let result = conn.execute_plsql(
1814 /// "BEGIN OPEN :1 FOR SELECT id, name FROM employees; END;",
1815 /// &[BindParam::output_cursor()]
1816 /// ).await?;
1817 ///
1818 /// // Get the cursor and fetch rows
1819 /// if let Value::Cursor(cursor) = &result.out_values[0] {
1820 /// let rows = conn.fetch_cursor(cursor).await?;
1821 /// println!("Fetched {} rows", rows.row_count());
1822 /// for row in &rows.rows {
1823 /// println!("{:?}", row);
1824 /// }
1825 /// }
1826 /// ```
1827 pub async fn fetch_cursor(&self, cursor: &crate::types::RefCursor) -> Result<QueryResult> {
1828 self.fetch_cursor_with_size(cursor, 100).await
1829 }
1830
1831 /// Fetch rows from a REF CURSOR with a specified fetch size
1832 ///
1833 /// This is the same as `fetch_cursor` but allows specifying how many
1834 /// rows to fetch at once.
1835 ///
1836 /// REF CURSORs use an ExecuteMessage with only the FETCH option because
1837 /// the cursor is already open from the PL/SQL execution. The cursor_id
1838 /// and column metadata were obtained when the REF CURSOR was returned.
1839 ///
1840 /// # Arguments
1841 ///
1842 /// * `cursor` - The REF CURSOR returned from PL/SQL
1843 /// * `fetch_size` - Number of rows to fetch (default is 100)
1844 pub async fn fetch_cursor_with_size(
1845 &self,
1846 cursor: &crate::types::RefCursor,
1847 fetch_size: u32,
1848 ) -> Result<QueryResult> {
1849 use crate::messages::ExecuteMessage;
1850
1851 if cursor.cursor_id() == 0 {
1852 return Err(Error::InvalidCursor("Cursor ID is 0 (not initialized)".to_string()));
1853 }
1854
1855 self.ensure_ready().await?;
1856
1857 // REF CURSOR uses ExecuteMessage with FETCH only (no SQL, no EXECUTE)
1858 // Create a statement with the cursor's metadata
1859 let mut stmt = Statement::new(""); // No SQL for REF CURSOR
1860 stmt.set_cursor_id(cursor.cursor_id());
1861 stmt.set_columns(cursor.columns().to_vec());
1862 stmt.set_executed(true); // Already executed by Oracle
1863 stmt.set_statement_type(crate::statement::StatementType::Query); // This is a query cursor
1864
1865 // Build execute message with only FETCH option
1866 let options = crate::messages::ExecuteOptions::for_ref_cursor(fetch_size);
1867 let mut execute_msg = ExecuteMessage::new(&stmt, options);
1868
1869 let mut inner = self.inner.lock().await;
1870 let large_sdu = inner.large_sdu;
1871 let seq_num = inner.next_sequence_number();
1872 execute_msg.set_sequence_number(seq_num);
1873
1874 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
1875 inner.send(&request).await?;
1876
1877 // Receive and parse response
1878 let response = inner.receive().await?;
1879 if response.len() <= PACKET_HEADER_SIZE {
1880 return Err(Error::Protocol("Empty cursor response".to_string()));
1881 }
1882
1883 // Check for MARKER packet (indicates error)
1884 let packet_type = response[4];
1885 if packet_type == PacketType::Marker as u8 {
1886 let error_response = inner.handle_marker_reset().await?;
1887 let payload = &error_response[PACKET_HEADER_SIZE..];
1888 return self.parse_error_response(payload);
1889 }
1890
1891 // Parse query response - use cursor's columns since they're already defined
1892 let payload = &response[PACKET_HEADER_SIZE..];
1893 let caps = inner.capabilities.clone();
1894 drop(inner); // Release lock before parsing
1895 self.parse_fetch_response(payload, cursor.columns(), &caps)
1896 }
1897
1898 /// Fetch rows from an implicit result set
1899 ///
1900 /// Implicit results are returned via `DBMS_SQL.RETURN_RESULT` from PL/SQL.
1901 /// They contain cursor metadata but no rows until fetched.
1902 ///
1903 /// # Arguments
1904 ///
1905 /// * `result` - The implicit result from PL/SQL execution
1906 ///
1907 /// # Example
1908 ///
1909 /// ```rust,ignore
1910 /// let plsql_result = conn.execute_plsql(r#"
1911 /// declare
1912 /// c sys_refcursor;
1913 /// begin
1914 /// open c for select * from employees;
1915 /// dbms_sql.return_result(c);
1916 /// end;
1917 /// "#, &[]).await?;
1918 ///
1919 /// for implicit in plsql_result.implicit_results.iter() {
1920 /// let rows = conn.fetch_implicit_result(implicit).await?;
1921 /// println!("Fetched {} rows", rows.row_count());
1922 /// }
1923 /// ```
1924 pub async fn fetch_implicit_result(&self, result: &ImplicitResult) -> Result<QueryResult> {
1925 self.fetch_implicit_result_with_size(result, 100).await
1926 }
1927
1928 /// Fetch rows from an implicit result set with a specified fetch size
1929 pub async fn fetch_implicit_result_with_size(
1930 &self,
1931 result: &ImplicitResult,
1932 fetch_size: u32,
1933 ) -> Result<QueryResult> {
1934 // Convert implicit result to RefCursor and use fetch_cursor mechanism
1935 let cursor = crate::types::RefCursor::new(result.cursor_id, result.columns.clone());
1936 self.fetch_cursor_with_size(&cursor, fetch_size).await
1937 }
1938
1939 /// Parse fetch response to extract additional rows
1940 ///
1941 /// REF CURSOR fetch responses contain a series of messages:
1942 /// - RowHeader (6): Contains metadata about the following row data
1943 /// - RowData (7): Contains the actual row values
1944 /// - Error (4): Contains error info with cursor_id and row counts
1945 fn parse_fetch_response(&self, payload: &[u8], columns: &[ColumnInfo], caps: &Capabilities) -> Result<QueryResult> {
1946 if payload.len() < 3 {
1947 return Err(Error::Protocol("Fetch response too short".to_string()));
1948 }
1949
1950 let mut buf = ReadBuffer::from_slice(payload);
1951 let mut rows = Vec::new();
1952 let mut has_more_rows = false;
1953
1954 // Bit vector for duplicate column optimization
1955 let mut bit_vector: Option<Vec<u8>> = None;
1956 let mut previous_row_values: Option<Vec<Value>> = None;
1957
1958 // Skip data flags
1959 buf.skip(2)?;
1960
1961 // Process multiple messages in the response
1962 while buf.remaining() >= 1 {
1963 let msg_type = buf.read_u8()?;
1964
1965 match msg_type {
1966 x if x == MessageType::RowHeader as u8 => {
1967 // Skip row header metadata (per Python's _process_row_header)
1968 buf.skip(1)?; // flags
1969 buf.skip_ub2()?; // num requests
1970 buf.skip_ub4()?; // iteration number
1971 buf.skip_ub4()?; // num iters
1972 buf.skip_ub2()?; // buffer length
1973 let num_bytes = buf.read_ub4()?;
1974 if num_bytes > 0 {
1975 buf.skip(1)?; // skip repeated length
1976 // This bit vector in row header is for the following row data
1977 let bv = buf.read_bytes_vec(num_bytes as usize)?;
1978 bit_vector = Some(bv);
1979 }
1980 let rxhrid_bytes = buf.read_ub4()?;
1981 if rxhrid_bytes > 0 {
1982 buf.skip_raw_bytes_chunked()?;
1983 }
1984 }
1985 x if x == MessageType::RowData as u8 => {
1986 // Parse actual row data with bit vector support
1987 let row = self.parse_row_data_with_bitvector(
1988 &mut buf,
1989 columns,
1990 caps,
1991 bit_vector.as_deref(),
1992 previous_row_values.as_ref(),
1993 )?;
1994 previous_row_values = Some(row.values().to_vec());
1995 bit_vector = None;
1996 rows.push(row);
1997 }
1998 x if x == MessageType::BitVector as u8 => {
1999 // BitVector indicates which columns have actual data vs duplicates
2000 let _num_columns_sent = buf.read_ub2()?;
2001 let num_bytes = (columns.len() + 7) / 8; // Round up
2002 if num_bytes > 0 {
2003 let bv = buf.read_bytes_vec(num_bytes)?;
2004 bit_vector = Some(bv);
2005 }
2006 // Continue processing - RowData follows
2007 }
2008 x if x == MessageType::Error as u8 => {
2009 // Error message contains row count and cursor info
2010 let (error_code, error_msg, more_rows) = self.parse_error_message_info(&mut buf)?;
2011 has_more_rows = more_rows;
2012 if error_code != 0 && error_code != 1403 { // 1403 = no data found
2013 return Err(Error::OracleError {
2014 code: error_code,
2015 message: error_msg,
2016 });
2017 }
2018 break; // Error message marks end of response
2019 }
2020 x if x == MessageType::Status as u8 => {
2021 // Status message - usually marks end
2022 break;
2023 }
2024 x if x == MessageType::EndOfResponse as u8 => {
2025 break;
2026 }
2027 _ => {
2028 // Unknown message type - stop processing
2029 break;
2030 }
2031 }
2032 }
2033
2034 Ok(QueryResult {
2035 columns: columns.to_vec(),
2036 rows,
2037 rows_affected: 0,
2038 has_more_rows,
2039 cursor_id: 0,
2040 })
2041 }
2042
2043 /// Parse error message info including cursor_id and row counts
2044 fn parse_error_message_info(&self, buf: &mut ReadBuffer) -> Result<(u32, String, bool)> {
2045 let _call_status = buf.read_ub4()?; // end of call status
2046 buf.skip_ub2()?; // end to end seq#
2047 buf.skip_ub4()?; // current row number
2048 buf.skip_ub2()?; // error number
2049 buf.skip_ub2()?; // array elem error
2050 buf.skip_ub2()?; // array elem error
2051 let _cursor_id = buf.read_ub2()?; // cursor id
2052 let _error_pos = buf.read_sb2()?; // error position
2053 buf.skip(1)?; // sql type
2054 buf.skip(1)?; // fatal?
2055 buf.skip(1)?; // flags
2056 buf.skip(1)?; // user cursor options
2057 buf.skip(1)?; // UPI parameter
2058 let flags = buf.read_u8()?; // flags
2059 // Skip rowid - fixed 10 bytes in Oracle format
2060 buf.skip(10)?; // rowid is 10 bytes
2061 buf.skip_ub4()?; // OS error
2062 buf.skip(1)?; // statement number
2063 buf.skip(1)?; // call number
2064 buf.skip_ub2()?; // padding
2065 buf.skip_ub4()?; // success iters
2066 let num_bytes = buf.read_ub4()?; // oerrdd
2067 if num_bytes > 0 {
2068 buf.skip_raw_bytes_chunked()?;
2069 }
2070
2071 // Skip batch error codes
2072 let num_errors = buf.read_ub2()?;
2073 if num_errors > 0 {
2074 buf.skip_raw_bytes_chunked()?;
2075 }
2076
2077 // Skip batch error offsets
2078 let num_offsets = buf.read_ub4()?;
2079 if num_offsets > 0 {
2080 buf.skip_raw_bytes_chunked()?;
2081 }
2082
2083 // Skip batch error messages
2084 let temp16 = buf.read_ub2()?;
2085 if temp16 > 0 {
2086 buf.skip_raw_bytes_chunked()?;
2087 }
2088
2089 // Read extended error info
2090 let error_num = buf.read_ub4()?;
2091 let row_count = buf.read_ub8()?;
2092 let more_rows = row_count > 0 || (flags & 0x20) != 0;
2093
2094 // Read error message if present
2095 let error_msg = if error_num != 0 {
2096 buf.read_string_with_length()?.unwrap_or_default()
2097 } else {
2098 String::new()
2099 };
2100
2101 Ok((error_num, error_msg, more_rows))
2102 }
2103
2104 /// Open a scrollable cursor for bidirectional navigation
2105 ///
2106 /// Scrollable cursors allow moving forward and backward through result sets,
2107 /// jumping to specific positions, and fetching from various locations.
2108 ///
2109 /// # Arguments
2110 ///
2111 /// * `sql` - SQL query to execute
2112 ///
2113 /// # Example
2114 ///
2115 /// ```rust,ignore
2116 /// let mut cursor = conn.open_scrollable_cursor("SELECT * FROM employees").await?;
2117 ///
2118 /// // Move to different positions
2119 /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2120 /// let last = conn.scroll(&mut cursor, FetchOrientation::Last, 0).await?;
2121 /// let row5 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 5).await?;
2122 ///
2123 /// conn.close_cursor(&mut cursor).await?;
2124 /// ```
2125 pub async fn open_scrollable_cursor(&self, sql: &str) -> Result<ScrollableCursor> {
2126 self.ensure_ready().await?;
2127
2128 let statement = Statement::new(sql);
2129
2130 // For scrollable cursors, execute with scrollable flag and prefetch 1 row
2131 // to get column metadata. The scroll() method will fetch actual rows at
2132 // specific positions.
2133 let mut options = ExecuteOptions::for_query(1); // prefetch 1 row for column info
2134
2135 options.scrollable = true;
2136
2137 let mut execute_msg = ExecuteMessage::new(&statement, options);
2138
2139 let mut inner = self.inner.lock().await;
2140 let large_sdu = inner.large_sdu;
2141 let seq_num = inner.next_sequence_number();
2142 execute_msg.set_sequence_number(seq_num);
2143 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2144 inner.send(&request).await?;
2145
2146 // Receive and parse response
2147 let response = inner.receive().await?;
2148
2149 if response.len() <= PACKET_HEADER_SIZE {
2150 return Err(Error::Protocol("Empty scrollable cursor response".to_string()));
2151 }
2152
2153 // Check for MARKER packet (indicates error - requires reset protocol)
2154 let packet_type = response[4];
2155 if packet_type == PacketType::Marker as u8 {
2156 // Handle marker reset protocol and get the error packet
2157 let error_response = inner.handle_marker_reset().await?;
2158 let payload = &error_response[PACKET_HEADER_SIZE..];
2159 // Parse error response to extract the actual Oracle error
2160 let _: QueryResult = self.parse_error_response(payload)?;
2161 // If we get here without error, something unexpected happened
2162 return Err(Error::Protocol("Unexpected successful response after MARKER".to_string()));
2163 }
2164
2165 // Parse describe info to get columns
2166 let payload = &response[PACKET_HEADER_SIZE..];
2167 let result = self.parse_query_response(payload, &inner.capabilities)?;
2168
2169 Ok(ScrollableCursor::new(result.cursor_id, result.columns))
2170 }
2171
2172 /// Scroll to a position in a scrollable cursor and fetch rows
2173 ///
2174 /// # Arguments
2175 ///
2176 /// * `cursor` - The scrollable cursor to scroll
2177 /// * `orientation` - The direction/mode of scrolling
2178 /// * `offset` - Position offset (used for Absolute and Relative modes)
2179 ///
2180 /// # Example
2181 ///
2182 /// ```rust,ignore
2183 /// // Go to first row
2184 /// let first = conn.scroll(&mut cursor, FetchOrientation::First, 0).await?;
2185 ///
2186 /// // Go to absolute position 10
2187 /// let row10 = conn.scroll(&mut cursor, FetchOrientation::Absolute, 10).await?;
2188 ///
2189 /// // Move 5 rows forward from current position
2190 /// let plus5 = conn.scroll(&mut cursor, FetchOrientation::Relative, 5).await?;
2191 ///
2192 /// // Move 3 rows backward
2193 /// let minus3 = conn.scroll(&mut cursor, FetchOrientation::Relative, -3).await?;
2194 /// ```
2195 pub async fn scroll(
2196 &self,
2197 cursor: &mut ScrollableCursor,
2198 orientation: FetchOrientation,
2199 offset: i64,
2200 ) -> Result<ScrollResult> {
2201 self.ensure_ready().await?;
2202
2203 if !cursor.is_open() {
2204 return Err(Error::CursorClosed);
2205 }
2206
2207 // Create a statement for the scroll operation (uses the existing cursor)
2208 let mut stmt = Statement::new("");
2209 stmt.set_cursor_id(cursor.cursor_id);
2210 stmt.set_columns(cursor.columns.clone());
2211 stmt.set_executed(true);
2212 stmt.set_statement_type(crate::statement::StatementType::Query);
2213
2214 // Build execute message with scroll_operation=true
2215 let mut options = ExecuteOptions::for_query(1);
2216 options.scrollable = true;
2217 options.scroll_operation = true;
2218 options.fetch_orientation = orientation as u32;
2219 // Calculate fetch_pos based on orientation
2220 options.fetch_pos = match orientation {
2221 FetchOrientation::First => 1,
2222 FetchOrientation::Last => 0, // Server calculates
2223 FetchOrientation::Absolute => offset.max(0) as u32,
2224 FetchOrientation::Relative => (cursor.position + offset).max(0) as u32,
2225 FetchOrientation::Next => (cursor.position + 1).max(0) as u32,
2226 FetchOrientation::Prior => (cursor.position - 1).max(0) as u32,
2227 FetchOrientation::Current => cursor.position.max(0) as u32,
2228 };
2229
2230 let mut execute_msg = ExecuteMessage::new(&stmt, options);
2231
2232 let mut inner = self.inner.lock().await;
2233 let large_sdu = inner.large_sdu;
2234 let seq_num = inner.next_sequence_number();
2235 execute_msg.set_sequence_number(seq_num);
2236 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2237 inner.send(&request).await?;
2238
2239 // Receive and parse response
2240 let response = inner.receive().await?;
2241 if response.len() <= PACKET_HEADER_SIZE {
2242 return Err(Error::Protocol("Empty scroll response".to_string()));
2243 }
2244
2245 // Check for MARKER packet
2246 let packet_type = response[4];
2247 if packet_type == PacketType::Marker as u8 {
2248 let error_response = inner.handle_marker_reset().await?;
2249 let payload = &error_response[PACKET_HEADER_SIZE..];
2250 let _: QueryResult = self.parse_error_response(payload)?;
2251 return Err(Error::Protocol("Scroll operation failed".to_string()));
2252 }
2253
2254 let payload = &response[PACKET_HEADER_SIZE..];
2255 // Use cursor's columns since Oracle doesn't re-send column metadata for scroll operations
2256 let query_result = self.parse_query_response_with_columns(payload, &inner.capabilities, &cursor.columns)?;
2257
2258 // Use position from Oracle's response (rows_affected contains the row position)
2259 // For scrollable cursors, Oracle returns the row number in error_info.rowcount
2260 let new_position = if !query_result.rows.is_empty() {
2261 // Position is the actual row number from Oracle
2262 query_result.rows_affected as i64
2263 } else {
2264 // No rows returned - calculate position based on orientation
2265 match orientation {
2266 FetchOrientation::First => 0, // Before first
2267 FetchOrientation::Last => cursor.row_count.unwrap_or(0) as i64 + 1, // After last
2268 FetchOrientation::Next => cursor.position + 1,
2269 FetchOrientation::Prior => cursor.position - 1,
2270 FetchOrientation::Absolute => offset,
2271 FetchOrientation::Relative => cursor.position + offset,
2272 FetchOrientation::Current => cursor.position,
2273 }
2274 };
2275
2276 cursor.update_position(new_position);
2277
2278 let mut result = ScrollResult::new(query_result.rows, new_position);
2279 result.at_end = !query_result.has_more_rows;
2280 result.at_beginning = new_position <= 1;
2281
2282 Ok(result)
2283 }
2284
2285 /// Close a scrollable cursor
2286 ///
2287 /// # Arguments
2288 ///
2289 /// * `cursor` - The scrollable cursor to close
2290 pub async fn close_cursor(&self, cursor: &mut ScrollableCursor) -> Result<()> {
2291 if !cursor.is_open() {
2292 return Ok(());
2293 }
2294
2295 // Send close cursor message
2296 // For now, just mark it as closed - the cursor will be cleaned up
2297 // when the connection is closed or reused
2298 cursor.mark_closed();
2299 Ok(())
2300 }
2301
2302 /// Get type information for a database object or collection type
2303 ///
2304 /// This method queries Oracle's data dictionary to retrieve type metadata
2305 /// for collections (VARRAY, Nested Table) and user-defined object types.
2306 ///
2307 /// # Arguments
2308 ///
2309 /// * `type_name` - Fully qualified type name (e.g., "SCHEMA.TYPE_NAME" or just "TYPE_NAME")
2310 ///
2311 /// # Returns
2312 ///
2313 /// A `DbObjectType` containing the type metadata, including:
2314 /// - Schema and type name
2315 /// - Whether it's a collection
2316 /// - Collection type (VARRAY, Nested Table, etc.)
2317 /// - Element type for collections
2318 ///
2319 /// # Example
2320 ///
2321 /// ```rust,ignore
2322 /// let number_array = conn.get_type("MY_NUMBER_ARRAY").await?;
2323 /// assert!(number_array.is_collection);
2324 /// ```
2325 pub async fn get_type(&self, type_name: &str) -> Result<crate::dbobject::DbObjectType> {
2326 use crate::dbobject::{CollectionType, DbObjectType};
2327
2328 self.ensure_ready().await?;
2329
2330 // Parse type name into schema and name
2331 let (schema, name) = parse_type_name(type_name, &self.config.username);
2332
2333 // First, query ALL_TYPES to get basic type info
2334 let type_info = self
2335 .query(
2336 "SELECT typecode, type_oid FROM all_types WHERE owner = :1 AND type_name = :2",
2337 &[Value::String(schema.clone()), Value::String(name.clone())],
2338 )
2339 .await?;
2340
2341 if type_info.rows.is_empty() {
2342 return Err(Error::OracleError {
2343 code: 4043, // ORA-04043: object does not exist
2344 message: format!("Type {}.{} not found", schema, name),
2345 });
2346 }
2347
2348 let row = &type_info.rows[0];
2349 let typecode = row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2350 let type_oid = row.get(1).and_then(|v| v.as_bytes()).map(|b| b.to_vec());
2351
2352 // Check if it's a collection
2353 if typecode == "COLLECTION" {
2354 // Query ALL_COLL_TYPES for collection details
2355 let coll_info = self.query(
2356 "SELECT coll_type, elem_type_name, elem_type_owner, upper_bound FROM all_coll_types WHERE owner = :1 AND type_name = :2",
2357 &[Value::String(schema.clone()), Value::String(name.clone())],
2358 ).await?;
2359
2360 if coll_info.rows.is_empty() {
2361 return Err(Error::OracleError {
2362 code: 4043,
2363 message: format!("Collection type {}.{} metadata not found", schema, name),
2364 });
2365 }
2366
2367 let coll_row = &coll_info.rows[0];
2368 let coll_type_str = coll_row.get(0).and_then(|v| v.as_str()).unwrap_or("");
2369 let elem_type_name = coll_row.get(1).and_then(|v| v.as_str()).unwrap_or("VARCHAR2");
2370 let _elem_type_owner = coll_row.get(2).and_then(|v| v.as_str());
2371
2372 let collection_type = match coll_type_str {
2373 "VARYING ARRAY" => CollectionType::Varray,
2374 "TABLE" => CollectionType::NestedTable,
2375 _ => CollectionType::Varray, // Default
2376 };
2377
2378 let element_type = oracle_type_from_name(elem_type_name);
2379
2380 let mut obj_type = DbObjectType::collection(schema, name, collection_type, element_type);
2381 obj_type.oid = type_oid;
2382 Ok(obj_type)
2383 } else {
2384 // Regular object type (not yet fully implemented)
2385 let mut obj_type = DbObjectType::new(schema, name);
2386 obj_type.oid = type_oid;
2387 Ok(obj_type)
2388 }
2389 }
2390
2391 /// Internal: Execute a query statement with optional bind parameters
2392 async fn execute_query_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2393 let prefetch_rows = 100; // Default prefetch
2394
2395 // For first execution, check if we might have LOBs (no prefetch for safety)
2396 // This can be optimized later with describe-only first
2397 let options = ExecuteOptions::for_query(prefetch_rows);
2398 let mut execute_msg = ExecuteMessage::new(statement, options);
2399
2400 // Set bind values if provided
2401 if !params.is_empty() {
2402 execute_msg.set_bind_values(params.to_vec());
2403 }
2404
2405 let mut inner = self.inner.lock().await;
2406 let large_sdu = inner.large_sdu;
2407 let seq_num = inner.next_sequence_number();
2408 execute_msg.set_sequence_number(seq_num);
2409 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2410 inner.send(&request).await?;
2411
2412 // Receive and parse response
2413 let response = inner.receive().await?;
2414 if response.len() <= PACKET_HEADER_SIZE {
2415 return Err(Error::Protocol("Empty query response".to_string()));
2416 }
2417
2418 // Check for MARKER packet (indicates error - requires reset protocol)
2419 let packet_type = response[4];
2420 if packet_type == PacketType::Marker as u8 {
2421 // Handle marker reset protocol and get the error packet
2422 let error_response = inner.handle_marker_reset().await?;
2423 let payload = &error_response[PACKET_HEADER_SIZE..];
2424 return self.parse_error_response(payload);
2425 }
2426
2427 // Parse the response to extract columns and rows
2428 let payload = &response[PACKET_HEADER_SIZE..];
2429 let mut result = self.parse_query_response(payload, &inner.capabilities)?;
2430
2431 // Check if any columns are LOB types that require defines
2432 let has_lob_columns = result.columns.iter().any(|col| col.is_lob());
2433
2434 if has_lob_columns && !statement.requires_define() {
2435 // We need to re-execute with column defines
2436 // Create a modified statement with the define flag set
2437 let mut stmt_with_define = statement.clone();
2438 stmt_with_define.set_columns(result.columns.clone());
2439 stmt_with_define.set_cursor_id(result.cursor_id);
2440 stmt_with_define.set_requires_define(true);
2441 stmt_with_define.set_no_prefetch(true);
2442 stmt_with_define.set_executed(true);
2443
2444 // Re-execute with defines
2445 let define_options = ExecuteOptions::for_query(prefetch_rows);
2446 let mut define_msg = ExecuteMessage::new(&stmt_with_define, define_options);
2447 let seq_num = inner.next_sequence_number();
2448 define_msg.set_sequence_number(seq_num);
2449
2450 let define_request = define_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2451 inner.send(&define_request).await?;
2452
2453 // Receive the re-execute response
2454 let define_response = inner.receive().await?;
2455 if define_response.len() <= PACKET_HEADER_SIZE {
2456 return Err(Error::Protocol("Empty define response".to_string()));
2457 }
2458
2459 // Check for MARKER packet
2460 let packet_type = define_response[4];
2461 if packet_type == PacketType::Marker as u8 {
2462 let error_response = inner.handle_marker_reset().await?;
2463 let payload = &error_response[PACKET_HEADER_SIZE..];
2464 return self.parse_error_response(payload);
2465 }
2466
2467 // Parse the response with LOB data, using the columns we already know
2468 let payload = &define_response[PACKET_HEADER_SIZE..];
2469 result = self.parse_query_response_with_columns(
2470 payload,
2471 &inner.capabilities,
2472 &stmt_with_define.columns(),
2473 )?;
2474 }
2475
2476 Ok(result)
2477 }
2478
2479 /// Internal: Execute a DML statement with optional bind parameters
2480 async fn execute_dml_with_params(&self, statement: &Statement, params: &[Value]) -> Result<QueryResult> {
2481 let options = ExecuteOptions::for_dml(false); // Don't auto-commit
2482 let mut execute_msg = ExecuteMessage::new(statement, options);
2483
2484 // Set bind values if provided
2485 if !params.is_empty() {
2486 execute_msg.set_bind_values(params.to_vec());
2487 }
2488
2489 let mut inner = self.inner.lock().await;
2490 let large_sdu = inner.large_sdu;
2491 let seq_num = inner.next_sequence_number();
2492 execute_msg.set_sequence_number(seq_num);
2493 let request = execute_msg.build_request_with_sdu(&inner.capabilities, large_sdu)?;
2494
2495 inner.send(&request).await?;
2496
2497 // Receive response
2498 let mut response = inner.receive().await?;
2499 if response.len() <= PACKET_HEADER_SIZE {
2500 return Err(Error::Protocol("Empty DML response".to_string()));
2501 }
2502
2503 // Check packet type - handle MARKER packets
2504 let packet_type = response[4];
2505 if packet_type == PacketType::Marker as u8 {
2506 // Need to do reset protocol and then get the actual response
2507 // Extract marker type from packet: after header (8 bytes) + data flags (2 bytes) + marker type (1 byte)
2508 let marker_type = if response.len() >= PACKET_HEADER_SIZE + 3 {
2509 response[PACKET_HEADER_SIZE + 2]
2510 } else {
2511 1 // Assume BREAK
2512 };
2513
2514 if marker_type == 1 {
2515 // BREAK marker - send RESET and wait for response
2516 self.send_marker(&mut inner, 2).await?;
2517
2518 // Wait for RESET marker back or DATA packet with error
2519 // The server may send: RESET marker, or BREAK marker followed by DATA, or just close
2520 let mut got_reset = false;
2521 let mut max_attempts = 5;
2522 loop {
2523 match inner.receive().await {
2524 Ok(pkt) => {
2525 if pkt.len() < PACKET_HEADER_SIZE + 1 {
2526 break;
2527 }
2528 let pkt_type = pkt[4];
2529 if pkt_type == PacketType::Marker as u8 {
2530 if pkt.len() >= PACKET_HEADER_SIZE + 3 {
2531 let mk_type = pkt[PACKET_HEADER_SIZE + 2];
2532 if mk_type == 2 {
2533 // Got RESET marker, break out
2534 got_reset = true;
2535 break;
2536 }
2537 // Got another BREAK marker - server may be acknowledging
2538 // Keep trying a few times
2539 max_attempts -= 1;
2540 if max_attempts == 0 {
2541 // Give up and return a generic error
2542 return Err(Error::Protocol(
2543 "Server rejected operation (multiple BREAK markers)".to_string()
2544 ));
2545 }
2546 continue;
2547 }
2548 } else if pkt_type == PacketType::Data as u8 {
2549 // Got DATA packet - use this as the response (may contain error)
2550 let payload = &pkt[PACKET_HEADER_SIZE..];
2551 return self.parse_dml_response(payload);
2552 } else {
2553 break;
2554 }
2555 }
2556 Err(e) => {
2557 // If we get EOF, the server may have closed the connection
2558 // This can happen with temp LOB binding issues
2559 inner.state = ConnectionState::Closed;
2560 return Err(Error::Protocol(format!(
2561 "Server closed connection during error handling: {}",
2562 e
2563 )));
2564 }
2565 }
2566 }
2567
2568 // After RESET, try to receive DATA packet with error response
2569 // Note: Some Oracle servers "quit immediately" after reset without sending
2570 // an error packet. In that case, we'll get EOF which is handled below.
2571 if got_reset {
2572 loop {
2573 match inner.receive().await {
2574 Ok(pkt) => {
2575 if pkt.len() < PACKET_HEADER_SIZE + 1 {
2576 // Too short, connection may be closing
2577 break;
2578 }
2579 let pkt_type = pkt[4];
2580 if pkt_type == PacketType::Marker as u8 {
2581 // More markers, keep reading
2582 continue;
2583 } else if pkt_type == PacketType::Data as u8 {
2584 // Got DATA packet, use this as the response
2585 response = pkt;
2586 break;
2587 } else {
2588 // Unknown packet type, return error
2589 return Err(Error::Protocol(format!(
2590 "Unexpected packet type {} after reset",
2591 pkt_type
2592 )));
2593 }
2594 }
2595 Err(_e) => {
2596 // EOF after reset is normal - server may close connection
2597 // without sending error details. Return a descriptive error.
2598 inner.state = ConnectionState::Closed;
2599 return Err(Error::OracleError {
2600 code: 0,
2601 message: "Server rejected the operation and closed the connection. \
2602 This may happen when binding a temporary LOB to an INSERT statement. \
2603 Try using a different approach (e.g., DBMS_LOB procedures).".to_string(),
2604 });
2605 }
2606 }
2607 }
2608 }
2609 }
2610 }
2611
2612 // Parse the response to extract rows affected (or error)
2613 let payload = &response[PACKET_HEADER_SIZE..];
2614 self.parse_dml_response(payload)
2615 }
2616
2617 /// Parse query response to extract columns and rows
2618 ///
2619 /// Oracle sends multiple messages in a single response:
2620 /// - DescribeInfo (16): column metadata
2621 /// - RowHeader (6): row header info
2622 /// - RowData (7): actual column values
2623 /// - Error (4): completion status (may contain error or success)
2624 fn parse_query_response(&self, payload: &[u8], caps: &Capabilities) -> Result<QueryResult> {
2625 self.parse_query_response_with_columns(payload, caps, &[])
2626 }
2627
2628 /// Parse query response with pre-known columns (for re-execute after define)
2629 fn parse_query_response_with_columns(
2630 &self,
2631 payload: &[u8],
2632 caps: &Capabilities,
2633 known_columns: &[ColumnInfo],
2634 ) -> Result<QueryResult> {
2635 if payload.len() < 3 {
2636 return Err(Error::Protocol("Query response too short".to_string()));
2637 }
2638
2639 let mut buf = ReadBuffer::from_slice(payload);
2640
2641 // Skip data flags
2642 buf.skip(2)?;
2643
2644 // Use known columns if provided, otherwise parse from describe info
2645 let mut columns: Vec<ColumnInfo> = known_columns.to_vec();
2646 let mut rows: Vec<Row> = Vec::new();
2647 let mut cursor_id: u16 = 0;
2648 let mut row_count: u64 = 0;
2649 let mut end_of_response = false;
2650
2651 // Bit vector for duplicate column optimization
2652 // When Some, indicates which columns have actual data (bit=1) vs duplicates (bit=0)
2653 let mut bit_vector: Option<Vec<u8>> = None;
2654 // Previous row values for copying duplicates
2655 let mut previous_row_values: Option<Vec<Value>> = None;
2656
2657 // Process messages until we hit end of response or run out of data
2658 while !end_of_response && buf.remaining() > 0 {
2659 let msg_type = buf.read_u8()?;
2660
2661 match msg_type {
2662 // DescribeInfo (16) - column metadata
2663 x if x == MessageType::DescribeInfo as u8 => {
2664 // Skip chunked bytes first
2665 buf.skip_raw_bytes_chunked()?;
2666 columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2667 }
2668
2669 // RowHeader (6) - header info for rows
2670 x if x == MessageType::RowHeader as u8 => {
2671 self.parse_row_header(&mut buf)?;
2672 }
2673
2674 // RowData (7) - actual row values
2675 x if x == MessageType::RowData as u8 => {
2676 let row = self.parse_row_data_with_bitvector(
2677 &mut buf,
2678 &columns,
2679 caps,
2680 bit_vector.as_deref(),
2681 previous_row_values.as_ref(),
2682 )?;
2683 // Store this row's values for potential duplicate copying
2684 previous_row_values = Some(row.values().to_vec());
2685 // Clear bit vector after using it (it's per-row)
2686 bit_vector = None;
2687 rows.push(row);
2688 }
2689
2690 // Error (4) - completion or error
2691 x if x == MessageType::Error as u8 => {
2692 let (error_code, error_msg, cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2693 cursor_id = cid;
2694 row_count = rc;
2695 if error_code != 0 && error_code != 1403 {
2696 // 1403 is "no data found" which is not an error for queries
2697 return Err(Error::OracleError {
2698 code: error_code,
2699 message: error_msg.unwrap_or_default(),
2700 });
2701 }
2702 end_of_response = true;
2703 }
2704
2705 // Parameter (8) - return parameters
2706 x if x == MessageType::Parameter as u8 => {
2707 self.parse_return_parameters(&mut buf)?;
2708 }
2709
2710 // Status (9) - call status
2711 x if x == MessageType::Status as u8 => {
2712 // Read call status and end-to-end seq number
2713 let _call_status = buf.read_ub4()?;
2714 let _end_to_end_seq = buf.read_ub2()?;
2715 // Note: end_of_response only if supports_end_of_response is false
2716 // For now, we assume it's not the end
2717 }
2718
2719 // BitVector (21) - column presence bitmap for sparse results
2720 // Bit=1 means actual data is sent, bit=0 means duplicate from previous row
2721 21 => {
2722 // Read num columns sent
2723 let _num_columns_sent = buf.read_ub2()?;
2724 // Read bit vector (1 byte per 8 columns, rounded up)
2725 let num_bytes = (columns.len() + 7) / 8;
2726 if num_bytes > 0 {
2727 let bv = buf.read_bytes_vec(num_bytes)?;
2728 bit_vector = Some(bv);
2729 }
2730 }
2731
2732 _ => {
2733 // Unknown message type - break to avoid parsing errors
2734 break;
2735 }
2736 }
2737 }
2738
2739 Ok(QueryResult {
2740 columns,
2741 rows,
2742 rows_affected: row_count,
2743 has_more_rows: false,
2744 cursor_id,
2745 })
2746 }
2747
2748 /// Parse a PL/SQL response containing OUT parameter values
2749 ///
2750 /// PL/SQL responses may contain:
2751 /// - IoVector (11): bind directions for each parameter
2752 /// - RowData (7): OUT parameter values
2753 /// - FlushOutBinds (19): signals end of OUT bind data
2754 /// - Error (4): completion status
2755 fn parse_plsql_response(
2756 &self,
2757 payload: &[u8],
2758 caps: &Capabilities,
2759 params: &[BindParam],
2760 ) -> Result<PlsqlResult> {
2761 if payload.len() < 3 {
2762 return Err(Error::Protocol("PL/SQL response too short".to_string()));
2763 }
2764
2765 let mut buf = ReadBuffer::from_slice(payload);
2766
2767 // Skip data flags
2768 buf.skip(2)?;
2769
2770 let mut out_values: Vec<Value> = Vec::new();
2771 let mut _out_indices: Vec<usize> = Vec::new();
2772 let mut row_count: u64 = 0;
2773 let mut cursor_id: Option<u16> = None;
2774 let mut end_of_response = false;
2775 let mut implicit_results = ImplicitResults::new();
2776
2777 // Create column infos for OUT params based on their oracle types
2778 let mut out_columns: Vec<ColumnInfo> = Vec::new();
2779
2780 while !end_of_response && buf.remaining() > 0 {
2781 let msg_type = buf.read_u8()?;
2782
2783 match msg_type {
2784 // IoVector (11) - bind directions from server
2785 x if x == MessageType::IoVector as u8 => {
2786 let (indices, cols) = self.parse_io_vector(&mut buf, params)?;
2787 _out_indices = indices;
2788 out_columns = cols;
2789 }
2790
2791 // RowHeader (6)
2792 x if x == MessageType::RowHeader as u8 => {
2793 self.parse_row_header(&mut buf)?;
2794 }
2795
2796 // RowData (7) - OUT parameter values
2797 x if x == MessageType::RowData as u8 => {
2798 if !out_columns.is_empty() {
2799 let row = self.parse_row_data_single(&mut buf, &out_columns, caps)?;
2800 // Extract values from the row into out_values
2801 for (idx, value) in row.into_values().into_iter().enumerate() {
2802 // Check if this is a cursor
2803 if let Value::Cursor(cursor) = &value {
2804 if cursor_id.is_none() && cursor.cursor_id() != 0 {
2805 cursor_id = Some(cursor.cursor_id());
2806 }
2807 }
2808 // Map back to original param position if we have indices
2809 if idx < out_values.len() {
2810 out_values[idx] = value;
2811 } else {
2812 out_values.push(value);
2813 }
2814 }
2815 } else {
2816 // Skip the row data if we don't have column info
2817 // This shouldn't normally happen
2818 break;
2819 }
2820 }
2821
2822 // DescribeInfo (16) - for REF CURSOR describe
2823 x if x == MessageType::DescribeInfo as u8 => {
2824 buf.skip_raw_bytes_chunked()?;
2825 let cursor_columns = self.parse_describe_info(&mut buf, caps.ttc_field_version)?;
2826 // Store cursor columns if needed
2827 let _ = cursor_columns; // For now, just skip
2828 }
2829
2830 // FlushOutBinds (19) - signals end of OUT bind data
2831 x if x == MessageType::FlushOutBinds as u8 => {
2832 // This indicates that OUT bind data is done
2833 // Just continue to get the error/completion status
2834 }
2835
2836 // Error (4) - completion or error
2837 x if x == MessageType::Error as u8 => {
2838 let (error_code, error_msg, _cid, rc) = self.parse_error_info_with_rowcount(&mut buf)?;
2839 row_count = rc;
2840 if error_code != 0 {
2841 return Err(Error::OracleError {
2842 code: error_code,
2843 message: error_msg.unwrap_or_default(),
2844 });
2845 }
2846 end_of_response = true;
2847 }
2848
2849 // Parameter (8) - return parameters
2850 x if x == MessageType::Parameter as u8 => {
2851 self.parse_return_parameters(&mut buf)?;
2852 }
2853
2854 // Status (9)
2855 x if x == MessageType::Status as u8 => {
2856 let _call_status = buf.read_ub4()?;
2857 let _end_to_end_seq = buf.read_ub2()?;
2858 }
2859
2860 // ImplicitResultset (27) - result sets from DBMS_SQL.RETURN_RESULT
2861 x if x == MessageType::ImplicitResultset as u8 => {
2862 let parsed_results = self.parse_implicit_results(&mut buf, caps)?;
2863 implicit_results = parsed_results;
2864 }
2865
2866 _ => {
2867 // Unknown message type - break to avoid parsing errors
2868 break;
2869 }
2870 }
2871 }
2872
2873 // If no IoVector was received, all params might be IN-only
2874 // In that case, out_values should be empty
2875 Ok(PlsqlResult {
2876 out_values,
2877 rows_affected: row_count,
2878 cursor_id,
2879 implicit_results,
2880 })
2881 }
2882
2883 /// Parse implicit result sets from DBMS_SQL.RETURN_RESULT
2884 ///
2885 /// Format per Python base.pyx _process_implicit_result:
2886 /// - num_results: ub4 (number of implicit result sets)
2887 /// - For each result:
2888 /// - num_bytes: ub1 + raw bytes (metadata to skip)
2889 /// - describe_info: column metadata
2890 /// - cursor_id: ub2
2891 fn parse_implicit_results(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<ImplicitResults> {
2892 let num_results = buf.read_ub4()?;
2893 let mut results = ImplicitResults::new();
2894
2895 for _ in 0..num_results {
2896 // Skip metadata bytes
2897 let num_bytes = buf.read_u8()?;
2898 if num_bytes > 0 {
2899 buf.skip(num_bytes as usize)?;
2900 }
2901
2902 // Parse column metadata for this result set
2903 let columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
2904
2905 // Read cursor ID
2906 let cursor_id = buf.read_ub2()?;
2907
2908 // Create implicit result with metadata but no rows yet
2909 // Rows will be fetched separately using fetch_implicit_result
2910 let result = ImplicitResult::new(cursor_id, columns, Vec::new());
2911 results.add(result);
2912 }
2913
2914 Ok(results)
2915 }
2916
2917 /// Parse IO Vector message to get bind directions
2918 ///
2919 /// Returns a tuple of:
2920 /// - indices of OUT/INOUT parameters in the params list
2921 /// - column infos for parsing OUT values
2922 fn parse_io_vector(
2923 &self,
2924 buf: &mut ReadBuffer,
2925 params: &[BindParam],
2926 ) -> Result<(Vec<usize>, Vec<ColumnInfo>)> {
2927 // I/O vector format (from Python reference):
2928 // - skip 1 byte (flag)
2929 // - read ub2 (num requests)
2930 // - read ub4 (num iters)
2931 // - num_binds = num_iters * 256 + num_requests
2932 // - skip ub4 (num iters this time)
2933 // - skip ub2 (uac buffer length)
2934 // - read ub2 (num_bytes for bit vector), skip if > 0
2935 // - read ub2 (num_bytes for rowid), skip if > 0
2936 // - for each bind: read ub1 (bind_dir)
2937
2938 buf.skip(1)?; // flag
2939 let num_requests = buf.read_ub2()? as u32;
2940 let num_iters = buf.read_ub4()?;
2941 let num_binds = num_iters * 256 + num_requests;
2942 let _ = buf.read_ub4()?; // num iters this time (discard)
2943 let _ = buf.read_ub2()?; // uac buffer length (discard)
2944
2945 // Bit vector
2946 let num_bytes = buf.read_ub2()? as usize;
2947 if num_bytes > 0 {
2948 buf.skip(num_bytes)?;
2949 }
2950
2951 // Rowid (raw bytes, not length-prefixed here)
2952 let num_bytes = buf.read_ub4()? as usize;
2953 if num_bytes > 0 {
2954 buf.skip_raw_bytes_chunked()?; // Skip rowid bytes using chunked read
2955 }
2956
2957 // Read bind directions
2958 let mut out_indices = Vec::new();
2959 let mut out_columns = Vec::new();
2960
2961 for i in 0..(num_binds as usize).min(params.len()) {
2962 let dir_byte = buf.read_u8()?;
2963 let dir = BindDirection::try_from(dir_byte).unwrap_or(BindDirection::Input);
2964
2965 // If this is not an INPUT-only parameter, it has OUT data
2966 if dir != BindDirection::Input {
2967 out_indices.push(i);
2968
2969 // Create a column info for parsing the OUT value
2970 let param = ¶ms[i];
2971 let mut col = ColumnInfo::new(format!("OUT_{}", i), param.oracle_type);
2972 col.buffer_size = param.buffer_size;
2973 col.data_size = param.buffer_size;
2974 col.nullable = true;
2975
2976 // For collection OUT params, extract element type from the placeholder
2977 if let Some(Value::Collection(ref placeholder)) = param.value {
2978 if let Some(Value::Integer(elem_type_code)) = placeholder.get("_element_type") {
2979 col.element_type = crate::constants::OracleType::try_from(*elem_type_code as u8).ok();
2980 }
2981 }
2982
2983 out_columns.push(col);
2984 }
2985 }
2986
2987 Ok((out_indices, out_columns))
2988 }
2989
2990 /// Parse row header (TNS_MSG_TYPE_ROW_HEADER = 6)
2991 fn parse_row_header(&self, buf: &mut ReadBuffer) -> Result<()> {
2992 buf.skip_ub1()?; // flags
2993 buf.skip_ub2()?; // num requests
2994 buf.skip_ub4()?; // iteration number
2995 buf.skip_ub4()?; // num iters
2996 buf.skip_ub2()?; // buffer length
2997 let num_bytes = buf.read_ub4()? as usize;
2998 if num_bytes > 0 {
2999 buf.skip_ub1()?; // skip repeated length
3000 buf.skip(num_bytes)?; // bit vector
3001 }
3002 let num_bytes = buf.read_ub4()? as usize;
3003 if num_bytes > 0 {
3004 buf.skip_raw_bytes_chunked()?; // rxhrid
3005 }
3006 Ok(())
3007 }
3008
3009 /// Parse return parameters (TNS_MSG_TYPE_PARAMETER = 8)
3010 fn parse_return_parameters(&self, buf: &mut ReadBuffer) -> Result<()> {
3011 self.parse_return_parameters_internal(buf, false).map(|_| ())
3012 }
3013
3014 /// Parse return parameters with optional row counts extraction
3015 /// When `want_row_counts` is true, attempts to read arraydmlrowcounts from the response.
3016 fn parse_return_parameters_internal(
3017 &self,
3018 buf: &mut ReadBuffer,
3019 want_row_counts: bool,
3020 ) -> Result<Option<Vec<u64>>> {
3021 // Per Python's _process_return_parameters
3022 let num_params = buf.read_ub2()?; // al8o4l (ignored)
3023 for _ in 0..num_params {
3024 buf.skip_ub4()?;
3025 }
3026
3027 let al8txl = buf.read_ub2()?; // al8txl (ignored)
3028 if al8txl > 0 {
3029 buf.skip(al8txl as usize)?;
3030 }
3031
3032 // num key/value pairs - skip for now
3033 let num_pairs = buf.read_ub2()?;
3034 for _ in 0..num_pairs {
3035 buf.read_bytes_with_length()?; // text value
3036 buf.read_bytes_with_length()?; // binary value
3037 buf.skip_ub2()?; // keyword num
3038 }
3039
3040 // registration
3041 let num_bytes = buf.read_ub2()?;
3042 if num_bytes > 0 {
3043 buf.skip(num_bytes as usize)?;
3044 }
3045
3046 // If arraydmlrowcounts was requested, parse the row counts
3047 if want_row_counts && buf.remaining() >= 4 {
3048 let num_rows = buf.read_ub4()? as usize;
3049 let mut row_counts = Vec::with_capacity(num_rows);
3050 for _ in 0..num_rows {
3051 let count = buf.read_ub8()?;
3052 row_counts.push(count);
3053 }
3054 Ok(Some(row_counts))
3055 } else {
3056 Ok(None)
3057 }
3058 }
3059
3060 /// Parse a single row of data
3061 fn parse_row_data_single(
3062 &self,
3063 buf: &mut ReadBuffer,
3064 columns: &[ColumnInfo],
3065 caps: &Capabilities,
3066 ) -> Result<Row> {
3067 let mut values = Vec::with_capacity(columns.len());
3068
3069 for col in columns {
3070 let value = self.parse_column_value(buf, col, caps)?;
3071 values.push(value);
3072 }
3073
3074 Ok(Row::new(values))
3075 }
3076
3077 /// Parse a single row of data with bit vector support for duplicate column optimization
3078 ///
3079 /// Oracle sends a BitVector message before RowData when some columns have the same
3080 /// value as the previous row. Bits that are SET (1) indicate data is sent in the buffer;
3081 /// bits that are CLEAR (0) indicate the value should be copied from the previous row.
3082 fn parse_row_data_with_bitvector(
3083 &self,
3084 buf: &mut ReadBuffer,
3085 columns: &[ColumnInfo],
3086 caps: &Capabilities,
3087 bit_vector: Option<&[u8]>,
3088 previous_values: Option<&Vec<Value>>,
3089 ) -> Result<Row> {
3090 let mut values = Vec::with_capacity(columns.len());
3091
3092 for (col_idx, col) in columns.iter().enumerate() {
3093 // Check if this column is a duplicate (bit=0 means duplicate)
3094 let is_duplicate = if let Some(bv) = bit_vector {
3095 let byte_num = col_idx / 8;
3096 let bit_num = col_idx % 8;
3097 if byte_num < bv.len() {
3098 // If bit is 0, it's a duplicate
3099 (bv[byte_num] & (1 << bit_num)) == 0
3100 } else {
3101 false
3102 }
3103 } else {
3104 false
3105 };
3106
3107 if is_duplicate {
3108 // Copy value from previous row
3109 if let Some(prev) = previous_values {
3110 if col_idx < prev.len() {
3111 values.push(prev[col_idx].clone());
3112 } else {
3113 // Shouldn't happen, but fallback to null
3114 values.push(Value::Null);
3115 }
3116 } else {
3117 // No previous row (shouldn't happen for duplicate), fallback to null
3118 values.push(Value::Null);
3119 }
3120 } else {
3121 // Read actual value from buffer
3122 let value = self.parse_column_value(buf, col, caps)?;
3123 values.push(value);
3124 }
3125 }
3126
3127 Ok(Row::new(values))
3128 }
3129
3130 /// Parse a single column value from the buffer
3131 fn parse_column_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo, caps: &Capabilities) -> Result<Value> {
3132 use crate::constants::OracleType;
3133
3134 // Handle LOB columns specially - they have a different format
3135 if col.is_lob() {
3136 return self.parse_lob_value(buf, col);
3137 }
3138
3139 // Handle CURSOR type - REF CURSOR from PL/SQL
3140 if col.oracle_type == OracleType::Cursor {
3141 return self.parse_cursor_value(buf, caps);
3142 }
3143
3144 // Handle Object type - collections (VARRAY, Nested Table) and UDTs
3145 if col.oracle_type == OracleType::Object {
3146 return self.parse_object_value(buf, col);
3147 }
3148
3149 // Read the value based on the column type
3150 // First, check if it's NULL
3151 let data = buf.read_bytes_with_length()?;
3152
3153 match data {
3154 None => Ok(Value::Null),
3155 Some(bytes) if bytes.is_empty() => Ok(Value::Null),
3156 Some(bytes) => {
3157 // Decode based on oracle type
3158 match col.oracle_type {
3159 OracleType::Number => {
3160 // Oracle NUMBER format - decode to string
3161 let num = crate::types::decode_oracle_number(&bytes)?;
3162 Ok(Value::String(num.value))
3163 }
3164 OracleType::Varchar | OracleType::Char | OracleType::Long => {
3165 let s = String::from_utf8_lossy(&bytes).to_string();
3166 Ok(Value::String(s))
3167 }
3168 OracleType::Raw | OracleType::LongRaw => {
3169 // RAW/LONG RAW types - return as bytes
3170 Ok(Value::Bytes(bytes.to_vec()))
3171 }
3172 OracleType::Date => {
3173 // Oracle DATE format - 7 bytes
3174 let date = crate::types::decode_oracle_date(&bytes)?;
3175 Ok(Value::Date(date))
3176 }
3177 OracleType::Timestamp | OracleType::TimestampLtz => {
3178 // Oracle TIMESTAMP format - 11 bytes (date + fractional seconds)
3179 let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3180 Ok(Value::Timestamp(ts))
3181 }
3182 OracleType::TimestampTz => {
3183 // Oracle TIMESTAMP WITH TIME ZONE - 13 bytes
3184 let ts = crate::types::decode_oracle_timestamp(&bytes)?;
3185 Ok(Value::Timestamp(ts))
3186 }
3187 _ => {
3188 // Default: return as raw bytes or string
3189 let s = String::from_utf8_lossy(&bytes).to_string();
3190 Ok(Value::String(s))
3191 }
3192 }
3193 }
3194 }
3195 }
3196
3197 /// Parse a REF CURSOR value from the buffer
3198 ///
3199 /// Per Python base.pyx lines 1038-1046:
3200 /// - Skip 1 byte (length indicator - fixed value)
3201 /// - Read describe info (column metadata for the cursor)
3202 /// - Read cursor_id (UB2)
3203 fn parse_cursor_value(&self, buf: &mut ReadBuffer, caps: &Capabilities) -> Result<Value> {
3204 use crate::types::RefCursor;
3205
3206 // Skip length indicator (fixed value for cursors)
3207 let _length = buf.read_u8()?;
3208
3209 // Read column metadata for this cursor
3210 let cursor_columns = self.parse_describe_info(buf, caps.ttc_field_version)?;
3211
3212 // Read the cursor ID
3213 let cursor_id = buf.read_ub2()?;
3214
3215 // Create RefCursor with the metadata
3216 let ref_cursor = RefCursor::new(cursor_id, cursor_columns);
3217
3218 Ok(Value::Cursor(ref_cursor))
3219 }
3220
3221 /// Parse an Object/Collection value from the buffer
3222 ///
3223 /// Object format from Oracle (per Python packet.pyx read_dbobject):
3224 /// - UB4: type OID length, then type OID bytes if > 0
3225 /// - UB4: OID length, then OID bytes if > 0
3226 /// - UB4: snapshot length, then snapshot bytes if > 0 (discarded)
3227 /// - UB2: version (skip)
3228 /// - UB4: packed data length
3229 /// - UB2: flags (skip)
3230 /// - Bytes: packed data (pickle format)
3231 fn parse_object_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3232 use crate::dbobject::{CollectionType, DbObject, DbObjectType};
3233 use crate::types::decode_collection;
3234
3235 // Read type OID
3236 let toid_len = buf.read_ub4()?;
3237 let _toid = if toid_len > 0 {
3238 Some(buf.read_bytes_vec(toid_len as usize)?)
3239 } else {
3240 None
3241 };
3242
3243 // Read OID
3244 let oid_len = buf.read_ub4()?;
3245 let _oid = if oid_len > 0 {
3246 Some(buf.read_bytes_vec(oid_len as usize)?)
3247 } else {
3248 None
3249 };
3250
3251 // Read and discard snapshot
3252 let snapshot_len = buf.read_ub4()?;
3253 if snapshot_len > 0 {
3254 buf.skip_raw_bytes_chunked()?;
3255 }
3256
3257 // Skip version (length-prefixed UB2)
3258 let _version = buf.read_ub2()?;
3259
3260 // Read packed data length
3261 let data_len = buf.read_ub4()?;
3262
3263 // Skip flags (length-prefixed UB2)
3264 let _flags = buf.read_ub2()?;
3265
3266 if data_len == 0 {
3267 return Ok(Value::Null);
3268 }
3269
3270 // Read packed data (chunked format like other byte sequences)
3271 let packed_data = buf.read_bytes_with_length()?;
3272
3273 match packed_data {
3274 None => Ok(Value::Null),
3275 Some(data) if data.is_empty() => Ok(Value::Null),
3276 Some(data) => {
3277 // Create a placeholder type based on column info
3278 let type_name = col.type_name.clone().unwrap_or_else(|| "UNKNOWN".to_string());
3279
3280 // Try to determine if this is a collection based on the pickle data
3281 // The first byte contains flags - check for IS_COLLECTION (0x08)
3282 let is_collection = !data.is_empty() && (data[0] & 0x08) != 0;
3283
3284 if is_collection {
3285 // Get element type from column info or default to VARCHAR
3286 let element_type = col.element_type.unwrap_or(crate::constants::OracleType::Varchar);
3287
3288 // Determine collection type from pickle flags
3289 // Collection flags are after header - but we'll default for now
3290 let collection_type = CollectionType::Varray;
3291
3292 let obj_type = DbObjectType::collection(
3293 &col.type_schema.clone().unwrap_or_default(),
3294 &type_name,
3295 collection_type,
3296 element_type,
3297 );
3298
3299 match decode_collection(&obj_type, &data) {
3300 Ok(collection) => Ok(Value::Collection(collection)),
3301 Err(e) => {
3302 tracing::warn!("Failed to decode collection: {}, data: {:02x?}", e, &data[..std::cmp::min(20, data.len())]);
3303 // Return raw bytes as fallback
3304 Ok(Value::Bytes(data))
3305 }
3306 }
3307 } else {
3308 // Regular object type - not yet fully implemented
3309 let mut obj = DbObject::new(&type_name);
3310 // Store raw pickle data for later inspection
3311 obj.set("_raw_data", Value::Bytes(data));
3312 Ok(Value::Collection(obj))
3313 }
3314 }
3315 }
3316 }
3317
3318 /// Parse a LOB column value from the buffer
3319 ///
3320 /// LOB format from Oracle (per Python's read_lob_with_length):
3321 /// - UB4: num_bytes (indicator that LOB data follows)
3322 /// - If num_bytes > 0:
3323 /// - For non-BFILE: UB8 size, UB4 chunk_size
3324 /// - Bytes: LOB locator (chunked format)
3325 ///
3326 /// The actual LOB content must be fetched separately using LOB operations.
3327 /// For JSON columns, the data is OSON-encoded and decoded directly.
3328 /// For VECTOR columns, the data is decoded from Oracle's vector binary format.
3329 fn parse_lob_value(&self, buf: &mut ReadBuffer, col: &ColumnInfo) -> Result<Value> {
3330 use crate::constants::OracleType;
3331 use crate::types::{decode_vector, OsonDecoder};
3332
3333 // Read length indicator
3334 let num_bytes = buf.read_ub4()?;
3335
3336 if num_bytes == 0 {
3337 // For JSON, null is Value::Json(serde_json::Value::Null)
3338 if col.oracle_type == OracleType::Json || col.is_json {
3339 return Ok(Value::Json(serde_json::Value::Null));
3340 }
3341 // For VECTOR, null is Value::Null
3342 if col.oracle_type == OracleType::Vector {
3343 return Ok(Value::Null);
3344 }
3345 return Ok(Value::Lob(LobValue::Null));
3346 }
3347
3348 // For BFILE, there's no size/chunk_size metadata
3349 let (size, chunk_size) = if col.oracle_type == OracleType::Bfile {
3350 (0u64, 0u32)
3351 } else {
3352 // Read LOB size and chunk size
3353 let size = buf.read_ub8()?;
3354 let chunk_size = buf.read_ub4()?;
3355 (size, chunk_size)
3356 };
3357
3358 // Read LOB data (could be locator or inline data depending on size)
3359 let data_bytes = buf.read_bytes_with_length()?;
3360
3361 // Handle JSON columns - decode OSON format
3362 // JSON is sent as a LOB with prefetched data + a LOB locator that must be consumed
3363 if col.oracle_type == OracleType::Json || col.is_json {
3364 // Read and discard the LOB locator
3365 let _locator = buf.read_bytes_with_length()?;
3366
3367 if let Some(data) = data_bytes {
3368 if !data.is_empty() {
3369 // Decode OSON to JSON
3370 match OsonDecoder::decode(bytes::Bytes::from(data)) {
3371 Ok(json_value) => return Ok(Value::Json(json_value)),
3372 Err(e) => {
3373 tracing::warn!("Failed to decode OSON: {}", e);
3374 return Ok(Value::Json(serde_json::Value::Null));
3375 }
3376 }
3377 }
3378 }
3379 return Ok(Value::Json(serde_json::Value::Null));
3380 }
3381
3382 // Handle VECTOR columns - decode vector binary format
3383 if col.oracle_type == OracleType::Vector {
3384 // Read and discard LOB locator (not needed for VECTOR)
3385 let _locator = buf.read_bytes_with_length()?;
3386
3387 if let Some(data) = data_bytes {
3388 if !data.is_empty() {
3389 match decode_vector(&data) {
3390 Ok(vector) => return Ok(Value::Vector(vector)),
3391 Err(e) => {
3392 tracing::warn!("Failed to decode VECTOR: {}", e);
3393 return Ok(Value::Null);
3394 }
3395 }
3396 }
3397 }
3398 return Ok(Value::Null);
3399 }
3400
3401 // Create a LOB locator for fetching the data later
3402 if let Some(locator_data) = data_bytes {
3403 if !locator_data.is_empty() {
3404 let locator = LobLocator::new(
3405 bytes::Bytes::from(locator_data),
3406 size,
3407 chunk_size,
3408 col.oracle_type,
3409 col.csfrm,
3410 );
3411 return Ok(Value::Lob(LobValue::locator(locator)));
3412 }
3413 }
3414
3415 // If we have size but no locator, it might be an empty LOB
3416 if size == 0 {
3417 return Ok(Value::Lob(LobValue::Empty));
3418 }
3419
3420 // Empty LOB (shouldn't normally reach here)
3421 Ok(Value::Lob(LobValue::Empty))
3422 }
3423
3424 /// Parse error info message and extract cursor_id
3425 /// Format per Python's _process_error_info in base.pyx
3426 fn parse_error_info(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16)> {
3427 // End of call status
3428 let _call_status = buf.read_ub4()?;
3429 // End to end seq#
3430 buf.skip_ub2()?;
3431 // Current row number
3432 buf.skip_ub4()?;
3433 // Error number (short form)
3434 buf.skip_ub2()?;
3435 // Array elem error
3436 buf.skip_ub2()?;
3437 // Array elem error
3438 buf.skip_ub2()?;
3439 // Cursor ID
3440 let cursor_id = buf.read_ub2()?;
3441 // Error position
3442 let _error_pos = buf.read_sb2()?;
3443 // SQL type (19c and earlier)
3444 buf.skip_ub1()?;
3445 // Fatal?
3446 buf.skip_ub1()?;
3447 // Flags
3448 buf.skip_ub1()?;
3449 // User cursor options
3450 buf.skip_ub1()?;
3451 // UPI parameter
3452 buf.skip_ub1()?;
3453 // Flags (second)
3454 buf.skip_ub1()?;
3455 // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3456 buf.skip_ub4()?; // rba
3457 buf.skip_ub2()?; // partition_id
3458 buf.skip_ub1()?; // skip
3459 buf.skip_ub4()?; // block_num
3460 buf.skip_ub2()?; // slot_num
3461 // OS error
3462 buf.skip_ub4()?;
3463 // Statement number
3464 buf.skip_ub1()?;
3465 // Call number
3466 buf.skip_ub1()?;
3467 // Padding
3468 buf.skip_ub2()?;
3469 // Success iters
3470 buf.skip_ub4()?;
3471 // oerrdd (logical rowid)
3472 let oerrdd_len = buf.read_ub4()?;
3473 if oerrdd_len > 0 {
3474 buf.skip_raw_bytes_chunked()?;
3475 }
3476
3477 // Batch error codes array
3478 let num_batch_errors = buf.read_ub2()?;
3479 if num_batch_errors > 0 {
3480 buf.skip_ub1()?; // first byte
3481 for _ in 0..num_batch_errors {
3482 buf.skip_ub2()?; // error code
3483 }
3484 }
3485
3486 // Batch error row offset array
3487 let num_offsets = buf.read_ub4()?;
3488 if num_offsets > 0 {
3489 buf.skip_ub1()?; // first byte
3490 for _ in 0..num_offsets {
3491 buf.skip_ub4()?; // offset
3492 }
3493 }
3494
3495 // Batch error messages array
3496 let num_batch_msgs = buf.read_ub2()?;
3497 if num_batch_msgs > 0 {
3498 buf.skip_ub1()?; // packed size
3499 for _ in 0..num_batch_msgs {
3500 buf.skip_ub2()?; // chunk length
3501 buf.read_string_with_length()?; // message
3502 buf.skip(2)?; // end marker
3503 }
3504 }
3505
3506 // Extended error number (UB4)
3507 let error_code = buf.read_ub4()?;
3508 // Row count (UB8)
3509 let _row_count = buf.read_ub8()?;
3510
3511 // Error message
3512 let error_msg = if error_code != 0 {
3513 buf.read_string_with_length()?.map(|s| s.trim().to_string())
3514 } else {
3515 None
3516 };
3517
3518 Ok((error_code, error_msg, cursor_id))
3519 }
3520
3521 /// Parse error response packet (received after marker reset)
3522 fn parse_error_response(&self, payload: &[u8]) -> Result<QueryResult> {
3523 if payload.len() < 3 {
3524 return Err(Error::Protocol("Error response too short".to_string()));
3525 }
3526
3527 let mut buf = ReadBuffer::from_slice(payload);
3528
3529 // Skip data flags
3530 buf.skip(2)?;
3531
3532 // Read message type
3533 let msg_type = buf.read_u8()?;
3534
3535 // Check for error message type (4)
3536 if msg_type == MessageType::Error as u8 {
3537 // Parse error info per Python's _process_error_info
3538 let _call_status = buf.read_ub4()?; // end of call status
3539 buf.skip_ub2()?; // end to end seq#
3540 buf.skip_ub4()?; // current row number
3541 buf.skip_ub2()?; // error number (short form)
3542 buf.skip_ub2()?; // array elem error
3543 buf.skip_ub2()?; // array elem error
3544 let _cursor_id = buf.read_ub2()?; // cursor id
3545 let _error_pos = buf.read_sb2()?; // error position
3546 buf.skip_ub1()?; // sql type (19c and earlier)
3547 buf.skip_ub1()?; // fatal?
3548 buf.skip_ub1()?; // flags
3549 buf.skip_ub1()?; // user cursor options
3550 buf.skip_ub1()?; // UPI parameter
3551 buf.skip_ub1()?; // flags
3552
3553 // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3554 buf.skip_ub4()?; // rba
3555 buf.skip_ub2()?; // partition_id
3556 buf.skip_ub1()?; // skip
3557 buf.skip_ub4()?; // block_num
3558 buf.skip_ub2()?; // slot_num
3559
3560 buf.skip_ub4()?; // OS error
3561 buf.skip_ub1()?; // statement number
3562 buf.skip_ub1()?; // call number
3563 buf.skip_ub2()?; // padding
3564 buf.skip_ub4()?; // success iters
3565
3566 // oerrdd (logical rowid)
3567 let oerrdd_len = buf.read_ub4()?;
3568 if oerrdd_len > 0 {
3569 buf.skip_raw_bytes_chunked()?;
3570 }
3571
3572 // batch error codes array
3573 let num_batch_errors = buf.read_ub2()?;
3574 if num_batch_errors > 0 {
3575 // Skip batch error data - we don't process it for now
3576 buf.skip_ub1()?; // first byte
3577 for _ in 0..num_batch_errors {
3578 buf.skip_ub2()?; // error code
3579 }
3580 }
3581
3582 // batch error row offset array
3583 let num_offsets = buf.read_ub4()?;
3584 if num_offsets > 0 {
3585 buf.skip_ub1()?; // first byte
3586 for _ in 0..num_offsets {
3587 buf.skip_ub4()?; // offset
3588 }
3589 }
3590
3591 // batch error messages array
3592 let num_batch_msgs = buf.read_ub2()?;
3593 if num_batch_msgs > 0 {
3594 // Skip batch error messages
3595 buf.skip_ub1()?; // packed size
3596 for _ in 0..num_batch_msgs {
3597 buf.skip_ub2()?; // chunk length
3598 buf.read_string_with_length()?; // message
3599 buf.skip(2)?; // end marker
3600 }
3601 }
3602
3603 // Extended error number (UB4)
3604 let error_num = buf.read_ub4()?;
3605 let _row_count = buf.read_ub8()?; // row number (extended)
3606
3607 // Read error message
3608 let error_msg = if error_num != 0 {
3609 buf.read_string_with_length()?.map(|s| s.trim().to_string())
3610 } else {
3611 None
3612 };
3613
3614 return Err(Error::OracleError {
3615 code: error_num,
3616 message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_num)),
3617 });
3618 }
3619
3620 // If not an error message type, return generic error
3621 Err(Error::Protocol(format!(
3622 "Expected error message type 4, got {}",
3623 msg_type
3624 )))
3625 }
3626
3627 /// Parse DML response to extract rows affected
3628 fn parse_dml_response(&self, payload: &[u8]) -> Result<QueryResult> {
3629 if payload.len() < 3 {
3630 return Err(Error::Protocol("DML response too short".to_string()));
3631 }
3632
3633 let mut buf = ReadBuffer::from_slice(payload);
3634
3635 // Skip data flags
3636 buf.skip(2)?;
3637
3638 let mut rows_affected: u64 = 0;
3639 let mut cursor_id: u16 = 0;
3640 let mut end_of_response = false;
3641
3642 // Process messages until end_of_response or out of data
3643 // Note: If supports_end_of_response is true, we must continue until msg type 29
3644 while !end_of_response && buf.remaining() > 0 {
3645 let msg_type = buf.read_u8()?;
3646
3647 match msg_type {
3648 // Error (4) - may contain error or success info
3649 x if x == MessageType::Error as u8 => {
3650 let (error_code, error_msg, cid, row_count) = self.parse_error_info_with_rowcount(&mut buf)?;
3651 cursor_id = cid;
3652 rows_affected = row_count;
3653 if error_code != 0 && error_code != 1403 {
3654 return Err(Error::OracleError {
3655 code: error_code,
3656 message: error_msg.unwrap_or_default(),
3657 });
3658 }
3659 // Only end if server doesn't support end_of_response
3660 // Otherwise, continue until we get msg type 29
3661 }
3662
3663 // Parameter (8) - return parameters
3664 x if x == MessageType::Parameter as u8 => {
3665 self.parse_return_parameters(&mut buf)?;
3666 }
3667
3668 // Status (9) - call status
3669 x if x == MessageType::Status as u8 => {
3670 let _call_status = buf.read_ub4()?;
3671 let _end_to_end_seq = buf.read_ub2()?;
3672 }
3673
3674 // BitVector (21)
3675 21 => {
3676 let _num_columns_sent = buf.read_ub2()?;
3677 // No columns for DML, but read the byte if present
3678 if buf.remaining() > 0 {
3679 let _byte = buf.read_u8()?;
3680 }
3681 }
3682
3683 // End of Response (29) - explicit end marker
3684 29 => {
3685 end_of_response = true;
3686 }
3687
3688 _ => {
3689 // Unknown message type - continue processing
3690 }
3691 }
3692 }
3693
3694 Ok(QueryResult {
3695 columns: Vec::new(),
3696 rows: Vec::new(),
3697 rows_affected,
3698 has_more_rows: false,
3699 cursor_id,
3700 })
3701 }
3702
3703 /// Parse error info and return (error_code, error_msg, cursor_id, row_count)
3704 fn parse_error_info_with_rowcount(&self, buf: &mut ReadBuffer) -> Result<(u32, Option<String>, u16, u64)> {
3705 // End of call status
3706 let _call_status = buf.read_ub4()?;
3707 // End to end seq#
3708 buf.skip_ub2()?;
3709 // Current row number
3710 buf.skip_ub4()?;
3711 // Error number (short form)
3712 buf.skip_ub2()?;
3713 // Array elem error
3714 buf.skip_ub2()?;
3715 // Array elem error
3716 buf.skip_ub2()?;
3717 // Cursor ID
3718 let cursor_id = buf.read_ub2()?;
3719 // Error position
3720 let _error_pos = buf.read_sb2()?;
3721 // SQL type (19c and earlier)
3722 buf.skip_ub1()?;
3723 // Fatal?
3724 buf.skip_ub1()?;
3725 // Flags
3726 buf.skip_ub1()?;
3727 // User cursor options
3728 buf.skip_ub1()?;
3729 // UPI parameter
3730 buf.skip_ub1()?;
3731 // Flags (second)
3732 buf.skip_ub1()?;
3733 // Rowid (rba, partition_id, skip 1, block_num, slot_num)
3734 buf.skip_ub4()?; // rba
3735 buf.skip_ub2()?; // partition_id
3736 buf.skip_ub1()?; // skip
3737 buf.skip_ub4()?; // block_num
3738 buf.skip_ub2()?; // slot_num
3739 // OS error
3740 buf.skip_ub4()?;
3741 // Statement number
3742 buf.skip_ub1()?;
3743 // Call number
3744 buf.skip_ub1()?;
3745 // Padding
3746 buf.skip_ub2()?;
3747 // Success iters
3748 buf.skip_ub4()?;
3749 // oerrdd (logical rowid)
3750 let oerrdd_len = buf.read_ub4()?;
3751 if oerrdd_len > 0 {
3752 buf.skip_raw_bytes_chunked()?;
3753 }
3754
3755 // Batch error codes array
3756 let num_batch_errors = buf.read_ub2()?;
3757 if num_batch_errors > 0 {
3758 buf.skip_ub1()?; // first byte
3759 for _ in 0..num_batch_errors {
3760 buf.skip_ub2()?; // error code
3761 }
3762 }
3763
3764 // Batch error row offset array
3765 let num_offsets = buf.read_ub4()?;
3766 if num_offsets > 0 {
3767 buf.skip_ub1()?; // first byte
3768 for _ in 0..num_offsets {
3769 buf.skip_ub4()?; // offset
3770 }
3771 }
3772
3773 // Batch error messages array
3774 let num_batch_msgs = buf.read_ub2()?;
3775 if num_batch_msgs > 0 {
3776 buf.skip_ub1()?; // packed size
3777 for _ in 0..num_batch_msgs {
3778 buf.skip_ub2()?; // chunk length
3779 buf.read_string_with_length()?; // message
3780 buf.skip(2)?; // end marker
3781 }
3782 }
3783
3784 // Extended error number (UB4)
3785 let error_code = buf.read_ub4()?;
3786 // Row count (UB8) - this is the rows affected!
3787 let row_count = buf.read_ub8()?;
3788
3789 // Fields added in Oracle Database 20c (TTC field version >= 16)
3790 // We always skip these since we support Oracle 20c+
3791 buf.skip_ub4()?; // sql_type
3792 buf.skip_ub4()?; // server_checksum
3793
3794 // Error message
3795 let error_msg = if error_code != 0 {
3796 buf.read_string_with_length()?.map(|s| s.trim().to_string())
3797 } else {
3798 None
3799 };
3800
3801 Ok((error_code, error_msg, cursor_id, row_count))
3802 }
3803
3804 /// Parse describe info from response to extract column metadata
3805 ///
3806 /// Per Python's _process_describe_info, the format is:
3807 /// - UB4: max row size (skip)
3808 /// - UB4: number of columns
3809 /// - If num_columns > 0: UB1 (skip one byte)
3810 /// - For each column: metadata fields
3811 /// - After columns: current date, dcb flags, etc.
3812 fn parse_describe_info(&self, buf: &mut ReadBuffer, ttc_field_version: u8) -> Result<Vec<ColumnInfo>> {
3813 use crate::constants::ccap_value;
3814
3815 // Skip max row size
3816 buf.skip_ub4()?;
3817
3818 // Read number of columns
3819 let num_columns = buf.read_ub4()? as usize;
3820 if num_columns == 0 {
3821 return Ok(Vec::new());
3822 }
3823
3824 // Skip one byte if we have columns
3825 buf.skip_ub1()?;
3826
3827 let mut columns = Vec::with_capacity(num_columns);
3828
3829 for _col_idx in 0..num_columns {
3830
3831 // Parse column metadata per Python's _process_metadata
3832 let ora_type_num = buf.read_u8()?;
3833 buf.skip_ub1()?; // flags
3834 let precision = buf.read_u8()?; // precision as SB1
3835 let scale = buf.read_u8()?; // scale as SB1
3836 let buffer_size = buf.read_ub4()?;
3837
3838 buf.skip_ub4()?; // max_num_array_elements
3839 buf.skip_ub8()?; // cont_flags
3840 let _oid = buf.read_bytes_with_length()?; // OID
3841 buf.skip_ub2()?; // version
3842 buf.skip_ub2()?; // charset_id
3843 let _csfrm = buf.read_u8()?; // charset form
3844 let max_size = buf.read_ub4()?;
3845
3846 // For TTC field version >= 12.2 (8), skip oaccolid
3847 if ttc_field_version >= ccap_value::FIELD_VERSION_12_2 {
3848 buf.skip_ub4()?; // oaccolid
3849 }
3850
3851 let _nulls_allowed = buf.read_u8()?;
3852 buf.skip_ub1()?; // v7 length of name
3853 let name = buf.read_string_with_ub4_length()?.unwrap_or_default();
3854 let _schema = buf.read_string_with_ub4_length()?; // schema
3855 let _type_name = buf.read_string_with_ub4_length()?; // type_name
3856 buf.skip_ub2()?; // column position
3857 buf.skip_ub4()?; // uds_flags
3858
3859 // For TTC field version >= 23.1 (17), read domain fields
3860 if ttc_field_version >= ccap_value::FIELD_VERSION_23_1 {
3861 let _domain_schema = buf.read_string_with_ub4_length()?;
3862 let _domain_name = buf.read_string_with_ub4_length()?;
3863 }
3864
3865 // For TTC field version >= 20 (23.1_EXT_3), read annotations
3866 if ttc_field_version >= 20 {
3867 let num_annotations = buf.read_ub4()?;
3868 if num_annotations > 0 {
3869 buf.skip_ub1()?;
3870 // Read the actual annotations count (yes, it's read twice in Python)
3871 let actual_num = buf.read_ub4()?;
3872 buf.skip_ub1()?;
3873 for _ in 0..actual_num {
3874 // Skip annotation key and value (both are string with UB4 length)
3875 let _key = buf.read_string_with_ub4_length()?;
3876 let _value = buf.read_string_with_ub4_length()?;
3877 buf.skip_ub4()?; // flags per annotation
3878 }
3879 buf.skip_ub4()?; // final flags
3880 }
3881 }
3882
3883 // For TTC field version >= 24 (23.4), read vector fields
3884 if ttc_field_version >= ccap_value::FIELD_VERSION_23_4 {
3885 buf.skip_ub4()?; // vector_dimensions
3886 buf.skip_ub1()?; // vector_format
3887 buf.skip_ub1()?; // vector_flags
3888 }
3889
3890 // Convert data type to OracleType
3891 let oracle_type = crate::constants::OracleType::try_from(ora_type_num)
3892 .unwrap_or(crate::constants::OracleType::Varchar);
3893
3894 let mut col = ColumnInfo::new(&name, oracle_type);
3895 col.data_size = if max_size > 0 { max_size } else { buffer_size };
3896 col.precision = precision as i16;
3897 col.scale = scale as i16;
3898 columns.push(col);
3899 }
3900
3901 // After columns: skip remaining describe info fields
3902 // Python's _process_describe_info uses:
3903 // buf.read_ub4(&num_bytes)
3904 // if num_bytes > 0:
3905 // buf.skip_raw_bytes_chunked() # current date
3906 // buf.skip_ub4() # dcbflag
3907 // buf.skip_ub4() # dcbmdbz
3908 // buf.skip_ub4() # dcbmnpr
3909 // buf.skip_ub4() # dcbmxpr
3910 // buf.read_ub4(&num_bytes)
3911 // if num_bytes > 0:
3912 // buf.skip_raw_bytes_chunked() # dcbqcky
3913
3914 // current_date - read UB4 indicator first, then skip chunked bytes if > 0
3915 let current_date_indicator = buf.read_ub4()?;
3916 if current_date_indicator > 0 {
3917 buf.skip_raw_bytes_chunked()?;
3918 }
3919
3920 // dcb* fields as UB4
3921 buf.skip_ub4()?; // dcbflag
3922 buf.skip_ub4()?; // dcbmdbz
3923 buf.skip_ub4()?; // dcbmnpr
3924 buf.skip_ub4()?; // dcbmxpr
3925
3926 // dcbqcky - read UB4 indicator first, then skip chunked bytes if > 0
3927 let dcbqcky_indicator = buf.read_ub4()?;
3928 if dcbqcky_indicator > 0 {
3929 buf.skip_raw_bytes_chunked()?;
3930 }
3931
3932 // After dcbqcky, the next message (RowHeader) follows directly
3933 // No additional fields to skip here
3934
3935
3936 Ok(columns)
3937 }
3938
3939 /// Commit the current transaction.
3940 ///
3941 /// Makes all changes in the current transaction permanent. After commit,
3942 /// a new transaction begins automatically.
3943 ///
3944 /// # Example
3945 ///
3946 /// ```rust,no_run
3947 /// # use oracle_rs::{Connection, Value};
3948 /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3949 /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Alice".into()]).await?;
3950 /// conn.execute("INSERT INTO users (name) VALUES (:1)", &["Bob".into()]).await?;
3951 /// conn.commit().await?; // Both inserts are now permanent
3952 /// # Ok(())
3953 /// # }
3954 /// ```
3955 pub async fn commit(&self) -> Result<()> {
3956 self.ensure_ready().await?;
3957 // Use SQL COMMIT statement instead of simple function
3958 // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3959 self.execute("COMMIT", &[]).await?;
3960 Ok(())
3961 }
3962
3963 /// Rollback the current transaction.
3964 ///
3965 /// Discards all changes made in the current transaction. After rollback,
3966 /// a new transaction begins automatically.
3967 ///
3968 /// # Example
3969 ///
3970 /// ```rust,no_run
3971 /// # use oracle_rs::{Connection, Value};
3972 /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
3973 /// conn.execute("DELETE FROM users WHERE id = :1", &[1.into()]).await?;
3974 /// // Oops, wrong user!
3975 /// conn.rollback().await?; // Delete is undone
3976 /// # Ok(())
3977 /// # }
3978 /// ```
3979 pub async fn rollback(&self) -> Result<()> {
3980 self.ensure_ready().await?;
3981 // Use SQL ROLLBACK statement instead of simple function
3982 // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
3983 self.execute("ROLLBACK", &[]).await?;
3984 Ok(())
3985 }
3986
3987 /// Create a savepoint within the current transaction
3988 ///
3989 /// Savepoints allow partial rollback of a transaction. You can create multiple
3990 /// savepoints and rollback to any of them without affecting work done before
3991 /// that savepoint.
3992 ///
3993 /// # Arguments
3994 /// * `name` - The savepoint name (must be a valid Oracle identifier)
3995 ///
3996 /// # Example
3997 /// ```rust,ignore
3998 /// conn.execute("INSERT INTO t VALUES (1)", &[]).await?;
3999 /// conn.savepoint("sp1").await?;
4000 /// conn.execute("INSERT INTO t VALUES (2)", &[]).await?;
4001 /// conn.rollback_to_savepoint("sp1").await?; // Undoes the second insert
4002 /// conn.commit().await?; // Commits only the first insert
4003 /// ```
4004 pub async fn savepoint(&self, name: &str) -> Result<()> {
4005 self.ensure_ready().await?;
4006 self.execute(&format!("SAVEPOINT {}", name), &[]).await?;
4007 Ok(())
4008 }
4009
4010 /// Rollback to a previously created savepoint
4011 ///
4012 /// This undoes all changes made after the savepoint was created, but keeps
4013 /// the transaction active. Changes made before the savepoint are preserved.
4014 ///
4015 /// # Arguments
4016 /// * `name` - The savepoint name to rollback to
4017 pub async fn rollback_to_savepoint(&self, name: &str) -> Result<()> {
4018 self.ensure_ready().await?;
4019 self.execute(&format!("ROLLBACK TO SAVEPOINT {}", name), &[]).await?;
4020 Ok(())
4021 }
4022
4023 /// Ping the server to check if the connection is still alive.
4024 ///
4025 /// This executes a lightweight query (`SELECT 1 FROM DUAL`) to verify
4026 /// the connection is responsive. Useful for connection health checks
4027 /// in pooling scenarios.
4028 ///
4029 /// # Example
4030 ///
4031 /// ```rust,no_run
4032 /// # use oracle_rs::Connection;
4033 /// # async fn example(conn: Connection) -> oracle_rs::Result<()> {
4034 /// if conn.ping().await.is_ok() {
4035 /// println!("Connection is alive");
4036 /// } else {
4037 /// println!("Connection is dead");
4038 /// }
4039 /// # Ok(())
4040 /// # }
4041 /// ```
4042 pub async fn ping(&self) -> Result<()> {
4043 self.ensure_ready().await?;
4044 // Use SELECT FROM DUAL instead of simple function
4045 // The simple function protocol triggers BREAK marker + connection close on Oracle Free 23ai
4046 self.query("SELECT 1 FROM DUAL", &[]).await?;
4047 Ok(())
4048 }
4049
4050 /// Clear the statement cache
4051 ///
4052 /// This should be called when recycling a connection in a pool to ensure
4053 /// that any stale cursor state is cleared. This is useful after errors
4054 /// or when the connection state may be inconsistent.
4055 pub async fn clear_statement_cache(&self) {
4056 let mut inner = self.inner.lock().await;
4057 if let Some(ref mut cache) = inner.statement_cache {
4058 cache.clear();
4059 }
4060 }
4061
4062 /// Read data from a LOB (CLOB or BLOB)
4063 ///
4064 /// # Arguments
4065 /// * `locator` - The LOB locator obtained from a query result
4066 /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4067 /// * `amount` - Amount to read (0 for entire LOB)
4068 ///
4069 /// # Returns
4070 /// For CLOB: returns the text content as a String
4071 /// For BLOB: returns the binary content as bytes
4072 pub async fn read_lob(&self, locator: &LobLocator) -> Result<LobData> {
4073 self.ensure_ready().await?;
4074
4075 // Read the entire LOB starting at offset 1
4076 let offset = 1u64;
4077 let amount = locator.size();
4078
4079 self.read_lob_internal(locator, offset, amount).await
4080 }
4081
4082 /// Read a portion of a LOB
4083 pub async fn read_lob_range(
4084 &self,
4085 locator: &LobLocator,
4086 offset: u64,
4087 amount: u64,
4088 ) -> Result<LobData> {
4089 self.ensure_ready().await?;
4090 self.read_lob_internal(locator, offset, amount).await
4091 }
4092
4093 /// Read a CLOB and return as String
4094 ///
4095 /// This is a convenience method for reading CLOB data directly as a String.
4096 /// Returns an error if the LOB is not a CLOB.
4097 pub async fn read_clob(&self, locator: &LobLocator) -> Result<String> {
4098 if locator.is_blob() || locator.is_bfile() {
4099 return Err(Error::Protocol(
4100 "Cannot read BLOB/BFILE as string, use read_blob instead".to_string(),
4101 ));
4102 }
4103
4104 let data = self.read_lob(locator).await?;
4105 match data {
4106 LobData::String(s) => Ok(s),
4107 LobData::Bytes(_) => Err(Error::Protocol(
4108 "Unexpected bytes from CLOB read".to_string(),
4109 )),
4110 }
4111 }
4112
4113 /// Read a BLOB and return as bytes
4114 ///
4115 /// This is a convenience method for reading BLOB data directly as bytes.
4116 /// Returns an error if the LOB is a CLOB (use read_clob instead).
4117 pub async fn read_blob(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4118 if locator.is_clob() {
4119 return Err(Error::Protocol(
4120 "Cannot read CLOB as bytes, use read_clob instead".to_string(),
4121 ));
4122 }
4123
4124 let data = self.read_lob(locator).await?;
4125 match data {
4126 LobData::Bytes(b) => Ok(b),
4127 LobData::String(_) => Err(Error::Protocol(
4128 "Unexpected string from BLOB read".to_string(),
4129 )),
4130 }
4131 }
4132
4133 /// Read a LOB in chunks, calling a callback for each chunk
4134 ///
4135 /// This is useful for processing large LOBs without loading the entire
4136 /// content into memory. The callback receives each chunk as it's read.
4137 ///
4138 /// # Arguments
4139 /// * `locator` - The LOB locator
4140 /// * `chunk_size` - Size of each chunk to read (0 uses the LOB's natural chunk size)
4141 /// * `callback` - Async function called for each chunk
4142 ///
4143 /// # Example
4144 /// ```ignore
4145 /// let mut total_size = 0;
4146 /// conn.read_lob_chunked(&locator, 8192, |chunk| async move {
4147 /// match chunk {
4148 /// LobData::Bytes(b) => total_size += b.len(),
4149 /// LobData::String(s) => total_size += s.len(),
4150 /// }
4151 /// Ok(())
4152 /// }).await?;
4153 /// ```
4154 pub async fn read_lob_chunked<F, Fut>(
4155 &self,
4156 locator: &LobLocator,
4157 chunk_size: u64,
4158 mut callback: F,
4159 ) -> Result<()>
4160 where
4161 F: FnMut(LobData) -> Fut,
4162 Fut: std::future::Future<Output = Result<()>>,
4163 {
4164 self.ensure_ready().await?;
4165
4166 let total_size = locator.size();
4167 if total_size == 0 {
4168 return Ok(());
4169 }
4170
4171 // Use LOB's natural chunk size if not specified
4172 let chunk_size = if chunk_size == 0 {
4173 self.lob_chunk_size(locator).await?.max(8192) as u64
4174 } else {
4175 chunk_size
4176 };
4177
4178 let mut offset = 1u64;
4179 while offset <= total_size {
4180 let remaining = total_size - offset + 1;
4181 let amount = std::cmp::min(remaining, chunk_size);
4182
4183 let chunk = self.read_lob_internal(locator, offset, amount).await?;
4184 callback(chunk).await?;
4185
4186 offset += amount;
4187 }
4188
4189 Ok(())
4190 }
4191
4192 /// Get the optimal chunk size for a LOB
4193 ///
4194 /// This returns the chunk size that Oracle recommends for efficient
4195 /// reading and writing to this LOB.
4196 pub async fn lob_chunk_size(&self, locator: &LobLocator) -> Result<u32> {
4197 self.ensure_ready().await?;
4198
4199 let mut inner = self.inner.lock().await;
4200 let large_sdu = inner.large_sdu;
4201
4202 // Create LOB operation message for get chunk size
4203 let mut lob_msg = LobOpMessage::new_get_chunk_size(locator);
4204 let seq_num = inner.next_sequence_number();
4205 lob_msg.set_sequence_number(seq_num);
4206
4207 // Build and send the request
4208 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4209 inner.send(&request).await?;
4210
4211 // Receive and parse response
4212 let response = inner.receive().await?;
4213 if response.len() <= PACKET_HEADER_SIZE {
4214 return Err(Error::Protocol("Empty LOB chunk size response".to_string()));
4215 }
4216
4217 // Check for MARKER packet
4218 let packet_type = response[4];
4219 if packet_type == PacketType::Marker as u8 {
4220 let error_response = inner.handle_marker_reset().await?;
4221 let payload = &error_response[PACKET_HEADER_SIZE..];
4222 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4223 buf.skip(2)?;
4224 return self.parse_lob_error(&mut buf);
4225 }
4226
4227 // Parse the amount response (chunk size is returned as amount)
4228 self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4229 .map(|v| v as u32)
4230 }
4231
4232 /// Internal LOB read implementation
4233 async fn read_lob_internal(
4234 &self,
4235 locator: &LobLocator,
4236 offset: u64,
4237 amount: u64,
4238 ) -> Result<LobData> {
4239 let mut inner = self.inner.lock().await;
4240 let large_sdu = inner.large_sdu;
4241
4242 // Create LOB operation message for read
4243 let mut lob_msg = LobOpMessage::new_read(locator, offset, amount);
4244 let seq_num = inner.next_sequence_number();
4245 lob_msg.set_sequence_number(seq_num);
4246
4247 // Build and send the request
4248 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4249 inner.send(&request).await?;
4250
4251 // Receive and parse response (may span multiple packets for large LOBs)
4252 let response = inner.receive_response().await?;
4253 if response.len() <= PACKET_HEADER_SIZE {
4254 return Err(Error::Protocol("Empty LOB read response".to_string()));
4255 }
4256
4257 // Check for MARKER packet (indicates error)
4258 let packet_type = response[4];
4259 if packet_type == PacketType::Marker as u8 {
4260 let error_response = inner.handle_marker_reset().await?;
4261 let payload = &error_response[PACKET_HEADER_SIZE..];
4262 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4263 // Skip data flags
4264 buf.skip(2)?;
4265 return self.parse_lob_error(&mut buf);
4266 }
4267
4268 // Parse LOB data response
4269 let payload = &response[PACKET_HEADER_SIZE..];
4270 self.parse_lob_read_response(payload, locator)
4271 }
4272
4273 /// Parse LOB read response
4274 fn parse_lob_read_response(&self, payload: &[u8], locator: &LobLocator) -> Result<LobData> {
4275 use crate::buffer::ReadBuffer;
4276
4277 let mut buf = ReadBuffer::from_slice(payload);
4278
4279 // Skip data flags
4280 buf.skip(2)?;
4281
4282 let mut lob_data: Option<Vec<u8>> = None;
4283
4284 // Process messages until end of response
4285 while buf.remaining() > 0 {
4286 let msg_type = buf.read_u8()?;
4287
4288 match msg_type {
4289 // LobData message (14)
4290 x if x == MessageType::LobData as u8 => {
4291 // Read LOB data with length
4292 let data = buf.read_raw_bytes_chunked()?;
4293 lob_data = Some(data);
4294 }
4295
4296 // Parameter return (8) - contains updated locator and amount
4297 x if x == MessageType::Parameter as u8 => {
4298 // Skip the updated locator (same length as original)
4299 let locator_len = locator.locator_bytes().len();
4300 buf.skip(locator_len)?;
4301
4302 // Read back the amount (ub8)
4303 let _returned_amount = buf.read_ub8()?;
4304 }
4305
4306 // Error/Status message (4) - code 0 means success
4307 x if x == MessageType::Error as u8 => {
4308 // Parse error info - code 0 means success
4309 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4310 if code != 0 {
4311 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4312 return Err(Error::OracleError { code, message });
4313 }
4314 // code 0 = success, continue processing
4315 }
4316 }
4317
4318 // End of response (29)
4319 x if x == MessageType::EndOfResponse as u8 => {
4320 break;
4321 }
4322
4323 // Skip other message types
4324 _ => {
4325 // Try to skip unknown messages
4326 continue;
4327 }
4328 }
4329 }
4330
4331 // Convert to appropriate type based on LOB type
4332 match lob_data {
4333 Some(data) => {
4334 if locator.is_blob() || locator.is_bfile() {
4335 Ok(LobData::Bytes(bytes::Bytes::from(data)))
4336 } else {
4337 // CLOB - decode based on encoding
4338 let text = if locator.uses_var_length_charset() {
4339 // UTF-16 BE encoding
4340 let chars: Vec<u16> = data
4341 .chunks_exact(2)
4342 .map(|c| u16::from_be_bytes([c[0], c[1]]))
4343 .collect();
4344 String::from_utf16_lossy(&chars)
4345 } else {
4346 // UTF-8 encoding
4347 String::from_utf8_lossy(&data).to_string()
4348 };
4349 Ok(LobData::String(text))
4350 }
4351 }
4352 None => {
4353 // Empty LOB
4354 if locator.is_blob() || locator.is_bfile() {
4355 Ok(LobData::Bytes(bytes::Bytes::new()))
4356 } else {
4357 Ok(LobData::String(String::new()))
4358 }
4359 }
4360 }
4361 }
4362
4363 /// Write data to a LOB
4364 ///
4365 /// # Arguments
4366 /// * `locator` - The LOB locator obtained from a query result
4367 /// * `offset` - Starting position (1-based, in characters for CLOB, bytes for BLOB)
4368 /// * `data` - Data to write (bytes for BLOB, UTF-8 encoded bytes for CLOB)
4369 pub async fn write_lob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4370 self.ensure_ready().await?;
4371
4372 let mut inner = self.inner.lock().await;
4373 let large_sdu = inner.large_sdu;
4374 let sdu_size = inner.sdu_size as usize;
4375
4376 // Encode data for CLOB if necessary
4377 let encoded_data: Vec<u8>;
4378 let write_data = if locator.is_clob() && locator.uses_var_length_charset() {
4379 // Convert UTF-8 to UTF-16 BE for CLOB with var length charset
4380 let text = String::from_utf8_lossy(data);
4381 encoded_data = text
4382 .encode_utf16()
4383 .flat_map(|c| c.to_be_bytes())
4384 .collect();
4385 &encoded_data[..]
4386 } else {
4387 data
4388 };
4389
4390 // Create LOB operation message for write
4391 let mut lob_msg = LobOpMessage::new_write(locator, offset, write_data);
4392 let seq_num = inner.next_sequence_number();
4393 lob_msg.set_sequence_number(seq_num);
4394
4395 // Build message content (without packet header or data flags)
4396 let message = lob_msg.build_message_only(&inner.capabilities)?;
4397
4398 // Calculate if this fits in a single packet
4399 // Single packet max payload = SDU - header (8) - data flags (2)
4400 let max_single_packet_payload = sdu_size.saturating_sub(PACKET_HEADER_SIZE + 2);
4401
4402 let is_multi_packet = message.len() > max_single_packet_payload;
4403
4404 if is_multi_packet {
4405 // Needs multiple packets - use multi-packet sender
4406 inner.send_multi_packet(&message, 0).await?;
4407 } else {
4408 // Fits in one packet - use standard send
4409 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4410 inner.send(&request).await?;
4411 }
4412
4413 // Receive and parse response
4414 // Use receive_response() to accumulate all packets until END_OF_RESPONSE.
4415 // This is necessary because Oracle may send multiple packets for the response,
4416 // and if we only read one packet, leftover data causes close() to hang.
4417 let response = inner.receive_response().await?;
4418 if response.len() <= PACKET_HEADER_SIZE {
4419 return Err(Error::Protocol("Empty LOB write response".to_string()));
4420 }
4421
4422 // Check for MARKER packet (indicates error)
4423 let packet_type = response[4];
4424 if packet_type == PacketType::Marker as u8 {
4425 let error_response = inner.handle_marker_reset().await?;
4426 let payload = &error_response[PACKET_HEADER_SIZE..];
4427 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4428 buf.skip(2)?;
4429 return self.parse_lob_error(&mut buf);
4430 }
4431
4432 // Parse response to check for errors
4433 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4434 }
4435
4436 /// Write string data to a CLOB
4437 pub async fn write_clob(&self, locator: &LobLocator, offset: u64, text: &str) -> Result<()> {
4438 if locator.is_blob() || locator.is_bfile() {
4439 return Err(Error::Protocol(
4440 "Cannot write string to BLOB/BFILE, use write_blob instead".to_string(),
4441 ));
4442 }
4443 self.write_lob(locator, offset, text.as_bytes()).await
4444 }
4445
4446 /// Write binary data to a BLOB
4447 pub async fn write_blob(&self, locator: &LobLocator, offset: u64, data: &[u8]) -> Result<()> {
4448 if locator.is_clob() {
4449 return Err(Error::Protocol(
4450 "Cannot write bytes to CLOB, use write_clob instead".to_string(),
4451 ));
4452 }
4453 self.write_lob(locator, offset, data).await
4454 }
4455
4456 /// Get the length of a LOB
4457 ///
4458 /// For CLOB: returns length in characters
4459 /// For BLOB: returns length in bytes
4460 pub async fn lob_length(&self, locator: &LobLocator) -> Result<u64> {
4461 self.ensure_ready().await?;
4462
4463 let mut inner = self.inner.lock().await;
4464 let large_sdu = inner.large_sdu;
4465
4466 // Create LOB operation message for get length
4467 let mut lob_msg = LobOpMessage::new_get_length(locator);
4468 let seq_num = inner.next_sequence_number();
4469 lob_msg.set_sequence_number(seq_num);
4470
4471 // Build and send the request
4472 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4473 inner.send(&request).await?;
4474
4475 // Receive and parse response
4476 let response = inner.receive().await?;
4477 if response.len() <= PACKET_HEADER_SIZE {
4478 return Err(Error::Protocol("Empty LOB get_length response".to_string()));
4479 }
4480
4481 // Check for MARKER packet (indicates error)
4482 let packet_type = response[4];
4483 if packet_type == PacketType::Marker as u8 {
4484 let error_response = inner.handle_marker_reset().await?;
4485 let payload = &error_response[PACKET_HEADER_SIZE..];
4486 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4487 buf.skip(2)?;
4488 return self.parse_lob_error(&mut buf);
4489 }
4490
4491 // Parse response to get the length
4492 self.parse_lob_amount_response(&response[PACKET_HEADER_SIZE..], locator)
4493 }
4494
4495 /// Trim a LOB to a specified length
4496 ///
4497 /// # Arguments
4498 /// * `locator` - The LOB locator
4499 /// * `new_size` - The new size (in characters for CLOB, bytes for BLOB)
4500 pub async fn lob_trim(&self, locator: &LobLocator, new_size: u64) -> Result<()> {
4501 self.ensure_ready().await?;
4502
4503 let mut inner = self.inner.lock().await;
4504 let large_sdu = inner.large_sdu;
4505
4506 // Create LOB operation message for trim
4507 let mut lob_msg = LobOpMessage::new_trim(locator, new_size);
4508 let seq_num = inner.next_sequence_number();
4509 lob_msg.set_sequence_number(seq_num);
4510
4511 // Build and send the request
4512 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4513 inner.send(&request).await?;
4514
4515 // Receive and parse response
4516 let response = inner.receive().await?;
4517 if response.len() <= PACKET_HEADER_SIZE {
4518 return Err(Error::Protocol("Empty LOB trim response".to_string()));
4519 }
4520
4521 // Check for MARKER packet (indicates error)
4522 let packet_type = response[4];
4523 if packet_type == PacketType::Marker as u8 {
4524 let error_response = inner.handle_marker_reset().await?;
4525 let payload = &error_response[PACKET_HEADER_SIZE..];
4526 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4527 buf.skip(2)?;
4528 return self.parse_lob_error(&mut buf);
4529 }
4530
4531 // Parse response to check for errors
4532 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4533 }
4534
4535 /// Create a temporary LOB on the server
4536 ///
4537 /// Creates a temporary LOB of the specified type that lives until the connection
4538 /// is closed or the LOB is explicitly freed.
4539 ///
4540 /// # Arguments
4541 /// * `oracle_type` - The LOB type to create (Clob or Blob)
4542 ///
4543 /// # Returns
4544 /// A `LobLocator` for the newly created temporary LOB
4545 ///
4546 /// # Example
4547 /// ```ignore
4548 /// use oracle_rs::OracleType;
4549 ///
4550 /// let locator = conn.create_temp_lob(OracleType::Clob).await?;
4551 /// conn.write_clob(&locator, 1, "Hello, World!").await?;
4552 /// // Now bind the locator to insert into a CLOB column
4553 /// ```
4554 pub async fn create_temp_lob(&self, oracle_type: OracleType) -> Result<LobLocator> {
4555 use crate::buffer::ReadBuffer;
4556
4557 // Validate oracle_type is a LOB type
4558 match oracle_type {
4559 OracleType::Clob | OracleType::Blob => {}
4560 _ => {
4561 return Err(Error::Protocol(format!(
4562 "create_temp_lob: invalid type {:?}, must be Clob or Blob",
4563 oracle_type
4564 )));
4565 }
4566 }
4567
4568 self.ensure_ready().await?;
4569
4570 let mut inner = self.inner.lock().await;
4571 let large_sdu = inner.large_sdu;
4572
4573 // Create the CREATE_TEMP message
4574 let mut lob_msg = LobOpMessage::new_create_temp(oracle_type);
4575 let seq_num = inner.next_sequence_number();
4576 lob_msg.set_sequence_number(seq_num);
4577
4578 // Build and send the request
4579 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4580 inner.send(&request).await?;
4581
4582 // Receive and parse response
4583 let response = inner.receive().await?;
4584 if response.len() <= PACKET_HEADER_SIZE {
4585 return Err(Error::Protocol("Empty CREATE_TEMP LOB response".to_string()));
4586 }
4587
4588 // Check for MARKER packet (indicates error)
4589 let packet_type = response[4];
4590 if packet_type == PacketType::Marker as u8 {
4591 let error_response = inner.handle_marker_reset().await?;
4592 let payload = &error_response[PACKET_HEADER_SIZE..];
4593 let mut buf = ReadBuffer::from_slice(payload);
4594 buf.skip(2)?;
4595 return self.parse_lob_error(&mut buf);
4596 }
4597
4598 // Parse response to extract the locator
4599 let payload = &response[PACKET_HEADER_SIZE..];
4600 let mut buf = ReadBuffer::from_slice(payload);
4601 buf.skip(2)?; // Skip data flags
4602
4603 let mut locator_bytes: Option<Vec<u8>> = None;
4604
4605 while buf.remaining() > 0 {
4606 let msg_type = buf.read_u8()?;
4607
4608 match msg_type {
4609 // Parameter return (8) - contains the populated locator
4610 x if x == MessageType::Parameter as u8 => {
4611 // Read the 40-byte locator (matches the 40 bytes sent in request)
4612 let loc_data = buf.read_bytes_vec(40)?;
4613 locator_bytes = Some(loc_data);
4614 // Skip charset (variable-length ub2) and trailing flags (raw u8)
4615 buf.skip_ub2()?;
4616 buf.skip(1)?;
4617 }
4618
4619 // Error/Status message (4) - code 0 means success
4620 x if x == MessageType::Error as u8 => {
4621 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4622 if code != 0 {
4623 let message = msg.unwrap_or_else(|| "CREATE_TEMP LOB error".to_string());
4624 return Err(Error::OracleError { code, message });
4625 }
4626 }
4627 }
4628
4629 // End of response (29)
4630 x if x == MessageType::EndOfResponse as u8 => {
4631 break;
4632 }
4633
4634 _ => continue,
4635 }
4636 }
4637
4638 // Create the LobLocator from the returned bytes
4639 let loc_bytes = locator_bytes.ok_or_else(|| {
4640 Error::Protocol("CREATE_TEMP LOB response did not contain locator".to_string())
4641 })?;
4642
4643 // Create LobLocator with size 0, chunk_size 0 (will be fetched if needed)
4644 let locator = LobLocator::new(
4645 bytes::Bytes::from(loc_bytes),
4646 0, // size - unknown for new temp LOB
4647 0, // chunk_size - unknown, can be fetched later
4648 oracle_type,
4649 1, // csfrm - 1 for CLOB, 0 for BLOB (but we store it on the locator type)
4650 );
4651
4652 Ok(locator)
4653 }
4654
4655 // ==================== BFILE Operations ====================
4656
4657 /// Check if a BFILE exists on the server
4658 ///
4659 /// Returns true if the file referenced by the BFILE locator exists on the server.
4660 pub async fn bfile_exists(&self, locator: &LobLocator) -> Result<bool> {
4661 self.ensure_ready().await?;
4662
4663 if !locator.is_bfile() {
4664 return Err(Error::Protocol("bfile_exists called on non-BFILE locator".to_string()));
4665 }
4666
4667 let mut inner = self.inner.lock().await;
4668 let large_sdu = inner.large_sdu;
4669
4670 let mut lob_msg = LobOpMessage::new_file_exists(locator);
4671 let seq_num = inner.next_sequence_number();
4672 lob_msg.set_sequence_number(seq_num);
4673
4674 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4675 inner.send(&request).await?;
4676
4677 let response = inner.receive().await?;
4678 if response.len() <= PACKET_HEADER_SIZE {
4679 return Err(Error::Protocol("Empty BFILE exists response".to_string()));
4680 }
4681
4682 let packet_type = response[4];
4683 if packet_type == PacketType::Marker as u8 {
4684 let error_response = inner.handle_marker_reset().await?;
4685 let payload = &error_response[PACKET_HEADER_SIZE..];
4686 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4687 buf.skip(2)?;
4688 return self.parse_lob_error(&mut buf);
4689 }
4690
4691 self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4692 }
4693
4694 /// Open a BFILE for reading
4695 ///
4696 /// The BFILE must be opened before reading. After reading, close it with bfile_close.
4697 pub async fn bfile_open(&self, locator: &LobLocator) -> Result<()> {
4698 self.ensure_ready().await?;
4699
4700 if !locator.is_bfile() {
4701 return Err(Error::Protocol("bfile_open called on non-BFILE locator".to_string()));
4702 }
4703
4704 let mut inner = self.inner.lock().await;
4705 let large_sdu = inner.large_sdu;
4706
4707 let mut lob_msg = LobOpMessage::new_file_open(locator);
4708 let seq_num = inner.next_sequence_number();
4709 lob_msg.set_sequence_number(seq_num);
4710
4711 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4712 inner.send(&request).await?;
4713
4714 let response = inner.receive().await?;
4715 if response.len() <= PACKET_HEADER_SIZE {
4716 return Err(Error::Protocol("Empty BFILE open response".to_string()));
4717 }
4718
4719 let packet_type = response[4];
4720 if packet_type == PacketType::Marker as u8 {
4721 let error_response = inner.handle_marker_reset().await?;
4722 let payload = &error_response[PACKET_HEADER_SIZE..];
4723 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4724 buf.skip(2)?;
4725 return self.parse_lob_error(&mut buf);
4726 }
4727
4728 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4729 }
4730
4731 /// Close a BFILE after reading
4732 pub async fn bfile_close(&self, locator: &LobLocator) -> Result<()> {
4733 self.ensure_ready().await?;
4734
4735 if !locator.is_bfile() {
4736 return Err(Error::Protocol("bfile_close called on non-BFILE locator".to_string()));
4737 }
4738
4739 let mut inner = self.inner.lock().await;
4740 let large_sdu = inner.large_sdu;
4741
4742 let mut lob_msg = LobOpMessage::new_file_close(locator);
4743 let seq_num = inner.next_sequence_number();
4744 lob_msg.set_sequence_number(seq_num);
4745
4746 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4747 inner.send(&request).await?;
4748
4749 let response = inner.receive().await?;
4750 if response.len() <= PACKET_HEADER_SIZE {
4751 return Err(Error::Protocol("Empty BFILE close response".to_string()));
4752 }
4753
4754 let packet_type = response[4];
4755 if packet_type == PacketType::Marker as u8 {
4756 let error_response = inner.handle_marker_reset().await?;
4757 let payload = &error_response[PACKET_HEADER_SIZE..];
4758 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4759 buf.skip(2)?;
4760 return self.parse_lob_error(&mut buf);
4761 }
4762
4763 self.parse_lob_simple_response(&response[PACKET_HEADER_SIZE..], locator)
4764 }
4765
4766 /// Check if a BFILE is currently open
4767 pub async fn bfile_is_open(&self, locator: &LobLocator) -> Result<bool> {
4768 self.ensure_ready().await?;
4769
4770 if !locator.is_bfile() {
4771 return Err(Error::Protocol("bfile_is_open called on non-BFILE locator".to_string()));
4772 }
4773
4774 let mut inner = self.inner.lock().await;
4775 let large_sdu = inner.large_sdu;
4776
4777 let mut lob_msg = LobOpMessage::new_file_is_open(locator);
4778 let seq_num = inner.next_sequence_number();
4779 lob_msg.set_sequence_number(seq_num);
4780
4781 let request = lob_msg.build_request(&inner.capabilities, large_sdu)?;
4782 inner.send(&request).await?;
4783
4784 let response = inner.receive().await?;
4785 if response.len() <= PACKET_HEADER_SIZE {
4786 return Err(Error::Protocol("Empty BFILE is_open response".to_string()));
4787 }
4788
4789 let packet_type = response[4];
4790 if packet_type == PacketType::Marker as u8 {
4791 let error_response = inner.handle_marker_reset().await?;
4792 let payload = &error_response[PACKET_HEADER_SIZE..];
4793 let mut buf = crate::buffer::ReadBuffer::from_slice(payload);
4794 buf.skip(2)?;
4795 return self.parse_lob_error(&mut buf);
4796 }
4797
4798 self.parse_lob_bool_response(&response[PACKET_HEADER_SIZE..], locator)
4799 }
4800
4801 /// Read BFILE data
4802 ///
4803 /// This is a convenience method that opens the BFILE if needed, reads all content,
4804 /// and returns it as bytes. For large BFILEs, consider using read_lob_chunked.
4805 pub async fn read_bfile(&self, locator: &LobLocator) -> Result<bytes::Bytes> {
4806 if !locator.is_bfile() {
4807 return Err(Error::Protocol("read_bfile called on non-BFILE locator".to_string()));
4808 }
4809
4810 // Check if file is open, open if needed
4811 let should_close = if !self.bfile_is_open(locator).await? {
4812 self.bfile_open(locator).await?;
4813 true
4814 } else {
4815 false
4816 };
4817
4818 // Read all data
4819 let result = self.read_blob(locator).await;
4820
4821 // Close if we opened it
4822 if should_close {
4823 let _ = self.bfile_close(locator).await;
4824 }
4825
4826 result
4827 }
4828
4829 /// Parse a LOB operation response that returns a boolean (file_exists, is_open)
4830 fn parse_lob_bool_response(&self, payload: &[u8], locator: &LobLocator) -> Result<bool> {
4831 use crate::buffer::ReadBuffer;
4832
4833 let mut buf = ReadBuffer::from_slice(payload);
4834 buf.skip(2)?; // Skip data flags
4835
4836 let mut bool_result: bool = false;
4837
4838 while buf.remaining() > 0 {
4839 let msg_type = buf.read_u8()?;
4840
4841 match msg_type {
4842 // Parameter return (8) - contains updated locator and bool flag
4843 x if x == MessageType::Parameter as u8 => {
4844 let locator_len = locator.locator_bytes().len();
4845 buf.skip(locator_len)?;
4846 // Boolean flag is a single byte, > 0 means true
4847 let flag = buf.read_u8()?;
4848 bool_result = flag > 0;
4849 }
4850
4851 // Error/Status message (4) - code 0 means success
4852 x if x == MessageType::Error as u8 => {
4853 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4854 if code != 0 {
4855 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4856 return Err(Error::OracleError { code, message });
4857 }
4858 }
4859 }
4860
4861 // End of response (29)
4862 x if x == MessageType::EndOfResponse as u8 => {
4863 break;
4864 }
4865
4866 _ => continue,
4867 }
4868 }
4869
4870 Ok(bool_result)
4871 }
4872
4873 /// Parse a simple LOB operation response (write, trim)
4874 fn parse_lob_simple_response(&self, payload: &[u8], locator: &LobLocator) -> Result<()> {
4875 use crate::buffer::ReadBuffer;
4876
4877 let mut buf = ReadBuffer::from_slice(payload);
4878 buf.skip(2)?; // Skip data flags
4879
4880 while buf.remaining() > 0 {
4881 let msg_type = buf.read_u8()?;
4882
4883 match msg_type {
4884 // Parameter return (8) - contains updated locator and possibly amount
4885 x if x == MessageType::Parameter as u8 => {
4886 let locator_len = locator.locator_bytes().len();
4887 buf.skip(locator_len)?;
4888 // After locator, there may be amount (ub8) if send_amount was true
4889 // We just skip any remaining bytes until we hit Error or EndOfResponse
4890 }
4891
4892 // Error/Status message (4) - code 0 means success
4893 x if x == MessageType::Error as u8 => {
4894 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4895 if code != 0 {
4896 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4897 return Err(Error::OracleError { code, message });
4898 }
4899 }
4900 }
4901
4902 // End of response (29)
4903 x if x == MessageType::EndOfResponse as u8 => {
4904 break;
4905 }
4906
4907 _ => continue,
4908 }
4909 }
4910
4911 Ok(())
4912 }
4913
4914 /// Parse a LOB operation response that returns an amount (get_length, get_chunk_size)
4915 fn parse_lob_amount_response(&self, payload: &[u8], locator: &LobLocator) -> Result<u64> {
4916 use crate::buffer::ReadBuffer;
4917
4918 let mut buf = ReadBuffer::from_slice(payload);
4919 buf.skip(2)?; // Skip data flags
4920
4921 let mut returned_amount: u64 = 0;
4922
4923 while buf.remaining() > 0 {
4924 let msg_type = buf.read_u8()?;
4925
4926 match msg_type {
4927 // Parameter return (8) - contains updated locator and amount
4928 x if x == MessageType::Parameter as u8 => {
4929 let locator_len = locator.locator_bytes().len();
4930 buf.skip(locator_len)?;
4931 returned_amount = buf.read_ub8()?;
4932 }
4933
4934 // Error/Status message (4) - code 0 means success
4935 x if x == MessageType::Error as u8 => {
4936 if let Ok((code, msg, _)) = self.parse_error_info(&mut buf) {
4937 if code != 0 {
4938 let message = msg.unwrap_or_else(|| "LOB error".to_string());
4939 return Err(Error::OracleError { code, message });
4940 }
4941 }
4942 }
4943
4944 // End of response (29)
4945 x if x == MessageType::EndOfResponse as u8 => {
4946 break;
4947 }
4948
4949 _ => continue,
4950 }
4951 }
4952
4953 Ok(returned_amount)
4954 }
4955
4956 /// Parse LOB error response
4957 fn parse_lob_error<T>(&self, buf: &mut crate::buffer::ReadBuffer) -> Result<T> {
4958 // Try to extract error info
4959 if let Ok((code, msg, _)) = self.parse_error_info(buf) {
4960 let message = msg.unwrap_or_else(|| "Unknown LOB error".to_string());
4961 Err(Error::OracleError { code, message })
4962 } else {
4963 Err(Error::Protocol("LOB operation failed".to_string()))
4964 }
4965 }
4966
4967 /// Close the connection.
4968 ///
4969 /// Sends a logoff message to the server and closes the underlying TCP
4970 /// connection. After calling close, the connection cannot be reused.
4971 ///
4972 /// If the connection is already closed, this method returns `Ok(())`
4973 /// without doing anything.
4974 ///
4975 /// # Note
4976 ///
4977 /// Any uncommitted transaction is rolled back by the server when the
4978 /// connection is closed.
4979 ///
4980 /// # Example
4981 ///
4982 /// ```rust,no_run
4983 /// # use oracle_rs::{Config, Connection};
4984 /// # async fn example() -> oracle_rs::Result<()> {
4985 /// let config = Config::new("localhost", 1521, "FREEPDB1", "user", "password");
4986 /// let conn = Connection::connect_with_config(config).await?;
4987 ///
4988 /// // Do work...
4989 /// conn.commit().await?;
4990 ///
4991 /// // Explicitly close when done
4992 /// conn.close().await?;
4993 /// # Ok(())
4994 /// # }
4995 /// ```
4996 pub async fn close(&self) -> Result<()> {
4997 if self.closed.swap(true, Ordering::Relaxed) {
4998 // Already closed
4999 return Ok(());
5000 }
5001
5002 let mut inner = self.inner.lock().await;
5003
5004 if inner.state == ConnectionState::Ready {
5005 // Send logoff
5006 let _ = self.send_simple_function_inner(&mut inner, FunctionCode::Logoff).await;
5007 }
5008
5009 inner.state = ConnectionState::Closed;
5010
5011 // Close the TCP stream
5012 if let Some(stream) = inner.stream.take() {
5013 drop(stream);
5014 }
5015
5016 Ok(())
5017 }
5018
5019 /// Send a marker packet to the server
5020 /// marker_type: 1=BREAK, 2=RESET, 3=INTERRUPT
5021 async fn send_marker(&self, inner: &mut ConnectionInner, marker_type: u8) -> Result<()> {
5022 let mut packet_buf = WriteBuffer::new();
5023
5024 // Build MARKER packet header
5025 let packet_len = PACKET_HEADER_SIZE + 3; // Header + 3 bytes payload
5026 packet_buf.write_u16_be(packet_len as u16)?;
5027 packet_buf.write_u16_be(0)?; // Checksum
5028 packet_buf.write_u8(PacketType::Marker as u8)?;
5029 packet_buf.write_u8(0)?; // Flags
5030 packet_buf.write_u16_be(0)?; // Header checksum
5031
5032 // Marker payload: [1, 0, marker_type] per Python's _send_marker
5033 packet_buf.write_u8(1)?;
5034 packet_buf.write_u8(0)?;
5035 packet_buf.write_u8(marker_type)?;
5036
5037 inner.send(&packet_buf.freeze()).await
5038 }
5039
5040 async fn send_simple_function_inner(
5041 &self,
5042 inner: &mut ConnectionInner,
5043 function_code: FunctionCode,
5044 ) -> Result<()> {
5045 // Build function message
5046 let mut buf = WriteBuffer::new();
5047
5048 // Get next sequence number (must be done before sending)
5049 let seq_num = inner.next_sequence_number();
5050
5051 // Data flags
5052 buf.write_u16_be(0)?;
5053 // Message type: Function
5054 buf.write_u8(MessageType::Function as u8)?;
5055 // Function code
5056 buf.write_u8(function_code as u8)?;
5057 // Sequence number (tracked per connection)
5058 buf.write_u8(seq_num)?;
5059
5060 // Token number (required for TTC field version >= 18, i.e. Oracle 23ai)
5061 if inner.capabilities.ttc_field_version >= 18 {
5062 buf.write_ub8(0)?;
5063 }
5064
5065 // Build DATA packet
5066 let data_payload = buf.freeze();
5067 let mut packet_buf = WriteBuffer::new();
5068 let packet_len = PACKET_HEADER_SIZE + data_payload.len();
5069 packet_buf.write_u16_be(packet_len as u16)?;
5070 packet_buf.write_u16_be(0)?; // Checksum
5071 packet_buf.write_u8(PacketType::Data as u8)?;
5072 packet_buf.write_u8(0)?; // Flags
5073 packet_buf.write_u16_be(0)?; // Header checksum
5074 packet_buf.write_bytes(&data_payload)?;
5075
5076 let packet_bytes = packet_buf.freeze();
5077 inner.send(&packet_bytes).await?;
5078
5079 // Wait for response
5080 let response = inner.receive().await?;
5081
5082 // Check response
5083 if response.len() <= 4 {
5084 return Err(Error::Protocol("Response too short".to_string()));
5085 }
5086
5087 let packet_type = response[4];
5088
5089 // MARKER packet (type 12) - need to handle reset protocol
5090 if packet_type == PacketType::Marker as u8 {
5091 // Check marker type
5092 if response.len() >= PACKET_HEADER_SIZE + 3 {
5093 let marker_type = response[PACKET_HEADER_SIZE + 2];
5094
5095 // For BREAK marker (1), we need to do the reset protocol
5096 if marker_type == 1 {
5097 // For Logoff, Oracle may send BREAK to indicate "connection closing"
5098 // Don't try to do the full reset handshake - just return success
5099 if function_code == FunctionCode::Logoff {
5100 inner.state = ConnectionState::Closed;
5101 return Ok(());
5102 }
5103
5104 // The BREAK marker means the server is interrupting/breaking the current operation
5105 // We MUST complete the reset handshake or the connection will be in a bad state
5106
5107 // Send RESET marker to server
5108 if let Err(e) = self.send_marker(inner, 2).await {
5109 inner.state = ConnectionState::Closed;
5110 return Err(e);
5111 }
5112
5113 // Read and discard packets until we get RESET marker
5114 // This follows Python's _reset() logic
5115 let mut current_packet_type: u8;
5116 loop {
5117 match inner.receive().await {
5118 Ok(pkt) => {
5119 if pkt.len() < PACKET_HEADER_SIZE + 1 {
5120 break;
5121 }
5122 current_packet_type = pkt[4];
5123
5124 if current_packet_type == PacketType::Marker as u8 {
5125 if pkt.len() >= PACKET_HEADER_SIZE + 3 {
5126 let mk_type = pkt[PACKET_HEADER_SIZE + 2];
5127 if mk_type == 2 {
5128 // Got RESET marker, exit this loop
5129 break;
5130 }
5131 }
5132 } else {
5133 // Non-marker packet - unexpected during reset wait
5134 break;
5135 }
5136 }
5137 Err(e) => {
5138 inner.state = ConnectionState::Closed;
5139 return Err(e);
5140 }
5141 }
5142 }
5143
5144 // After RESET, continue reading while we still get MARKER packets
5145 // Some servers send multiple RESET markers, others send DATA response
5146 // Python comment: "some quit immediately" - meaning some servers close
5147 // the connection right after the reset handshake
5148 loop {
5149 match inner.receive().await {
5150 Ok(pkt) => {
5151 if pkt.len() < PACKET_HEADER_SIZE + 1 {
5152 break;
5153 }
5154 current_packet_type = pkt[4];
5155
5156 if current_packet_type == PacketType::Marker as u8 {
5157 // Another marker, continue reading
5158 continue;
5159 }
5160
5161 // Got a non-marker packet (probably DATA with error/status)
5162 if current_packet_type == PacketType::Data as u8 {
5163 if pkt.len() > PACKET_HEADER_SIZE + 2 {
5164 let msg_type = pkt[PACKET_HEADER_SIZE + 2];
5165 if msg_type == MessageType::Error as u8 {
5166 let payload = &pkt[PACKET_HEADER_SIZE..];
5167 let mut buf = ReadBuffer::from_slice(payload);
5168 buf.skip(2)?; // data flags
5169 buf.skip(1)?; // msg_type
5170 let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5171 if error_code != 0 {
5172 return Err(Error::OracleError {
5173 code: error_code,
5174 message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5175 });
5176 }
5177 }
5178 }
5179 }
5180 // Exit after processing non-marker packet
5181 break;
5182 }
5183 Err(_) => {
5184 // Error reading - connection might be closed
5185 // Python comment says "some quit immediately" - meaning some
5186 // servers close the connection after BREAK/RESET handshake.
5187 // For commit/rollback/logoff, treat this as success since
5188 // the operation was processed before the close.
5189 if matches!(function_code,
5190 FunctionCode::Logoff |
5191 FunctionCode::Commit |
5192 FunctionCode::Rollback
5193 ) {
5194 // The operation succeeded, but the server closed the connection
5195 // Mark connection as closed for future operations
5196 inner.state = ConnectionState::Closed;
5197 return Ok(());
5198 }
5199 // For other functions, this means connection is broken
5200 inner.state = ConnectionState::Closed;
5201 // Don't return error for Ping - treat as success
5202 if function_code == FunctionCode::Ping {
5203 return Ok(());
5204 }
5205 return Ok(()); // Conservative approach - treat as success
5206 }
5207 }
5208 }
5209
5210 return Ok(());
5211 }
5212 }
5213 // For non-BREAK markers, just return success
5214 return Ok(());
5215 }
5216
5217 // DATA packet (type 6)
5218 if packet_type == PacketType::Data as u8 {
5219 // Parse response to check for errors
5220 if response.len() > PACKET_HEADER_SIZE + 2 {
5221 let msg_type = response[PACKET_HEADER_SIZE + 2];
5222 if msg_type == MessageType::Error as u8 {
5223 // Parse the error info
5224 let payload = &response[PACKET_HEADER_SIZE..];
5225 let mut buf = ReadBuffer::from_slice(payload);
5226 buf.skip(2)?; // data flags
5227 buf.skip(1)?; // msg_type
5228 let (error_code, error_msg, _) = self.parse_error_info(&mut buf)?;
5229 if error_code != 0 {
5230 return Err(Error::OracleError {
5231 code: error_code,
5232 message: error_msg.unwrap_or_else(|| format!("ORA-{:05}", error_code)),
5233 });
5234 }
5235 }
5236 }
5237 return Ok(());
5238 }
5239
5240 Err(Error::Protocol(format!("Unexpected packet type {} for function call", packet_type)))
5241 }
5242
5243 /// Ensure the connection is ready for operations
5244 async fn ensure_ready(&self) -> Result<()> {
5245 if self.is_closed() {
5246 return Err(Error::ConnectionClosed);
5247 }
5248
5249 let inner = self.inner.lock().await;
5250 if inner.state != ConnectionState::Ready {
5251 return Err(Error::ConnectionNotReady);
5252 }
5253
5254 Ok(())
5255 }
5256}
5257
5258impl Drop for Connection {
5259 fn drop(&mut self) {
5260 // Note: Can't do async cleanup in Drop
5261 // Users should call close() explicitly
5262 self.closed.store(true, Ordering::Relaxed);
5263 }
5264}
5265
5266// =============================================================================
5267// Helper functions for get_type()
5268// =============================================================================
5269
5270/// Parse a type name into (schema, name) components
5271///
5272/// Handles formats like:
5273/// - "SCHEMA.TYPE_NAME" -> ("SCHEMA", "TYPE_NAME")
5274/// - "TYPE_NAME" -> (default_schema, "TYPE_NAME")
5275fn parse_type_name(type_name: &str, default_schema: &str) -> (String, String) {
5276 let parts: Vec<&str> = type_name.split('.').collect();
5277 match parts.len() {
5278 1 => (default_schema.to_uppercase(), parts[0].to_uppercase()),
5279 2 => (parts[0].to_uppercase(), parts[1].to_uppercase()),
5280 _ => {
5281 // Multiple dots - take first as schema, rest as name
5282 (parts[0].to_uppercase(), parts[1..].join(".").to_uppercase())
5283 }
5284 }
5285}
5286
5287/// Convert an Oracle type name from data dictionary to OracleType enum
5288fn oracle_type_from_name(type_name: &str) -> crate::constants::OracleType {
5289 use crate::constants::OracleType;
5290
5291 match type_name.to_uppercase().as_str() {
5292 "NUMBER" => OracleType::Number,
5293 "INTEGER" | "INT" | "SMALLINT" => OracleType::Number,
5294 "FLOAT" | "REAL" | "DOUBLE PRECISION" => OracleType::BinaryDouble,
5295 "BINARY_FLOAT" => OracleType::BinaryFloat,
5296 "BINARY_DOUBLE" => OracleType::BinaryDouble,
5297 "VARCHAR2" | "VARCHAR" | "NVARCHAR2" => OracleType::Varchar,
5298 "CHAR" | "NCHAR" => OracleType::Char,
5299 "DATE" => OracleType::Date,
5300 "TIMESTAMP" => OracleType::Timestamp,
5301 "TIMESTAMP WITH TIME ZONE" => OracleType::TimestampTz,
5302 "TIMESTAMP WITH LOCAL TIME ZONE" => OracleType::TimestampLtz,
5303 "RAW" => OracleType::Raw,
5304 "BLOB" => OracleType::Blob,
5305 "CLOB" | "NCLOB" => OracleType::Clob,
5306 "BOOLEAN" | "PL/SQL BOOLEAN" => OracleType::Boolean,
5307 "ROWID" | "UROWID" => OracleType::Rowid,
5308 "XMLTYPE" => OracleType::Varchar, // Treat XMLType as string for now
5309 _ => OracleType::Varchar, // Default to VARCHAR for unknown types
5310 }
5311}
5312
5313#[cfg(test)]
5314mod tests {
5315 use super::*;
5316 use crate::row::Value;
5317
5318 #[test]
5319 fn test_query_options_default() {
5320 let opts = QueryOptions::default();
5321 assert_eq!(opts.prefetch_rows, 100);
5322 assert_eq!(opts.array_size, 100);
5323 assert!(!opts.auto_commit);
5324 }
5325
5326 #[test]
5327 fn test_query_result_empty() {
5328 let result = QueryResult::empty();
5329 assert!(result.is_empty());
5330 assert_eq!(result.column_count(), 0);
5331 assert_eq!(result.row_count(), 0);
5332 assert!(result.first().is_none());
5333 }
5334
5335 #[test]
5336 fn test_query_result_with_rows() {
5337 let columns = vec![ColumnInfo::new("ID", crate::constants::OracleType::Number)];
5338 let rows = vec![Row::new(vec![Value::Integer(1)])];
5339
5340 let result = QueryResult {
5341 columns,
5342 rows,
5343 rows_affected: 0,
5344 has_more_rows: false,
5345 cursor_id: 1,
5346 };
5347
5348 assert!(!result.is_empty());
5349 assert_eq!(result.column_count(), 1);
5350 assert_eq!(result.row_count(), 1);
5351 assert!(result.first().is_some());
5352 assert!(result.column_by_name("ID").is_some());
5353 assert!(result.column_by_name("id").is_some()); // Case insensitive
5354 assert_eq!(result.column_index("ID"), Some(0));
5355 }
5356
5357 #[test]
5358 fn test_server_info_default() {
5359 let info = ServerInfo::default();
5360 assert!(info.version.is_empty());
5361 assert_eq!(info.session_id, 0);
5362 }
5363
5364 #[test]
5365 fn test_connection_state_transitions() {
5366 assert_eq!(ConnectionState::Disconnected, ConnectionState::Disconnected);
5367 assert_ne!(ConnectionState::Connected, ConnectionState::Ready);
5368 }
5369
5370 #[test]
5371 fn test_query_result_iterator() {
5372 let rows = vec![
5373 Row::new(vec![Value::Integer(1)]),
5374 Row::new(vec![Value::Integer(2)]),
5375 ];
5376 let result = QueryResult {
5377 columns: vec![],
5378 rows,
5379 rows_affected: 0,
5380 has_more_rows: false,
5381 cursor_id: 0,
5382 };
5383
5384 let collected: Vec<_> = result.iter().collect();
5385 assert_eq!(collected.len(), 2);
5386 }
5387
5388 #[test]
5389 fn test_query_result_into_iterator() {
5390 let rows = vec![
5391 Row::new(vec![Value::Integer(1)]),
5392 Row::new(vec![Value::Integer(2)]),
5393 ];
5394 let result = QueryResult {
5395 columns: vec![],
5396 rows,
5397 rows_affected: 0,
5398 has_more_rows: false,
5399 cursor_id: 0,
5400 };
5401
5402 let collected: Vec<Row> = result.into_iter().collect();
5403 assert_eq!(collected.len(), 2);
5404 }
5405}