Skip to main content

ferrule_sql/backends/
oracle.rs

1use crate::connection::{
2    AsyncConnection, BulkInsert, ConnectOptions, ExecutionSummary, ForeignKey, QueryResult,
3    SchemaInfo, StatementResult,
4};
5use crate::error::SqlError;
6use crate::stream::{BoxRowStream, DEFAULT_CURSOR_CAPACITY, channel_stream};
7use crate::url::DatabaseUrl;
8use crate::value::{ColumnInfo, Row, TypeHint, Value};
9use async_trait::async_trait;
10use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
11use oracle::sql_type::ToSql;
12use secrecy::ExposeSecret;
13use std::sync::Arc;
14
15#[derive(Debug)]
16pub struct OracleConnection {
17    conn: Arc<oracle::Connection>,
18}
19
20#[async_trait]
21impl AsyncConnection for OracleConnection {
22    async fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
23        let sql = sql.to_string();
24        let conn = self.conn.clone();
25        tokio::task::spawn_blocking(move || {
26            let stmt = conn
27                .execute(&sql, &[])
28                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
29            let row_count = stmt
30                .row_count()
31                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
32            Ok(ExecutionSummary {
33                rows_affected: Some(row_count),
34                command_tag: None,
35            })
36        })
37        .await
38        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
39    }
40
41    async fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
42        let sql = sql.to_string();
43        let conn = self.conn.clone();
44        tokio::task::spawn_blocking(move || {
45            let result_set = conn
46                .query(&sql, &[])
47                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
48
49            let col_info: Vec<ColumnInfo> = result_set
50                .column_info()
51                .iter()
52                .map(|c| ColumnInfo {
53                    name: c.name().to_string(),
54                    type_hint: oracle_type_to_hint(c.oracle_type()),
55                    nullable: c.nullable(),
56                })
57                .collect();
58
59            let mut rows = Vec::new();
60            for row_result in result_set {
61                let row = row_result.map_err(|e| SqlError::QueryFailed(e.to_string()))?;
62                let values: Vec<Value> = row
63                    .sql_values()
64                    .iter()
65                    .enumerate()
66                    .map(|(i, sql_val)| {
67                        oracle_to_value(sql_val, row.column_info()[i].oracle_type())
68                    })
69                    .collect();
70                rows.push(values);
71            }
72
73            Ok(QueryResult {
74                columns: col_info,
75                rows,
76            })
77        })
78        .await
79        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
80    }
81
82    /// Stream rows from an Oracle (ODPI-C) result set at bounded memory.
83    ///
84    /// The `oracle` crate is a synchronous C-FFI driver, so the
85    /// row-iteration loop runs on a `spawn_blocking` thread that pushes
86    /// each decoded row through a **bounded** `tokio::sync::mpsc` channel
87    /// (`DEFAULT_CURSOR_CAPACITY` rows). When the channel fills the
88    /// producer blocks on `blocking_send`, so peak memory is `O(cap)`,
89    /// never `O(total rows)`. Column metadata is delivered up front via a
90    /// oneshot so the caller has it before pulling any row.
91    async fn query_stream(
92        &mut self,
93        sql: &str,
94    ) -> Result<(Vec<ColumnInfo>, BoxRowStream<'_>), SqlError> {
95        let sql = sql.to_string();
96        let conn = self.conn.clone();
97        let (col_tx, col_rx) = tokio::sync::oneshot::channel::<Result<Vec<ColumnInfo>, SqlError>>();
98        let (row_tx, row_rx) =
99            tokio::sync::mpsc::channel::<Result<Row, SqlError>>(DEFAULT_CURSOR_CAPACITY);
100
101        tokio::task::spawn_blocking(move || {
102            let result_set = match conn.query(&sql, &[]) {
103                Ok(rs) => rs,
104                Err(e) => {
105                    let _ = col_tx.send(Err(SqlError::QueryFailed(e.to_string())));
106                    return;
107                }
108            };
109            let columns: Vec<ColumnInfo> = result_set
110                .column_info()
111                .iter()
112                .map(|c| ColumnInfo {
113                    name: c.name().to_string(),
114                    type_hint: oracle_type_to_hint(c.oracle_type()),
115                    nullable: c.nullable(),
116                })
117                .collect();
118            if col_tx.send(Ok(columns)).is_err() {
119                return;
120            }
121
122            for row_result in result_set {
123                let msg = match row_result {
124                    Ok(row) => {
125                        let values: Row = row
126                            .sql_values()
127                            .iter()
128                            .enumerate()
129                            .map(|(i, sql_val)| {
130                                oracle_to_value(sql_val, row.column_info()[i].oracle_type())
131                            })
132                            .collect();
133                        Ok(values)
134                    }
135                    Err(e) => Err(SqlError::QueryFailed(e.to_string())),
136                };
137                let is_err = msg.is_err();
138                // blocking_send back-pressures the producer thread.
139                if row_tx.blocking_send(msg).is_err() {
140                    return;
141                }
142                if is_err {
143                    return;
144                }
145            }
146        });
147
148        let columns = col_rx
149            .await
150            .map_err(|_| SqlError::QueryFailed("Oracle cursor producer dropped".to_string()))??;
151        Ok((columns, channel_stream(row_rx)))
152    }
153
154    async fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
155        let statements =
156            split_oracle_statements(sql).map_err(|e| SqlError::QueryFailed(e.to_string()))?;
157        let mut results = Vec::with_capacity(statements.len());
158        for stmt in statements {
159            let stmt = stmt.trim();
160            if stmt.is_empty() {
161                continue;
162            }
163            match self.query(stmt).await {
164                Ok(result) => results.push(StatementResult::Query(result)),
165                Err(SqlError::QueryFailed(_)) => {
166                    let summary = self.execute(stmt).await?;
167                    results.push(StatementResult::Summary(summary));
168                }
169                Err(e) => return Err(e),
170            }
171        }
172        Ok(results)
173    }
174
175    async fn ping(&mut self) -> Result<(), SqlError> {
176        let conn = self.conn.clone();
177        tokio::task::spawn_blocking(move || {
178            conn.ping()
179                .map_err(|e| SqlError::ConnectionFailed(e.to_string()))
180        })
181        .await
182        .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?
183    }
184
185    async fn list_tables(&mut self, schema: Option<&str>) -> Result<Vec<String>, SqlError> {
186        let sql = match schema {
187            Some(s) => format!(
188                "SELECT table_name FROM all_tables WHERE owner = '{}' ORDER BY table_name",
189                escape_oracle_string(s)
190            ),
191            None => "SELECT table_name FROM user_tables ORDER BY table_name".to_string(),
192        };
193        let result = self.query(&sql).await?;
194        let names: Vec<String> = result
195            .rows
196            .into_iter()
197            .filter_map(|row| {
198                row.into_iter().next().and_then(|v| match v {
199                    Value::String(s) => Some(s),
200                    _ => None,
201                })
202            })
203            .collect();
204        Ok(names)
205    }
206
207    async fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
208        // `ALL_USERS` lists every owner/schema visible to the connected
209        // user (the portable choice over `DBA_USERS`, which needs elevated
210        // privileges). `USER` is the connecting schema, flagged default.
211        let sql = "SELECT username, CASE WHEN username = USER THEN 1 ELSE 0 END FROM all_users ORDER BY username";
212        let result = self.query(sql).await?;
213        let schemas: Vec<SchemaInfo> = result
214            .rows
215            .into_iter()
216            .filter_map(|row| {
217                let name = match row.first() {
218                    Some(Value::String(s)) => s.clone(),
219                    _ => return None,
220                };
221                let is_default = crate::connection::is_default_from_value(row.get(1));
222                Some(SchemaInfo { name, is_default })
223            })
224            .collect();
225        Ok(schemas)
226    }
227
228    async fn describe_table(
229        &mut self,
230        schema: Option<&str>,
231        table: &str,
232    ) -> Result<QueryResult, SqlError> {
233        let sql = match schema {
234            Some(s) => format!(
235                "SELECT column_name, data_type, nullable, data_default, data_precision, data_scale \
236                 FROM all_tab_columns \
237                 WHERE owner = UPPER('{}') AND table_name = UPPER('{}') \
238                 ORDER BY column_id",
239                escape_oracle_string(s),
240                escape_oracle_string(table),
241            ),
242            None => format!(
243                "SELECT column_name, data_type, nullable, data_default, data_precision, data_scale \
244                 FROM user_tab_columns \
245                 WHERE table_name = UPPER('{}') \
246                 ORDER BY column_id",
247                escape_oracle_string(table),
248            ),
249        };
250        self.query(&sql).await
251    }
252
253    async fn primary_key(
254        &mut self,
255        schema: Option<&str>,
256        table: &str,
257    ) -> Result<Vec<String>, SqlError> {
258        // `all_constraints.constraint_type = 'P'` is the PK; join
259        // `all_cons_columns` for key positions.
260        let sql = match schema {
261            Some(s) => format!(
262                "SELECT cc.column_name \
263                 FROM all_constraints c \
264                 JOIN all_cons_columns cc \
265                   ON cc.owner = c.owner AND cc.constraint_name = c.constraint_name \
266                 WHERE c.constraint_type = 'P' \
267                   AND c.owner = UPPER('{}') AND c.table_name = UPPER('{}') \
268                 ORDER BY cc.position",
269                escape_oracle_string(s),
270                escape_oracle_string(table),
271            ),
272            None => format!(
273                "SELECT cc.column_name \
274                 FROM user_constraints c \
275                 JOIN user_cons_columns cc ON cc.constraint_name = c.constraint_name \
276                 WHERE c.constraint_type = 'P' AND c.table_name = UPPER('{}') \
277                 ORDER BY cc.position",
278                escape_oracle_string(table),
279            ),
280        };
281        let result = self.query(&sql).await?;
282        Ok(result
283            .rows
284            .into_iter()
285            .filter_map(|row| {
286                row.into_iter().next().and_then(|v| match v {
287                    Value::String(s) => Some(s),
288                    _ => None,
289                })
290            })
291            .collect())
292    }
293
294    async fn list_foreign_keys(
295        &mut self,
296        schema: Option<&str>,
297    ) -> Result<Vec<ForeignKey>, SqlError> {
298        // FK rows live in `all_constraints` with constraint_type 'R'.
299        // The referenced PK is on the parent's constraint, joined
300        // via `r_constraint_name`. `delete_rule` is on the FK itself.
301        let sql = match schema {
302            Some(s) => format!(
303                "SELECT c.constraint_name, c.table_name AS child_table, cc.column_name AS child_col, \
304                        pc.table_name AS parent_table, pcc.column_name AS parent_col, \
305                        c.delete_rule, cc.position \
306                 FROM all_constraints c \
307                 JOIN all_cons_columns cc \
308                   ON cc.owner = c.owner AND cc.constraint_name = c.constraint_name \
309                 JOIN all_constraints pc \
310                   ON pc.owner = c.r_owner AND pc.constraint_name = c.r_constraint_name \
311                 JOIN all_cons_columns pcc \
312                   ON pcc.owner = pc.owner AND pcc.constraint_name = pc.constraint_name \
313                  AND pcc.position = cc.position \
314                 WHERE c.constraint_type = 'R' AND c.owner = UPPER('{}') \
315                 ORDER BY c.constraint_name, cc.position",
316                escape_oracle_string(s),
317            ),
318            None => "SELECT c.constraint_name, c.table_name AS child_table, cc.column_name AS child_col, \
319                        pc.table_name AS parent_table, pcc.column_name AS parent_col, \
320                        c.delete_rule, cc.position \
321                 FROM user_constraints c \
322                 JOIN user_cons_columns cc ON cc.constraint_name = c.constraint_name \
323                 JOIN user_constraints pc ON pc.constraint_name = c.r_constraint_name \
324                 JOIN user_cons_columns pcc \
325                   ON pcc.constraint_name = pc.constraint_name AND pcc.position = cc.position \
326                 WHERE c.constraint_type = 'R' \
327                 ORDER BY c.constraint_name, cc.position".to_string(),
328        };
329        let result = self.query(&sql).await?;
330        let mut map: indexmap::IndexMap<String, ForeignKey> = indexmap::IndexMap::new();
331        for row in result.rows {
332            let mut cols = row.into_iter();
333            let conname = match cols.next() {
334                Some(Value::String(s)) => s,
335                _ => continue,
336            };
337            let child_table = match cols.next() {
338                Some(Value::String(s)) => s,
339                _ => continue,
340            };
341            let child_col = match cols.next() {
342                Some(Value::String(s)) => s,
343                _ => continue,
344            };
345            let parent_table = match cols.next() {
346                Some(Value::String(s)) => s,
347                _ => continue,
348            };
349            let parent_col = match cols.next() {
350                Some(Value::String(s)) => s,
351                _ => continue,
352            };
353            let on_delete = match cols.next() {
354                // Oracle reports "NO ACTION" as a literal absence;
355                // surface it as None for parity with the other backends.
356                Some(Value::String(s)) if !s.is_empty() && s != "NO ACTION" => Some(s),
357                _ => None,
358            };
359            let entry = map.entry(conname).or_insert_with(|| ForeignKey {
360                child_table: child_table.clone(),
361                child_columns: Vec::new(),
362                parent_table: parent_table.clone(),
363                parent_columns: Vec::new(),
364                on_delete,
365            });
366            entry.child_columns.push(child_col);
367            entry.parent_columns.push(parent_col);
368        }
369        Ok(map.into_values().collect())
370    }
371
372    async fn bulk_insert_rows(&mut self, target: BulkInsert<'_>) -> Result<usize, SqlError> {
373        if target.rows.is_empty() {
374            return Ok(0);
375        }
376        let row_count = target.rows.len();
377
378        // Build the parameterized INSERT once: `:1, :2, ...`. Bind
379        // by position, not by name — matches our positional `Row`
380        // shape and avoids per-column name negotiation.
381        let qtable = crate::copy::quote_identifier(target.table, crate::backend::Backend::Oracle);
382        let cols = target
383            .columns
384            .iter()
385            .map(|c| crate::copy::quote_identifier(&c.name, crate::backend::Backend::Oracle))
386            .collect::<Vec<_>>()
387            .join(", ");
388        let placeholders = (1..=target.columns.len())
389            .map(|i| format!(":{i}"))
390            .collect::<Vec<_>>()
391            .join(", ");
392        let sql = format!("INSERT INTO {qtable} ({cols}) VALUES ({placeholders})");
393
394        // Translate every row into owned bindings up front. Keeping
395        // ownership outside the spawn_blocking closure keeps the
396        // closure cheap and the error-mapping legible.
397        let hints: Vec<TypeHint> = target.columns.iter().map(|c| c.type_hint).collect();
398        let mut owned_rows: Vec<Vec<OwnedBind>> = Vec::with_capacity(row_count);
399        for row in target.rows {
400            let mut bound = Vec::with_capacity(row.len());
401            for (idx, v) in row.iter().enumerate() {
402                let hint = hints.get(idx).copied().unwrap_or(TypeHint::Other);
403                bound.push(value_to_oracle_bind(v, hint)?);
404            }
405            owned_rows.push(bound);
406        }
407
408        // ODPI-C / `oracle::Batch` is sync — push the entire batch
409        // build + append + execute loop into spawn_blocking. The
410        // Arc clone goes in; the Batch borrows the Connection only
411        // for the closure's lifetime.
412        let conn_arc = self.conn.clone();
413        tokio::task::spawn_blocking(move || {
414            let mut batch = conn_arc
415                .batch(&sql, row_count)
416                .with_row_counts()
417                .build()
418                .map_err(map_oracle_bulk_error)?;
419            for row in &owned_rows {
420                let binds: Vec<&dyn ToSql> = row.iter().map(|b| b.as_to_sql()).collect();
421                batch.append_row(&binds).map_err(map_oracle_bulk_error)?;
422            }
423            // Always flush at the end. `append_row` auto-executes
424            // when `batch_index == batch_size`, but a leading
425            // execute() is a no-op on an empty batch, so this is
426            // idempotent and safer than relying on auto-execute.
427            batch.execute().map_err(map_oracle_bulk_error)?;
428            Ok::<usize, SqlError>(row_count)
429        })
430        .await
431        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
432    }
433}
434
435/// Classify an `oracle::Error` raised by the `Batch` path.
436///
437/// `BulkUnavailable` is only used when the generic
438/// `INSERT ALL ... SELECT 1 FROM DUAL` path could *actually* succeed
439/// where bulk failed — i.e., when the failure is specific to the
440/// array-DML protocol shape and not to schema/privilege concerns the
441/// generic path would also hit. Today no such Oracle-specific
442/// recoverable error class is known, so every failure surfaces as
443/// `QueryFailed` immediately. (Notably, ORA-01031 "insufficient
444/// privilege" and ORA-00942 "table or view does not exist" were
445/// previously flagged as `BulkUnavailable`, but the generic path
446/// fails identically on both — the suggested fallback was misleading.)
447///
448/// ORA-26026 / ORA-12838 from direct-path inserts are reserved for
449/// the future `--oracle-direct-path` opt-in (#37), where falling
450/// back to safe array-DML is the right behaviour.
451fn map_oracle_bulk_error(e: oracle::Error) -> SqlError {
452    let msg = e.to_string();
453    // dpi_code is typed `Option<i32>`. ODPI-C 1047 is the
454    // Instant-Client-not-loaded structural error (see the connect
455    // path at `map_oracle_error`). In practice this can't fire here
456    // because we'd have failed at connect time, but defend in depth.
457    if let Some(code) = e.dpi_code()
458        && (code == 1047 || msg.contains("libclntsh"))
459    {
460        return SqlError::ConnectionFailed(format!("Oracle Instant Client not loaded: {msg}"));
461    }
462    SqlError::QueryFailed(format!("Oracle bulk: {msg}"))
463}
464
465/// Owned, typed binding for a single column slot in one row of an
466/// `oracle::Batch`. Each variant holds an `Option<T>` because
467/// `Option<T>: ToSql` is how the oracle crate represents typed NULL
468/// — the destination column's [`TypeHint`] selects the variant for
469/// `Value::Null` so the bind metadata stays stable across rows.
470enum OwnedBind {
471    /// `i64` — used for `Value::Int64` and `Value::Bool` (ferrule's
472    /// Oracle DDL maps Bool → `NUMBER(1)`, so 0/1 round-trips
473    /// cleanly through an `i64` bind).
474    I64(Option<i64>),
475    /// `f64` — used for `Value::Float64`. NaN/Inf pass through
476    /// because ferrule's Oracle DDL maps `Float64 → BINARY_DOUBLE`,
477    /// which natively accepts both values via the IEEE-754 bit
478    /// pattern. (Note: this diverges from the MySQL bulk path, which
479    /// substitutes NULL because MySQL `DOUBLE` literals have no
480    /// NaN/Inf representation in LOAD DATA.)
481    F64(Option<f64>),
482    /// Text — used for `Value::String`, `Value::Decimal` (Oracle
483    /// auto-coerces string → NUMBER), `Value::Json`, `Value::Array`,
484    /// and `Value::Time` (rendered as `HH:MM:SS` for CLOB or
485    /// auto-coerce to TIMESTAMP).
486    Text(Option<String>),
487    /// Raw bytes — used for `Value::Bytes` (BLOB) and `Value::Uuid`
488    /// (16-byte RAW after hex parsing).
489    Bytes(Option<Vec<u8>>),
490    /// `NaiveDate` — used for `Value::Date` (DATE column).
491    Date(Option<NaiveDate>),
492    /// `NaiveDateTime` — used for `Value::DateTime` and `Value::Time`
493    /// (the latter paired with an epoch date for TIMESTAMP).
494    DateTime(Option<NaiveDateTime>),
495    /// `DateTime<Utc>` — used for `Value::DateTimeTz` (TIMESTAMP WITH TIME ZONE).
496    DateTimeTz(Option<DateTime<Utc>>),
497}
498
499impl OwnedBind {
500    fn as_to_sql(&self) -> &dyn ToSql {
501        match self {
502            Self::I64(v) => v,
503            Self::F64(v) => v,
504            Self::Text(v) => v,
505            Self::Bytes(v) => v,
506            Self::Date(v) => v,
507            Self::DateTime(v) => v,
508            Self::DateTimeTz(v) => v,
509        }
510    }
511}
512
513/// Translate one `Value` (paired with the destination column's
514/// [`TypeHint`]) into an [`OwnedBind`] ready to feed an
515/// `oracle::Batch`. The hint matters for [`Value::Null`] (it picks
516/// the typed-NULL variant so per-column bind metadata stays stable
517/// across rows) and for [`Value::Time`] (Oracle has no TIME type;
518/// we pair the time with an epoch date for TIMESTAMP coercion).
519fn value_to_oracle_bind(v: &Value, hint: TypeHint) -> Result<OwnedBind, SqlError> {
520    Ok(match v {
521        Value::Null => null_bind_for_hint(hint),
522        Value::Bool(b) => OwnedBind::I64(Some(if *b { 1 } else { 0 })),
523        Value::Int64(n) => OwnedBind::I64(Some(*n)),
524        Value::Float64(f) => {
525            // Oracle BINARY_DOUBLE accepts NaN/Inf but NUMBER does
526            // not, and our DDL maps Float64 → BINARY_DOUBLE — so
527            // the bind succeeds either way. Pass through.
528            OwnedBind::F64(Some(*f))
529        }
530        // Decimal: rely on Oracle's implicit string → NUMBER
531        // conversion. Matches the generic INSERT path's behaviour
532        // (`Value::Decimal(s)` is inlined as the bare numeric
533        // string).
534        Value::Decimal(s) => OwnedBind::Text(Some(s.clone())),
535        Value::String(s) => OwnedBind::Text(Some(s.clone())),
536        Value::Bytes(b) => OwnedBind::Bytes(Some(b.clone())),
537        Value::Date(d) => OwnedBind::Date(Some(*d)),
538        Value::Time(t) => {
539            // Oracle has no TIME-only type; our DDL maps TypeHint::Time
540            // → TIMESTAMP. Pair the time with the Unix epoch date so
541            // the TIMESTAMP receives the time-of-day portion intact.
542            // Same pattern as MySQL: ferrule already returns Time only
543            // for backends with a true TIME type, so the date is a
544            // sentinel the user wouldn't otherwise see.
545            let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
546            OwnedBind::DateTime(Some(NaiveDateTime::new(epoch, *t)))
547        }
548        Value::DateTime(dt) => OwnedBind::DateTime(Some(*dt)),
549        Value::DateTimeTz(dt) => OwnedBind::DateTimeTz(Some(*dt)),
550        Value::Json(j) => {
551            let rendered = serde_json::to_string(j)
552                .map_err(|e| SqlError::QueryFailed(format!("Oracle bulk: JSON serialize: {e}")))?;
553            OwnedBind::Text(Some(rendered))
554        }
555        Value::Uuid(s) => {
556            // Oracle DDL maps Uuid → RAW(16). Parse the canonical
557            // hex form into 16 bytes; reject malformed input as a
558            // hard error (QueryFailed) — same outcome as the
559            // generic INSERT path would have hit.
560            let parsed = parse_uuid_hex(s)
561                .map_err(|e| SqlError::QueryFailed(format!("Oracle bulk: UUID {s:?}: {e}")))?;
562            OwnedBind::Bytes(Some(parsed))
563        }
564        Value::Array(a) => {
565            // Oracle DDL maps Array → CLOB; serialize compact JSON.
566            let rendered = serde_json::to_string(a)
567                .map_err(|e| SqlError::QueryFailed(format!("Oracle bulk: array serialize: {e}")))?;
568            OwnedBind::Text(Some(rendered))
569        }
570    })
571}
572
573/// Typed `None` for a given column hint. Selecting the right
574/// variant ensures every row in the batch binds the same Oracle
575/// type for a given column, even if some rows have `Value::Null`
576/// at that position. Mismatched bind types would surface as
577/// ORA-01036 ("illegal variable name/number") or similar.
578fn null_bind_for_hint(hint: TypeHint) -> OwnedBind {
579    match hint {
580        TypeHint::Bool | TypeHint::Int64 => OwnedBind::I64(None),
581        TypeHint::Float64 => OwnedBind::F64(None),
582        TypeHint::Bytes | TypeHint::Uuid => OwnedBind::Bytes(None),
583        TypeHint::Date => OwnedBind::Date(None),
584        TypeHint::Time | TypeHint::DateTime => OwnedBind::DateTime(None),
585        TypeHint::DateTimeTz => OwnedBind::DateTimeTz(None),
586        // Decimal / String / Json / Array / Other / Null all bind
587        // as text — the DDL translator maps these to NUMBER (for
588        // Decimal) or CLOB / NVARCHAR (for the rest), and Oracle
589        // implicitly converts a NVARCHAR2 bind into either.
590        _ => OwnedBind::Text(None),
591    }
592}
593
594/// Parse a UUID hex string into 16 raw bytes for binding into an
595/// Oracle `RAW(16)` column. Accepts the canonical 8-4-4-4-12 dashed
596/// form, the 32-character undashed form, an optional `urn:uuid:`
597/// prefix (RFC 4122 §3, case-insensitive per the RFC), and Microsoft /
598/// .NET curly-brace form `{...}`. Hex digits are case-insensitive
599/// (`u8::from_str_radix` accepts a-f and A-F). Surrounding whitespace
600/// is tolerated.
601fn parse_uuid_hex(s: &str) -> Result<Vec<u8>, String> {
602    let trimmed = s.trim();
603    // RFC 4122 §3: "The prefix 'urn:uuid:' is case insensitive".
604    // Match any casing — `urn:uuid:`, `URN:UUID:`, `Urn:Uuid:`,
605    // `uRn:UuId:`, etc.
606    const URN_PREFIX: &str = "urn:uuid:";
607    let stripped_urn = if trimmed.len() >= URN_PREFIX.len()
608        && trimmed.is_char_boundary(URN_PREFIX.len())
609        && trimmed[..URN_PREFIX.len()].eq_ignore_ascii_case(URN_PREFIX)
610    {
611        // Per #42 — tolerate whitespace between the URN prefix and
612        // the hex digits. Real-world sources (JDBC `toString` on
613        // some drivers, hand-pasted examples from RFC docs) often
614        // include `urn:uuid:  550e8400-...`. trim_start matches
615        // every other production UUID parser's leniency.
616        trimmed[URN_PREFIX.len()..].trim_start()
617    } else {
618        trimmed
619    };
620    // .NET / Microsoft curly-brace form: `{550e8400-...}`. Both braces
621    // must be present; one without the other is a malformed input.
622    let stripped_braces = if let Some(inner) = stripped_urn.strip_prefix('{') {
623        inner
624            .strip_suffix('}')
625            .ok_or_else(|| "leading `{` without matching `}`".to_string())?
626    } else if stripped_urn.starts_with('}') || stripped_urn.ends_with('}') {
627        return Err("unmatched `}` in UUID".to_string());
628    } else {
629        stripped_urn
630    };
631    let stripped: String = stripped_braces.chars().filter(|c| *c != '-').collect();
632    if stripped.len() != 32 {
633        return Err(format!(
634            "expected 32 hex characters (after stripping dashes / prefix / braces), got {}",
635            stripped.len()
636        ));
637    }
638    let mut out = Vec::with_capacity(16);
639    for chunk in stripped.as_bytes().chunks(2) {
640        let pair = std::str::from_utf8(chunk).map_err(|e| e.to_string())?;
641        out.push(u8::from_str_radix(pair, 16).map_err(|e| e.to_string())?);
642    }
643    Ok(out)
644}
645
646pub(crate) async fn connect(
647    url: &DatabaseUrl,
648    opts: &ConnectOptions,
649) -> Result<OracleConnection, SqlError> {
650    let host = url.host().unwrap_or("localhost").to_string();
651    let port = url.port().unwrap_or(1521);
652    let username = url.username().to_string();
653    // A caller-resolved secret takes precedence over the URL password.
654    let password = opts
655        .effective_password(url)
656        .map(|p| p.expose_secret().to_string())
657        .unwrap_or_default();
658    let service = url.database().to_string();
659
660    let connect_string = if service.is_empty() {
661        format!("{}:{}", host, port)
662    } else {
663        format!("//{}:{}/{}", host, port, service)
664    };
665
666    tokio::task::spawn_blocking(move || {
667        let conn = oracle::Connection::connect(&username, &password, &connect_string)
668            .map_err(map_oracle_error)?;
669        Ok(OracleConnection {
670            conn: Arc::new(conn),
671        })
672    })
673    .await
674    .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?
675}
676
677/// Split a SQL string into individual statements by `;`.
678/// Ignores semicolons inside:
679///   - single-quoted strings (`''` escape)
680///   - `--` line comments and `/* */` block comments
681///   - PL/SQL blocks (`BEGIN … END`, `DECLARE … BEGIN … END`) and
682///     nested control structures (`IF … END IF`, `LOOP … END LOOP`,
683///     `CASE … END CASE` or `CASE … END`).
684fn split_oracle_statements(sql: &str) -> Result<Vec<&str>, String> {
685    let mut statements = Vec::new();
686    let mut start = 0usize;
687    let mut i = 0usize;
688    let bytes = sql.as_bytes();
689    let mut block_depth = 0usize;
690    let mut case_depth = 0usize;
691    let mut loop_depth = 0usize;
692
693    while i < bytes.len() {
694        match bytes[i] {
695            b'\'' => {
696                i += 1;
697                while i < bytes.len() {
698                    if bytes[i] == b'\'' {
699                        if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
700                            i += 2;
701                        } else {
702                            i += 1;
703                            break;
704                        }
705                    } else {
706                        i += 1;
707                    }
708                }
709            }
710            b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
711                while i < bytes.len() && bytes[i] != b'\n' {
712                    i += 1;
713                }
714            }
715            b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
716                i += 2;
717                while i + 1 < bytes.len() {
718                    if bytes[i] == b'*' && bytes[i + 1] == b'/' {
719                        i += 2;
720                        break;
721                    }
722                    i += 1;
723                }
724            }
725            b';' if block_depth == 0 && case_depth == 0 && loop_depth == 0 => {
726                let candidate = &sql[start..=i];
727                let trimmed = candidate.trim();
728                if !trimmed.is_empty() && trimmed != ";" {
729                    statements.push(trimmed);
730                }
731                i += 1;
732                start = i;
733            }
734            _ => {
735                if matches_keyword(bytes, i, "begin") || matches_keyword(bytes, i, "declare") {
736                    block_depth += 1;
737                    i += if matches_keyword(bytes, i, "begin") {
738                        5
739                    } else {
740                        7
741                    };
742                } else if matches_keyword(bytes, i, "case") {
743                    case_depth += 1;
744                    i += 4;
745                } else if matches_keyword(bytes, i, "loop") {
746                    loop_depth += 1;
747                    i += 4;
748                } else if matches_keyword(bytes, i, "end") {
749                    match end_suffix(bytes, i) {
750                        Some("case") => {
751                            case_depth = case_depth.saturating_sub(1);
752                            i += 3; // skip "END"
753                            while i < bytes.len() && bytes[i].is_ascii_whitespace() {
754                                i += 1;
755                            }
756                            i += keyword_len(bytes, i); // skip "CASE"
757                        }
758                        Some("if") | Some("loop") => {
759                            // END IF / END LOOP do not affect tracked depths
760                            i += 3; // skip "END"
761                            while i < bytes.len() && bytes[i].is_ascii_whitespace() {
762                                i += 1;
763                            }
764                            i += keyword_len(bytes, i); // skip suffix
765                        }
766                        _ => {
767                            if case_depth > 0 {
768                                case_depth -= 1;
769                            } else if loop_depth > 0 {
770                                loop_depth -= 1;
771                            } else {
772                                block_depth = block_depth.saturating_sub(1);
773                            }
774                            i += 3;
775                        }
776                    }
777                } else {
778                    i += 1;
779                }
780            }
781        }
782    }
783
784    if start < sql.len() {
785        let tail = &sql[start..];
786        if !tail.trim().is_empty() {
787            statements.push(tail.trim_end());
788        }
789    }
790
791    Ok(statements)
792}
793
794/// Case-insensitive keyword match with word-boundary guards.
795fn matches_keyword(bytes: &[u8], at: usize, keyword: &str) -> bool {
796    let klen = keyword.len();
797    if at + klen > bytes.len() {
798        return false;
799    }
800    for (i, b) in keyword.bytes().enumerate() {
801        if bytes[at + i].to_ascii_lowercase() != b {
802            return false;
803        }
804    }
805    // Preceding boundary
806    if at > 0 {
807        let prev = bytes[at - 1];
808        if prev.is_ascii_alphanumeric() || prev == b'_' {
809            return false;
810        }
811    }
812    // Following boundary
813    if at + klen < bytes.len() {
814        let next = bytes[at + klen];
815        if next.is_ascii_alphanumeric() || next == b'_' {
816            return false;
817        }
818    }
819    true
820}
821
822/// Length of the word starting at `at`.
823fn keyword_len(bytes: &[u8], at: usize) -> usize {
824    let mut j = at;
825    while j < bytes.len() && (bytes[j].is_ascii_alphanumeric() || bytes[j] == b'_') {
826        j += 1;
827    }
828    j - at
829}
830
831/// After a bare `END`, peek the next non-whitespace token.
832fn end_suffix(bytes: &[u8], end_pos: usize) -> Option<&'static str> {
833    let mut j = end_pos + 3;
834    while j < bytes.len() && bytes[j].is_ascii_whitespace() {
835        j += 1;
836    }
837    ["if", "loop", "case"]
838        .into_iter()
839        .find(|kw| matches_keyword(bytes, j, kw))
840}
841
842fn map_oracle_error(e: oracle::Error) -> SqlError {
843    let msg = e.to_string();
844    if e.dpi_code() == Some(1047) || msg.contains("libclntsh") {
845        SqlError::ConnectionFailed(format!(
846            "Oracle Instant Client not found. Install it from \
847             https://www.oracle.com/database/technologies/instant-client/downloads.html \
848             and ensure it is on your LD_LIBRARY_PATH (Linux), DYLD_LIBRARY_PATH (macOS), \
849             or PATH (Windows). Original error: {}",
850            msg
851        ))
852    } else {
853        SqlError::ConnectionFailed(msg)
854    }
855}
856
857fn oracle_type_to_hint(ora_type: &oracle::sql_type::OracleType) -> TypeHint {
858    use oracle::sql_type::OracleType;
859    match ora_type {
860        OracleType::Number(_, 0) => TypeHint::Int64,
861        OracleType::Number(_, _) | OracleType::Float(_) => TypeHint::Decimal,
862        OracleType::BinaryFloat | OracleType::BinaryDouble => TypeHint::Float64,
863        OracleType::Int64 => TypeHint::Int64,
864        OracleType::Varchar2(_)
865        | OracleType::NVarchar2(_)
866        | OracleType::Char(_)
867        | OracleType::NChar(_)
868        | OracleType::CLOB
869        | OracleType::NCLOB
870        | OracleType::Long
871        | OracleType::Rowid
872        | OracleType::Xml => TypeHint::String,
873        OracleType::BLOB | OracleType::BFILE | OracleType::Raw(_) | OracleType::LongRaw => {
874            TypeHint::Bytes
875        }
876        OracleType::Date | OracleType::Timestamp(_) => TypeHint::DateTime,
877        OracleType::TimestampTZ(_) | OracleType::TimestampLTZ(_) => TypeHint::DateTimeTz,
878        OracleType::Boolean => TypeHint::Bool,
879        OracleType::Json => TypeHint::Json,
880        _ => TypeHint::Other,
881    }
882}
883
884fn oracle_to_value(sql_val: &oracle::SqlValue, ora_type: &oracle::sql_type::OracleType) -> Value {
885    use oracle::sql_type::OracleType;
886    match ora_type {
887        OracleType::Number(_, 0) => {
888            if let Ok(Some(v)) = sql_val.get::<Option<i64>>() {
889                Value::Int64(v)
890            } else if let Ok(Some(v)) = sql_val.get::<Option<String>>() {
891                Value::Decimal(v)
892            } else {
893                Value::Null
894            }
895        }
896        OracleType::Number(_, _) | OracleType::Float(_) => {
897            if let Ok(Some(v)) = sql_val.get::<Option<String>>() {
898                Value::Decimal(v)
899            } else {
900                Value::Null
901            }
902        }
903        OracleType::BinaryFloat | OracleType::BinaryDouble => sql_val
904            .get::<Option<f64>>()
905            .unwrap_or(None)
906            .map(Value::Float64)
907            .unwrap_or(Value::Null),
908        OracleType::Int64 => sql_val
909            .get::<Option<i64>>()
910            .unwrap_or(None)
911            .map(Value::Int64)
912            .unwrap_or(Value::Null),
913        OracleType::Varchar2(_)
914        | OracleType::NVarchar2(_)
915        | OracleType::Char(_)
916        | OracleType::NChar(_)
917        | OracleType::CLOB
918        | OracleType::NCLOB
919        | OracleType::Long
920        | OracleType::Rowid
921        | OracleType::Xml => sql_val
922            .get::<Option<String>>()
923            .unwrap_or(None)
924            .map(Value::String)
925            .unwrap_or(Value::Null),
926        OracleType::BLOB | OracleType::BFILE | OracleType::Raw(_) | OracleType::LongRaw => sql_val
927            .get::<Option<Vec<u8>>>()
928            .unwrap_or(None)
929            .map(Value::Bytes)
930            .unwrap_or(Value::Null),
931        OracleType::Date | OracleType::Timestamp(_) => sql_val
932            .get::<Option<chrono::NaiveDateTime>>()
933            .unwrap_or(None)
934            .map(Value::DateTime)
935            .unwrap_or(Value::Null),
936        OracleType::TimestampTZ(_) | OracleType::TimestampLTZ(_) => sql_val
937            .get::<Option<chrono::DateTime<chrono::Utc>>>()
938            .unwrap_or(None)
939            .map(Value::DateTimeTz)
940            .unwrap_or(Value::Null),
941        OracleType::Boolean => sql_val
942            .get::<Option<bool>>()
943            .unwrap_or(None)
944            .map(Value::Bool)
945            .unwrap_or(Value::Null),
946        OracleType::Json => {
947            if let Ok(Some(s)) = sql_val.get::<Option<String>>() {
948                serde_json::from_str(&s)
949                    .map(Value::Json)
950                    .unwrap_or(Value::String(s))
951            } else {
952                Value::Null
953            }
954        }
955        _ => sql_val
956            .get::<Option<String>>()
957            .unwrap_or(None)
958            .map(Value::String)
959            .unwrap_or(Value::Null),
960    }
961}
962
963fn escape_oracle_string(s: &str) -> String {
964    s.replace("'", "''")
965}
966
967#[cfg(test)]
968mod tests {
969    use super::*;
970    use crate::url::DatabaseUrl;
971    use chrono::NaiveTime;
972
973    fn try_connect() -> Option<Box<dyn crate::Connection>> {
974        let raw = std::env::var("ORACLE_TEST_URL").ok()?;
975        let url = DatabaseUrl::parse(&raw).ok()?;
976        let conn = crate::connect(&url, &ConnectOptions::default(), None).ok()?;
977        Some(conn)
978    }
979
980    #[test]
981    fn test_oracle_connect() {
982        let Some(_conn) = try_connect() else {
983            eprintln!("ORACLE_TEST_URL not set or unreachable; skipping test_oracle_connect");
984            return;
985        };
986        println!("Oracle connection established successfully");
987    }
988
989    #[test]
990    fn test_oracle_ping() {
991        let Some(mut conn) = try_connect() else {
992            eprintln!("ORACLE_TEST_URL not set or unreachable; skipping test_oracle_ping");
993            return;
994        };
995        conn.ping().expect("ping should succeed");
996    }
997
998    #[test]
999    fn test_oracle_query() {
1000        let Some(mut conn) = try_connect() else {
1001            eprintln!("ORACLE_TEST_URL not set or unreachable; skipping test_oracle_query");
1002            return;
1003        };
1004        let result = conn
1005            .query("SELECT * FROM test_users")
1006            .expect("query should succeed");
1007        assert!(!result.columns.is_empty(), "should have columns");
1008        assert!(!result.rows.is_empty(), "should have rows");
1009    }
1010
1011    #[test]
1012    fn test_oracle_execute() {
1013        let Some(mut conn) = try_connect() else {
1014            eprintln!("ORACLE_TEST_URL not set or unreachable; skipping test_oracle_execute");
1015            return;
1016        };
1017        let summary = conn
1018            .execute("INSERT INTO test_users (name, age) VALUES ('TestUser', 99)")
1019            .expect("execute should succeed");
1020        assert!(
1021            summary.rows_affected.is_some_and(|n| n > 0),
1022            "should have affected rows"
1023        );
1024    }
1025
1026    #[test]
1027    fn test_oracle_list_tables() {
1028        let Some(mut conn) = try_connect() else {
1029            eprintln!("ORACLE_TEST_URL not set or unreachable; skipping test_oracle_list_tables");
1030            return;
1031        };
1032        let tables = conn.list_tables(None).expect("list_tables should succeed");
1033        assert!(
1034            tables.iter().any(|t| t.eq_ignore_ascii_case("test_users")),
1035            "should contain test_users (got: {:?})",
1036            tables
1037        );
1038    }
1039
1040    #[test]
1041    fn test_oracle_list_schemas() {
1042        let Some(mut conn) = try_connect() else {
1043            eprintln!("ORACLE_TEST_URL not set or unreachable; skipping test_oracle_list_schemas");
1044            return;
1045        };
1046        let schemas = conn.list_schemas().expect("list_schemas should succeed");
1047        // The connecting user (FERRULE in the seed) is its own schema and
1048        // is flagged the default. Oracle reports owners upper-cased.
1049        let default = schemas
1050            .iter()
1051            .find(|s| s.is_default)
1052            .unwrap_or_else(|| panic!("one schema should be is_default, got: {schemas:?}"));
1053        assert!(
1054            default.name.eq_ignore_ascii_case("ferrule"),
1055            "the default schema should be the connecting user, got: {default:?}"
1056        );
1057    }
1058
1059    #[test]
1060    fn test_oracle_describe_table() {
1061        let Some(mut conn) = try_connect() else {
1062            eprintln!(
1063                "ORACLE_TEST_URL not set or unreachable; skipping test_oracle_describe_table"
1064            );
1065            return;
1066        };
1067        let result = conn
1068            .describe_table(None, "test_users")
1069            .expect("describe_table should succeed");
1070        assert_eq!(result.columns.len(), 6, "should return 6 metadata columns");
1071        // Oracle column names from data dictionary are uppercase by default.
1072        // Just verify the count — exact names depend on Oracle metadata casing.
1073        assert!(!result.columns.is_empty(), "should have describe columns");
1074    }
1075
1076    #[test]
1077    fn test_oracle_type_mapping() {
1078        let Some(mut conn) = try_connect() else {
1079            eprintln!("ORACLE_TEST_URL not set or unreachable; skipping test_oracle_type_mapping");
1080            return;
1081        };
1082        let result = conn
1083            .query("SELECT name, age, score, active, meta FROM test_users WHERE name = 'Alice'")
1084            .expect("query should succeed");
1085        assert_eq!(result.rows.len(), 1);
1086        let row = &result.rows[0];
1087        assert!(matches!(row[0], Value::String(_)), "name should be String");
1088        assert!(matches!(row[1], Value::Int64(_)), "age should be Int64");
1089        assert!(
1090            matches!(row[2], Value::Float64(_) | Value::Decimal(_)),
1091            "score should be Float64 or Decimal"
1092        );
1093        assert!(
1094            matches!(row[3], Value::Int64(_) | Value::Bool(_)),
1095            "active should be Int64 or Bool"
1096        );
1097        assert!(
1098            matches!(row[4], Value::Json(_) | Value::String(_)),
1099            "meta should be Json or String"
1100        );
1101    }
1102
1103    #[test]
1104    fn test_oracle_missing_client_error() {
1105        if std::env::var("ORACLE_TEST_URL").is_ok() {
1106            eprintln!(
1107                "ORACLE_TEST_URL is set; skipping test_oracle_missing_client_error to avoid \
1108                 conflict with live environment"
1109            );
1110            return;
1111        }
1112        // If libclntsh.so is present on the system (even broken/extracted DB-home libs),
1113        // ODPI-C init may segfault instead of returning a clean error. Only attempt this
1114        // test when no Oracle client library is visible to the dynamic linker.
1115        let lib_present = std::process::Command::new("ldconfig")
1116            .args(["-p"])
1117            .output()
1118            .map(|o| String::from_utf8_lossy(&o.stdout).contains("libclntsh.so"))
1119            .unwrap_or(false);
1120        if lib_present {
1121            eprintln!(
1122                "Oracle client library (libclntsh.so) is present on this system; \
1123                 skipping test_oracle_missing_client_error because ODPI-C init may segfault \
1124                 with broken/extracted DB-home libraries."
1125            );
1126            return;
1127        }
1128        let url = DatabaseUrl::parse("oracle://user:pass@127.0.0.1:1521/XEPDB1").unwrap();
1129        let err = match crate::connect(&url, &ConnectOptions::default(), None) {
1130            Ok(_) => panic!("connect should fail when Instant Client is missing"),
1131            Err(e) => e.to_string(),
1132        };
1133        assert!(
1134            err.contains("Oracle Instant Client not found")
1135                || err.contains("DPI-1047")
1136                || err.contains("connection failed"),
1137            "error should mention missing client or connection failure: {err}"
1138        );
1139    }
1140
1141    // ── split_oracle_statements unit tests (no DB required) ─────────────────────
1142
1143    #[test]
1144    fn test_split_begin_end() {
1145        let stmts = split_oracle_statements("BEGIN NULL; END;").unwrap();
1146        assert_eq!(stmts.len(), 1);
1147        assert_eq!(stmts[0], "BEGIN NULL; END;");
1148    }
1149
1150    #[test]
1151    fn test_split_declare_begin_end() {
1152        let stmts = split_oracle_statements("DECLARE x INT; BEGIN NULL; END;").unwrap();
1153        assert_eq!(stmts.len(), 1);
1154        assert_eq!(stmts[0], "DECLARE x INT; BEGIN NULL; END;");
1155    }
1156
1157    #[test]
1158    fn test_split_nested_begin() {
1159        let stmts = split_oracle_statements("BEGIN BEGIN NULL; END; END;").unwrap();
1160        assert_eq!(stmts.len(), 1);
1161        assert_eq!(stmts[0], "BEGIN BEGIN NULL; END; END;");
1162    }
1163
1164    #[test]
1165    fn test_split_end_if_not_block_end() {
1166        let stmts = split_oracle_statements("BEGIN IF TRUE THEN NULL; END IF; END;").unwrap();
1167        assert_eq!(stmts.len(), 1);
1168        assert_eq!(stmts[0], "BEGIN IF TRUE THEN NULL; END IF; END;");
1169    }
1170
1171    #[test]
1172    fn test_split_end_loop_not_block_end() {
1173        let stmts = split_oracle_statements("BEGIN LOOP NULL; END LOOP; END;").unwrap();
1174        assert_eq!(stmts.len(), 1);
1175        assert_eq!(stmts[0], "BEGIN LOOP NULL; END LOOP; END;");
1176    }
1177
1178    #[test]
1179    fn test_split_end_case_not_block_end() {
1180        let stmts =
1181            split_oracle_statements("BEGIN CASE WHEN 1=1 THEN NULL; END CASE; END;").unwrap();
1182        assert_eq!(stmts.len(), 1);
1183        assert_eq!(stmts[0], "BEGIN CASE WHEN 1=1 THEN NULL; END CASE; END;");
1184    }
1185
1186    #[test]
1187    fn test_split_case_expr_bare_end() {
1188        let stmts = split_oracle_statements("BEGIN x := CASE WHEN 1=1 THEN 1 END; END;").unwrap();
1189        assert_eq!(stmts.len(), 1);
1190        assert_eq!(stmts[0], "BEGIN x := CASE WHEN 1=1 THEN 1 END; END;");
1191    }
1192
1193    #[test]
1194    fn test_split_case_insensitive() {
1195        let stmts = split_oracle_statements("begin null; end;").unwrap();
1196        assert_eq!(stmts.len(), 1);
1197        assert_eq!(stmts[0], "begin null; end;");
1198    }
1199
1200    #[test]
1201    fn test_split_string_ignores_keywords() {
1202        let stmts = split_oracle_statements("SELECT 'BEGIN END CASE LOOP' FROM DUAL;").unwrap();
1203        assert_eq!(stmts.len(), 1);
1204        assert_eq!(stmts[0], "SELECT 'BEGIN END CASE LOOP' FROM DUAL;");
1205    }
1206
1207    #[test]
1208    fn test_split_comment_ignores_keywords() {
1209        let stmts = split_oracle_statements("/* BEGIN END CASE */ SELECT 1;").unwrap();
1210        assert_eq!(stmts.len(), 1);
1211        assert_eq!(stmts[0], "/* BEGIN END CASE */ SELECT 1;");
1212    }
1213
1214    #[test]
1215    fn test_split_multiple_statements() {
1216        let stmts = split_oracle_statements("BEGIN NULL; END; SELECT 1;").unwrap();
1217        assert_eq!(stmts.len(), 2);
1218        assert_eq!(stmts[0], "BEGIN NULL; END;");
1219        assert_eq!(stmts[1], "SELECT 1;");
1220    }
1221
1222    #[test]
1223    fn test_split_trailing_no_semicolon() {
1224        // A semicolon is required between statements; without one the tail is
1225        // treated as a continuation of the current statement.
1226        let stmts = split_oracle_statements("BEGIN NULL; END\n SELECT 1").unwrap();
1227        assert_eq!(stmts.len(), 1);
1228        assert_eq!(stmts[0], "BEGIN NULL; END\n SELECT 1");
1229    }
1230
1231    #[test]
1232    fn test_split_empty_and_whitespace() {
1233        let stmts = split_oracle_statements("  ;  ;  BEGIN NULL; END;  ;  ").unwrap();
1234        assert_eq!(stmts.len(), 1);
1235        assert_eq!(stmts[0], "BEGIN NULL; END;");
1236    }
1237
1238    #[test]
1239    fn test_split_deeply_nested_case() {
1240        let sql = "BEGIN CASE WHEN 1=1 THEN CASE WHEN 2=2 THEN 2 END; END CASE; END;";
1241        let stmts = split_oracle_statements(sql).unwrap();
1242        assert_eq!(stmts.len(), 1);
1243        assert_eq!(stmts[0], sql);
1244    }
1245
1246    #[test]
1247    fn test_split_mixed_block_and_dml() {
1248        let stmts =
1249            split_oracle_statements("BEGIN INSERT INTO t VALUES (1); END; COMMIT;").unwrap();
1250        assert_eq!(stmts.len(), 2);
1251        assert_eq!(stmts[0], "BEGIN INSERT INTO t VALUES (1); END;");
1252        assert_eq!(stmts[1], "COMMIT;");
1253    }
1254
1255    // -------- value_to_oracle_bind + parse_uuid_hex unit tests --------
1256
1257    #[test]
1258    fn parse_uuid_with_dashes_round_trips() {
1259        let bytes = parse_uuid_hex("550e8400-e29b-41d4-a716-446655440000").expect("parse");
1260        assert_eq!(
1261            bytes,
1262            vec![
1263                0x55, 0x0e, 0x84, 0x00, 0xe2, 0x9b, 0x41, 0xd4, 0xa7, 0x16, 0x44, 0x66, 0x55, 0x44,
1264                0x00, 0x00
1265            ]
1266        );
1267    }
1268
1269    #[test]
1270    fn parse_uuid_without_dashes_works() {
1271        let bytes = parse_uuid_hex("550e8400e29b41d4a716446655440000").expect("parse");
1272        assert_eq!(bytes.len(), 16);
1273    }
1274
1275    #[test]
1276    fn parse_uuid_rejects_short_or_invalid() {
1277        assert!(parse_uuid_hex("550e8400").is_err());
1278        assert!(parse_uuid_hex("ZZZe8400-e29b-41d4-a716-446655440000").is_err());
1279    }
1280
1281    // -------- L2: parse_uuid_hex accepts more real-world forms --------
1282
1283    #[test]
1284    fn parse_uuid_accepts_urn_prefix() {
1285        // RFC 4122 §3 — UUIDs from URN-aware sources arrive prefixed.
1286        let canonical =
1287            parse_uuid_hex("urn:uuid:550e8400-e29b-41d4-a716-446655440000").expect("parse");
1288        assert_eq!(canonical.len(), 16);
1289        // RFC 4122 §3: "The prefix 'urn:uuid:' is case insensitive".
1290        // Every casing of the prefix MUST decode to the same bytes.
1291        for prefix in &[
1292            "urn:uuid:",
1293            "URN:UUID:",
1294            "Urn:Uuid:",
1295            "uRn:UuId:",
1296            "URN:uuid:",
1297            "urn:UUID:",
1298        ] {
1299            let s = format!("{prefix}550e8400-e29b-41d4-a716-446655440000");
1300            let parsed = parse_uuid_hex(&s)
1301                .unwrap_or_else(|e| panic!("prefix {prefix:?} should parse: {e}"));
1302            assert_eq!(parsed, canonical, "prefix {prefix:?} mismatch");
1303        }
1304    }
1305
1306    #[test]
1307    fn parse_uuid_accepts_curly_brace_form() {
1308        // Microsoft / .NET registry form: `{550e8400-...}`.
1309        let bytes = parse_uuid_hex("{550e8400-e29b-41d4-a716-446655440000}").expect("parse");
1310        assert_eq!(bytes.len(), 16);
1311        // Also accepts braces around the no-dash form.
1312        let bytes2 = parse_uuid_hex("{550e8400e29b41d4a716446655440000}").expect("parse");
1313        assert_eq!(bytes, bytes2);
1314    }
1315
1316    #[test]
1317    fn parse_uuid_accepts_uppercase_hex() {
1318        let bytes = parse_uuid_hex("550E8400-E29B-41D4-A716-446655440000").expect("parse");
1319        let lower = parse_uuid_hex("550e8400-e29b-41d4-a716-446655440000").expect("parse");
1320        assert_eq!(bytes, lower);
1321    }
1322
1323    #[test]
1324    fn parse_uuid_trims_surrounding_whitespace() {
1325        let bytes = parse_uuid_hex("  550e8400-e29b-41d4-a716-446655440000\t\n").expect("parse");
1326        assert_eq!(bytes.len(), 16);
1327    }
1328
1329    #[test]
1330    fn parse_uuid_tolerates_whitespace_after_urn_prefix() {
1331        // Per #42 — real-world sources (some JDBC drivers, hand-pasted
1332        // examples) emit `urn:uuid:  <hex>`. Trim post-prefix
1333        // whitespace so these decode to the same bytes as the
1334        // canonical form.
1335        let canonical =
1336            parse_uuid_hex("urn:uuid:550e8400-e29b-41d4-a716-446655440000").expect("canonical");
1337        for shape in &[
1338            "urn:uuid: 550e8400-e29b-41d4-a716-446655440000",
1339            "urn:uuid:  550e8400-e29b-41d4-a716-446655440000",
1340            "urn:uuid:\t550e8400-e29b-41d4-a716-446655440000",
1341            "urn:uuid: \t 550e8400-e29b-41d4-a716-446655440000",
1342            // Mixed-case prefix + post-prefix whitespace.
1343            "URN:UUID:  550e8400-e29b-41d4-a716-446655440000",
1344            "Urn:Uuid:\t550e8400-e29b-41d4-a716-446655440000",
1345        ] {
1346            let parsed = parse_uuid_hex(shape).unwrap_or_else(|e| panic!("shape {shape:?}: {e}"));
1347            assert_eq!(parsed, canonical, "shape {shape:?} should round-trip");
1348        }
1349    }
1350
1351    #[test]
1352    fn parse_uuid_rejects_unmatched_braces() {
1353        assert!(parse_uuid_hex("{550e8400-e29b-41d4-a716-446655440000").is_err());
1354        assert!(parse_uuid_hex("550e8400-e29b-41d4-a716-446655440000}").is_err());
1355    }
1356
1357    #[test]
1358    fn bind_int_and_bool_share_i64_variant() {
1359        assert!(matches!(
1360            value_to_oracle_bind(&Value::Int64(42), TypeHint::Int64).unwrap(),
1361            OwnedBind::I64(Some(42))
1362        ));
1363        assert!(matches!(
1364            value_to_oracle_bind(&Value::Bool(true), TypeHint::Bool).unwrap(),
1365            OwnedBind::I64(Some(1))
1366        ));
1367        assert!(matches!(
1368            value_to_oracle_bind(&Value::Bool(false), TypeHint::Bool).unwrap(),
1369            OwnedBind::I64(Some(0))
1370        ));
1371    }
1372
1373    #[test]
1374    fn bind_float_passes_through() {
1375        let b = value_to_oracle_bind(&Value::Float64(1.5), TypeHint::Float64).unwrap();
1376        assert!(matches!(b, OwnedBind::F64(Some(v)) if (v - 1.5).abs() < 1e-12));
1377    }
1378
1379    #[test]
1380    fn bind_string_decimal_json_array_all_route_through_text() {
1381        assert!(matches!(
1382            value_to_oracle_bind(&Value::String("hi".into()), TypeHint::String).unwrap(),
1383            OwnedBind::Text(Some(_))
1384        ));
1385        match value_to_oracle_bind(&Value::Decimal("99.5".into()), TypeHint::Decimal).unwrap() {
1386            OwnedBind::Text(Some(s)) => assert_eq!(s, "99.5"),
1387            other => panic!(
1388                "expected Text, got {other:?}",
1389                other = match other {
1390                    OwnedBind::I64(_) => "I64",
1391                    OwnedBind::F64(_) => "F64",
1392                    OwnedBind::Text(_) => "Text",
1393                    OwnedBind::Bytes(_) => "Bytes",
1394                    OwnedBind::Date(_) => "Date",
1395                    OwnedBind::DateTime(_) => "DateTime",
1396                    OwnedBind::DateTimeTz(_) => "DateTimeTz",
1397                }
1398            ),
1399        }
1400        match value_to_oracle_bind(
1401            &Value::Json(serde_json::json!({"role": "admin"})),
1402            TypeHint::Json,
1403        )
1404        .unwrap()
1405        {
1406            OwnedBind::Text(Some(s)) => assert!(s.contains("\"role\":\"admin\"")),
1407            _ => panic!("expected Text for JSON"),
1408        }
1409        match value_to_oracle_bind(
1410            &Value::Array(vec![Value::Int64(1), Value::Int64(2)]),
1411            TypeHint::Array,
1412        )
1413        .unwrap()
1414        {
1415            OwnedBind::Text(Some(s)) => assert_eq!(s, "[1,2]"),
1416            _ => panic!("expected Text for Array"),
1417        }
1418    }
1419
1420    #[test]
1421    fn bind_bytes_passes_through() {
1422        match value_to_oracle_bind(&Value::Bytes(vec![1, 2, 3]), TypeHint::Bytes).unwrap() {
1423            OwnedBind::Bytes(Some(b)) => assert_eq!(b, vec![1, 2, 3]),
1424            _ => panic!("expected Bytes"),
1425        }
1426    }
1427
1428    #[test]
1429    fn bind_uuid_converts_to_raw_16() {
1430        match value_to_oracle_bind(
1431            &Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
1432            TypeHint::Uuid,
1433        )
1434        .unwrap()
1435        {
1436            OwnedBind::Bytes(Some(b)) => assert_eq!(b.len(), 16),
1437            _ => panic!("expected Bytes for Uuid"),
1438        }
1439    }
1440
1441    #[test]
1442    fn bind_time_pairs_with_epoch_date() {
1443        let t = NaiveTime::from_hms_opt(12, 34, 56).unwrap();
1444        match value_to_oracle_bind(&Value::Time(t), TypeHint::Time).unwrap() {
1445            OwnedBind::DateTime(Some(dt)) => {
1446                assert_eq!(dt.date(), NaiveDate::from_ymd_opt(1970, 1, 1).unwrap());
1447                assert_eq!(dt.time(), t);
1448            }
1449            _ => panic!("expected DateTime for Time"),
1450        }
1451    }
1452
1453    #[test]
1454    fn bind_null_uses_typed_none_per_hint() {
1455        // The hint drives the typed-NULL variant — sending the wrong
1456        // typed NULL across rows would surface as a bind-type mismatch.
1457        assert!(matches!(
1458            value_to_oracle_bind(&Value::Null, TypeHint::Int64).unwrap(),
1459            OwnedBind::I64(None)
1460        ));
1461        assert!(matches!(
1462            value_to_oracle_bind(&Value::Null, TypeHint::Bool).unwrap(),
1463            OwnedBind::I64(None)
1464        ));
1465        assert!(matches!(
1466            value_to_oracle_bind(&Value::Null, TypeHint::Float64).unwrap(),
1467            OwnedBind::F64(None)
1468        ));
1469        assert!(matches!(
1470            value_to_oracle_bind(&Value::Null, TypeHint::Bytes).unwrap(),
1471            OwnedBind::Bytes(None)
1472        ));
1473        assert!(matches!(
1474            value_to_oracle_bind(&Value::Null, TypeHint::Uuid).unwrap(),
1475            OwnedBind::Bytes(None)
1476        ));
1477        assert!(matches!(
1478            value_to_oracle_bind(&Value::Null, TypeHint::Date).unwrap(),
1479            OwnedBind::Date(None)
1480        ));
1481        assert!(matches!(
1482            value_to_oracle_bind(&Value::Null, TypeHint::DateTime).unwrap(),
1483            OwnedBind::DateTime(None)
1484        ));
1485        assert!(matches!(
1486            value_to_oracle_bind(&Value::Null, TypeHint::DateTimeTz).unwrap(),
1487            OwnedBind::DateTimeTz(None)
1488        ));
1489        // Decimal / String / Json / Array / Other / Null all go to
1490        // Text — NUMBER and CLOB columns accept string binds via
1491        // implicit conversion.
1492        assert!(matches!(
1493            value_to_oracle_bind(&Value::Null, TypeHint::Decimal).unwrap(),
1494            OwnedBind::Text(None)
1495        ));
1496        assert!(matches!(
1497            value_to_oracle_bind(&Value::Null, TypeHint::Json).unwrap(),
1498            OwnedBind::Text(None)
1499        ));
1500        assert!(matches!(
1501            value_to_oracle_bind(&Value::Null, TypeHint::Other).unwrap(),
1502            OwnedBind::Text(None)
1503        ));
1504    }
1505
1506    // -------- bulk_insert_rows end-to-end (skip on absent container) --------
1507
1508    /// Round-trip a scratch table via the Oracle bulk path. Skips
1509    /// when ORACLE_TEST_URL is not set or unreachable. Uses a
1510    /// per-pid table so concurrent runs do not collide.
1511    #[test]
1512    fn test_oracle_bulk_insert_rows_round_trip() {
1513        let Some(mut conn) = try_connect() else {
1514            eprintln!(
1515                "ORACLE_TEST_URL not set or unreachable; skipping test_oracle_bulk_insert_rows_round_trip"
1516            );
1517            return;
1518        };
1519
1520        let pid = std::process::id();
1521        let table = format!("ferrule_bulk_test_{pid}");
1522        let _ = conn.execute(&format!("DROP TABLE {table}"));
1523        conn.execute(&format!(
1524            "CREATE TABLE {table} (\
1525               id NUMBER(19) NOT NULL, \
1526               name VARCHAR2(255) NULL, \
1527               active NUMBER(1) NULL, \
1528               score NUMBER(10,2) NULL, \
1529               meta CLOB NULL, \
1530               guid RAW(16) NULL\
1531             )"
1532        ))
1533        .expect("CREATE TABLE");
1534
1535        let columns = vec![
1536            ColumnInfo {
1537                name: "id".into(),
1538                type_hint: TypeHint::Int64,
1539                nullable: false,
1540            },
1541            ColumnInfo {
1542                name: "name".into(),
1543                type_hint: TypeHint::String,
1544                nullable: true,
1545            },
1546            ColumnInfo {
1547                name: "active".into(),
1548                type_hint: TypeHint::Bool,
1549                nullable: true,
1550            },
1551            ColumnInfo {
1552                name: "score".into(),
1553                type_hint: TypeHint::Decimal,
1554                nullable: true,
1555            },
1556            ColumnInfo {
1557                name: "meta".into(),
1558                type_hint: TypeHint::Json,
1559                nullable: true,
1560            },
1561            ColumnInfo {
1562                name: "guid".into(),
1563                type_hint: TypeHint::Uuid,
1564                nullable: true,
1565            },
1566        ];
1567
1568        let rows: Vec<crate::value::Row> = vec![
1569            vec![
1570                Value::Int64(1),
1571                Value::String("Alice".into()),
1572                Value::Bool(true),
1573                Value::Decimal("99.50".into()),
1574                Value::Json(serde_json::json!({"role": "admin"})),
1575                Value::Uuid("550e8400-e29b-41d4-a716-446655440000".into()),
1576            ],
1577            vec![
1578                Value::Int64(2),
1579                Value::String("Bob".into()),
1580                Value::Bool(false),
1581                Value::Decimal("-7.25".into()),
1582                Value::Json(serde_json::json!({"role": "user"})),
1583                Value::Null,
1584            ],
1585            vec![
1586                Value::Int64(3),
1587                Value::Null,
1588                Value::Null,
1589                Value::Null,
1590                Value::Null,
1591                Value::Null,
1592            ],
1593        ];
1594
1595        let n = conn
1596            .bulk_insert_rows(BulkInsert {
1597                table: &table,
1598                columns: &columns,
1599                rows: &rows,
1600                copy_format: crate::copy::CopyFormat::Text,
1601            })
1602            .expect("bulk_insert_rows");
1603        assert_eq!(n, 3);
1604
1605        // Oracle: commit before the round-trip read since the
1606        // generic INSERT path doesn't auto-commit either. Inline test
1607        // simulates what copy.rs's outer transaction does for
1608        // --atomic; without it, the rows would not be visible.
1609        conn.execute("COMMIT").expect("COMMIT");
1610
1611        let result = conn
1612            .query(&format!(
1613                "SELECT id, name, active, score, guid FROM {table} ORDER BY id"
1614            ))
1615            .expect("read-back query");
1616        assert_eq!(result.rows.len(), 3);
1617
1618        // Row 1: id=1, name=Alice. score may come back as Decimal
1619        // or Float64 depending on whether the column metadata triggers
1620        // the Decimal path.
1621        if let Value::String(s) = &result.rows[0][1] {
1622            assert_eq!(s, "Alice");
1623        } else {
1624            panic!("row 1 name should be String, got {:?}", result.rows[0][1]);
1625        }
1626
1627        // Row 2 — guid NULL.
1628        assert!(matches!(&result.rows[1][4], Value::Null));
1629
1630        // Row 3 — everything NULL except id.
1631        assert!(matches!(&result.rows[2][1], Value::Null));
1632        assert!(matches!(&result.rows[2][3], Value::Null));
1633        assert!(matches!(&result.rows[2][4], Value::Null));
1634
1635        let _ = conn.execute(&format!("DROP TABLE {table}"));
1636    }
1637
1638    #[test]
1639    fn test_oracle_primary_key() {
1640        let Some(mut conn) = try_connect() else {
1641            eprintln!("Oracle test container not available, skipping test_oracle_primary_key");
1642            return;
1643        };
1644        // The CLAUDE.md seed declares `id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY`.
1645        // Oracle uppercases unquoted identifiers.
1646        let pk = conn.primary_key(None, "test_users").expect("primary_key");
1647        assert_eq!(pk, vec!["ID".to_string()]);
1648    }
1649
1650    #[test]
1651    fn test_oracle_list_foreign_keys() {
1652        let Some(mut conn) = try_connect() else {
1653            eprintln!(
1654                "Oracle test container not available, skipping test_oracle_list_foreign_keys"
1655            );
1656            return;
1657        };
1658        let pid = std::process::id();
1659        let child = format!("ferrule_fk_test_orders_{pid}");
1660        let _ = conn.execute(&format!("DROP TABLE {child}"));
1661        conn.execute(&format!(
1662            "CREATE TABLE {child} (\
1663               id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, \
1664               user_id NUMBER, \
1665               CONSTRAINT {child}_fk FOREIGN KEY (user_id) \
1666                 REFERENCES test_users(id) ON DELETE CASCADE\
1667             )"
1668        ))
1669        .expect("CREATE TABLE");
1670
1671        let fks = conn.list_foreign_keys(None).expect("list_foreign_keys");
1672        let child_upper = child.to_uppercase();
1673        let matching: Vec<_> = fks
1674            .iter()
1675            .filter(|fk| fk.child_table == child_upper)
1676            .collect();
1677        assert_eq!(
1678            matching.len(),
1679            1,
1680            "expected 1 FK from {child_upper}, got {fks:?}"
1681        );
1682        let fk = matching[0];
1683        assert_eq!(fk.child_columns, vec!["USER_ID".to_string()]);
1684        assert_eq!(fk.parent_table, "TEST_USERS");
1685        assert_eq!(fk.parent_columns, vec!["ID".to_string()]);
1686        assert_eq!(fk.on_delete.as_deref(), Some("CASCADE"));
1687
1688        let _ = conn.execute(&format!("DROP TABLE {child}"));
1689    }
1690
1691    /// End-to-end `--if-exists skip` then `upsert` round-trip against
1692    /// Oracle. Exercises the `MERGE … USING (SELECT … FROM dual UNION ALL …)`
1693    /// codegen (Skip = WHEN NOT MATCHED only; Upsert = full MERGE).
1694    #[test]
1695    fn test_oracle_copy_skip_then_upsert() {
1696        use crate::backend::Backend;
1697        use crate::copy::{CopyOptions, CopySource, IfExists, copy_rows};
1698
1699        let (Some(mut src), Some(mut dst)) = (try_connect(), try_connect()) else {
1700            eprintln!(
1701                "Oracle test container not available, skipping test_oracle_copy_skip_then_upsert"
1702            );
1703            return;
1704        };
1705
1706        let pid = std::process::id();
1707        let src_table = format!("ferrule_or_skip_src_{pid}");
1708        let dst_table = format!("ferrule_or_skip_dst_{pid}");
1709        // Oracle has no `DROP TABLE IF EXISTS`; best-effort drop then ignore.
1710        let _ = src.execute(&format!("DROP TABLE {src_table}"));
1711        let _ = dst.execute(&format!("DROP TABLE {dst_table}"));
1712        src.execute(&format!(
1713            "CREATE TABLE {src_table} (id NUMBER PRIMARY KEY, name VARCHAR2(64), val NUMBER)"
1714        ))
1715        .expect("CREATE src");
1716        dst.execute(&format!(
1717            "CREATE TABLE {dst_table} (id NUMBER PRIMARY KEY, name VARCHAR2(64), val NUMBER)"
1718        ))
1719        .expect("CREATE dst");
1720        src.execute(&format!("INSERT INTO {src_table} VALUES (1, 'new-1', 10)"))
1721            .expect("seed src 1");
1722        src.execute(&format!("INSERT INTO {src_table} VALUES (2, 'new-2', 20)"))
1723            .expect("seed src 2");
1724        dst.execute(&format!("INSERT INTO {dst_table} VALUES (1, 'old-1', 99)"))
1725            .expect("seed dst");
1726        // Oracle has no autocommit on `execute`; flush both connections.
1727        src.execute("COMMIT").expect("commit src");
1728        dst.execute("COMMIT").expect("commit dst");
1729
1730        // --- Skip ---------------------------------------------------------
1731        let opts = CopyOptions {
1732            source: CopySource::Query {
1733                sql: format!("SELECT * FROM {src_table} ORDER BY id"),
1734                into: dst_table.clone(),
1735            },
1736            if_exists: IfExists::Skip,
1737            ..Default::default()
1738        };
1739        copy_rows(&mut src, Backend::Oracle, &mut dst, Backend::Oracle, &opts)
1740            .expect("copy_rows skip");
1741
1742        let out = dst
1743            .query(&format!(
1744                "SELECT id, name, val FROM {dst_table} ORDER BY id"
1745            ))
1746            .expect("verify skip");
1747        assert_eq!(out.rows.len(), 2);
1748        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "old-1"));
1749        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "new-2"));
1750
1751        // --- Upsert -------------------------------------------------------
1752        let opts = CopyOptions {
1753            source: CopySource::Query {
1754                sql: format!("SELECT * FROM {src_table} ORDER BY id"),
1755                into: dst_table.clone(),
1756            },
1757            if_exists: IfExists::Upsert,
1758            ..Default::default()
1759        };
1760        copy_rows(&mut src, Backend::Oracle, &mut dst, Backend::Oracle, &opts)
1761            .expect("copy_rows upsert");
1762
1763        let out = dst
1764            .query(&format!(
1765                "SELECT id, name, val FROM {dst_table} ORDER BY id"
1766            ))
1767            .expect("verify upsert");
1768        assert_eq!(out.rows.len(), 2);
1769        assert!(matches!(&out.rows[0][1], Value::String(s) if s == "new-1"));
1770        // val column for id=1 should now be 10 (was 99).
1771        match &out.rows[0][2] {
1772            Value::Int64(n) => assert_eq!(*n, 10),
1773            Value::Decimal(d) => assert_eq!(d, "10"),
1774            other => panic!("unexpected val type: {other:?}"),
1775        }
1776        assert!(matches!(&out.rows[1][1], Value::String(s) if s == "new-2"));
1777
1778        let _ = src.execute(&format!("DROP TABLE {src_table}"));
1779        let _ = dst.execute(&format!("DROP TABLE {dst_table}"));
1780    }
1781}