Skip to main content

plsql_catalog/
lib.rs

1#![forbid(unsafe_code)]
2pub mod synthetic;
3
4#[cfg(feature = "oraclemcp-db")]
5mod live;
6
7#[cfg(feature = "oraclemcp-db")]
8pub use live::{
9    OracleConnection, OraclemcpDbConnection, fetch_dbms_metadata_ddl,
10    load_snapshot_from_connection, negotiate_capabilities, populate_dbms_metadata_ddl,
11};
12
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::fs;
15use std::path::PathBuf;
16
17use chrono::{DateTime, Utc};
18#[cfg(feature = "oraclemcp-db")]
19use plsql_core::OracleVersion;
20use plsql_core::{
21    AnalysisProfile, ColumnName, EditionName, MemberName, ObjectName, RoleName, SchemaName,
22    SymbolId, SymbolInterner, UserName,
23};
24use serde::{Deserialize, Serialize};
25use thiserror::Error;
26use tracing::instrument;
27
28use plsql_output::SchemaVersion;
29
30macro_rules! catalog_name {
31    ($name:ident) => {
32        #[derive(
33            Clone,
34            Copy,
35            Debug,
36            Default,
37            Eq,
38            PartialEq,
39            Ord,
40            PartialOrd,
41            Hash,
42            Serialize,
43            Deserialize,
44        )]
45        #[serde(transparent)]
46        pub struct $name(SymbolId);
47
48        impl $name {
49            #[must_use]
50            #[instrument(level = "trace")]
51            pub fn new(symbol: SymbolId) -> Self {
52                Self(symbol)
53            }
54
55            #[must_use]
56            #[instrument(level = "trace", skip(self))]
57            pub fn symbol(self) -> SymbolId {
58                self.0
59            }
60        }
61
62        impl From<SymbolId> for $name {
63            fn from(value: SymbolId) -> Self {
64                Self::new(value)
65            }
66        }
67    };
68}
69
70catalog_name!(SynonymName);
71catalog_name!(IndexName);
72catalog_name!(ConstraintName);
73catalog_name!(TriggerName);
74
75#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
76#[serde(transparent)]
77pub struct Hash(String);
78
79impl Hash {
80    #[must_use]
81    #[instrument(level = "trace", skip(value))]
82    pub fn new(value: impl Into<String>) -> Self {
83        Self(value.into())
84    }
85
86    #[must_use]
87    #[instrument(level = "trace", skip(self))]
88    pub fn as_str(&self) -> &str {
89        self.0.as_str()
90    }
91}
92
93#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
94pub struct DbmsMetadataDdl {
95    pub ddl_text: String,
96    pub normalized_ddl: Option<String>,
97    pub xml_text: Option<String>,
98}
99
100#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
101pub enum CatalogSourceKind {
102    #[default]
103    JsonSnapshot,
104    LiveConnection,
105    DbmsMetadataFiles,
106    SyntheticTestCatalog,
107}
108
109#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
110pub struct CatalogSource {
111    pub kind: CatalogSourceKind,
112    pub path: Option<PathBuf>,
113    pub description: Option<String>,
114}
115
116#[derive(
117    Clone, Copy, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize,
118)]
119pub enum ObjectType {
120    Table,
121    View,
122    MaterializedView,
123    Sequence,
124    Type,
125    Package,
126    Procedure,
127    Function,
128    Trigger,
129    SchedulerJob,
130    EditioningView,
131    Synonym,
132    Index,
133    Constraint,
134    #[default]
135    Unknown,
136}
137
138#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
139pub enum ObjectStatus {
140    Valid,
141    Invalid,
142    #[default]
143    NotApplicable,
144}
145
146#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
147pub struct ObjectCommon {
148    pub owner: SchemaName,
149    pub name: ObjectName,
150    pub object_type: ObjectType,
151    pub status: ObjectStatus,
152    pub edition_name: Option<EditionName>,
153    pub editionable: Option<bool>,
154    pub last_ddl_time: Option<DateTime<Utc>>,
155    pub source_hash: Option<Hash>,
156    pub ddl: Option<DbmsMetadataDdl>,
157}
158
159#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
160pub struct CatalogCapabilities {
161    pub can_query_dba_views: bool,
162    pub can_query_all_views: bool,
163    pub can_use_dbms_metadata: bool,
164    pub can_read_source: bool,
165    pub plscope_enabled: bool,
166    pub can_query_scheduler: bool,
167    pub can_query_roles_and_grants: bool,
168    pub warnings: Vec<CapabilityWarning>,
169}
170
171#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
172pub struct CapabilityWarning {
173    pub code: String,
174    pub message: String,
175    pub remediation: Option<String>,
176}
177
178#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
179pub struct CatalogSnapshot {
180    pub schemas: HashMap<SchemaName, SchemaCatalog>,
181    pub profile: AnalysisProfile,
182    pub capabilities: CatalogCapabilities,
183    pub generated_at: DateTime<Utc>,
184    pub source: CatalogSource,
185    pub interner: SymbolInterner,
186    /// Database-wide edition tree from `ALL_EDITIONS`. Empty when EBR
187    /// is not in use.
188    #[serde(default)]
189    pub editions: Vec<Edition>,
190    /// Set of database usernames observed from `ALL_USERS`, used during
191    /// live extraction to discriminate an object-privilege grantee
192    /// (`ALL_TAB_PRIVS.GRANTEE`) between a real user and a database role —
193    /// Oracle's `ALL_TAB_PRIVS` carries no user/role discriminator column.
194    ///
195    /// `None` means the username set was never loaded (the `ALL_USERS`
196    /// probe was not run or failed); in that state grantee classification
197    /// is *undetermined* and, honoring R13, the extractor must NOT
198    /// silently assume a direct (high-confidence) user grant. `Some(_)`
199    /// (even when empty) means the set was loaded and is authoritative for
200    /// the schemas under analysis.
201    ///
202    /// This is transient extraction state: it is never serialized, because
203    /// the resulting `Grantee` discrimination is already baked into each
204    /// persisted `Grant`. JSON snapshots therefore round-trip unchanged.
205    #[serde(default, skip)]
206    pub known_users: Option<HashSet<UserName>>,
207}
208
209impl CatalogSnapshot {
210    #[must_use]
211    #[instrument(level = "trace", skip(profile, capabilities, source))]
212    pub fn new(
213        profile: AnalysisProfile,
214        capabilities: CatalogCapabilities,
215        source: CatalogSource,
216        generated_at: DateTime<Utc>,
217    ) -> Self {
218        Self {
219            schemas: HashMap::new(),
220            profile,
221            capabilities,
222            generated_at,
223            source,
224            interner: SymbolInterner::new(),
225            editions: Vec::new(),
226            known_users: None,
227        }
228    }
229
230    /// Intern `text` as a [`UserName`] without changing classification
231    /// state. Mirrors [`SymbolInterner::intern_user_name`] but routes
232    /// through this snapshot's interner.
233    #[must_use]
234    #[instrument(level = "trace", skip(self, text))]
235    pub fn intern_user_name(&mut self, text: impl Into<String>) -> Option<UserName> {
236        self.interner.intern_user_name(text)
237    }
238
239    /// Intern `text` as a [`RoleName`] through this snapshot's interner.
240    #[must_use]
241    #[instrument(level = "trace", skip(self, text))]
242    pub fn intern_role_name(&mut self, text: impl Into<String>) -> Option<RoleName> {
243        self.interner.intern_role_name(text)
244    }
245
246    #[must_use]
247    #[instrument(level = "trace", skip(self, text))]
248    pub fn intern_schema_name(&mut self, text: impl Into<String>) -> Option<SchemaName> {
249        self.interner.intern_schema_name(text)
250    }
251
252    #[must_use]
253    #[instrument(level = "trace", skip(self, text))]
254    pub fn intern_object_name(&mut self, text: impl Into<String>) -> Option<ObjectName> {
255        self.interner.intern(text).map(ObjectName::from)
256    }
257
258    #[must_use]
259    #[instrument(level = "trace", skip(self, text))]
260    pub fn intern_column_name(&mut self, text: impl Into<String>) -> Option<ColumnName> {
261        self.interner.intern(text).map(ColumnName::from)
262    }
263
264    #[must_use]
265    #[instrument(level = "trace", skip(self, text))]
266    pub fn intern_member_name(&mut self, text: impl Into<String>) -> Option<MemberName> {
267        self.interner.intern(text).map(MemberName::from)
268    }
269
270    #[must_use]
271    #[instrument(level = "trace", skip(self, text))]
272    pub fn intern_synonym_name(&mut self, text: impl Into<String>) -> Option<SynonymName> {
273        self.interner.intern(text).map(SynonymName::from)
274    }
275
276    #[must_use]
277    #[instrument(level = "trace", skip(self, text))]
278    pub fn intern_index_name(&mut self, text: impl Into<String>) -> Option<IndexName> {
279        self.interner.intern(text).map(IndexName::from)
280    }
281
282    #[must_use]
283    #[instrument(level = "trace", skip(self, text))]
284    pub fn intern_constraint_name(&mut self, text: impl Into<String>) -> Option<ConstraintName> {
285        self.interner.intern(text).map(ConstraintName::from)
286    }
287
288    #[must_use]
289    #[instrument(level = "trace", skip(self, text))]
290    pub fn intern_trigger_name(&mut self, text: impl Into<String>) -> Option<TriggerName> {
291        self.interner.intern(text).map(TriggerName::from)
292    }
293}
294
295pub const CATALOG_SNAPSHOT_SCHEMA_ID: &str = "plsql.catalog.snapshot";
296pub const CATALOG_SNAPSHOT_SCHEMA_VERSION: SchemaVersion = SchemaVersion::new(1, 1, 0);
297
298pub const CATALOG_DOCTOR_SCHEMA_ID: &str = "plsql.catalog.doctor";
299pub const CATALOG_DOCTOR_SCHEMA_VERSION: SchemaVersion = SchemaVersion::new(1, 0, 0);
300
301/// Per-`ObjectType` count tile shown in the doctor report.
302#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
303pub struct DoctorObjectCount {
304    pub object_type: ObjectType,
305    pub total: usize,
306    pub valid: usize,
307    pub invalid: usize,
308    pub other: usize,
309}
310
311/// Summary of how many catalog rows landed per family and how many
312/// schema-scoped buckets are populated.
313#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
314pub struct DoctorExtractionTotals {
315    pub schemas_observed: usize,
316    pub objects_total: usize,
317    pub columns_total: usize,
318    pub indexes_total: usize,
319    pub constraints_total: usize,
320    pub triggers_total: usize,
321    pub synonyms_total: usize,
322    pub grants_total: usize,
323    pub dependencies_total: usize,
324}
325
326/// Doctor-flagged missing privilege: the `plsql-catalog` driver could not
327/// observe an Oracle dictionary view that some upstream features require.
328#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
329pub struct MissingPermissionReport {
330    pub view_name: String,
331    pub required_for: Vec<String>,
332    pub suggested_grant: String,
333}
334
335/// Structured doctor report for a `CatalogSnapshot`.
336///
337/// Consumers (`plsql catalog doctor --robot-json`, `plsql-mcp` foundation
338/// tools, and the planned `plsql doctor` umbrella surface) can render the
339/// report directly or wrap it in a `RobotJsonEnvelope` for stable,
340/// versioned output.
341#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
342pub struct CatalogDoctorReport {
343    /// Identifier of the snapshot's origin (`live extraction via ...` or the
344    /// JSON snapshot path).
345    pub source_description: String,
346    pub source_kind: CatalogSourceKind,
347    pub generated_at: Option<DateTime<Utc>>,
348    pub totals: DoctorExtractionTotals,
349    pub object_counts: Vec<DoctorObjectCount>,
350    pub capability_warnings: Vec<CapabilityWarning>,
351    pub missing_permissions: Vec<MissingPermissionReport>,
352    /// Per-schema PL/Scope availability. Empty when the snapshot has no
353    /// PL/Scope detection wired.
354    pub plscope_availability_per_schema: Vec<PlScopeAvailabilityRow>,
355    /// Capability-bit copy for downstream consumers that don't want to read
356    /// the full `CatalogSnapshot` to learn whether a query family worked.
357    pub can_query_dba_views: bool,
358    pub can_query_all_views: bool,
359    pub can_use_dbms_metadata: bool,
360    pub can_read_source: bool,
361    pub plscope_enabled: bool,
362    pub can_query_scheduler: bool,
363    pub can_query_roles_and_grants: bool,
364}
365
366/// One row of the doctor report's per-schema PL/Scope availability summary.
367/// The `schema_name` is rendered through the snapshot's `SymbolInterner` so
368/// the report is stable across JSON snapshots and live extractions.
369#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
370pub struct PlScopeAvailabilityRow {
371    pub schema_name: String,
372    pub availability: PlScopeAvailability,
373}
374
375impl CatalogSnapshot {
376    /// Build the doctor report directly from this snapshot.
377    ///
378    /// The doctor is read-only — it never queries the DB itself; it
379    /// summarizes what was already extracted into the snapshot plus any
380    /// `CapabilityWarning`s the loader recorded. Missing-permission diagnoses
381    /// are inferred from per-family capability bits so the report is
382    /// equally useful for live-extracted and JSON-loaded snapshots.
383    #[must_use]
384    #[instrument(level = "trace", skip(self))]
385    pub fn doctor_report(&self) -> CatalogDoctorReport {
386        let mut counts: BTreeMap<ObjectType, DoctorObjectCount> = BTreeMap::new();
387        let mut columns_total = 0usize;
388        let mut indexes_total = 0usize;
389        let mut constraints_total = 0usize;
390        let mut triggers_total = 0usize;
391        let mut synonyms_total = 0usize;
392        let mut grants_total = 0usize;
393        let mut dependencies_total = 0usize;
394        let mut objects_total = 0usize;
395
396        for schema_catalog in self.schemas.values() {
397            for object in schema_catalog.objects.values() {
398                let common = catalog_object_common(object);
399                let tile = counts
400                    .entry(common.object_type)
401                    .or_insert(DoctorObjectCount {
402                        object_type: common.object_type,
403                        ..DoctorObjectCount::default()
404                    });
405                tile.total = tile.total.saturating_add(1);
406                match common.status {
407                    ObjectStatus::Valid => {
408                        tile.valid = tile.valid.saturating_add(1);
409                    }
410                    ObjectStatus::Invalid => {
411                        tile.invalid = tile.invalid.saturating_add(1);
412                    }
413                    ObjectStatus::NotApplicable => {
414                        tile.other = tile.other.saturating_add(1);
415                    }
416                }
417                objects_total = objects_total.saturating_add(1);
418
419                columns_total = columns_total.saturating_add(catalog_object_column_count(object));
420            }
421            indexes_total = indexes_total.saturating_add(schema_catalog.indexes.len());
422            constraints_total = constraints_total.saturating_add(schema_catalog.constraints.len());
423            triggers_total = triggers_total.saturating_add(schema_catalog.triggers.len());
424            synonyms_total = synonyms_total.saturating_add(schema_catalog.synonyms.len());
425            grants_total = grants_total.saturating_add(schema_catalog.grants.len());
426            dependencies_total =
427                dependencies_total.saturating_add(schema_catalog.dependencies.len());
428        }
429
430        let totals = DoctorExtractionTotals {
431            schemas_observed: self.schemas.len(),
432            objects_total,
433            columns_total,
434            indexes_total,
435            constraints_total,
436            triggers_total,
437            synonyms_total,
438            grants_total,
439            dependencies_total,
440        };
441
442        let mut object_counts: Vec<DoctorObjectCount> = counts.into_values().collect();
443        object_counts.sort_by_key(|tile| std::cmp::Reverse(tile.total));
444
445        let missing_permissions =
446            derive_missing_permission_reports(&self.capabilities, &self.source);
447
448        let mut plscope_availability_per_schema: Vec<PlScopeAvailabilityRow> = self
449            .schemas
450            .iter()
451            .filter_map(|(owner, schema_catalog)| {
452                let availability = schema_catalog.plscope.as_ref()?.availability;
453                let schema_name = self.interner.resolve(owner.symbol())?.to_string();
454                Some(PlScopeAvailabilityRow {
455                    schema_name,
456                    availability,
457                })
458            })
459            .collect();
460        plscope_availability_per_schema.sort_by(|a, b| a.schema_name.cmp(&b.schema_name));
461
462        CatalogDoctorReport {
463            source_description: self.source.description.clone().unwrap_or_default(),
464            source_kind: self.source.kind,
465            generated_at: Some(self.generated_at),
466            totals,
467            object_counts,
468            capability_warnings: self.capabilities.warnings.clone(),
469            missing_permissions,
470            plscope_availability_per_schema,
471            can_query_dba_views: self.capabilities.can_query_dba_views,
472            can_query_all_views: self.capabilities.can_query_all_views,
473            can_use_dbms_metadata: self.capabilities.can_use_dbms_metadata,
474            can_read_source: self.capabilities.can_read_source,
475            plscope_enabled: self.capabilities.plscope_enabled,
476            can_query_scheduler: self.capabilities.can_query_scheduler,
477            can_query_roles_and_grants: self.capabilities.can_query_roles_and_grants,
478        }
479    }
480}
481
482fn catalog_object_common(object: &CatalogObject) -> &ObjectCommon {
483    match object {
484        CatalogObject::Table(metadata) => &metadata.common,
485        CatalogObject::View(metadata) => &metadata.common,
486        CatalogObject::MaterializedView(metadata) => &metadata.common,
487        CatalogObject::Sequence(metadata) => &metadata.common,
488        CatalogObject::Type(metadata) => &metadata.common,
489        CatalogObject::Package(metadata) => &metadata.common,
490        CatalogObject::Procedure(metadata) => &metadata.common,
491        CatalogObject::Function(metadata) => &metadata.common,
492        CatalogObject::Trigger(metadata) => &metadata.common,
493        CatalogObject::SchedulerJob(metadata) => &metadata.common,
494        CatalogObject::EditioningView(metadata) => &metadata.common,
495    }
496}
497
498fn catalog_object_column_count(object: &CatalogObject) -> usize {
499    match object {
500        CatalogObject::Table(metadata) => metadata.columns.len(),
501        CatalogObject::View(metadata) => metadata.columns.len(),
502        CatalogObject::MaterializedView(metadata) => metadata.columns.len(),
503        CatalogObject::EditioningView(metadata) => metadata.columns.len(),
504        CatalogObject::Sequence(_)
505        | CatalogObject::Type(_)
506        | CatalogObject::Package(_)
507        | CatalogObject::Procedure(_)
508        | CatalogObject::Function(_)
509        | CatalogObject::Trigger(_)
510        | CatalogObject::SchedulerJob(_) => 0,
511    }
512}
513
514fn derive_missing_permission_reports(
515    capabilities: &CatalogCapabilities,
516    source: &CatalogSource,
517) -> Vec<MissingPermissionReport> {
518    // Missing-permission diagnoses only make sense for live extractions. A
519    // JSON snapshot was already produced once — its capability bits reflect
520    // the original extraction; we surface them verbatim instead of inventing
521    // new grant suggestions.
522    if !matches!(source.kind, CatalogSourceKind::LiveConnection) {
523        return Vec::new();
524    }
525
526    let mut reports = Vec::new();
527    if !capabilities.can_query_dba_views {
528        reports.push(MissingPermissionReport {
529            view_name: String::from("DBA_OBJECTS / DBA_TAB_COLUMNS / DBA_DEPENDENCIES"),
530            required_for: vec![
531                String::from("cross-schema extraction beyond ALL_*"),
532                String::from("PLSQL-CAT-014 dependency reachability over schemas"),
533            ],
534            suggested_grant: String::from(
535                "grant select_catalog_role to <user>; -- or individual grants on DBA_* views",
536            ),
537        });
538    }
539    if !capabilities.can_use_dbms_metadata {
540        reports.push(MissingPermissionReport {
541            view_name: String::from("DBMS_METADATA"),
542            required_for: vec![
543                String::from("PLSQL-CAT-015 DBMS_METADATA.GET_DDL extraction"),
544                String::from("normalized DDL hashes for `what-breaks`"),
545            ],
546            suggested_grant: String::from("grant execute on DBMS_METADATA to <user>;"),
547        });
548    }
549    if !capabilities.can_read_source {
550        reports.push(MissingPermissionReport {
551            view_name: String::from("ALL_SOURCE / DBA_SOURCE"),
552            required_for: vec![
553                String::from("packaged routine body inspection"),
554                String::from("get_object_source MCP tool"),
555            ],
556            suggested_grant: String::from(
557                "grant select on ALL_SOURCE to <user>; -- ALL_SOURCE itself is normally readable; ensure no DROP/REVOKE narrowed it",
558            ),
559        });
560    }
561    if !capabilities.plscope_enabled {
562        reports.push(MissingPermissionReport {
563            view_name: String::from("PLSCOPE_SETTINGS / ALL_IDENTIFIERS"),
564            required_for: vec![
565                String::from("PLSQL-CAT-010 PL/Scope availability detection"),
566                String::from("PLSQL-CAT-011 identifier extraction"),
567            ],
568            suggested_grant: String::from(
569                "alter session set plscope_settings = 'identifiers:all'; -- and recompile target objects",
570            ),
571        });
572    }
573    if !capabilities.can_query_scheduler {
574        reports.push(MissingPermissionReport {
575            view_name: String::from("ALL_SCHEDULER_JOBS / ALL_SCHEDULER_PROGRAMS"),
576            required_for: vec![String::from("scheduler job lineage edges")],
577            suggested_grant: String::from(
578                "grant select on ALL_SCHEDULER_JOBS to <user>; grant select on ALL_SCHEDULER_PROGRAMS to <user>;",
579            ),
580        });
581    }
582    if !capabilities.can_query_roles_and_grants {
583        reports.push(MissingPermissionReport {
584            view_name: String::from("DBA_ROLE_PRIVS / DBA_SYS_PRIVS / DBA_TAB_PRIVS"),
585            required_for: vec![
586                String::from("definer-rights privilege chain analysis"),
587                String::from("role-mediated execution evidence (PRIVILEGES-* beads)"),
588            ],
589            suggested_grant: String::from(
590                "grant select_catalog_role to <user>; -- enables DBA_*_PRIVS reads",
591            ),
592        });
593    }
594    reports
595}
596
597#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
598pub struct CatalogSnapshotDocument {
599    pub schema_id: String,
600    pub schema_version: SchemaVersion,
601    pub snapshot: CatalogSnapshot,
602}
603
604impl CatalogSnapshotDocument {
605    #[must_use]
606    #[instrument(level = "trace", skip(snapshot))]
607    pub fn new(snapshot: CatalogSnapshot) -> Self {
608        Self {
609            schema_id: String::from(CATALOG_SNAPSHOT_SCHEMA_ID),
610            schema_version: CATALOG_SNAPSHOT_SCHEMA_VERSION,
611            snapshot,
612        }
613    }
614}
615
616#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
617pub struct CatalogLoadRequest {
618    pub schema_filters: Vec<CatalogSchemaFilter>,
619}
620
621impl CatalogLoadRequest {
622    #[must_use]
623    #[instrument(level = "trace")]
624    pub fn for_current_schema() -> Self {
625        Self {
626            schema_filters: vec![CatalogSchemaFilter::CurrentSchema],
627        }
628    }
629
630    #[must_use]
631    #[instrument(level = "trace", skip(schema_names))]
632    pub fn for_named_schemas<I, S>(schema_names: I) -> Self
633    where
634        I: IntoIterator<Item = S>,
635        S: Into<String>,
636    {
637        Self {
638            schema_filters: schema_names
639                .into_iter()
640                .map(CatalogSchemaFilter::named)
641                .collect(),
642        }
643    }
644}
645
646impl Default for CatalogLoadRequest {
647    fn default() -> Self {
648        Self::for_current_schema()
649    }
650}
651
652#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
653pub enum CatalogSchemaFilter {
654    CurrentSchema,
655    Named(String),
656}
657
658impl CatalogSchemaFilter {
659    #[must_use]
660    #[instrument(level = "trace")]
661    pub fn current_schema() -> Self {
662        Self::CurrentSchema
663    }
664
665    #[must_use]
666    #[instrument(level = "trace", skip(schema_name))]
667    pub fn named(schema_name: impl Into<String>) -> Self {
668        Self::Named(schema_name.into())
669    }
670}
671
672#[derive(Debug, Error)]
673pub enum CatalogError {
674    #[error("i/o error: {0}")]
675    Io(#[from] std::io::Error),
676    #[error("json error: {0}")]
677    Json(#[from] serde_json::Error),
678    #[error("oracle backend `{backend}` is unavailable in this build; use `{feature}`")]
679    OracleBackendNotCompiled {
680        backend: OracleBackend,
681        feature: &'static str,
682    },
683    #[error("oracle backend `{backend}` error: {message}")]
684    OracleBackendError {
685        backend: OracleBackend,
686        message: String,
687    },
688    #[error("expected {expected} row(s) but received {actual}")]
689    UnexpectedRowCount { expected: String, actual: usize },
690    #[error("required column `{column}` was missing from the query result")]
691    MissingColumn { column: String },
692    #[error("column `{column}` was null")]
693    NullColumnValue { column: String },
694    #[error("column `{column}` could not be parsed as {expected}: `{value}`")]
695    InvalidColumnValue {
696        column: String,
697        expected: &'static str,
698        value: String,
699    },
700    #[error("unsupported catalog snapshot schema {found} for {schema_id}; expected {expected}")]
701    UnsupportedSchemaVersion {
702        schema_id: String,
703        found: SchemaVersion,
704        expected: SchemaVersion,
705    },
706    #[error("unexpected catalog snapshot schema id `{0}`")]
707    UnexpectedSchemaId(String),
708    #[error("catalog load request could not resolve the current schema from the Oracle connection")]
709    CurrentSchemaUnavailable,
710    #[error("schema filter `{schema_name}` is invalid: schema names must not be blank")]
711    InvalidSchemaFilter { schema_name: String },
712}
713
714#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
715pub enum OracleBackend {
716    RustOracle,
717    OracleRs,
718}
719
720impl OracleBackend {
721    #[must_use]
722    #[instrument(level = "trace", skip(self))]
723    pub fn as_str(self) -> &'static str {
724        match self {
725            Self::RustOracle => "oracle",
726            Self::OracleRs => "oracle-rs",
727        }
728    }
729}
730
731impl std::fmt::Display for OracleBackend {
732    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
733        f.write_str(self.as_str())
734    }
735}
736
737#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
738pub struct OracleConnectOptions {
739    pub username: String,
740    pub password: String,
741    pub connect_string: String,
742    pub current_schema: Option<String>,
743    pub module: Option<String>,
744    pub action: Option<String>,
745    pub client_info: Option<String>,
746    pub client_identifier: Option<String>,
747}
748
749impl OracleConnectOptions {
750    #[must_use]
751    pub fn new(
752        username: impl Into<String>,
753        password: impl Into<String>,
754        connect_string: impl Into<String>,
755    ) -> Self {
756        Self {
757            username: username.into(),
758            password: password.into(),
759            connect_string: connect_string.into(),
760            current_schema: None,
761            module: None,
762            action: None,
763            client_info: None,
764            client_identifier: None,
765        }
766    }
767
768    #[must_use]
769    pub fn with_current_schema(mut self, current_schema: impl Into<String>) -> Self {
770        self.current_schema = Some(current_schema.into());
771        self
772    }
773
774    #[must_use]
775    pub fn with_module(mut self, module: impl Into<String>) -> Self {
776        self.module = Some(module.into());
777        self
778    }
779
780    #[must_use]
781    pub fn with_action(mut self, action: impl Into<String>) -> Self {
782        self.action = Some(action.into());
783        self
784    }
785
786    #[must_use]
787    pub fn with_client_info(mut self, client_info: impl Into<String>) -> Self {
788        self.client_info = Some(client_info.into());
789        self
790    }
791
792    #[must_use]
793    pub fn with_client_identifier(mut self, client_identifier: impl Into<String>) -> Self {
794        self.client_identifier = Some(client_identifier.into());
795        self
796    }
797}
798
799#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
800pub enum OracleBind {
801    String(String),
802    I64(i64),
803    U64(u64),
804    Bool(bool),
805}
806
807impl From<&str> for OracleBind {
808    fn from(value: &str) -> Self {
809        Self::String(String::from(value))
810    }
811}
812
813impl From<String> for OracleBind {
814    fn from(value: String) -> Self {
815        Self::String(value)
816    }
817}
818
819impl From<i32> for OracleBind {
820    fn from(value: i32) -> Self {
821        Self::I64(i64::from(value))
822    }
823}
824
825impl From<i64> for OracleBind {
826    fn from(value: i64) -> Self {
827        Self::I64(value)
828    }
829}
830
831impl From<u32> for OracleBind {
832    fn from(value: u32) -> Self {
833        Self::U64(u64::from(value))
834    }
835}
836
837impl From<u64> for OracleBind {
838    fn from(value: u64) -> Self {
839        Self::U64(value)
840    }
841}
842
843impl From<bool> for OracleBind {
844    fn from(value: bool) -> Self {
845        Self::Bool(value)
846    }
847}
848
849#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
850pub struct OracleCell {
851    pub oracle_type: String,
852    pub value: Option<String>,
853}
854
855impl OracleCell {
856    #[must_use]
857    #[instrument(level = "trace", skip(oracle_type, value))]
858    pub fn new(oracle_type: impl Into<String>, value: Option<String>) -> Self {
859        Self {
860            oracle_type: oracle_type.into(),
861            value,
862        }
863    }
864}
865
866#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
867pub struct OracleRow {
868    pub columns: BTreeMap<String, OracleCell>,
869}
870
871impl OracleRow {
872    pub fn insert(
873        &mut self,
874        name: impl Into<String>,
875        oracle_type: impl Into<String>,
876        value: Option<String>,
877    ) {
878        self.columns.insert(
879            name.into().to_ascii_uppercase(),
880            OracleCell::new(oracle_type, value),
881        );
882    }
883
884    #[must_use]
885    #[instrument(level = "trace", skip(self))]
886    pub fn cell(&self, name: &str) -> Option<&OracleCell> {
887        self.columns.get(&name.to_ascii_uppercase())
888    }
889
890    #[must_use]
891    #[instrument(level = "trace", skip(self))]
892    pub fn text(&self, name: &str) -> Option<&str> {
893        self.cell(name).and_then(|cell| cell.value.as_deref())
894    }
895
896    #[instrument(level = "trace", skip(self))]
897    pub fn require_text(&self, name: &str) -> Result<&str, CatalogError> {
898        let Some(cell) = self.cell(name) else {
899            return Err(CatalogError::MissingColumn {
900                column: name.to_ascii_uppercase(),
901            });
902        };
903        cell.value
904            .as_deref()
905            .ok_or_else(|| CatalogError::NullColumnValue {
906                column: name.to_ascii_uppercase(),
907            })
908    }
909
910    #[instrument(level = "trace", skip(self))]
911    pub fn parse_i64(&self, name: &str) -> Result<i64, CatalogError> {
912        let text = self.require_text(name)?;
913        text.parse::<i64>()
914            .map_err(|_| CatalogError::InvalidColumnValue {
915                column: name.to_ascii_uppercase(),
916                expected: "i64",
917                value: String::from(text),
918            })
919    }
920
921    #[instrument(level = "trace", skip(self))]
922    pub fn parse_u64(&self, name: &str) -> Result<u64, CatalogError> {
923        let text = self.require_text(name)?;
924        text.parse::<u64>()
925            .map_err(|_| CatalogError::InvalidColumnValue {
926                column: name.to_ascii_uppercase(),
927                expected: "u64",
928                value: String::from(text),
929            })
930    }
931
932    #[instrument(level = "trace", skip(self))]
933    pub fn parse_bool(&self, name: &str) -> Result<bool, CatalogError> {
934        let text = self.require_text(name)?;
935        let normalized = text.trim().to_ascii_uppercase();
936        match normalized.as_str() {
937            "Y" | "YES" | "TRUE" | "1" => Ok(true),
938            "N" | "NO" | "FALSE" | "0" => Ok(false),
939            _ => Err(CatalogError::InvalidColumnValue {
940                column: name.to_ascii_uppercase(),
941                expected: "bool",
942                value: String::from(text),
943            }),
944        }
945    }
946}
947
948#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
949pub struct OracleConnectionInfo {
950    pub backend: OracleBackend,
951    pub connect_string: String,
952    pub current_schema: Option<String>,
953    pub server_version: String,
954    pub db_name: String,
955    pub db_domain: String,
956    pub service_name: String,
957    pub instance_name: String,
958    pub server_type: String,
959    pub max_identifier_length: u32,
960    pub max_open_cursors: u32,
961}
962
963#[instrument(level = "trace")]
964pub fn load_snapshot_from_json(path: &std::path::Path) -> Result<CatalogSnapshot, CatalogError> {
965    let raw = fs::read_to_string(path)?;
966    let document: CatalogSnapshotDocument = serde_json::from_str(&raw)?;
967
968    if !document.schema_id.as_str().eq(CATALOG_SNAPSHOT_SCHEMA_ID) {
969        return Err(CatalogError::UnexpectedSchemaId(document.schema_id));
970    }
971
972    if !matches!(
973        document
974            .schema_version
975            .cmp(&CATALOG_SNAPSHOT_SCHEMA_VERSION),
976        std::cmp::Ordering::Equal
977    ) {
978        return Err(CatalogError::UnsupportedSchemaVersion {
979            schema_id: String::from(CATALOG_SNAPSHOT_SCHEMA_ID),
980            found: document.schema_version,
981            expected: CATALOG_SNAPSHOT_SCHEMA_VERSION,
982        });
983    }
984
985    Ok(document.snapshot)
986}
987
988#[instrument(level = "trace", skip(snapshot))]
989pub fn export_snapshot_to_json(
990    snapshot: &CatalogSnapshot,
991    path: &std::path::Path,
992) -> Result<(), CatalogError> {
993    let document = CatalogSnapshotDocument::new(snapshot.clone());
994    let rendered = serde_json::to_string_pretty(&document)?;
995    fs::write(path, rendered)?;
996    Ok(())
997}
998
999/// Load a catalog snapshot from a directory of DBMS_METADATA-exported .sql files.
1000///
1001/// Load a `CatalogSnapshot` by classifying every `.sql` file under `dir` as a
1002/// single top-level CREATE DDL statement (the shape `DBMS_METADATA.GET_DDL`
1003/// emits when written per-object to disk).
1004///
1005/// For each file:
1006///
1007/// * The object kind is read from the leading `CREATE …` keyword
1008///   (`TABLE` / `VIEW` / `PACKAGE` / `PROCEDURE` / `FUNCTION` /
1009///   `SEQUENCE` / `TRIGGER` / `TYPE`); statements whose keyword does
1010///   not match a known kind are skipped (graceful degradation per
1011///   R13).
1012/// * The owner schema is read from the optional `OWNER.OBJECT` prefix
1013///   on the CREATE target. Unqualified statements (no `OWNER.`
1014///   prefix) are filed under a stable `PUBLIC` schema interned through
1015///   the regular interner — never `SymbolId::new(0)`, which would
1016///   collide with whatever the first object name happens to be.
1017/// * The raw file bytes are stored verbatim on
1018///   [`ObjectCommon::ddl`] as a [`DbmsMetadataDdl`] so downstream
1019///   consumers (doc generation, lineage, the doctor's
1020///   ddl-extraction ratio) can inspect the exact source the catalog
1021///   was derived from.
1022///
1023/// This classifier is keyword-shaped and does not parse arbitrary
1024/// PL/SQL bodies — column definitions, parameter signatures, view
1025/// projections and constraint details are *not* populated. When the
1026/// full parser (Layer 1) lands, callers that need column- or
1027/// signature-level fidelity should switch to that path; the
1028/// `DbmsMetadataDdl` stored here is sufficient seed for re-parsing
1029/// on demand without re-reading the disk.
1030#[instrument(level = "info", skip_all, fields(dir = %dir.display()))]
1031pub fn load_from_dbms_metadata_dir(dir: &std::path::Path) -> Result<CatalogSnapshot, CatalogError> {
1032    if !dir.is_dir() {
1033        return Err(CatalogError::Io(std::io::Error::new(
1034            std::io::ErrorKind::NotFound,
1035            format!("not a directory: {}", dir.display()),
1036        )));
1037    }
1038
1039    let mut interner = SymbolInterner::default();
1040    let mut schemas: HashMap<SchemaName, SchemaCatalog> = HashMap::new();
1041    let mut file_count = 0usize;
1042    let mut classified_count = 0usize;
1043
1044    // Collect + sort entries so the resulting snapshot (and its
1045    // interner symbol ids) are deterministic across runs and
1046    // platforms — `read_dir` ordering is unspecified.
1047    let mut paths: Vec<std::path::PathBuf> = fs::read_dir(dir)?
1048        .filter_map(|e| e.ok().map(|e| e.path()))
1049        .filter(|p| {
1050            p.extension()
1051                .and_then(|e| e.to_str())
1052                .is_some_and(|ext| ext.eq("sql"))
1053        })
1054        .collect();
1055    paths.sort();
1056
1057    for path in paths {
1058        file_count += 1;
1059        let ddl_text = match fs::read_to_string(&path) {
1060            Ok(text) => text,
1061            Err(_) => continue,
1062        };
1063
1064        if let Some((schema, obj_name, obj)) = classify_dbms_metadata_ddl(&ddl_text, &mut interner)
1065        {
1066            let schema_catalog = schemas.entry(schema).or_default();
1067            schema_catalog.objects.insert(obj_name, obj);
1068            classified_count += 1;
1069        }
1070    }
1071
1072    tracing::info!(
1073        files = file_count,
1074        classified = classified_count,
1075        "loaded DBMS_METADATA directory"
1076    );
1077
1078    Ok(CatalogSnapshot {
1079        schemas,
1080        profile: AnalysisProfile::default(),
1081        capabilities: CatalogCapabilities {
1082            can_query_all_views: false,
1083            can_query_dba_views: false,
1084            can_use_dbms_metadata: true,
1085            can_read_source: true,
1086            plscope_enabled: false,
1087            can_query_scheduler: false,
1088            can_query_roles_and_grants: false,
1089            warnings: vec![],
1090        },
1091        generated_at: Utc::now(),
1092        source: CatalogSource {
1093            kind: CatalogSourceKind::DbmsMetadataFiles,
1094            description: Some(format!("loaded from {}", dir.display())),
1095            ..CatalogSource::default()
1096        },
1097        interner,
1098        editions: Vec::new(),
1099        // DBMS_METADATA directory loads do not query ALL_USERS; grantee
1100        // classification is not exercised on this path.
1101        known_users: None,
1102    })
1103}
1104
1105/// Default schema name used when a CREATE statement has no `OWNER.`
1106/// prefix. Interned through the regular interner so the resulting
1107/// `SchemaName` has a real, resolvable text — never a collision with
1108/// `SymbolId::new(0)`.
1109const UNQUALIFIED_DDL_SCHEMA: &str = "PUBLIC";
1110
1111/// Classify a single per-file DDL statement into a `CatalogObject`.
1112///
1113/// Returns `None` for whitespace-only / comment-only files and for
1114/// CREATE statements whose object kind keyword is not in the
1115/// known set. The DDL bytes are preserved verbatim on
1116/// [`ObjectCommon::ddl`] so downstream code can re-parse them as
1117/// fidelity improves.
1118fn classify_dbms_metadata_ddl(
1119    ddl_text: &str,
1120    interner: &mut SymbolInterner,
1121) -> Option<(SchemaName, ObjectName, CatalogObject)> {
1122    // Parse the DDL HEADER as a real token stream — never substring-match
1123    // the whole DDL. Body / comment text that mentions `TABLE` etc. used
1124    // to silently re-classify VIEWs and PROCEDUREs as tables.
1125    let header = parse_create_header(ddl_text)?;
1126
1127    // `PACKAGE BODY` / `TYPE BODY` are bodies — the spec's catalog row
1128    // is the source of truth. Honest uncertainty: return None.
1129    if matches!(
1130        header.kind,
1131        DdlKind::PackageBody | DdlKind::TypeBody | DdlKind::Unknown
1132    ) {
1133        return None;
1134    }
1135
1136    let (owner_text, object_text) = extract_owner_and_name(&header.after_kind)?;
1137
1138    let owner_text = owner_text.unwrap_or_else(|| UNQUALIFIED_DDL_SCHEMA.to_string());
1139    let owner = interner.intern_schema_name(owner_text)?;
1140    let name_sid = interner.intern(&object_text)?;
1141    let obj_name = ObjectName::new(name_sid);
1142
1143    let ddl = DbmsMetadataDdl {
1144        ddl_text: ddl_text.to_string(),
1145        normalized_ddl: Some(normalize_dbms_metadata_ddl(ddl_text)),
1146        xml_text: None,
1147    };
1148
1149    let common = ObjectCommon {
1150        owner,
1151        name: obj_name,
1152        object_type: header.kind.object_type(),
1153        ddl: Some(ddl),
1154        ..ObjectCommon::default()
1155    };
1156
1157    let object = match header.kind {
1158        DdlKind::Table => CatalogObject::Table(TableMetadata {
1159            common,
1160            ..TableMetadata::default()
1161        }),
1162        DdlKind::View => CatalogObject::View(ViewMetadata {
1163            common,
1164            ..ViewMetadata::default()
1165        }),
1166        DdlKind::MaterializedView => CatalogObject::MaterializedView(MViewMetadata {
1167            common,
1168            ..MViewMetadata::default()
1169        }),
1170        DdlKind::Package => CatalogObject::Package(PackageMetadata {
1171            common,
1172            ..PackageMetadata::default()
1173        }),
1174        DdlKind::Procedure => CatalogObject::Procedure(ProcedureMetadata {
1175            common,
1176            signature: RoutineSignature {
1177                routine_name: obj_name,
1178                ..RoutineSignature::default()
1179            },
1180        }),
1181        DdlKind::Function => CatalogObject::Function(FunctionMetadata {
1182            common,
1183            signature: RoutineSignature {
1184                routine_name: obj_name,
1185                ..RoutineSignature::default()
1186            },
1187            ..FunctionMetadata::default()
1188        }),
1189        DdlKind::Sequence => CatalogObject::Sequence(SequenceMetadata {
1190            common,
1191            ..SequenceMetadata::default()
1192        }),
1193        DdlKind::Trigger => CatalogObject::Trigger(TriggerMetadata {
1194            common,
1195            ..TriggerMetadata::default()
1196        }),
1197        DdlKind::Type => CatalogObject::Type(TypeMetadata {
1198            common,
1199            ..TypeMetadata::default()
1200        }),
1201        // Filtered above — the match is exhaustive only because we
1202        // handle every concrete kind.
1203        DdlKind::PackageBody | DdlKind::TypeBody | DdlKind::Unknown => return None,
1204    };
1205
1206    Some((owner, obj_name, object))
1207}
1208
1209/// Object kinds the per-file DDL classifier recognizes. `Unknown`
1210/// represents honest uncertainty (R13) — the header didn't tokenize
1211/// into a kind we model. `PackageBody` / `TypeBody` are recognized
1212/// separately so the classifier can skip them without confusing them
1213/// with their specs.
1214#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1215enum DdlKind {
1216    Table,
1217    View,
1218    MaterializedView,
1219    Package,
1220    PackageBody,
1221    Procedure,
1222    Function,
1223    Sequence,
1224    Trigger,
1225    Type,
1226    TypeBody,
1227    Unknown,
1228}
1229
1230impl DdlKind {
1231    fn object_type(self) -> ObjectType {
1232        match self {
1233            DdlKind::Table => ObjectType::Table,
1234            DdlKind::View => ObjectType::View,
1235            DdlKind::MaterializedView => ObjectType::MaterializedView,
1236            DdlKind::Package | DdlKind::PackageBody => ObjectType::Package,
1237            DdlKind::Procedure => ObjectType::Procedure,
1238            DdlKind::Function => ObjectType::Function,
1239            DdlKind::Sequence => ObjectType::Sequence,
1240            DdlKind::Trigger => ObjectType::Trigger,
1241            DdlKind::Type | DdlKind::TypeBody => ObjectType::Type,
1242            DdlKind::Unknown => ObjectType::Unknown,
1243        }
1244    }
1245}
1246
1247/// Parsed CREATE header: the typed `DdlKind` plus the upper-cased
1248/// remainder of the DDL starting immediately after the kind tokens.
1249/// Callers use `after_kind` to locate the `[OWNER.]NAME` — it has
1250/// already been stripped of leading comments / whitespace / `CREATE`
1251/// modifiers / kind tokens so a substring match in there cannot be
1252/// fooled by body content.
1253#[derive(Clone, Debug)]
1254struct ParsedCreateHeader {
1255    kind: DdlKind,
1256    after_kind: String,
1257}
1258
1259/// Parse the CREATE header of a raw DDL string.
1260///
1261/// Skips leading whitespace, `--` line comments, and `/* … */` block
1262/// comments. Consumes `CREATE` then optional `OR REPLACE`, optional
1263/// `FORCE` / `EDITIONABLE` / `NONEDITIONABLE` (in any order), then
1264/// reads one or two tokens to form a [`DdlKind`] (multi-word kinds
1265/// `MATERIALIZED VIEW`, `PACKAGE BODY`, `TYPE BODY` handled). Returns
1266/// `None` only when the input has no `CREATE` token at all; an
1267/// unrecognized kind word produces `DdlKind::Unknown` so callers can
1268/// represent honest uncertainty (R13).
1269fn parse_create_header(ddl: &str) -> Option<ParsedCreateHeader> {
1270    let mut cursor = Cursor::new(ddl);
1271    cursor.skip_ws_and_comments();
1272
1273    // Must start with `CREATE`.
1274    if !cursor.consume_keyword("CREATE") {
1275        return None;
1276    }
1277    cursor.skip_ws_and_comments();
1278
1279    // Optional `OR REPLACE`.
1280    if cursor.consume_keyword("OR") {
1281        cursor.skip_ws_and_comments();
1282        // `OR` without `REPLACE` is malformed; let it fall through to
1283        // kind parsing — the kind word won't match and we'll honestly
1284        // return `Unknown`.
1285        let _ = cursor.consume_keyword("REPLACE");
1286        cursor.skip_ws_and_comments();
1287    }
1288
1289    // Optional `FORCE` / `EDITIONABLE` / `NONEDITIONABLE` modifiers,
1290    // any order, any subset.
1291    loop {
1292        if cursor.consume_keyword("FORCE")
1293            || cursor.consume_keyword("NONEDITIONABLE")
1294            || cursor.consume_keyword("EDITIONABLE")
1295            || cursor.consume_keyword("NO")
1296        {
1297            cursor.skip_ws_and_comments();
1298            continue;
1299        }
1300        break;
1301    }
1302
1303    // Read the kind word (one token, possibly extended to two for
1304    // `MATERIALIZED VIEW` / `PACKAGE BODY` / `TYPE BODY`).
1305    let first = match cursor.consume_identifier() {
1306        Some(tok) => tok,
1307        None => {
1308            return Some(ParsedCreateHeader {
1309                kind: DdlKind::Unknown,
1310                after_kind: cursor.upper_remainder(),
1311            });
1312        }
1313    };
1314    cursor.skip_ws_and_comments();
1315
1316    // Speculatively look at the second token without committing — only
1317    // commit if it forms a known two-word kind.
1318    let kind = match first.as_str() {
1319        "MATERIALIZED" => {
1320            if cursor.peek_keyword("VIEW") {
1321                cursor.consume_keyword("VIEW");
1322                cursor.skip_ws_and_comments();
1323                DdlKind::MaterializedView
1324            } else {
1325                DdlKind::Unknown
1326            }
1327        }
1328        "PACKAGE" => {
1329            if cursor.peek_keyword("BODY") {
1330                cursor.consume_keyword("BODY");
1331                cursor.skip_ws_and_comments();
1332                DdlKind::PackageBody
1333            } else {
1334                DdlKind::Package
1335            }
1336        }
1337        "TYPE" => {
1338            if cursor.peek_keyword("BODY") {
1339                cursor.consume_keyword("BODY");
1340                cursor.skip_ws_and_comments();
1341                DdlKind::TypeBody
1342            } else {
1343                DdlKind::Type
1344            }
1345        }
1346        "TABLE" => DdlKind::Table,
1347        "VIEW" => DdlKind::View,
1348        "PROCEDURE" => DdlKind::Procedure,
1349        "FUNCTION" => DdlKind::Function,
1350        "SEQUENCE" => DdlKind::Sequence,
1351        "TRIGGER" => DdlKind::Trigger,
1352        _ => DdlKind::Unknown,
1353    };
1354
1355    Some(ParsedCreateHeader {
1356        kind,
1357        after_kind: cursor.upper_remainder(),
1358    })
1359}
1360
1361/// Hand-rolled byte cursor for the CREATE header tokenizer.
1362///
1363/// Only knows enough about SQL to skip whitespace / `--` line
1364/// comments / `/* … */` block comments and to read alphabetic
1365/// identifier keywords case-insensitively. It deliberately does
1366/// **not** try to parse the whole DDL — every operation past the
1367/// kind word is delegated to [`extract_owner_and_name`] working on
1368/// the upper-cased remainder.
1369struct Cursor<'a> {
1370    bytes: &'a [u8],
1371    pos: usize,
1372}
1373
1374impl<'a> Cursor<'a> {
1375    fn new(text: &'a str) -> Self {
1376        Self {
1377            bytes: text.as_bytes(),
1378            pos: 0,
1379        }
1380    }
1381
1382    fn skip_ws_and_comments(&mut self) {
1383        loop {
1384            // Skip ASCII whitespace.
1385            while self.pos < self.bytes.len() && self.bytes[self.pos].is_ascii_whitespace() {
1386                self.pos += 1;
1387            }
1388            // `--` line comment.
1389            if self.pos + 1 < self.bytes.len()
1390                && self.bytes[self.pos].eq(&b'-')
1391                && self.bytes[self.pos + 1].eq(&b'-')
1392            {
1393                self.pos += 2;
1394                while self.pos < self.bytes.len() && self.bytes[self.pos].ne(&b'\n') {
1395                    self.pos += 1;
1396                }
1397                continue;
1398            }
1399            // `/* … */` block comment.
1400            if self.pos + 1 < self.bytes.len()
1401                && self.bytes[self.pos].eq(&b'/')
1402                && self.bytes[self.pos + 1].eq(&b'*')
1403            {
1404                self.pos += 2;
1405                while self.pos + 1 < self.bytes.len()
1406                    && !(self.bytes[self.pos].eq(&b'*') && self.bytes[self.pos + 1].eq(&b'/'))
1407                {
1408                    self.pos += 1;
1409                }
1410                if self.pos + 1 < self.bytes.len() {
1411                    self.pos += 2; // consume the closing `*/`
1412                } else {
1413                    self.pos = self.bytes.len(); // unterminated — end-of-input
1414                }
1415                continue;
1416            }
1417            break;
1418        }
1419    }
1420
1421    /// Returns true if the next identifier token matches `kw`
1422    /// case-insensitively (and is followed by a non-identifier
1423    /// character or end-of-input). Does not advance the cursor.
1424    fn peek_keyword(&self, kw: &str) -> bool {
1425        let end = self.pos + kw.len();
1426        if end > self.bytes.len() {
1427            return false;
1428        }
1429        if !self.bytes[self.pos..end].eq_ignore_ascii_case(kw.as_bytes()) {
1430            return false;
1431        }
1432        // Word boundary check — `CREATEDOC` must not match `CREATE`.
1433        if end < self.bytes.len() {
1434            let next = self.bytes[end];
1435            if next.eq(&b'_') || next.is_ascii_alphanumeric() {
1436                return false;
1437            }
1438        }
1439        true
1440    }
1441
1442    fn consume_keyword(&mut self, kw: &str) -> bool {
1443        if self.peek_keyword(kw) {
1444            self.pos += kw.len();
1445            true
1446        } else {
1447            false
1448        }
1449    }
1450
1451    /// Consume the next bare ASCII identifier (letters / digits /
1452    /// underscore, must start with a letter) and return it
1453    /// upper-cased. Returns `None` if the cursor is not on an
1454    /// identifier start character — e.g. a quoted identifier or a
1455    /// punctuation token. Quoted identifiers in the header position
1456    /// (the kind word) are not legal Oracle DDL so we don't bother.
1457    fn consume_identifier(&mut self) -> Option<String> {
1458        if self.pos >= self.bytes.len() {
1459            return None;
1460        }
1461        let first = self.bytes[self.pos];
1462        if !first.is_ascii_alphabetic() {
1463            return None;
1464        }
1465        let start = self.pos;
1466        while self.pos < self.bytes.len() {
1467            let b = self.bytes[self.pos];
1468            if b.is_ascii_alphanumeric() || b.eq(&b'_') {
1469                self.pos += 1;
1470            } else {
1471                break;
1472            }
1473        }
1474        let raw = std::str::from_utf8(&self.bytes[start..self.pos]).ok()?;
1475        Some(raw.to_ascii_uppercase())
1476    }
1477
1478    /// Return the rest of the input from the current cursor position,
1479    /// upper-cased. Used to hand off to [`extract_owner_and_name`].
1480    fn upper_remainder(&self) -> String {
1481        std::str::from_utf8(&self.bytes[self.pos..])
1482            .unwrap_or("")
1483            .to_ascii_uppercase()
1484    }
1485}
1486
1487/// Extract the optional `OWNER` and the bare `OBJECT` name from the
1488/// upper-cased remainder that follows the parsed `CREATE <KIND>`
1489/// header. Strips surrounding quotes (so `CREATE TABLE "HR"."EMP"`
1490/// works) and trailing punctuation / parenthesis that the column
1491/// list would attach. Operates on the post-header slice only — never
1492/// on the body — so it can't be fooled by `TABLE` appearing later.
1493fn extract_owner_and_name(after_kind: &str) -> Option<(Option<String>, String)> {
1494    let after = after_kind.trim_start();
1495
1496    // Scan the `[OWNER.]NAME` token honouring double-quoted Oracle
1497    // identifiers. A `"..."` segment is a single token that may contain
1498    // whitespace and runs to its closing `"`; an unquoted segment stops
1499    // at whitespace, `(`, `;`, or other DDL punctuation. The owner/name
1500    // split is the first top-level (outside-quotes) `.`.
1501    let mut segments: Vec<Segment> = Vec::new();
1502    let bytes = after.as_bytes();
1503    let mut i = 0usize;
1504    'scan: while i < bytes.len() {
1505        if bytes[i].eq(&b'"') {
1506            // Quoted segment: consume up to (and including) the closing `"`.
1507            let content_start = i + 1;
1508            let mut j = content_start;
1509            while j < bytes.len() && bytes[j].ne(&b'"') {
1510                j += 1;
1511            }
1512            // Unterminated quote ⇒ malformed header; give up.
1513            if j >= bytes.len() {
1514                return None;
1515            }
1516            segments.push(Segment {
1517                text: after[content_start..j].to_string(),
1518                quoted: true,
1519            });
1520            i = j + 1; // skip closing quote
1521        } else {
1522            // Unquoted run: identifier chars only. Anything else (space,
1523            // `(`, `;`, `,`, …) terminates the `[OWNER.]NAME` token —
1524            // except a top-level `.` which separates owner from name.
1525            let start = i;
1526            while i < bytes.len() {
1527                let c = bytes[i] as char;
1528                if c.is_ascii_alphanumeric() || c.eq(&'_') {
1529                    i += 1;
1530                } else {
1531                    break;
1532                }
1533            }
1534            // An empty unquoted run means we hit a non-identifier byte
1535            // that is not a segment separator: stop scanning the token.
1536            if i.eq(&start) {
1537                break 'scan;
1538            }
1539            segments.push(Segment {
1540                text: after[start..i].to_string(),
1541                quoted: false,
1542            });
1543        }
1544
1545        // After a segment, a `.` continues into the next (NAME) segment;
1546        // anything else ends the `[OWNER.]NAME` token.
1547        if i < bytes.len() && bytes[i].eq(&b'.') {
1548            i += 1;
1549        } else {
1550            break 'scan;
1551        }
1552    }
1553
1554    // Validate each segment: quoted segments accept any non-empty
1555    // content; unquoted segments must be a real identifier.
1556    let valid = |seg: &Segment| -> bool {
1557        if seg.text.is_empty() {
1558            return false;
1559        }
1560        seg.quoted || seg.text.chars().all(|c| c.is_alphanumeric() || c.eq(&'_'))
1561    };
1562
1563    match segments.as_slice() {
1564        [name] if valid(name) => Some((None, name.text.clone())),
1565        [owner, name] if valid(owner) && valid(name) => {
1566            Some((Some(owner.text.clone()), name.text.clone()))
1567        }
1568        _ => None,
1569    }
1570}
1571
1572/// One dot-delimited segment of a `[OWNER.]NAME` token, tracking whether
1573/// it originated from a double-quoted Oracle identifier (which may hold
1574/// whitespace and bypasses the unquoted identifier-char validity rule).
1575struct Segment {
1576    text: String,
1577    quoted: bool,
1578}
1579
1580#[cfg(feature = "oraclemcp-db")]
1581#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1582enum RoutineKind {
1583    Procedure,
1584    Function,
1585}
1586
1587#[cfg(feature = "oraclemcp-db")]
1588#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1589struct RoutineLocator {
1590    owner: SchemaName,
1591    package_name: Option<ObjectName>,
1592    routine_name: ObjectName,
1593    subprogram_id: Option<u32>,
1594    overload: Option<u32>,
1595}
1596
1597#[cfg(feature = "oraclemcp-db")]
1598#[derive(Clone, Debug, Default)]
1599struct RoutineAccumulator {
1600    signature: Option<RoutineSignature>,
1601    kind_hint: Option<RoutineKind>,
1602    deterministic: bool,
1603    pipelined: bool,
1604}
1605
1606#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1607pub struct SchemaCatalog {
1608    pub objects: HashMap<ObjectName, CatalogObject>,
1609    pub synonyms: HashMap<SynonymName, SynonymTarget>,
1610    pub grants: Vec<Grant>,
1611    pub indexes: HashMap<IndexName, IndexMetadata>,
1612    pub constraints: HashMap<ConstraintName, ConstraintMetadata>,
1613    pub triggers: HashMap<TriggerName, TriggerMetadata>,
1614    pub dependencies: Vec<CatalogDependency>,
1615    pub plscope: Option<PlScopeSnapshot>,
1616    /// Database links owned by this schema. Public links live in the
1617    /// synthetic `PUBLIC` schema. Sourced from `ALL_DB_LINKS`.
1618    #[serde(default)]
1619    pub db_links: Vec<DatabaseLink>,
1620    /// Per-object COMMENT ON TABLE / VIEW text. Sourced from
1621    /// `ALL_TAB_COMMENTS`.
1622    #[serde(default)]
1623    pub table_comments: Vec<TableComment>,
1624    /// Per-column COMMENT ON COLUMN text. Sourced from
1625    /// `ALL_COL_COMMENTS`.
1626    #[serde(default)]
1627    pub column_comments: Vec<ColumnComment>,
1628    /// Editioning views owned by this schema (the views that mask the
1629    /// underlying base table in an EBR shop). Sourced from
1630    /// `ALL_EDITIONING_VIEWS`.
1631    #[serde(default)]
1632    pub editioning_views: Vec<EditioningView>,
1633    /// VPD/RLS policies attached to objects in this schema. Sourced
1634    /// from `ALL_POLICIES`.
1635    #[serde(default)]
1636    pub vpd_policies: Vec<VpdPolicy>,
1637}
1638
1639#[cfg(feature = "oraclemcp-db")]
1640fn resolve_schema_filters(
1641    connection_info: &OracleConnectionInfo,
1642    request: &CatalogLoadRequest,
1643) -> Result<Vec<String>, CatalogError> {
1644    let mut resolved = Vec::<String>::new();
1645
1646    for filter in &request.schema_filters {
1647        let schema_name = match filter {
1648            CatalogSchemaFilter::CurrentSchema => connection_info
1649                .current_schema
1650                .clone()
1651                .ok_or(CatalogError::CurrentSchemaUnavailable)?,
1652            CatalogSchemaFilter::Named(schema_name) => {
1653                let trimmed = schema_name.trim();
1654                if trimmed.is_empty() {
1655                    return Err(CatalogError::InvalidSchemaFilter {
1656                        schema_name: schema_name.clone(),
1657                    });
1658                }
1659                String::from(trimmed)
1660            }
1661        };
1662
1663        if !resolved.iter().any(|candidate| candidate.eq(&schema_name)) {
1664            resolved.push(schema_name);
1665        }
1666    }
1667
1668    if resolved.is_empty() {
1669        return Err(CatalogError::CurrentSchemaUnavailable);
1670    }
1671
1672    Ok(resolved)
1673}
1674
1675/// Normalize DDL text emitted by `DBMS_METADATA.GET_DDL` so equality checks
1676/// across runs ignore cosmetic differences:
1677///
1678/// - Trim leading + trailing whitespace.
1679/// - Collapse runs of whitespace inside the body to a single space (newlines
1680///   are preserved as-is so the result remains readable).
1681/// - Strip the trailing `/` SQL*Plus terminator if present.
1682#[must_use]
1683pub fn normalize_dbms_metadata_ddl(text: &str) -> String {
1684    let trimmed = text.trim();
1685    let trimmed = trimmed.strip_suffix('/').unwrap_or(trimmed).trim_end();
1686    let mut normalized = String::with_capacity(trimmed.len());
1687    let mut prev_space = false;
1688    for c in trimmed.chars() {
1689        if c.eq(&' ') || c.eq(&'\t') {
1690            if !prev_space {
1691                normalized.push(' ');
1692                prev_space = true;
1693            }
1694        } else {
1695            normalized.push(c);
1696            prev_space = false;
1697        }
1698    }
1699    normalized
1700}
1701
1702/// Map an `ObjectType` to the string the `DBMS_METADATA.GET_DDL` /
1703/// `GET_XML` overloads expect as their first parameter. Returns `None` for
1704/// types that have no DBMS_METADATA representation (e.g.
1705/// `ObjectType::Unknown`, `ObjectType::Constraint`).
1706#[must_use]
1707pub fn object_type_to_dbms_metadata_value(object_type: ObjectType) -> Option<&'static str> {
1708    match object_type {
1709        ObjectType::Table => Some("TABLE"),
1710        ObjectType::View => Some("VIEW"),
1711        ObjectType::MaterializedView => Some("MATERIALIZED_VIEW"),
1712        ObjectType::Sequence => Some("SEQUENCE"),
1713        ObjectType::Type => Some("TYPE"),
1714        ObjectType::Package => Some("PACKAGE"),
1715        ObjectType::Procedure => Some("PROCEDURE"),
1716        ObjectType::Function => Some("FUNCTION"),
1717        ObjectType::Trigger => Some("TRIGGER"),
1718        ObjectType::EditioningView => Some("VIEW"),
1719        ObjectType::SchedulerJob => Some("PROCOBJ"),
1720        ObjectType::Synonym => Some("SYNONYM"),
1721        ObjectType::Index => Some("INDEX"),
1722        ObjectType::Constraint | ObjectType::Unknown => None,
1723    }
1724}
1725
1726#[cfg(feature = "oraclemcp-db")]
1727fn oracle_version_from_server_version(
1728    server_version: &str,
1729) -> (OracleVersion, Option<CapabilityWarning>) {
1730    let major_component = server_version
1731        .split('.')
1732        .next()
1733        .unwrap_or_default()
1734        .trim()
1735        .parse::<u32>()
1736        .ok();
1737
1738    match major_component {
1739        Some(11) => (OracleVersion::Oracle11g, None),
1740        Some(12) => (OracleVersion::Oracle12c, None),
1741        Some(19) => (OracleVersion::Oracle19c, None),
1742        Some(21) => (OracleVersion::Oracle21c, None),
1743        Some(23) => (OracleVersion::Oracle23ai, None),
1744        Some(26) => (OracleVersion::Oracle26ai, None),
1745        _ => (
1746            OracleVersion::Oracle19c,
1747            Some(CapabilityWarning {
1748                code: String::from("catalog-version-parse-fallback"),
1749                message: format!(
1750                    "server version `{server_version}` did not map cleanly to a supported OracleVersion; defaulted AnalysisProfile.oracle_version to Oracle19c"
1751                ),
1752                remediation: Some(String::from(
1753                    "Set the workspace AnalysisProfile explicitly if this estate targets a newer or older Oracle release.",
1754                )),
1755            }),
1756        ),
1757    }
1758}
1759
1760#[cfg(feature = "oraclemcp-db")]
1761fn oracle_bind_placeholders(count: usize, start_index: usize) -> String {
1762    (0..count)
1763        .map(|offset| format!(":{}", start_index + offset))
1764        .collect::<Vec<_>>()
1765        .join(", ")
1766}
1767
1768#[cfg(feature = "oraclemcp-db")]
1769fn hash_text(text: &str) -> Hash {
1770    use sha2::{Digest as _, Sha256};
1771    let mut hasher = Sha256::new();
1772    hasher.update(text.as_bytes());
1773    // sha2 0.11+ returns `Array<u8, …>` from `finalize` which no
1774    // longer impls `LowerHex` directly; render byte-by-byte (matches
1775    // the plsql-store pattern). Keeps the bump from being a breaking
1776    // change for callers.
1777    let digest = hasher.finalize();
1778    let mut rendered = String::with_capacity(7 + digest.len() * 2);
1779    rendered.push_str("sha256:");
1780    for byte in digest {
1781        rendered.push_str(&format!("{byte:02x}"));
1782    }
1783    Hash::new(rendered)
1784}
1785
1786#[cfg(feature = "oraclemcp-db")]
1787fn schema_filter_params(schema_names: &[String]) -> Vec<OracleBind> {
1788    schema_names
1789        .iter()
1790        .cloned()
1791        .map(OracleBind::from)
1792        .collect::<Vec<_>>()
1793}
1794
1795#[cfg(feature = "oraclemcp-db")]
1796fn apply_object_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
1797    let owner_text = row.require_text("OWNER")?;
1798    let object_name_text = row.require_text("OBJECT_NAME")?;
1799    let object_type_text = row.require_text("OBJECT_TYPE")?;
1800    let Some(object_type) = object_type_from_dictionary_value(object_type_text) else {
1801        return Ok(());
1802    };
1803
1804    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
1805        return Err(CatalogError::InvalidColumnValue {
1806            column: String::from("OWNER"),
1807            expected: "interned schema name",
1808            value: String::from(owner_text),
1809        });
1810    };
1811    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
1812        return Err(CatalogError::InvalidColumnValue {
1813            column: String::from("OBJECT_NAME"),
1814            expected: "interned object name",
1815            value: String::from(object_name_text),
1816        });
1817    };
1818
1819    let last_ddl_time =
1820        optional_nonblank_text(row, "LAST_DDL_TIME_ISO").and_then(parse_dictionary_timestamp);
1821    let editionable = optional_bool(row, "EDITIONABLE")?;
1822    let edition_name = optional_nonblank_text(row, "EDITION_NAME")
1823        .map(|value| {
1824            snapshot
1825                .interner
1826                .intern(value)
1827                .map(EditionName::from)
1828                .ok_or(CatalogError::InvalidColumnValue {
1829                    column: String::from("EDITION_NAME"),
1830                    expected: "interned edition name",
1831                    value: String::from(value),
1832                })
1833        })
1834        .transpose()?;
1835
1836    let common = ObjectCommon {
1837        owner,
1838        name: object_name,
1839        object_type,
1840        status: row
1841            .text("STATUS")
1842            .map(object_status_from_dictionary_value)
1843            .unwrap_or_default(),
1844        edition_name,
1845        editionable,
1846        last_ddl_time,
1847        ..ObjectCommon::default()
1848    };
1849
1850    let Some(catalog_object) = blank_catalog_object(common) else {
1851        return Ok(());
1852    };
1853
1854    snapshot
1855        .schemas
1856        .entry(owner)
1857        .or_default()
1858        .objects
1859        .insert(object_name, catalog_object);
1860
1861    Ok(())
1862}
1863
1864#[cfg(feature = "oraclemcp-db")]
1865fn apply_dependency_row(
1866    snapshot: &mut CatalogSnapshot,
1867    row: &OracleRow,
1868) -> Result<(), CatalogError> {
1869    let owner_text = row.require_text("OWNER")?;
1870    let name_text = row.require_text("NAME")?;
1871    let referenced_owner_text = row.require_text("REFERENCED_OWNER")?;
1872    let referenced_name_text = row.require_text("REFERENCED_NAME")?;
1873
1874    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
1875        return Err(CatalogError::InvalidColumnValue {
1876            column: String::from("OWNER"),
1877            expected: "interned schema name",
1878            value: String::from(owner_text),
1879        });
1880    };
1881    let Some(object_name) = snapshot.intern_object_name(name_text) else {
1882        return Err(CatalogError::InvalidColumnValue {
1883            column: String::from("NAME"),
1884            expected: "interned object name",
1885            value: String::from(name_text),
1886        });
1887    };
1888    let Some(referenced_owner) = snapshot.intern_schema_name(referenced_owner_text) else {
1889        return Err(CatalogError::InvalidColumnValue {
1890            column: String::from("REFERENCED_OWNER"),
1891            expected: "interned schema name",
1892            value: String::from(referenced_owner_text),
1893        });
1894    };
1895    let Some(referenced_name) = snapshot.intern_object_name(referenced_name_text) else {
1896        return Err(CatalogError::InvalidColumnValue {
1897            column: String::from("REFERENCED_NAME"),
1898            expected: "interned object name",
1899            value: String::from(referenced_name_text),
1900        });
1901    };
1902
1903    let object_type = optional_nonblank_text(row, "TYPE")
1904        .and_then(object_type_from_dictionary_value)
1905        .unwrap_or_default();
1906    let referenced_type =
1907        optional_nonblank_text(row, "REFERENCED_TYPE").and_then(object_type_from_dictionary_value);
1908
1909    let dependency = CatalogDependency {
1910        owner,
1911        name: object_name,
1912        object_type,
1913        referenced_owner: Some(referenced_owner),
1914        referenced_name,
1915        referenced_type,
1916        dependency_kind: optional_nonblank_text(row, "DEPENDENCY_TYPE")
1917            .map(catalog_dependency_kind_from_dictionary_value)
1918            .unwrap_or_default(),
1919        via_db_link: None,
1920    };
1921
1922    snapshot
1923        .schemas
1924        .entry(owner)
1925        .or_default()
1926        .dependencies
1927        .push(dependency);
1928
1929    Ok(())
1930}
1931
1932#[cfg(feature = "oraclemcp-db")]
1933fn parse_dictionary_timestamp(text: &str) -> Option<DateTime<Utc>> {
1934    // Expected shape from the loader query: `YYYY-MM-DD"T"HH24:MI:SS`.
1935    chrono::NaiveDateTime::parse_from_str(text, "%Y-%m-%dT%H:%M:%S")
1936        .ok()
1937        .map(|naive| DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc))
1938}
1939
1940#[cfg(feature = "oraclemcp-db")]
1941fn catalog_dependency_kind_from_dictionary_value(text: &str) -> CatalogDependencyKind {
1942    match text.to_ascii_uppercase().as_str() {
1943        "HARD" => CatalogDependencyKind::Hard,
1944        "REF" => CatalogDependencyKind::Reference,
1945        "EXTENDED" => CatalogDependencyKind::Extended,
1946        _ => CatalogDependencyKind::default(),
1947    }
1948}
1949
1950#[cfg(feature = "oraclemcp-db")]
1951fn apply_column_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
1952    let owner_text = row.require_text("OWNER")?;
1953    let table_name_text = row.require_text("TABLE_NAME")?;
1954    let column_name_text = row.require_text("COLUMN_NAME")?;
1955
1956    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
1957        return Err(CatalogError::InvalidColumnValue {
1958            column: String::from("OWNER"),
1959            expected: "interned schema name",
1960            value: String::from(owner_text),
1961        });
1962    };
1963    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
1964        return Err(CatalogError::InvalidColumnValue {
1965            column: String::from("TABLE_NAME"),
1966            expected: "interned object name",
1967            value: String::from(table_name_text),
1968        });
1969    };
1970    let Some(column_name) = snapshot.intern_column_name(column_name_text) else {
1971        return Err(CatalogError::InvalidColumnValue {
1972            column: String::from("COLUMN_NAME"),
1973            expected: "interned column name",
1974            value: String::from(column_name_text),
1975        });
1976    };
1977    let data_type = data_type_ref_from_row(snapshot, row)?;
1978
1979    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
1980        return Ok(());
1981    };
1982    let Some(catalog_object) = schema_catalog.objects.get_mut(&table_name) else {
1983        return Ok(());
1984    };
1985
1986    let default_expression = row
1987        .text("DATA_DEFAULT_VC")
1988        .map(String::from)
1989        .filter(|value| !value.trim().is_empty());
1990    let virtual_column = optional_bool(row, "VIRTUAL_COLUMN")?.unwrap_or(false);
1991    let column = ColumnMetadata {
1992        name: column_name,
1993        position: required_u32(row, "COLUMN_POSITION")?,
1994        data_type,
1995        nullable: optional_bool(row, "NULLABLE")?.unwrap_or(false),
1996        default_expression: if virtual_column {
1997            None
1998        } else {
1999            default_expression.clone()
2000        },
2001        generated_expression: if virtual_column {
2002            default_expression
2003        } else {
2004            None
2005        },
2006        hidden: optional_bool(row, "HIDDEN_COLUMN")?.unwrap_or(false),
2007    };
2008
2009    match catalog_object {
2010        CatalogObject::Table(metadata) => {
2011            metadata.columns.insert(column.name, column);
2012        }
2013        CatalogObject::View(metadata) => {
2014            metadata.columns.insert(column.name, column);
2015        }
2016        CatalogObject::MaterializedView(metadata) => {
2017            metadata.columns.insert(column.name, column);
2018        }
2019        CatalogObject::EditioningView(metadata) => {
2020            metadata.columns.insert(column.name, column);
2021        }
2022        CatalogObject::Sequence(_)
2023        | CatalogObject::Type(_)
2024        | CatalogObject::Package(_)
2025        | CatalogObject::Procedure(_)
2026        | CatalogObject::Function(_)
2027        | CatalogObject::Trigger(_)
2028        | CatalogObject::SchedulerJob(_) => {}
2029    }
2030
2031    Ok(())
2032}
2033
2034#[cfg(feature = "oraclemcp-db")]
2035fn apply_constraint_row(
2036    snapshot: &mut CatalogSnapshot,
2037    row: &OracleRow,
2038) -> Result<(), CatalogError> {
2039    let owner_text = row.require_text("OWNER")?;
2040    let constraint_name_text = row.require_text("CONSTRAINT_NAME")?;
2041    let table_name_text = row.require_text("TABLE_NAME")?;
2042    let search_condition = optional_nonblank_text(row, "SEARCH_CONDITION_VC").map(String::from);
2043
2044    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2045        return Err(CatalogError::InvalidColumnValue {
2046            column: String::from("OWNER"),
2047            expected: "interned schema name",
2048            value: String::from(owner_text),
2049        });
2050    };
2051    let Some(constraint_name) = snapshot.intern_constraint_name(constraint_name_text) else {
2052        return Err(CatalogError::InvalidColumnValue {
2053            column: String::from("CONSTRAINT_NAME"),
2054            expected: "interned constraint name",
2055            value: String::from(constraint_name_text),
2056        });
2057    };
2058    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2059        return Err(CatalogError::InvalidColumnValue {
2060            column: String::from("TABLE_NAME"),
2061            expected: "interned object name",
2062            value: String::from(table_name_text),
2063        });
2064    };
2065    let referenced_table_owner = optional_nonblank_text(row, "REFERENCED_TABLE_OWNER")
2066        .map(|value| {
2067            snapshot
2068                .intern_schema_name(value)
2069                .ok_or(CatalogError::InvalidColumnValue {
2070                    column: String::from("REFERENCED_TABLE_OWNER"),
2071                    expected: "interned schema name",
2072                    value: String::from(value),
2073                })
2074        })
2075        .transpose()?;
2076    let referenced_table_name = optional_nonblank_text(row, "REFERENCED_TABLE_NAME")
2077        .map(|value| {
2078            snapshot
2079                .intern_object_name(value)
2080                .ok_or(CatalogError::InvalidColumnValue {
2081                    column: String::from("REFERENCED_TABLE_NAME"),
2082                    expected: "interned object name",
2083                    value: String::from(value),
2084                })
2085        })
2086        .transpose()?;
2087    let child_column = optional_nonblank_text(row, "COLUMN_NAME")
2088        .map(|value| {
2089            snapshot
2090                .intern_column_name(value)
2091                .ok_or(CatalogError::InvalidColumnValue {
2092                    column: String::from("COLUMN_NAME"),
2093                    expected: "interned column name",
2094                    value: String::from(value),
2095                })
2096        })
2097        .transpose()?;
2098    let referenced_column = optional_nonblank_text(row, "REFERENCED_COLUMN_NAME")
2099        .map(|value| {
2100            snapshot
2101                .intern_column_name(value)
2102                .ok_or(CatalogError::InvalidColumnValue {
2103                    column: String::from("REFERENCED_COLUMN_NAME"),
2104                    expected: "interned column name",
2105                    value: String::from(value),
2106                })
2107        })
2108        .transpose()?;
2109
2110    let constraint_type = constraint_type_from_dictionary_value(
2111        row.require_text("CONSTRAINT_TYPE")?,
2112        search_condition.as_deref(),
2113        child_column.is_some(),
2114    );
2115
2116    let metadata = snapshot
2117        .schemas
2118        .entry(owner)
2119        .or_default()
2120        .constraints
2121        .entry(constraint_name)
2122        .or_insert_with(|| ConstraintMetadata {
2123            name: constraint_name,
2124            table_owner: owner,
2125            table_name,
2126            constraint_type,
2127            columns: Vec::new(),
2128            referenced_table_owner,
2129            referenced_table_name,
2130            referenced_columns: Vec::new(),
2131            search_condition: search_condition.clone(),
2132            deferrable: optional_bool(row, "IS_DEFERRABLE").ok().flatten(),
2133            initially_deferred: optional_bool(row, "IS_DEFERRED").ok().flatten(),
2134        });
2135
2136    metadata.table_name = table_name;
2137    metadata.constraint_type = constraint_type;
2138    metadata.referenced_table_owner = referenced_table_owner;
2139    metadata.referenced_table_name = referenced_table_name;
2140    metadata.search_condition = search_condition;
2141    metadata.deferrable = optional_bool(row, "IS_DEFERRABLE")?;
2142    metadata.initially_deferred = optional_bool(row, "IS_DEFERRED")?;
2143
2144    if let Some(column_name) = child_column {
2145        push_unique_column(&mut metadata.columns, column_name);
2146    }
2147    if let Some(column_name) = referenced_column {
2148        push_unique_column(&mut metadata.referenced_columns, column_name);
2149    }
2150
2151    Ok(())
2152}
2153
2154#[cfg(feature = "oraclemcp-db")]
2155fn apply_index_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2156    let owner_text = row.require_text("OWNER")?;
2157    let index_name_text = row.require_text("INDEX_NAME")?;
2158    let table_owner_text = row.require_text("TABLE_OWNER")?;
2159    let table_name_text = row.require_text("TABLE_NAME")?;
2160
2161    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2162        return Err(CatalogError::InvalidColumnValue {
2163            column: String::from("OWNER"),
2164            expected: "interned schema name",
2165            value: String::from(owner_text),
2166        });
2167    };
2168    let Some(index_name) = snapshot.intern_index_name(index_name_text) else {
2169        return Err(CatalogError::InvalidColumnValue {
2170            column: String::from("INDEX_NAME"),
2171            expected: "interned index name",
2172            value: String::from(index_name_text),
2173        });
2174    };
2175    let Some(table_owner) = snapshot.intern_schema_name(table_owner_text) else {
2176        return Err(CatalogError::InvalidColumnValue {
2177            column: String::from("TABLE_OWNER"),
2178            expected: "interned schema name",
2179            value: String::from(table_owner_text),
2180        });
2181    };
2182    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2183        return Err(CatalogError::InvalidColumnValue {
2184            column: String::from("TABLE_NAME"),
2185            expected: "interned object name",
2186            value: String::from(table_name_text),
2187        });
2188    };
2189    let index_column = optional_nonblank_text(row, "COLUMN_NAME")
2190        .map(|value| {
2191            snapshot
2192                .intern_column_name(value)
2193                .ok_or(CatalogError::InvalidColumnValue {
2194                    column: String::from("COLUMN_NAME"),
2195                    expected: "interned column name",
2196                    value: String::from(value),
2197                })
2198        })
2199        .transpose()?;
2200
2201    let metadata = snapshot
2202        .schemas
2203        .entry(owner)
2204        .or_default()
2205        .indexes
2206        .entry(index_name)
2207        .or_insert_with(|| IndexMetadata {
2208            name: index_name,
2209            table_owner,
2210            table_name,
2211            unique: optional_bool(row, "IS_UNIQUE")
2212                .ok()
2213                .flatten()
2214                .unwrap_or(false),
2215            columns: Vec::new(),
2216            index_type: String::from(row.text("INDEX_TYPE").unwrap_or_default()),
2217            status: row
2218                .text("STATUS")
2219                .map(object_status_from_dictionary_value)
2220                .unwrap_or_default(),
2221        });
2222
2223    metadata.table_owner = table_owner;
2224    metadata.table_name = table_name;
2225    metadata.unique = optional_bool(row, "IS_UNIQUE")?.unwrap_or(false);
2226    metadata.index_type = String::from(row.text("INDEX_TYPE").unwrap_or_default());
2227    metadata.status = row
2228        .text("STATUS")
2229        .map(object_status_from_dictionary_value)
2230        .unwrap_or_default();
2231
2232    if let Some(column_name) = index_column {
2233        push_unique_column(&mut metadata.columns, column_name);
2234    }
2235
2236    Ok(())
2237}
2238
2239#[cfg(feature = "oraclemcp-db")]
2240fn apply_trigger_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2241    let owner_text = row.require_text("OWNER")?;
2242    let trigger_name_text = row.require_text("TRIGGER_NAME")?;
2243    let table_owner_text = row.require_text("TABLE_OWNER")?;
2244    let table_name_text = row.require_text("TABLE_NAME")?;
2245
2246    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2247        return Err(CatalogError::InvalidColumnValue {
2248            column: String::from("OWNER"),
2249            expected: "interned schema name",
2250            value: String::from(owner_text),
2251        });
2252    };
2253    let Some(trigger_name) = snapshot.intern_trigger_name(trigger_name_text) else {
2254        return Err(CatalogError::InvalidColumnValue {
2255            column: String::from("TRIGGER_NAME"),
2256            expected: "interned trigger name",
2257            value: String::from(trigger_name_text),
2258        });
2259    };
2260    let Some(object_name) = snapshot.intern_object_name(trigger_name_text) else {
2261        return Err(CatalogError::InvalidColumnValue {
2262            column: String::from("TRIGGER_NAME"),
2263            expected: "interned object name",
2264            value: String::from(trigger_name_text),
2265        });
2266    };
2267    let Some(target_owner) = snapshot.intern_schema_name(table_owner_text) else {
2268        return Err(CatalogError::InvalidColumnValue {
2269            column: String::from("TABLE_OWNER"),
2270            expected: "interned schema name",
2271            value: String::from(table_owner_text),
2272        });
2273    };
2274    let Some(target_name) = snapshot.intern_object_name(table_name_text) else {
2275        return Err(CatalogError::InvalidColumnValue {
2276            column: String::from("TABLE_NAME"),
2277            expected: "interned object name",
2278            value: String::from(table_name_text),
2279        });
2280    };
2281
2282    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2283    let common = schema_catalog
2284        .objects
2285        .get(&object_name)
2286        .and_then(|object| {
2287            if let CatalogObject::Trigger(metadata) = object {
2288                Some(metadata.common.clone())
2289            } else {
2290                None
2291            }
2292        })
2293        .unwrap_or_else(|| ObjectCommon {
2294            owner,
2295            name: object_name,
2296            object_type: ObjectType::Trigger,
2297            ..ObjectCommon::default()
2298        });
2299
2300    let metadata = TriggerMetadata {
2301        common,
2302        target_owner,
2303        target_name,
2304        timing: trigger_timing_from_dictionary_value(row.text("TRIGGER_TYPE").unwrap_or_default()),
2305        level: trigger_level_from_dictionary_value(row.text("TRIGGER_TYPE").unwrap_or_default()),
2306        events: trigger_events_from_dictionary_value(
2307            row.text("TRIGGERING_EVENT").unwrap_or_default(),
2308        ),
2309        when_clause: optional_nonblank_text(row, "WHEN_CLAUSE").map(String::from),
2310        body_hash: None,
2311    };
2312
2313    schema_catalog
2314        .triggers
2315        .insert(trigger_name, metadata.clone());
2316    schema_catalog
2317        .objects
2318        .insert(object_name, CatalogObject::Trigger(metadata));
2319
2320    Ok(())
2321}
2322
2323#[cfg(feature = "oraclemcp-db")]
2324fn apply_synonym_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2325    let owner_text = row.require_text("OWNER")?;
2326    let synonym_name_text = row.require_text("SYNONYM_NAME")?;
2327    let target_name_text = row.require_text("TABLE_NAME")?;
2328
2329    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2330        return Err(CatalogError::InvalidColumnValue {
2331            column: String::from("OWNER"),
2332            expected: "interned schema name",
2333            value: String::from(owner_text),
2334        });
2335    };
2336    let Some(synonym_name) = snapshot.intern_synonym_name(synonym_name_text) else {
2337        return Err(CatalogError::InvalidColumnValue {
2338            column: String::from("SYNONYM_NAME"),
2339            expected: "interned synonym name",
2340            value: String::from(synonym_name_text),
2341        });
2342    };
2343    let Some(target_name) = snapshot.intern_object_name(target_name_text) else {
2344        return Err(CatalogError::InvalidColumnValue {
2345            column: String::from("TABLE_NAME"),
2346            expected: "interned object name",
2347            value: String::from(target_name_text),
2348        });
2349    };
2350    let target_owner = optional_nonblank_text(row, "TABLE_OWNER")
2351        .map(|value| {
2352            snapshot
2353                .intern_schema_name(value)
2354                .ok_or(CatalogError::InvalidColumnValue {
2355                    column: String::from("TABLE_OWNER"),
2356                    expected: "interned schema name",
2357                    value: String::from(value),
2358                })
2359        })
2360        .transpose()?;
2361
2362    snapshot.schemas.entry(owner).or_default().synonyms.insert(
2363        synonym_name,
2364        SynonymTarget {
2365            target_owner,
2366            target_name,
2367            target_type: None,
2368            db_link: optional_nonblank_text(row, "DB_LINK").map(String::from),
2369            public_synonym: owner_text.eq_ignore_ascii_case("PUBLIC"),
2370        },
2371    );
2372
2373    Ok(())
2374}
2375
2376#[cfg(feature = "oraclemcp-db")]
2377fn apply_view_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2378    let owner_text = row.require_text("OWNER")?;
2379    let view_name_text = row.require_text("VIEW_NAME")?;
2380
2381    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2382        return Err(CatalogError::InvalidColumnValue {
2383            column: String::from("OWNER"),
2384            expected: "interned schema name",
2385            value: String::from(owner_text),
2386        });
2387    };
2388    let Some(view_name) = snapshot.intern_object_name(view_name_text) else {
2389        return Err(CatalogError::InvalidColumnValue {
2390            column: String::from("VIEW_NAME"),
2391            expected: "interned object name",
2392            value: String::from(view_name_text),
2393        });
2394    };
2395
2396    let query_hash = optional_nonblank_text(row, "TEXT_VC").map(hash_text);
2397    let read_only = optional_bool(row, "READ_ONLY")?;
2398
2399    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2400        return Ok(());
2401    };
2402    let Some(catalog_object) = schema_catalog.objects.get_mut(&view_name) else {
2403        return Ok(());
2404    };
2405
2406    if let CatalogObject::View(metadata) = catalog_object {
2407        metadata.query_hash = query_hash;
2408        metadata.read_only = read_only;
2409    }
2410
2411    Ok(())
2412}
2413
2414#[cfg(feature = "oraclemcp-db")]
2415fn apply_mview_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2416    let owner_text = row.require_text("OWNER")?;
2417    let mview_name_text = row.require_text("MVIEW_NAME")?;
2418
2419    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2420        return Err(CatalogError::InvalidColumnValue {
2421            column: String::from("OWNER"),
2422            expected: "interned schema name",
2423            value: String::from(owner_text),
2424        });
2425    };
2426    let Some(mview_name) = snapshot.intern_object_name(mview_name_text) else {
2427        return Err(CatalogError::InvalidColumnValue {
2428            column: String::from("MVIEW_NAME"),
2429            expected: "interned object name",
2430            value: String::from(mview_name_text),
2431        });
2432    };
2433
2434    let refresh_mode = optional_nonblank_text(row, "REFRESH_MODE").map(String::from);
2435    let refresh_method = optional_nonblank_text(row, "REFRESH_METHOD").map(String::from);
2436    let query_hash = optional_nonblank_text(row, "QUERY").map(hash_text);
2437
2438    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2439        return Ok(());
2440    };
2441    let Some(catalog_object) = schema_catalog.objects.get_mut(&mview_name) else {
2442        return Ok(());
2443    };
2444
2445    if let CatalogObject::MaterializedView(metadata) = catalog_object {
2446        metadata.refresh_mode = refresh_mode;
2447        metadata.refresh_method = refresh_method;
2448        metadata.query_hash = query_hash;
2449    }
2450
2451    Ok(())
2452}
2453
2454#[cfg(feature = "oraclemcp-db")]
2455fn apply_sequence_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2456    let owner_text = row.require_text("SEQUENCE_OWNER")?;
2457    let sequence_name_text = row.require_text("SEQUENCE_NAME")?;
2458
2459    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2460        return Err(CatalogError::InvalidColumnValue {
2461            column: String::from("SEQUENCE_OWNER"),
2462            expected: "interned schema name",
2463            value: String::from(owner_text),
2464        });
2465    };
2466    let Some(sequence_name) = snapshot.intern_object_name(sequence_name_text) else {
2467        return Err(CatalogError::InvalidColumnValue {
2468            column: String::from("SEQUENCE_NAME"),
2469            expected: "interned object name",
2470            value: String::from(sequence_name_text),
2471        });
2472    };
2473
2474    let increment_by = row.parse_i64("INCREMENT_BY").unwrap_or(1);
2475    let min_value = row.parse_i64("MIN_VALUE").ok();
2476    let max_value = row.parse_i64("MAX_VALUE").ok();
2477    let cycle = row
2478        .text("CYCLE_FLAG")
2479        .map(|value| value.eq_ignore_ascii_case("Y"))
2480        .unwrap_or(false);
2481    let ordered = row
2482        .text("ORDER_FLAG")
2483        .map(|value| value.eq_ignore_ascii_case("Y"))
2484        .unwrap_or(false);
2485    let cache_size = row.parse_u64("CACHE_SIZE").ok();
2486
2487    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2488        return Ok(());
2489    };
2490    let Some(catalog_object) = schema_catalog.objects.get_mut(&sequence_name) else {
2491        return Ok(());
2492    };
2493
2494    if let CatalogObject::Sequence(metadata) = catalog_object {
2495        metadata.increment_by = increment_by;
2496        metadata.min_value = min_value;
2497        metadata.max_value = max_value;
2498        metadata.cycle = cycle;
2499        metadata.ordered = ordered;
2500        metadata.cache_size = cache_size;
2501    }
2502
2503    Ok(())
2504}
2505
2506#[cfg(feature = "oraclemcp-db")]
2507fn apply_type_attr_row(
2508    snapshot: &mut CatalogSnapshot,
2509    row: &OracleRow,
2510) -> Result<(), CatalogError> {
2511    let owner_text = row.require_text("OWNER")?;
2512    let type_name_text = row.require_text("TYPE_NAME")?;
2513    let attr_name_text = row.require_text("ATTR_NAME")?;
2514
2515    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2516        return Err(CatalogError::InvalidColumnValue {
2517            column: String::from("OWNER"),
2518            expected: "interned schema name",
2519            value: String::from(owner_text),
2520        });
2521    };
2522    let Some(type_name) = snapshot.intern_object_name(type_name_text) else {
2523        return Err(CatalogError::InvalidColumnValue {
2524            column: String::from("TYPE_NAME"),
2525            expected: "interned object name",
2526            value: String::from(type_name_text),
2527        });
2528    };
2529    let Some(attr_name) = snapshot.intern_member_name(attr_name_text) else {
2530        return Err(CatalogError::InvalidColumnValue {
2531            column: String::from("ATTR_NAME"),
2532            expected: "interned member name",
2533            value: String::from(attr_name_text),
2534        });
2535    };
2536
2537    let attr_type_owner = optional_nonblank_text(row, "ATTR_TYPE_OWNER")
2538        .map(|value| {
2539            snapshot
2540                .intern_schema_name(value)
2541                .ok_or(CatalogError::InvalidColumnValue {
2542                    column: String::from("ATTR_TYPE_OWNER"),
2543                    expected: "interned schema name",
2544                    value: String::from(value),
2545                })
2546        })
2547        .transpose()?;
2548    let attr_type_name = row
2549        .text("ATTR_TYPE_NAME")
2550        .map(String::from)
2551        .unwrap_or_default();
2552
2553    let attribute = TypeAttribute {
2554        name: attr_name,
2555        position: required_u32(row, "ATTR_NO")?,
2556        data_type: DataTypeRef {
2557            owner: attr_type_owner,
2558            name: attr_type_name,
2559            length: optional_u32(row, "LENGTH")?,
2560            precision: optional_u32(row, "PRECISION")?,
2561            scale: optional_i32(row, "SCALE")?,
2562            char_semantics: None,
2563        },
2564    };
2565
2566    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2567        return Ok(());
2568    };
2569    let Some(catalog_object) = schema_catalog.objects.get_mut(&type_name) else {
2570        return Ok(());
2571    };
2572
2573    if let CatalogObject::Type(metadata) = catalog_object {
2574        match metadata
2575            .attributes
2576            .iter()
2577            .position(|existing| existing.position.eq(&attribute.position))
2578        {
2579            Some(index) => metadata.attributes[index] = attribute,
2580            None => metadata.attributes.push(attribute),
2581        }
2582        metadata
2583            .attributes
2584            .sort_by_key(|attribute| attribute.position);
2585    }
2586
2587    Ok(())
2588}
2589
2590/// Apply a single `ALL_DB_LINKS` row into the snapshot. Ensures the
2591/// owning schema entry exists (lazily creates it) so a `PUBLIC` row
2592/// lands even when no other catalog object has been recorded for that
2593/// synthetic schema.
2594#[cfg(feature = "oraclemcp-db")]
2595fn apply_db_link_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2596    let owner_text = row.require_text("OWNER")?;
2597    let link_name_text = row.require_text("DB_LINK")?;
2598
2599    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2600        return Err(CatalogError::InvalidColumnValue {
2601            column: String::from("OWNER"),
2602            expected: "interned schema name",
2603            value: String::from(owner_text),
2604        });
2605    };
2606
2607    let host = optional_nonblank_text(row, "HOST").map(String::from);
2608    let public_link = owner_text.eq_ignore_ascii_case("PUBLIC");
2609
2610    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2611    schema_catalog.db_links.push(DatabaseLink {
2612        owner,
2613        name: String::from(link_name_text),
2614        host,
2615        public_link,
2616    });
2617
2618    Ok(())
2619}
2620
2621/// Apply a single `ALL_POLICIES` row.
2622#[cfg(feature = "oraclemcp-db")]
2623fn apply_vpd_policy_row(
2624    snapshot: &mut CatalogSnapshot,
2625    row: &OracleRow,
2626) -> Result<(), CatalogError> {
2627    let object_owner_text = row.require_text("OBJECT_OWNER")?;
2628    let object_name_text = row.require_text("OBJECT_NAME")?;
2629    let policy_name = row.require_text("POLICY_NAME")?.to_string();
2630    let function_owner_text = row.require_text("PF_OWNER")?;
2631    let function_name = row.require_text("FUNCTION")?.to_string();
2632
2633    let Some(object_owner) = snapshot.intern_schema_name(object_owner_text) else {
2634        return Err(CatalogError::InvalidColumnValue {
2635            column: String::from("OBJECT_OWNER"),
2636            expected: "interned schema name",
2637            value: String::from(object_owner_text),
2638        });
2639    };
2640    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
2641        return Err(CatalogError::InvalidColumnValue {
2642            column: String::from("OBJECT_NAME"),
2643            expected: "interned object name",
2644            value: String::from(object_name_text),
2645        });
2646    };
2647    let Some(function_owner) = snapshot.intern_schema_name(function_owner_text) else {
2648        return Err(CatalogError::InvalidColumnValue {
2649            column: String::from("PF_OWNER"),
2650            expected: "interned schema name",
2651            value: String::from(function_owner_text),
2652        });
2653    };
2654
2655    let policy_group = optional_nonblank_text(row, "POLICY_GROUP").map(String::from);
2656    let function_package = optional_nonblank_text(row, "PACKAGE").map(String::from);
2657
2658    let yn = |col: &str| {
2659        row.text(col)
2660            .map(|v| v.eq_ignore_ascii_case("Y") || v.eq_ignore_ascii_case("YES"))
2661            .unwrap_or(false)
2662    };
2663
2664    let schema_catalog = snapshot.schemas.entry(object_owner).or_default();
2665    schema_catalog.vpd_policies.push(VpdPolicy {
2666        object_owner,
2667        object_name,
2668        policy_group,
2669        policy_name,
2670        function_owner,
2671        function_package,
2672        function_name,
2673        on_select: yn("SEL"),
2674        on_insert: yn("INS"),
2675        on_update: yn("UPD"),
2676        on_delete: yn("DEL"),
2677        enabled: yn("ENABLE"),
2678    });
2679    Ok(())
2680}
2681
2682/// Apply a single `ALL_EDITIONS` row.
2683#[cfg(feature = "oraclemcp-db")]
2684fn apply_edition_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2685    let edition_name = row.require_text("EDITION_NAME")?.to_string();
2686    let parent_edition_name = optional_nonblank_text(row, "PARENT_EDITION_NAME").map(String::from);
2687    let usable = row
2688        .text("USABLE")
2689        .map(|v| v.eq_ignore_ascii_case("Y"))
2690        .unwrap_or(true);
2691    snapshot.editions.push(Edition {
2692        edition_name,
2693        parent_edition_name,
2694        usable,
2695    });
2696    Ok(())
2697}
2698
2699/// Apply a single `ALL_EDITIONING_VIEWS` row.
2700#[cfg(feature = "oraclemcp-db")]
2701fn apply_editioning_view_row(
2702    snapshot: &mut CatalogSnapshot,
2703    row: &OracleRow,
2704) -> Result<(), CatalogError> {
2705    let owner_text = row.require_text("OWNER")?;
2706    let view_name_text = row.require_text("VIEW_NAME")?;
2707    let table_name_text = row.require_text("TABLE_NAME")?;
2708
2709    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2710        return Err(CatalogError::InvalidColumnValue {
2711            column: String::from("OWNER"),
2712            expected: "interned schema name",
2713            value: String::from(owner_text),
2714        });
2715    };
2716    let Some(view_name) = snapshot.intern_object_name(view_name_text) else {
2717        return Err(CatalogError::InvalidColumnValue {
2718            column: String::from("VIEW_NAME"),
2719            expected: "interned object name",
2720            value: String::from(view_name_text),
2721        });
2722    };
2723    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2724        return Err(CatalogError::InvalidColumnValue {
2725            column: String::from("TABLE_NAME"),
2726            expected: "interned object name",
2727            value: String::from(table_name_text),
2728        });
2729    };
2730
2731    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2732    schema_catalog.editioning_views.push(EditioningView {
2733        owner,
2734        view_name,
2735        table_name,
2736    });
2737    Ok(())
2738}
2739
2740/// Apply a single `ALL_TAB_COMMENTS` row into the snapshot.
2741#[cfg(feature = "oraclemcp-db")]
2742fn apply_table_comment_row(
2743    snapshot: &mut CatalogSnapshot,
2744    row: &OracleRow,
2745) -> Result<(), CatalogError> {
2746    let owner_text = row.require_text("OWNER")?;
2747    let table_name_text = row.require_text("TABLE_NAME")?;
2748    let table_type = row.text("TABLE_TYPE").map(String::from).unwrap_or_default();
2749    let comments = row.require_text("COMMENTS")?.to_string();
2750
2751    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2752        return Err(CatalogError::InvalidColumnValue {
2753            column: String::from("OWNER"),
2754            expected: "interned schema name",
2755            value: String::from(owner_text),
2756        });
2757    };
2758    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2759        return Err(CatalogError::InvalidColumnValue {
2760            column: String::from("TABLE_NAME"),
2761            expected: "interned object name",
2762            value: String::from(table_name_text),
2763        });
2764    };
2765
2766    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2767    schema_catalog.table_comments.push(TableComment {
2768        owner,
2769        table_name,
2770        table_type,
2771        comments,
2772    });
2773    Ok(())
2774}
2775
2776/// Apply a single `ALL_COL_COMMENTS` row into the snapshot.
2777#[cfg(feature = "oraclemcp-db")]
2778fn apply_column_comment_row(
2779    snapshot: &mut CatalogSnapshot,
2780    row: &OracleRow,
2781) -> Result<(), CatalogError> {
2782    let owner_text = row.require_text("OWNER")?;
2783    let table_name_text = row.require_text("TABLE_NAME")?;
2784    let column_name_text = row.require_text("COLUMN_NAME")?;
2785    let comments = row.require_text("COMMENTS")?.to_string();
2786
2787    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2788        return Err(CatalogError::InvalidColumnValue {
2789            column: String::from("OWNER"),
2790            expected: "interned schema name",
2791            value: String::from(owner_text),
2792        });
2793    };
2794    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2795        return Err(CatalogError::InvalidColumnValue {
2796            column: String::from("TABLE_NAME"),
2797            expected: "interned object name",
2798            value: String::from(table_name_text),
2799        });
2800    };
2801    let Some(column_name) = snapshot.intern_column_name(column_name_text) else {
2802        return Err(CatalogError::InvalidColumnValue {
2803            column: String::from("COLUMN_NAME"),
2804            expected: "interned column name",
2805            value: String::from(column_name_text),
2806        });
2807    };
2808
2809    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2810    schema_catalog.column_comments.push(ColumnComment {
2811        owner,
2812        table_name,
2813        column_name,
2814        comments,
2815    });
2816    Ok(())
2817}
2818
2819#[cfg(feature = "oraclemcp-db")]
2820fn apply_grant_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2821    let owner_text = row.require_text("TABLE_SCHEMA")?;
2822    let object_name_text = row.require_text("TABLE_NAME")?;
2823    let grantee_text = row.require_text("GRANTEE")?;
2824    let privilege_text = row.require_text("PRIVILEGE")?;
2825
2826    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2827        return Err(CatalogError::InvalidColumnValue {
2828            column: String::from("TABLE_SCHEMA"),
2829            expected: "interned schema name",
2830            value: String::from(owner_text),
2831        });
2832    };
2833    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
2834        return Err(CatalogError::InvalidColumnValue {
2835            column: String::from("TABLE_NAME"),
2836            expected: "interned object name",
2837            value: String::from(object_name_text),
2838        });
2839    };
2840
2841    let grantee = grantee_from_dictionary_value(snapshot, grantee_text)?;
2842    let privilege = grant_privilege_from_dictionary_value(privilege_text);
2843    let grantable = row
2844        .text("GRANTABLE")
2845        .map(|value| value.eq_ignore_ascii_case("YES"))
2846        .unwrap_or(false);
2847    let with_hierarchy = row
2848        .text("HIERARCHY")
2849        .map(|value| value.eq_ignore_ascii_case("YES"))
2850        .unwrap_or(false);
2851
2852    let grant = Grant {
2853        object_owner: owner,
2854        object_name,
2855        privilege,
2856        grantee,
2857        grantable,
2858        via_role: None,
2859        with_hierarchy,
2860    };
2861
2862    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2863    if !schema_catalog
2864        .grants
2865        .iter()
2866        .any(|existing| existing.eq(&grant))
2867    {
2868        schema_catalog.grants.push(grant);
2869    }
2870
2871    Ok(())
2872}
2873
2874/// Classify an `ALL_TAB_PRIVS.GRANTEE` value into a [`Grantee`].
2875///
2876/// `ALL_TAB_PRIVS` carries no user/role discriminator column, so the
2877/// grantee universe — `{ user, role, PUBLIC }` — is resolved against the
2878/// `ALL_USERS`-derived [`CatalogSnapshot::known_users`] set:
2879///
2880/// * `PUBLIC` -> [`Grantee::Public`].
2881/// * known username -> [`Grantee::User`] (a statically certain direct grant).
2882/// * loaded set, name absent -> [`Grantee::Role`] (the only remaining class;
2883///   the resolver then caps it at Low confidence and emits a
2884///   `RuntimeGrantOrRole` ambiguity because a role grant only applies when
2885///   the role is enabled in `SESSION_ROLES` at runtime).
2886/// * username set NOT loaded (`known_users` is `None`) -> [`Grantee::Role`]
2887///   as well. This is the R13 fail-toward-restrictive choice: when the
2888///   grantee class is genuinely undetermined we must NOT default to a
2889///   high-confidence direct user grant (a fail-toward-permissive result in
2890///   a privilege/SAST product); treating it as a role routes it through the
2891///   runtime-ambiguity downgrade instead of over-claiming certainty.
2892#[cfg(feature = "oraclemcp-db")]
2893fn grantee_from_dictionary_value(
2894    snapshot: &mut CatalogSnapshot,
2895    text: &str,
2896) -> Result<Grantee, CatalogError> {
2897    if text.eq_ignore_ascii_case("PUBLIC") {
2898        return Ok(Grantee::Public);
2899    }
2900    let Some(symbol) = snapshot.interner.intern(text) else {
2901        return Err(CatalogError::InvalidColumnValue {
2902            column: String::from("GRANTEE"),
2903            expected: "interned grantee name",
2904            value: String::from(text),
2905        });
2906    };
2907    let is_known_user = snapshot
2908        .known_users
2909        .as_ref()
2910        .is_some_and(|users| users.contains(&UserName::from(symbol)));
2911    if is_known_user {
2912        Ok(Grantee::User(UserName::from(symbol)))
2913    } else {
2914        Ok(Grantee::Role(RoleName::from(symbol)))
2915    }
2916}
2917
2918#[cfg(feature = "oraclemcp-db")]
2919fn grant_privilege_from_dictionary_value(text: &str) -> GrantPrivilege {
2920    match text.to_ascii_uppercase().as_str() {
2921        "SELECT" => GrantPrivilege::Select,
2922        "INSERT" => GrantPrivilege::Insert,
2923        "UPDATE" => GrantPrivilege::Update,
2924        "DELETE" => GrantPrivilege::Delete,
2925        "EXECUTE" => GrantPrivilege::Execute,
2926        "ALTER" => GrantPrivilege::Alter,
2927        "INDEX" => GrantPrivilege::Index,
2928        "REFERENCES" => GrantPrivilege::References,
2929        "DEBUG" => GrantPrivilege::Debug,
2930        _ => GrantPrivilege::Other,
2931    }
2932}
2933
2934#[cfg(feature = "oraclemcp-db")]
2935fn apply_routine_row(
2936    snapshot: &mut CatalogSnapshot,
2937    row: &OracleRow,
2938    routines: &mut HashMap<RoutineLocator, RoutineAccumulator>,
2939) -> Result<(), CatalogError> {
2940    let locator = routine_locator_from_procedure_row(snapshot, row)?;
2941    let deterministic = optional_bool(row, "DETERMINISTIC")?.unwrap_or(false);
2942    let pipelined = optional_bool(row, "PIPELINED")?.unwrap_or(false);
2943    let kind_hint = routine_kind_from_dictionary_value(optional_nonblank_text(row, "OBJECT_TYPE"));
2944
2945    let accumulator = routines.entry(locator).or_default();
2946    accumulator
2947        .signature
2948        .get_or_insert_with(|| RoutineSignature {
2949            routine_name: locator.routine_name,
2950            overload: locator.overload,
2951            ..RoutineSignature::default()
2952        });
2953    accumulator.kind_hint = kind_hint.or(accumulator.kind_hint);
2954    accumulator.deterministic = deterministic;
2955    accumulator.pipelined = pipelined;
2956
2957    Ok(())
2958}
2959
2960#[cfg(feature = "oraclemcp-db")]
2961fn apply_argument_row(
2962    snapshot: &mut CatalogSnapshot,
2963    row: &OracleRow,
2964    routines: &mut HashMap<RoutineLocator, RoutineAccumulator>,
2965) -> Result<(), CatalogError> {
2966    let locator = routine_locator_from_argument_row(snapshot, row)?;
2967    let data_type = data_type_ref_from_argument_row(snapshot, row)?;
2968    let accumulator = routines.entry(locator).or_default();
2969    let signature = accumulator
2970        .signature
2971        .get_or_insert_with(|| RoutineSignature {
2972            routine_name: locator.routine_name,
2973            overload: locator.overload,
2974            ..RoutineSignature::default()
2975        });
2976    let position = required_u32(row, "POSITION")?;
2977
2978    if position.eq(&0) {
2979        signature.return_type = Some(data_type);
2980        accumulator.kind_hint = Some(RoutineKind::Function);
2981        return Ok(());
2982    }
2983
2984    signature.arguments.push(ArgumentMetadata {
2985        position,
2986        name: optional_nonblank_text(row, "ARGUMENT_NAME")
2987            .map(|value| {
2988                snapshot
2989                    .intern_member_name(value)
2990                    .ok_or(CatalogError::InvalidColumnValue {
2991                        column: String::from("ARGUMENT_NAME"),
2992                        expected: "interned member name",
2993                        value: String::from(value),
2994                    })
2995            })
2996            .transpose()?,
2997        mode: parameter_mode_from_dictionary_value(row.text("IN_OUT")),
2998        data_type,
2999        defaulted: optional_bool(row, "DEFAULTED")?.unwrap_or(false),
3000    });
3001
3002    Ok(())
3003}
3004
3005#[cfg(feature = "oraclemcp-db")]
3006fn finalize_routines(
3007    snapshot: &mut CatalogSnapshot,
3008    routines: HashMap<RoutineLocator, RoutineAccumulator>,
3009) -> Result<(), CatalogError> {
3010    for (locator, accumulator) in routines {
3011        let Some(signature) = accumulator.signature else {
3012            continue;
3013        };
3014        let kind = accumulator
3015            .kind_hint
3016            .or_else(|| {
3017                if signature.return_type.is_some() {
3018                    Some(RoutineKind::Function)
3019                } else {
3020                    Some(RoutineKind::Procedure)
3021                }
3022            })
3023            .unwrap_or(RoutineKind::Procedure);
3024
3025        if let Some(package_name) = locator.package_name {
3026            upsert_packaged_routine(snapshot, locator.owner, package_name, kind, signature)?;
3027        } else {
3028            upsert_top_level_routine(
3029                snapshot,
3030                locator.owner,
3031                locator.routine_name,
3032                kind,
3033                signature,
3034                accumulator.deterministic,
3035                accumulator.pipelined,
3036            )?;
3037        }
3038    }
3039
3040    Ok(())
3041}
3042
3043#[cfg(feature = "oraclemcp-db")]
3044fn object_type_from_dictionary_value(text: &str) -> Option<ObjectType> {
3045    match text.trim().to_ascii_uppercase().as_str() {
3046        "TABLE" => Some(ObjectType::Table),
3047        "VIEW" => Some(ObjectType::View),
3048        "MATERIALIZED VIEW" => Some(ObjectType::MaterializedView),
3049        "SEQUENCE" => Some(ObjectType::Sequence),
3050        "TYPE" => Some(ObjectType::Type),
3051        "PACKAGE" => Some(ObjectType::Package),
3052        "PROCEDURE" => Some(ObjectType::Procedure),
3053        "FUNCTION" => Some(ObjectType::Function),
3054        "TRIGGER" => Some(ObjectType::Trigger),
3055        "EDITIONING VIEW" => Some(ObjectType::EditioningView),
3056        _ => None,
3057    }
3058}
3059
3060#[cfg(feature = "oraclemcp-db")]
3061fn object_status_from_dictionary_value(text: &str) -> ObjectStatus {
3062    match text.trim().to_ascii_uppercase().as_str() {
3063        "VALID" => ObjectStatus::Valid,
3064        "ENABLED" => ObjectStatus::Valid,
3065        "INVALID" => ObjectStatus::Invalid,
3066        "UNUSABLE" | "DISABLED" => ObjectStatus::Invalid,
3067        _ => ObjectStatus::NotApplicable,
3068    }
3069}
3070
3071#[cfg(feature = "oraclemcp-db")]
3072fn routine_kind_from_dictionary_value(text: Option<&str>) -> Option<RoutineKind> {
3073    match text.map(|value| value.trim().to_ascii_uppercase()) {
3074        Some(value) if value.eq("FUNCTION") => Some(RoutineKind::Function),
3075        Some(value) if value.eq("PROCEDURE") => Some(RoutineKind::Procedure),
3076        _ => None,
3077    }
3078}
3079
3080#[cfg(feature = "oraclemcp-db")]
3081fn constraint_type_from_dictionary_value(
3082    text: &str,
3083    search_condition: Option<&str>,
3084    has_columns: bool,
3085) -> ConstraintType {
3086    match text.trim().to_ascii_uppercase().as_str() {
3087        "P" => ConstraintType::PrimaryKey,
3088        "R" => ConstraintType::ForeignKey,
3089        "U" => ConstraintType::Unique,
3090        "F" => ConstraintType::Ref,
3091        "C" => {
3092            if has_columns
3093                && search_condition
3094                    .map(|condition| {
3095                        condition
3096                            .trim()
3097                            .to_ascii_uppercase()
3098                            .contains("IS NOT NULL")
3099                    })
3100                    .unwrap_or(false)
3101            {
3102                ConstraintType::NotNull
3103            } else {
3104                ConstraintType::Check
3105            }
3106        }
3107        _ => ConstraintType::Other,
3108    }
3109}
3110
3111#[cfg(feature = "oraclemcp-db")]
3112fn trigger_timing_from_dictionary_value(text: &str) -> TriggerTiming {
3113    let normalized = text.trim().to_ascii_uppercase();
3114    if normalized.contains("INSTEAD OF") {
3115        TriggerTiming::InsteadOf
3116    } else if normalized.contains("BEFORE") {
3117        TriggerTiming::Before
3118    } else if normalized.contains("AFTER") {
3119        TriggerTiming::After
3120    } else {
3121        TriggerTiming::Unknown
3122    }
3123}
3124
3125#[cfg(feature = "oraclemcp-db")]
3126fn trigger_level_from_dictionary_value(text: &str) -> TriggerLevel {
3127    let normalized = text.trim().to_ascii_uppercase();
3128    if normalized.contains("EACH ROW") {
3129        TriggerLevel::Row
3130    } else if normalized.contains("STATEMENT") {
3131        TriggerLevel::Statement
3132    } else {
3133        TriggerLevel::Unknown
3134    }
3135}
3136
3137#[cfg(feature = "oraclemcp-db")]
3138fn trigger_events_from_dictionary_value(text: &str) -> Vec<TriggerEvent> {
3139    let normalized = text.trim().to_ascii_uppercase();
3140    let mut events = Vec::<TriggerEvent>::new();
3141
3142    if normalized.contains("INSERT") {
3143        events.push(TriggerEvent::Insert);
3144    }
3145    if normalized.contains("UPDATE") {
3146        events.push(TriggerEvent::Update);
3147    }
3148    if normalized.contains("DELETE") {
3149        events.push(TriggerEvent::Delete);
3150    }
3151    if normalized.contains("LOGON") {
3152        events.push(TriggerEvent::Logon);
3153    }
3154    if normalized.contains("LOGOFF") {
3155        events.push(TriggerEvent::Logoff);
3156    }
3157    if normalized.contains("DDL") {
3158        events.push(TriggerEvent::Ddl);
3159    }
3160
3161    if events.is_empty() {
3162        events.push(TriggerEvent::Other);
3163    }
3164
3165    events
3166}
3167
3168#[cfg(feature = "oraclemcp-db")]
3169fn push_unique_column(columns: &mut Vec<ColumnName>, column_name: ColumnName) {
3170    if !columns.contains(&column_name) {
3171        columns.push(column_name);
3172    }
3173}
3174
3175#[cfg(feature = "oraclemcp-db")]
3176fn routine_locator_from_procedure_row(
3177    snapshot: &mut CatalogSnapshot,
3178    row: &OracleRow,
3179) -> Result<RoutineLocator, CatalogError> {
3180    let owner_text = row.require_text("OWNER")?;
3181    let container_name_text = row.require_text("OBJECT_NAME")?;
3182    let routine_name_text = row
3183        .text("PROCEDURE_NAME")
3184        .unwrap_or(container_name_text)
3185        .trim();
3186
3187    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3188        return Err(CatalogError::InvalidColumnValue {
3189            column: String::from("OWNER"),
3190            expected: "interned schema name",
3191            value: String::from(owner_text),
3192        });
3193    };
3194    let Some(container_name) = snapshot.intern_object_name(container_name_text) else {
3195        return Err(CatalogError::InvalidColumnValue {
3196            column: String::from("OBJECT_NAME"),
3197            expected: "interned object name",
3198            value: String::from(container_name_text),
3199        });
3200    };
3201    let Some(routine_name) = snapshot.intern_object_name(routine_name_text) else {
3202        return Err(CatalogError::InvalidColumnValue {
3203            column: String::from("PROCEDURE_NAME"),
3204            expected: "interned object name",
3205            value: String::from(routine_name_text),
3206        });
3207    };
3208
3209    Ok(RoutineLocator {
3210        owner,
3211        package_name: if optional_nonblank_text(row, "PROCEDURE_NAME").is_some() {
3212            Some(container_name)
3213        } else {
3214            None
3215        },
3216        routine_name,
3217        subprogram_id: optional_u32(row, "SUBPROGRAM_ID")?,
3218        overload: optional_u32(row, "OVERLOAD")?,
3219    })
3220}
3221
3222#[cfg(feature = "oraclemcp-db")]
3223fn routine_locator_from_argument_row(
3224    snapshot: &mut CatalogSnapshot,
3225    row: &OracleRow,
3226) -> Result<RoutineLocator, CatalogError> {
3227    let owner_text = row.require_text("OWNER")?;
3228    let routine_name_text = row.require_text("OBJECT_NAME")?;
3229
3230    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3231        return Err(CatalogError::InvalidColumnValue {
3232            column: String::from("OWNER"),
3233            expected: "interned schema name",
3234            value: String::from(owner_text),
3235        });
3236    };
3237    let package_name = optional_nonblank_text(row, "PACKAGE_NAME")
3238        .map(|value| {
3239            snapshot
3240                .intern_object_name(value)
3241                .ok_or(CatalogError::InvalidColumnValue {
3242                    column: String::from("PACKAGE_NAME"),
3243                    expected: "interned object name",
3244                    value: String::from(value),
3245                })
3246        })
3247        .transpose()?;
3248    let Some(routine_name) = snapshot.intern_object_name(routine_name_text) else {
3249        return Err(CatalogError::InvalidColumnValue {
3250            column: String::from("OBJECT_NAME"),
3251            expected: "interned object name",
3252            value: String::from(routine_name_text),
3253        });
3254    };
3255
3256    Ok(RoutineLocator {
3257        owner,
3258        package_name,
3259        routine_name,
3260        subprogram_id: optional_u32(row, "SUBPROGRAM_ID")?,
3261        overload: optional_u32(row, "OVERLOAD")?,
3262    })
3263}
3264
3265#[cfg(feature = "oraclemcp-db")]
3266fn upsert_packaged_routine(
3267    snapshot: &mut CatalogSnapshot,
3268    owner: SchemaName,
3269    package_name: ObjectName,
3270    kind: RoutineKind,
3271    signature: RoutineSignature,
3272) -> Result<(), CatalogError> {
3273    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3274
3275    schema_catalog
3276        .objects
3277        .entry(package_name)
3278        .or_insert_with(|| {
3279            CatalogObject::Package(PackageMetadata {
3280                common: ObjectCommon {
3281                    owner,
3282                    name: package_name,
3283                    object_type: ObjectType::Package,
3284                    ..ObjectCommon::default()
3285                },
3286                ..PackageMetadata::default()
3287            })
3288        });
3289
3290    let Some(CatalogObject::Package(metadata)) = schema_catalog.objects.get_mut(&package_name)
3291    else {
3292        return Ok(());
3293    };
3294
3295    match kind {
3296        RoutineKind::Procedure => upsert_signature(&mut metadata.procedures, signature),
3297        RoutineKind::Function => upsert_signature(&mut metadata.functions, signature),
3298    }
3299
3300    Ok(())
3301}
3302
3303#[cfg(feature = "oraclemcp-db")]
3304fn upsert_top_level_routine(
3305    snapshot: &mut CatalogSnapshot,
3306    owner: SchemaName,
3307    routine_name: ObjectName,
3308    kind: RoutineKind,
3309    signature: RoutineSignature,
3310    deterministic: bool,
3311    pipelined: bool,
3312) -> Result<(), CatalogError> {
3313    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3314    let common = schema_catalog
3315        .objects
3316        .get(&routine_name)
3317        .and_then(|object| match object {
3318            CatalogObject::Procedure(metadata) => Some(metadata.common.clone()),
3319            CatalogObject::Function(metadata) => Some(metadata.common.clone()),
3320            _ => None,
3321        })
3322        .unwrap_or_else(|| ObjectCommon {
3323            owner,
3324            name: routine_name,
3325            object_type: match kind {
3326                RoutineKind::Procedure => ObjectType::Procedure,
3327                RoutineKind::Function => ObjectType::Function,
3328            },
3329            ..ObjectCommon::default()
3330        });
3331
3332    let catalog_object = match kind {
3333        RoutineKind::Procedure => CatalogObject::Procedure(ProcedureMetadata { common, signature }),
3334        RoutineKind::Function => CatalogObject::Function(FunctionMetadata {
3335            common,
3336            signature,
3337            deterministic,
3338            pipelined,
3339        }),
3340    };
3341    schema_catalog.objects.insert(routine_name, catalog_object);
3342
3343    Ok(())
3344}
3345
3346#[cfg(feature = "oraclemcp-db")]
3347fn upsert_signature(signatures: &mut Vec<RoutineSignature>, signature: RoutineSignature) {
3348    if let Some(existing) = signatures.iter_mut().find(|candidate| {
3349        candidate.routine_name.eq(&signature.routine_name)
3350            && candidate.overload.eq(&signature.overload)
3351    }) {
3352        *existing = signature;
3353    } else {
3354        signatures.push(signature);
3355    }
3356}
3357
3358#[cfg(feature = "oraclemcp-db")]
3359fn data_type_ref_from_argument_row(
3360    snapshot: &mut CatalogSnapshot,
3361    row: &OracleRow,
3362) -> Result<DataTypeRef, CatalogError> {
3363    let owner = optional_nonblank_text(row, "TYPE_OWNER")
3364        .map(|value| {
3365            snapshot
3366                .intern_schema_name(value)
3367                .ok_or(CatalogError::InvalidColumnValue {
3368                    column: String::from("TYPE_OWNER"),
3369                    expected: "interned schema name",
3370                    value: String::from(value),
3371                })
3372        })
3373        .transpose()?;
3374    let type_name = optional_nonblank_text(row, "TYPE_NAME")
3375        .or_else(|| optional_nonblank_text(row, "DATA_TYPE"))
3376        .unwrap_or_default();
3377
3378    Ok(DataTypeRef {
3379        owner,
3380        name: String::from(type_name),
3381        length: optional_u32(row, "DATA_LENGTH")?,
3382        precision: optional_u32(row, "DATA_PRECISION")?,
3383        scale: optional_i32(row, "DATA_SCALE")?,
3384        char_semantics: None,
3385    })
3386}
3387
3388#[cfg(feature = "oraclemcp-db")]
3389fn parameter_mode_from_dictionary_value(text: Option<&str>) -> ParameterMode {
3390    match text.map(|value| value.trim().to_ascii_uppercase()) {
3391        Some(value) if value.eq("OUT") => ParameterMode::Out,
3392        Some(value) if value.eq("IN/OUT") => ParameterMode::InOut,
3393        _ => ParameterMode::In,
3394    }
3395}
3396
3397#[cfg(feature = "oraclemcp-db")]
3398fn blank_catalog_object(common: ObjectCommon) -> Option<CatalogObject> {
3399    match common.object_type {
3400        ObjectType::Table => Some(CatalogObject::Table(TableMetadata {
3401            common,
3402            ..TableMetadata::default()
3403        })),
3404        ObjectType::View => Some(CatalogObject::View(ViewMetadata {
3405            common,
3406            ..ViewMetadata::default()
3407        })),
3408        ObjectType::MaterializedView => Some(CatalogObject::MaterializedView(MViewMetadata {
3409            common,
3410            ..MViewMetadata::default()
3411        })),
3412        ObjectType::Sequence => Some(CatalogObject::Sequence(SequenceMetadata {
3413            common,
3414            ..SequenceMetadata::default()
3415        })),
3416        ObjectType::Type => Some(CatalogObject::Type(TypeMetadata {
3417            common,
3418            ..TypeMetadata::default()
3419        })),
3420        ObjectType::Package => Some(CatalogObject::Package(PackageMetadata {
3421            common,
3422            ..PackageMetadata::default()
3423        })),
3424        ObjectType::Procedure => Some(CatalogObject::Procedure(ProcedureMetadata {
3425            common,
3426            ..ProcedureMetadata::default()
3427        })),
3428        ObjectType::Function => Some(CatalogObject::Function(FunctionMetadata {
3429            common,
3430            ..FunctionMetadata::default()
3431        })),
3432        ObjectType::Trigger => Some(CatalogObject::Trigger(TriggerMetadata {
3433            common,
3434            ..TriggerMetadata::default()
3435        })),
3436        ObjectType::SchedulerJob => Some(CatalogObject::SchedulerJob(SchedulerJobMetadata {
3437            common,
3438            ..SchedulerJobMetadata::default()
3439        })),
3440        ObjectType::EditioningView => Some(CatalogObject::EditioningView(EditioningViewMetadata {
3441            common,
3442            ..EditioningViewMetadata::default()
3443        })),
3444        ObjectType::Synonym | ObjectType::Index | ObjectType::Constraint | ObjectType::Unknown => {
3445            None
3446        }
3447    }
3448}
3449
3450#[cfg(feature = "oraclemcp-db")]
3451fn data_type_ref_from_row(
3452    snapshot: &mut CatalogSnapshot,
3453    row: &OracleRow,
3454) -> Result<DataTypeRef, CatalogError> {
3455    let owner = row
3456        .text("DATA_TYPE_OWNER")
3457        .map(str::trim)
3458        .filter(|value| !value.is_empty())
3459        .map(|value| {
3460            snapshot
3461                .intern_schema_name(value)
3462                .ok_or(CatalogError::InvalidColumnValue {
3463                    column: String::from("DATA_TYPE_OWNER"),
3464                    expected: "interned schema name",
3465                    value: String::from(value),
3466                })
3467        })
3468        .transpose()?;
3469
3470    Ok(DataTypeRef {
3471        owner,
3472        name: String::from(row.require_text("DATA_TYPE")?),
3473        length: optional_u32(row, "DATA_LENGTH")?,
3474        precision: optional_u32(row, "DATA_PRECISION")?,
3475        scale: optional_i32(row, "DATA_SCALE")?,
3476        char_semantics: row.text("CHAR_USED").map(String::from),
3477    })
3478}
3479
3480#[cfg(feature = "oraclemcp-db")]
3481fn optional_bool(row: &OracleRow, column: &str) -> Result<Option<bool>, CatalogError> {
3482    match row.text(column) {
3483        Some(_) => row.parse_bool(column).map(Some),
3484        None => Ok(None),
3485    }
3486}
3487
3488#[cfg(feature = "oraclemcp-db")]
3489fn optional_nonblank_text<'a>(row: &'a OracleRow, column: &str) -> Option<&'a str> {
3490    row.text(column)
3491        .map(str::trim)
3492        .filter(|value| !value.is_empty())
3493}
3494
3495#[cfg(feature = "oraclemcp-db")]
3496fn optional_u32(row: &OracleRow, column: &str) -> Result<Option<u32>, CatalogError> {
3497    match row.text(column) {
3498        Some(_) => {
3499            let parsed = row.parse_u64(column)?;
3500            u32::try_from(parsed)
3501                .map(Some)
3502                .map_err(|_| CatalogError::InvalidColumnValue {
3503                    column: column.to_ascii_uppercase(),
3504                    expected: "u32",
3505                    value: parsed.to_string(),
3506                })
3507        }
3508        None => Ok(None),
3509    }
3510}
3511
3512#[cfg(feature = "oraclemcp-db")]
3513fn required_u32(row: &OracleRow, column: &str) -> Result<u32, CatalogError> {
3514    let parsed = row.parse_u64(column)?;
3515    u32::try_from(parsed).map_err(|_| CatalogError::InvalidColumnValue {
3516        column: column.to_ascii_uppercase(),
3517        expected: "u32",
3518        value: parsed.to_string(),
3519    })
3520}
3521
3522#[cfg(feature = "oraclemcp-db")]
3523fn optional_i32(row: &OracleRow, column: &str) -> Result<Option<i32>, CatalogError> {
3524    match row.text(column) {
3525        Some(_) => {
3526            let parsed = row.parse_i64(column)?;
3527            i32::try_from(parsed)
3528                .map(Some)
3529                .map_err(|_| CatalogError::InvalidColumnValue {
3530                    column: column.to_ascii_uppercase(),
3531                    expected: "i32",
3532                    value: parsed.to_string(),
3533                })
3534        }
3535        None => Ok(None),
3536    }
3537}
3538
3539#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3540pub struct SynonymTarget {
3541    pub target_owner: Option<SchemaName>,
3542    pub target_name: ObjectName,
3543    pub target_type: Option<ObjectType>,
3544    pub db_link: Option<String>,
3545    pub public_synonym: bool,
3546}
3547
3548/// Virtual Private Database (VPD / RLS) policy entry from
3549/// `ALL_POLICIES`. Each row describes one policy attached to an object;
3550/// the policy function (PF_OWNER.PACKAGE.FUNCTION) is the predicate
3551/// generator that Oracle invokes at parse time to inject a WHERE clause
3552/// into reads (and optional ones into INSERT/UPDATE/DELETE).
3553///
3554/// Lineage flags VPD-protected objects with
3555/// `UnknownReason::DbLinkRemoteObject` reused-as-marker pending a
3556/// dedicated `UnknownReason::VpdPolicyApplied` variant.
3557#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3558pub struct VpdPolicy {
3559    /// Owning schema (the object's owner).
3560    pub object_owner: SchemaName,
3561    /// Protected object.
3562    pub object_name: ObjectName,
3563    /// Optional policy-group name (Oracle policy-groups; usually NULL).
3564    pub policy_group: Option<String>,
3565    /// Policy name within the group.
3566    pub policy_name: String,
3567    /// Owner of the policy function.
3568    pub function_owner: SchemaName,
3569    /// Package containing the policy function (NULL for standalone).
3570    pub function_package: Option<String>,
3571    /// Function name that produces the WHERE-clause predicate.
3572    pub function_name: String,
3573    /// Statement-type bits — true means the policy applies to that DML.
3574    pub on_select: bool,
3575    pub on_insert: bool,
3576    pub on_update: bool,
3577    pub on_delete: bool,
3578    /// Whether the policy is currently enabled.
3579    pub enabled: bool,
3580}
3581
3582/// Edition entry from `ALL_EDITIONS` — the per-database edition tree
3583/// used by Oracle Edition-Based Redefinition (EBR). Linked into
3584/// [`CatalogSnapshot::editions`].
3585#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3586pub struct Edition {
3587    /// Edition name (Oracle identifiers are case-preserving but case-
3588    /// insensitive when unquoted; stored as the dictionary value).
3589    pub edition_name: String,
3590    /// Parent edition, if any. `None` for the root edition (typically
3591    /// `ORA$BASE`).
3592    pub parent_edition_name: Option<String>,
3593    /// Whether the edition is currently usable (`USABLE = 'Y'`).
3594    pub usable: bool,
3595}
3596
3597/// Editioning view from `ALL_EDITIONING_VIEWS` — a view that masks an
3598/// editioned table during EBR cutovers. Linked into
3599/// [`SchemaCatalog::editioning_views`].
3600#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3601pub struct EditioningView {
3602    /// Owning schema.
3603    pub owner: SchemaName,
3604    /// Editioning view name.
3605    pub view_name: ObjectName,
3606    /// Base table the editioning view masks.
3607    pub table_name: ObjectName,
3608}
3609
3610/// Documentation comment attached to a table, view, or materialized
3611/// view via `COMMENT ON TABLE owner.name IS '...'`. Sourced from
3612/// `ALL_TAB_COMMENTS`.
3613///
3614/// `plsql-docgen` consumes these to render description text alongside
3615/// object docs; dependency analysis does not interact with them.
3616#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3617pub struct TableComment {
3618    /// Owning schema.
3619    pub owner: SchemaName,
3620    /// Object name.
3621    pub table_name: ObjectName,
3622    /// `TABLE` / `VIEW` / `MATERIALIZED VIEW` — preserved verbatim
3623    /// from `ALL_TAB_COMMENTS.TABLE_TYPE` so docgen can pick a
3624    /// per-kind template.
3625    pub table_type: String,
3626    /// Comment text. Always present (we filter out NULL rows server-side
3627    /// to keep the snapshot compact).
3628    pub comments: String,
3629}
3630
3631/// Documentation comment attached to a column via
3632/// `COMMENT ON COLUMN owner.table.column IS '...'`. Sourced from
3633/// `ALL_COL_COMMENTS`.
3634#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3635pub struct ColumnComment {
3636    /// Owning schema.
3637    pub owner: SchemaName,
3638    /// Object name.
3639    pub table_name: ObjectName,
3640    /// Column name.
3641    pub column_name: ColumnName,
3642    /// Comment text.
3643    pub comments: String,
3644}
3645
3646/// Database link metadata sourced from `ALL_DB_LINKS`.
3647///
3648/// PL/SQL code that references `remote_object@my_link` resolves through
3649/// one of these entries. Public links have `owner = PUBLIC`. Lineage uses
3650/// the `host` field to classify the remote endpoint (a TNS alias, a full
3651/// EZCONNECT string, or the legacy `(DESCRIPTION=...)` form).
3652///
3653/// The shape intentionally avoids `username` — `ALL_DB_LINKS.USERNAME`
3654/// is the *connect user*, not a privilege grant, and most consumers
3655/// don't need it. If a future product surface needs the connect user it
3656/// can be added behind `#[serde(default)]` without breaking older
3657/// snapshots.
3658#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3659pub struct DatabaseLink {
3660    /// Owning schema. `PUBLIC` for public links.
3661    pub owner: SchemaName,
3662    /// Link name, e.g. `REPORTING.WORLD`.
3663    pub name: String,
3664    /// Connect-target host string. Can be a TNS alias, an EZCONNECT
3665    /// string, or a full `(DESCRIPTION=...)` block.
3666    pub host: Option<String>,
3667    /// `true` when `owner` is `PUBLIC`. Surfaced for fast filtering in
3668    /// downstream lineage without re-resolving the schema interner.
3669    pub public_link: bool,
3670}
3671
3672#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3673pub enum GrantPrivilege {
3674    Select,
3675    Insert,
3676    Update,
3677    Delete,
3678    Execute,
3679    Alter,
3680    Index,
3681    References,
3682    Debug,
3683    #[default]
3684    Other,
3685}
3686
3687#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3688pub enum Grantee {
3689    User(UserName),
3690    Role(RoleName),
3691    #[default]
3692    Public,
3693}
3694
3695#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3696pub struct Grant {
3697    pub object_owner: SchemaName,
3698    pub object_name: ObjectName,
3699    pub privilege: GrantPrivilege,
3700    pub grantee: Grantee,
3701    pub grantable: bool,
3702    pub via_role: Option<RoleName>,
3703    pub with_hierarchy: bool,
3704}
3705
3706#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3707pub struct IndexMetadata {
3708    pub name: IndexName,
3709    pub table_owner: SchemaName,
3710    pub table_name: ObjectName,
3711    pub unique: bool,
3712    pub columns: Vec<ColumnName>,
3713    pub index_type: String,
3714    pub status: ObjectStatus,
3715}
3716
3717#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3718pub enum ConstraintType {
3719    PrimaryKey,
3720    ForeignKey,
3721    Unique,
3722    Check,
3723    NotNull,
3724    Ref,
3725    #[default]
3726    Other,
3727}
3728
3729#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3730pub struct ConstraintMetadata {
3731    pub name: ConstraintName,
3732    pub table_owner: SchemaName,
3733    pub table_name: ObjectName,
3734    pub constraint_type: ConstraintType,
3735    pub columns: Vec<ColumnName>,
3736    pub referenced_table_owner: Option<SchemaName>,
3737    pub referenced_table_name: Option<ObjectName>,
3738    pub referenced_columns: Vec<ColumnName>,
3739    pub search_condition: Option<String>,
3740    pub deferrable: Option<bool>,
3741    pub initially_deferred: Option<bool>,
3742}
3743
3744#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3745pub enum CatalogDependencyKind {
3746    Hard,
3747    Reference,
3748    Extended,
3749    #[default]
3750    Other,
3751}
3752
3753#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3754pub struct CatalogDependency {
3755    pub owner: SchemaName,
3756    pub name: ObjectName,
3757    pub object_type: ObjectType,
3758    pub referenced_owner: Option<SchemaName>,
3759    pub referenced_name: ObjectName,
3760    pub referenced_type: Option<ObjectType>,
3761    pub dependency_kind: CatalogDependencyKind,
3762    pub via_db_link: Option<String>,
3763}
3764
3765#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3766pub enum PlScopeAvailability {
3767    #[default]
3768    NotAvailable,
3769    AvailableButStale,
3770    IdentifiersOnly,
3771    IdentifiersAndStatements,
3772}
3773
3774#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3775pub struct CompilerIdentifier {
3776    pub owner: SchemaName,
3777    pub object_name: ObjectName,
3778    pub identifier_name: MemberName,
3779    pub identifier_type: String,
3780    pub usage: String,
3781    pub line: u32,
3782    pub column: u32,
3783}
3784
3785#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3786pub struct CompilerReference {
3787    pub owner: SchemaName,
3788    pub object_name: ObjectName,
3789    pub usage_line: u32,
3790    pub usage_column: u32,
3791    pub target_owner: Option<SchemaName>,
3792    pub target_object_name: Option<ObjectName>,
3793    pub target_identifier_name: Option<MemberName>,
3794}
3795
3796#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3797pub struct CompilerStatementUsage {
3798    pub owner: SchemaName,
3799    pub object_name: ObjectName,
3800    pub statement_kind: String,
3801    pub line: u32,
3802    pub column: u32,
3803}
3804
3805#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3806pub struct PlScopeSnapshot {
3807    pub availability: PlScopeAvailability,
3808    pub identifiers: Vec<CompilerIdentifier>,
3809    pub references: Vec<CompilerReference>,
3810    pub statements: Vec<CompilerStatementUsage>,
3811    pub collected_at: Option<DateTime<Utc>>,
3812    pub source_hash: Option<Hash>,
3813    pub warnings: Vec<CapabilityWarning>,
3814}
3815
3816#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3817pub struct DataTypeRef {
3818    pub owner: Option<SchemaName>,
3819    pub name: String,
3820    pub length: Option<u32>,
3821    pub precision: Option<u32>,
3822    pub scale: Option<i32>,
3823    pub char_semantics: Option<String>,
3824}
3825
3826#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3827pub struct ColumnMetadata {
3828    pub name: ColumnName,
3829    pub position: u32,
3830    pub data_type: DataTypeRef,
3831    pub nullable: bool,
3832    pub default_expression: Option<String>,
3833    pub generated_expression: Option<String>,
3834    pub hidden: bool,
3835}
3836
3837#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3838pub enum TemporaryTableDuration {
3839    #[default]
3840    Transaction,
3841    Session,
3842}
3843
3844#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3845pub struct TableMetadata {
3846    pub common: ObjectCommon,
3847    pub columns: HashMap<ColumnName, ColumnMetadata>,
3848    pub temporary: bool,
3849    pub temporary_duration: Option<TemporaryTableDuration>,
3850    pub index_organized: bool,
3851}
3852
3853#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3854pub struct ViewMetadata {
3855    pub common: ObjectCommon,
3856    pub columns: HashMap<ColumnName, ColumnMetadata>,
3857    pub query_hash: Option<Hash>,
3858    pub read_only: Option<bool>,
3859    pub check_option: Option<String>,
3860}
3861
3862#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3863pub struct MViewMetadata {
3864    pub common: ObjectCommon,
3865    pub columns: HashMap<ColumnName, ColumnMetadata>,
3866    pub refresh_mode: Option<String>,
3867    pub refresh_method: Option<String>,
3868    pub query_hash: Option<Hash>,
3869}
3870
3871#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3872pub struct SequenceMetadata {
3873    pub common: ObjectCommon,
3874    pub increment_by: i64,
3875    pub min_value: Option<i64>,
3876    pub max_value: Option<i64>,
3877    pub cycle: bool,
3878    pub ordered: bool,
3879    pub cache_size: Option<u64>,
3880}
3881
3882#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3883pub enum ParameterMode {
3884    #[default]
3885    In,
3886    Out,
3887    InOut,
3888}
3889
3890#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3891pub struct ArgumentMetadata {
3892    pub position: u32,
3893    pub name: Option<MemberName>,
3894    pub mode: ParameterMode,
3895    pub data_type: DataTypeRef,
3896    pub defaulted: bool,
3897}
3898
3899#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3900pub struct AccessibleByTarget {
3901    pub owner: Option<SchemaName>,
3902    pub object_name: ObjectName,
3903}
3904
3905#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3906pub struct RoutineSignature {
3907    pub routine_name: ObjectName,
3908    pub overload: Option<u32>,
3909    pub arguments: Vec<ArgumentMetadata>,
3910    pub return_type: Option<DataTypeRef>,
3911    pub authid_current_user: Option<bool>,
3912    pub accessible_by: Vec<AccessibleByTarget>,
3913}
3914
3915#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3916pub enum TypeFinality {
3917    Final,
3918    NotFinal,
3919    #[default]
3920    Unknown,
3921}
3922
3923#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3924pub enum TypeInstantiable {
3925    Instantiable,
3926    NotInstantiable,
3927    #[default]
3928    Unknown,
3929}
3930
3931#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3932pub struct TypeAttribute {
3933    pub name: MemberName,
3934    pub position: u32,
3935    pub data_type: DataTypeRef,
3936}
3937
3938#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3939pub struct TypeMetadata {
3940    pub common: ObjectCommon,
3941    pub attributes: Vec<TypeAttribute>,
3942    pub methods: Vec<RoutineSignature>,
3943    pub supertype_owner: Option<SchemaName>,
3944    pub supertype_name: Option<ObjectName>,
3945    pub finality: TypeFinality,
3946    pub instantiable: TypeInstantiable,
3947}
3948
3949#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3950pub struct PackageMetadata {
3951    pub common: ObjectCommon,
3952    pub procedures: Vec<RoutineSignature>,
3953    pub functions: Vec<RoutineSignature>,
3954    pub package_stateful: Option<bool>,
3955    pub authid_current_user: Option<bool>,
3956    pub accessible_by: Vec<AccessibleByTarget>,
3957}
3958
3959#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3960pub struct ProcedureMetadata {
3961    pub common: ObjectCommon,
3962    pub signature: RoutineSignature,
3963}
3964
3965#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3966pub struct FunctionMetadata {
3967    pub common: ObjectCommon,
3968    pub signature: RoutineSignature,
3969    pub deterministic: bool,
3970    pub pipelined: bool,
3971}
3972
3973#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3974pub enum TriggerTiming {
3975    Before,
3976    After,
3977    InsteadOf,
3978    #[default]
3979    Unknown,
3980}
3981
3982#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3983pub enum TriggerLevel {
3984    Statement,
3985    Row,
3986    #[default]
3987    Unknown,
3988}
3989
3990#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3991pub enum TriggerEvent {
3992    Insert,
3993    Update,
3994    Delete,
3995    Logon,
3996    Logoff,
3997    Ddl,
3998    #[default]
3999    Other,
4000}
4001
4002#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4003pub struct TriggerMetadata {
4004    pub common: ObjectCommon,
4005    pub target_owner: SchemaName,
4006    pub target_name: ObjectName,
4007    pub timing: TriggerTiming,
4008    pub level: TriggerLevel,
4009    pub events: Vec<TriggerEvent>,
4010    pub when_clause: Option<String>,
4011    pub body_hash: Option<Hash>,
4012}
4013
4014#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4015pub struct SchedulerJobMetadata {
4016    pub common: ObjectCommon,
4017    pub enabled: bool,
4018    pub job_type: String,
4019    pub program_name: Option<ObjectName>,
4020    pub schedule_name: Option<ObjectName>,
4021    pub job_action: Option<String>,
4022}
4023
4024#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4025pub struct EditioningViewMetadata {
4026    pub common: ObjectCommon,
4027    pub base_table_owner: SchemaName,
4028    pub base_table_name: ObjectName,
4029    pub columns: HashMap<ColumnName, ColumnMetadata>,
4030}
4031
4032#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
4033pub enum CatalogObject {
4034    Table(TableMetadata),
4035    View(ViewMetadata),
4036    MaterializedView(MViewMetadata),
4037    Sequence(SequenceMetadata),
4038    Type(TypeMetadata),
4039    Package(PackageMetadata),
4040    Procedure(ProcedureMetadata),
4041    Function(FunctionMetadata),
4042    Trigger(TriggerMetadata),
4043    SchedulerJob(SchedulerJobMetadata),
4044    EditioningView(EditioningViewMetadata),
4045}
4046
4047#[cfg(all(test, feature = "oraclemcp-db"))]
4048mod tests {
4049    use std::collections::{HashMap, HashSet};
4050    use std::future::Future;
4051
4052    use chrono::{DateTime, Utc};
4053    use plsql_core::{AnalysisProfile, ColumnName, MemberName, ObjectName, SchemaName, SymbolId};
4054    use tempfile::tempdir;
4055
4056    use crate::live::{LiveContext, RuntimeBuilder, load_catalog_users};
4057    use crate::{
4058        AccessibleByTarget, CATALOG_SNAPSHOT_SCHEMA_ID, CATALOG_SNAPSHOT_SCHEMA_VERSION,
4059        CapabilityWarning, CatalogCapabilities, CatalogDependencyKind, CatalogError,
4060        CatalogLoadRequest, CatalogObject, CatalogSchemaFilter, CatalogSnapshot,
4061        CatalogSnapshotDocument, CatalogSource, CatalogSourceKind, CompilerIdentifier,
4062        ConstraintType, DataTypeRef, Hash, ObjectCommon, ObjectStatus, ObjectType, OracleBackend,
4063        OracleBind, OracleConnectOptions, OracleConnection, OracleConnectionInfo, OracleRow,
4064        PackageMetadata, PlScopeAvailability, PlScopeSnapshot, RoutineSignature, SchemaCatalog,
4065        SynonymName, SynonymTarget, TableMetadata, TriggerEvent, TriggerLevel, TriggerName,
4066        TriggerTiming, TypeFinality, TypeInstantiable, export_snapshot_to_json,
4067        grantee_from_dictionary_value, load_from_dbms_metadata_dir, load_snapshot_from_connection,
4068        load_snapshot_from_json, negotiate_capabilities, populate_dbms_metadata_ddl,
4069    };
4070
4071    #[derive(Clone, Debug, Eq, PartialEq)]
4072    struct QueryExpectation {
4073        sql_contains: String,
4074        params: Vec<OracleBind>,
4075        rows: Vec<OracleRow>,
4076    }
4077
4078    #[derive(Clone, Debug)]
4079    struct StaticConnection {
4080        rows: Vec<OracleRow>,
4081        row_count: u64,
4082        expected_queries: Vec<QueryExpectation>,
4083        connection_info: OracleConnectionInfo,
4084    }
4085
4086    impl Default for StaticConnection {
4087        fn default() -> Self {
4088            Self {
4089                rows: Vec::new(),
4090                row_count: 0,
4091                expected_queries: Vec::new(),
4092                connection_info: OracleConnectionInfo {
4093                    backend: OracleBackend::RustOracle,
4094                    connect_string: String::from("//localhost/XE"),
4095                    current_schema: Some(String::from("BILLING")),
4096                    server_version: String::from("23.0.0.0.0"),
4097                    db_name: String::from("XE"),
4098                    db_domain: String::new(),
4099                    service_name: String::from("XE"),
4100                    instance_name: String::from("xe"),
4101                    server_type: String::from("Dedicated"),
4102                    max_identifier_length: 128,
4103                    max_open_cursors: 500,
4104                },
4105            }
4106        }
4107    }
4108
4109    #[async_trait::async_trait(?Send)]
4110    impl OracleConnection for StaticConnection {
4111        fn backend(&self) -> OracleBackend {
4112            OracleBackend::RustOracle
4113        }
4114
4115        async fn ping(&self, cx: &LiveContext) -> Result<(), CatalogError> {
4116            let _ = cx;
4117            Ok(())
4118        }
4119
4120        async fn describe(&self, cx: &LiveContext) -> Result<OracleConnectionInfo, CatalogError> {
4121            let _ = cx;
4122            Ok(self.connection_info.clone())
4123        }
4124
4125        async fn query_rows(
4126            &self,
4127            cx: &LiveContext,
4128            sql: &str,
4129            params: &[OracleBind],
4130        ) -> Result<Vec<OracleRow>, CatalogError> {
4131            let _ = cx;
4132            if !self.expected_queries.is_empty() {
4133                if let Some(expectation) = self.expected_queries.iter().find(|expectation| {
4134                    sql.contains(expectation.sql_contains.as_str())
4135                        && params.eq(expectation.params.as_slice())
4136                }) {
4137                    return Ok(expectation.rows.clone());
4138                }
4139
4140                return Err(CatalogError::OracleBackendError {
4141                    backend: OracleBackend::RustOracle,
4142                    message: format!("unexpected query `{sql}` with params {params:?}"),
4143                });
4144            }
4145
4146            Ok(self.rows.clone())
4147        }
4148
4149        async fn execute(
4150            &self,
4151            cx: &LiveContext,
4152            _sql: &str,
4153            _params: &[OracleBind],
4154        ) -> Result<u64, CatalogError> {
4155            let _ = cx;
4156            Ok(self.row_count)
4157        }
4158    }
4159
4160    fn run_catalog_future<F: Future>(future: F) -> F::Output {
4161        RuntimeBuilder::current_thread()
4162            .build()
4163            .expect("test live runtime")
4164            .block_on(future)
4165    }
4166
4167    fn test_cx() -> LiveContext {
4168        LiveContext::current().expect("test runtime installs a request context")
4169    }
4170
4171    fn load_snapshot_for_test<C: OracleConnection>(
4172        connection: &C,
4173        request: &CatalogLoadRequest,
4174    ) -> Result<CatalogSnapshot, CatalogError> {
4175        run_catalog_future(async {
4176            let cx = test_cx();
4177            load_snapshot_from_connection(&cx, connection, request).await
4178        })
4179    }
4180
4181    fn load_catalog_users_for_test<C: OracleConnection>(
4182        connection: &C,
4183        snapshot: &mut CatalogSnapshot,
4184    ) -> Result<(), CatalogError> {
4185        run_catalog_future(async {
4186            let cx = test_cx();
4187            load_catalog_users(&cx, connection, snapshot).await
4188        })
4189    }
4190
4191    fn populate_dbms_metadata_ddl_for_test<C: OracleConnection>(
4192        connection: &C,
4193        snapshot: &mut CatalogSnapshot,
4194    ) -> Result<(), CatalogError> {
4195        run_catalog_future(async {
4196            let cx = test_cx();
4197            populate_dbms_metadata_ddl(&cx, connection, snapshot).await
4198        })
4199    }
4200
4201    fn negotiate_capabilities_for_test<C: OracleConnection>(connection: &C) -> CatalogCapabilities {
4202        run_catalog_future(async {
4203            let cx = test_cx();
4204            negotiate_capabilities(&cx, connection).await
4205        })
4206    }
4207
4208    fn oracle_row(columns: &[(&str, &str, Option<&str>)]) -> OracleRow {
4209        let mut row = OracleRow::default();
4210        for (name, oracle_type, value) in columns {
4211            row.insert(*name, *oracle_type, value.map(String::from));
4212        }
4213        row
4214    }
4215
4216    /// Probe queries issued by `negotiate_capabilities`. Every mock test
4217    /// that wants the live-extraction loader to behave normally should
4218    /// prepend these to its `expected_queries` so each probe succeeds.
4219    fn capability_probe_expectations() -> Vec<QueryExpectation> {
4220        [
4221            "from all_objects where rownum = 0",
4222            "from dba_objects where rownum = 0",
4223            "from all_source where rownum = 0",
4224            "from all_scheduler_jobs where rownum = 0",
4225            "from all_tab_privs where rownum = 0",
4226            "from all_plsql_object_settings where rownum = 0",
4227        ]
4228        .into_iter()
4229        .map(|fragment| QueryExpectation {
4230            sql_contains: String::from(fragment),
4231            params: vec![],
4232            rows: vec![],
4233        })
4234        .collect()
4235    }
4236
4237    /// `ALL_USERS` extraction expectation. The live loader queries this
4238    /// (database-wide, no schema bind) to learn which grantees are users so
4239    /// `grantee_from_dictionary_value` can tell users from roles. Tests pass
4240    /// the set of usernames they want to be classified as `Grantee::User`;
4241    /// any grantee absent from this set is classified as `Grantee::Role`.
4242    fn all_users_expectation(usernames: &[&str]) -> QueryExpectation {
4243        QueryExpectation {
4244            sql_contains: String::from("from all_users"),
4245            params: vec![],
4246            rows: usernames
4247                .iter()
4248                .map(|name| oracle_row(&[("USERNAME", "VARCHAR2(128)", Some(name))]))
4249                .collect(),
4250        }
4251    }
4252
4253    #[test]
4254    fn oracle_row_helpers_are_case_insensitive_and_typed() {
4255        let mut row = OracleRow::default();
4256        row.insert(
4257            "current_schema",
4258            "VARCHAR2(128)",
4259            Some(String::from("billing")),
4260        );
4261        row.insert("object_id", "NUMBER(10)", Some(String::from("42")));
4262        row.insert("editionable", "VARCHAR2(1)", Some(String::from("Y")));
4263        row.insert("ddl_text", "CLOB", None);
4264
4265        assert_eq!(row.text("CURRENT_SCHEMA"), Some("billing"));
4266        assert_eq!(row.parse_u64("object_id").ok(), Some(42));
4267        assert_eq!(row.parse_bool("EDITIONABLE").ok(), Some(true));
4268        assert!(matches!(
4269            row.require_text("ddl_text"),
4270            Err(CatalogError::NullColumnValue { column }) if column.eq("DDL_TEXT")
4271        ));
4272    }
4273
4274    #[test]
4275    fn parse_bool_honors_oracle_conventions_and_errors() {
4276        // The whole catalog's boolean flags (status/editionable/
4277        // deterministic/…) flow through parse_bool. Lock the Oracle
4278        // convention: Y/YES/TRUE/1 -> true, N/NO/FALSE/0 -> false,
4279        // case-insensitive, whitespace-trimmed; anything else is an
4280        // explicit InvalidColumnValue (R13 — never a silent default).
4281        for t in ["Y", "y", "YES", " yes ", "TRUE", "true", "1"] {
4282            let row = oracle_row(&[("FLAG", "VARCHAR2(3)", Some(t))]);
4283            assert_eq!(row.parse_bool("flag").ok(), Some(true), "{t:?} -> true");
4284        }
4285        for f in ["N", "n", "NO", " no ", "FALSE", "false", "0"] {
4286            let row = oracle_row(&[("FLAG", "VARCHAR2(3)", Some(f))]);
4287            assert_eq!(row.parse_bool("flag").ok(), Some(false), "{f:?} -> false");
4288        }
4289        // Unrecognized value -> explicit error, not a silent bool.
4290        let bad = oracle_row(&[("FLAG", "VARCHAR2(5)", Some("MAYBE"))]);
4291        assert!(matches!(
4292            bad.parse_bool("FLAG"),
4293            Err(CatalogError::InvalidColumnValue {
4294                column,
4295                expected: "bool",
4296                ..
4297            }) if column.eq("FLAG")
4298        ));
4299        // Missing column and NULL value are distinct typed errors.
4300        let empty = oracle_row(&[]);
4301        assert!(matches!(
4302            empty.parse_bool("FLAG"),
4303            Err(CatalogError::MissingColumn { column }) if column.eq("FLAG")
4304        ));
4305        let nullv = oracle_row(&[("FLAG", "VARCHAR2(1)", None)]);
4306        assert!(matches!(
4307            nullv.parse_bool("FLAG"),
4308            Err(CatalogError::NullColumnValue { column }) if column.eq("FLAG")
4309        ));
4310
4311        // parse_u64 must reject a negative value (Oracle NUMBER can
4312        // be signed) rather than wrap/panic.
4313        let neg = oracle_row(&[("N", "NUMBER", Some("-1"))]);
4314        assert!(matches!(
4315            neg.parse_u64("N"),
4316            Err(CatalogError::InvalidColumnValue {
4317                expected: "u64",
4318                ..
4319            })
4320        ));
4321        let i = oracle_row(&[("N", "NUMBER", Some("-42"))]);
4322        assert_eq!(i.parse_i64("N").ok(), Some(-42));
4323    }
4324
4325    #[test]
4326    fn catalog_load_request_defaults_to_current_schema() {
4327        let request = CatalogLoadRequest::default();
4328        assert_eq!(
4329            request.schema_filters,
4330            vec![CatalogSchemaFilter::CurrentSchema]
4331        );
4332    }
4333
4334    #[test]
4335    fn load_snapshot_from_connection_extracts_structural_metadata() {
4336        let object_rows = vec![
4337            oracle_row(&[
4338                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4339                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4340                ("OBJECT_TYPE", "VARCHAR2(30)", Some("PACKAGE")),
4341                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4342            ]),
4343            oracle_row(&[
4344                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4345                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
4346                ("OBJECT_TYPE", "VARCHAR2(30)", Some("FUNCTION")),
4347                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4348            ]),
4349            oracle_row(&[
4350                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4351                ("OBJECT_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
4352                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
4353                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4354            ]),
4355            oracle_row(&[
4356                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4357                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
4358                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
4359                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4360            ]),
4361            oracle_row(&[
4362                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4363                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICE_SUMMARY")),
4364                ("OBJECT_TYPE", "VARCHAR2(30)", Some("VIEW")),
4365                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4366            ]),
4367            oracle_row(&[
4368                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4369                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES_BIU")),
4370                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TRIGGER")),
4371                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4372            ]),
4373        ];
4374        let column_rows = vec![
4375            oracle_row(&[
4376                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4377                ("TABLE_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
4378                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4379                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4380                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4381                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4382                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4383                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4384                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4385                ("CHAR_USED", "VARCHAR2(1)", None),
4386                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4387                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", None),
4388                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("NO")),
4389                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4390            ]),
4391            oracle_row(&[
4392                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4393                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4394                ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4395                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4396                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4397                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4398                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4399                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4400                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4401                ("CHAR_USED", "VARCHAR2(1)", None),
4402                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4403                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", None),
4404                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("NO")),
4405                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4406            ]),
4407            oracle_row(&[
4408                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4409                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4410                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4411                ("COLUMN_POSITION", "NUMBER(10)", Some("2")),
4412                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4413                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4414                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4415                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4416                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4417                ("CHAR_USED", "VARCHAR2(1)", None),
4418                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4419                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", None),
4420                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("NO")),
4421                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4422            ]),
4423            oracle_row(&[
4424                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4425                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICE_SUMMARY")),
4426                ("COLUMN_NAME", "VARCHAR2(128)", Some("TOTAL_DUE")),
4427                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4428                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4429                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4430                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4431                ("DATA_PRECISION", "NUMBER(10)", Some("12")),
4432                ("DATA_SCALE", "NUMBER(10)", Some("2")),
4433                ("CHAR_USED", "VARCHAR2(1)", None),
4434                ("NULLABLE", "VARCHAR2(1)", Some("Y")),
4435                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", Some("0")),
4436                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("YES")),
4437                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4438            ]),
4439        ];
4440        let constraint_rows = vec![
4441            oracle_row(&[
4442                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4443                (
4444                    "CONSTRAINT_NAME",
4445                    "VARCHAR2(128)",
4446                    Some("INVOICES_CUSTOMER_FK"),
4447                ),
4448                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4449                ("CONSTRAINT_TYPE", "VARCHAR2(1)", Some("R")),
4450                ("REFERENCED_TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4451                ("REFERENCED_TABLE_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
4452                ("SEARCH_CONDITION_VC", "VARCHAR2(4000)", None),
4453                ("IS_DEFERRABLE", "VARCHAR2(1)", Some("N")),
4454                ("IS_DEFERRED", "VARCHAR2(1)", Some("N")),
4455                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4456                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4457                (
4458                    "REFERENCED_COLUMN_NAME",
4459                    "VARCHAR2(128)",
4460                    Some("CUSTOMER_ID"),
4461                ),
4462            ]),
4463            oracle_row(&[
4464                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4465                ("CONSTRAINT_NAME", "VARCHAR2(128)", Some("INVOICES_PK")),
4466                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4467                ("CONSTRAINT_TYPE", "VARCHAR2(1)", Some("P")),
4468                ("REFERENCED_TABLE_OWNER", "VARCHAR2(128)", None),
4469                ("REFERENCED_TABLE_NAME", "VARCHAR2(128)", None),
4470                ("SEARCH_CONDITION_VC", "VARCHAR2(4000)", None),
4471                ("IS_DEFERRABLE", "VARCHAR2(1)", Some("N")),
4472                ("IS_DEFERRED", "VARCHAR2(1)", Some("N")),
4473                ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4474                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4475                ("REFERENCED_COLUMN_NAME", "VARCHAR2(128)", None),
4476            ]),
4477            oracle_row(&[
4478                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4479                (
4480                    "CONSTRAINT_NAME",
4481                    "VARCHAR2(128)",
4482                    Some("INVOICES_CUSTOMER_NN"),
4483                ),
4484                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4485                ("CONSTRAINT_TYPE", "VARCHAR2(1)", Some("C")),
4486                ("REFERENCED_TABLE_OWNER", "VARCHAR2(128)", None),
4487                ("REFERENCED_TABLE_NAME", "VARCHAR2(128)", None),
4488                (
4489                    "SEARCH_CONDITION_VC",
4490                    "VARCHAR2(4000)",
4491                    Some("\"CUSTOMER_ID\" IS NOT NULL"),
4492                ),
4493                ("IS_DEFERRABLE", "VARCHAR2(1)", Some("N")),
4494                ("IS_DEFERRED", "VARCHAR2(1)", Some("N")),
4495                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4496                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4497                ("REFERENCED_COLUMN_NAME", "VARCHAR2(128)", None),
4498            ]),
4499        ];
4500        let index_rows = vec![oracle_row(&[
4501            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4502            ("INDEX_NAME", "VARCHAR2(128)", Some("INVOICES_PK_IDX")),
4503            ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4504            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4505            ("IS_UNIQUE", "VARCHAR2(1)", Some("Y")),
4506            ("INDEX_TYPE", "VARCHAR2(27)", Some("NORMAL")),
4507            ("STATUS", "VARCHAR2(8)", Some("VALID")),
4508            ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4509            ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4510        ])];
4511        let trigger_rows = vec![oracle_row(&[
4512            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4513            ("TRIGGER_NAME", "VARCHAR2(128)", Some("INVOICES_BIU")),
4514            ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4515            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4516            ("TRIGGER_TYPE", "VARCHAR2(80)", Some("BEFORE EACH ROW")),
4517            (
4518                "TRIGGERING_EVENT",
4519                "VARCHAR2(246)",
4520                Some("INSERT OR UPDATE"),
4521            ),
4522            ("WHEN_CLAUSE", "VARCHAR2(4000)", Some(":new.total_due >= 0")),
4523        ])];
4524        let synonym_rows = vec![
4525            oracle_row(&[
4526                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4527                ("SYNONYM_NAME", "VARCHAR2(128)", Some("LATEST_INVOICE")),
4528                ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4529                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4530                ("DB_LINK", "VARCHAR2(128)", None),
4531            ]),
4532            oracle_row(&[
4533                ("OWNER", "VARCHAR2(128)", Some("PUBLIC")),
4534                ("SYNONYM_NAME", "VARCHAR2(128)", Some("INVOICE_PUBLIC")),
4535                ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4536                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4537                ("DB_LINK", "VARCHAR2(128)", None),
4538            ]),
4539        ];
4540        let procedure_rows = vec![
4541            oracle_row(&[
4542                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4543                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4544                ("PROCEDURE_NAME", "VARCHAR2(128)", Some("CREATE_INVOICE")),
4545                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
4546                ("OVERLOAD", "VARCHAR2(40)", None),
4547                ("OBJECT_TYPE", "VARCHAR2(13)", Some("PROCEDURE")),
4548                ("DETERMINISTIC", "VARCHAR2(3)", Some("NO")),
4549                ("PIPELINED", "VARCHAR2(3)", Some("NO")),
4550            ]),
4551            oracle_row(&[
4552                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4553                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4554                (
4555                    "PROCEDURE_NAME",
4556                    "VARCHAR2(128)",
4557                    Some("TOTAL_FOR_CUSTOMER"),
4558                ),
4559                ("SUBPROGRAM_ID", "NUMBER(10)", Some("2")),
4560                ("OVERLOAD", "VARCHAR2(40)", Some("1")),
4561                ("OBJECT_TYPE", "VARCHAR2(13)", Some("FUNCTION")),
4562                ("DETERMINISTIC", "VARCHAR2(3)", Some("NO")),
4563                ("PIPELINED", "VARCHAR2(3)", Some("NO")),
4564            ]),
4565            oracle_row(&[
4566                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4567                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
4568                ("PROCEDURE_NAME", "VARCHAR2(128)", None),
4569                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
4570                ("OVERLOAD", "VARCHAR2(40)", None),
4571                ("OBJECT_TYPE", "VARCHAR2(13)", Some("FUNCTION")),
4572                ("DETERMINISTIC", "VARCHAR2(3)", Some("YES")),
4573                ("PIPELINED", "VARCHAR2(3)", Some("NO")),
4574            ]),
4575        ];
4576        let argument_rows = vec![
4577            oracle_row(&[
4578                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4579                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4580                ("OBJECT_NAME", "VARCHAR2(128)", Some("CREATE_INVOICE")),
4581                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
4582                ("OVERLOAD", "VARCHAR2(40)", None),
4583                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4584                ("POSITION", "NUMBER(10)", Some("1")),
4585                ("SEQUENCE", "NUMBER(10)", Some("1")),
4586                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4587                ("TYPE_OWNER", "VARCHAR2(128)", None),
4588                ("TYPE_NAME", "VARCHAR2(128)", None),
4589                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4590                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4591                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4592                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
4593                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
4594            ]),
4595            oracle_row(&[
4596                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4597                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4598                ("OBJECT_NAME", "VARCHAR2(128)", Some("CREATE_INVOICE")),
4599                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
4600                ("OVERLOAD", "VARCHAR2(40)", None),
4601                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("TOTAL_DUE")),
4602                ("POSITION", "NUMBER(10)", Some("2")),
4603                ("SEQUENCE", "NUMBER(10)", Some("2")),
4604                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4605                ("TYPE_OWNER", "VARCHAR2(128)", None),
4606                ("TYPE_NAME", "VARCHAR2(128)", None),
4607                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4608                ("DATA_PRECISION", "NUMBER(10)", Some("12")),
4609                ("DATA_SCALE", "NUMBER(10)", Some("2")),
4610                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
4611                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
4612            ]),
4613            oracle_row(&[
4614                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4615                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4616                ("OBJECT_NAME", "VARCHAR2(128)", Some("TOTAL_FOR_CUSTOMER")),
4617                ("SUBPROGRAM_ID", "NUMBER(10)", Some("2")),
4618                ("OVERLOAD", "VARCHAR2(40)", Some("1")),
4619                ("ARGUMENT_NAME", "VARCHAR2(128)", None),
4620                ("POSITION", "NUMBER(10)", Some("0")),
4621                ("SEQUENCE", "NUMBER(10)", Some("1")),
4622                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4623                ("TYPE_OWNER", "VARCHAR2(128)", None),
4624                ("TYPE_NAME", "VARCHAR2(128)", None),
4625                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4626                ("DATA_PRECISION", "NUMBER(10)", Some("12")),
4627                ("DATA_SCALE", "NUMBER(10)", Some("2")),
4628                ("IN_OUT", "VARCHAR2(9)", Some("OUT")),
4629                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
4630            ]),
4631            oracle_row(&[
4632                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4633                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4634                ("OBJECT_NAME", "VARCHAR2(128)", Some("TOTAL_FOR_CUSTOMER")),
4635                ("SUBPROGRAM_ID", "NUMBER(10)", Some("2")),
4636                ("OVERLOAD", "VARCHAR2(40)", Some("1")),
4637                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4638                ("POSITION", "NUMBER(10)", Some("1")),
4639                ("SEQUENCE", "NUMBER(10)", Some("2")),
4640                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4641                ("TYPE_OWNER", "VARCHAR2(128)", None),
4642                ("TYPE_NAME", "VARCHAR2(128)", None),
4643                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4644                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4645                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4646                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
4647                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
4648            ]),
4649            oracle_row(&[
4650                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4651                ("PACKAGE_NAME", "VARCHAR2(128)", None),
4652                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
4653                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
4654                ("OVERLOAD", "VARCHAR2(40)", None),
4655                ("ARGUMENT_NAME", "VARCHAR2(128)", None),
4656                ("POSITION", "NUMBER(10)", Some("0")),
4657                ("SEQUENCE", "NUMBER(10)", Some("1")),
4658                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4659                ("TYPE_OWNER", "VARCHAR2(128)", None),
4660                ("TYPE_NAME", "VARCHAR2(128)", None),
4661                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4662                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4663                ("DATA_SCALE", "NUMBER(10)", Some("2")),
4664                ("IN_OUT", "VARCHAR2(9)", Some("OUT")),
4665                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
4666            ]),
4667            oracle_row(&[
4668                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4669                ("PACKAGE_NAME", "VARCHAR2(128)", None),
4670                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
4671                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
4672                ("OVERLOAD", "VARCHAR2(40)", None),
4673                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4674                ("POSITION", "NUMBER(10)", Some("1")),
4675                ("SEQUENCE", "NUMBER(10)", Some("2")),
4676                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4677                ("TYPE_OWNER", "VARCHAR2(128)", None),
4678                ("TYPE_NAME", "VARCHAR2(128)", None),
4679                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4680                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4681                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4682                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
4683                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
4684            ]),
4685        ];
4686        let mut expected_queries = capability_probe_expectations();
4687        // REPORTING is a database user, so its object grant stays a direct
4688        // (high-confidence) Grantee::User after the role-classification fix.
4689        expected_queries.push(all_users_expectation(&["BILLING", "REPORTING"]));
4690        expected_queries.extend(vec![
4691                QueryExpectation {
4692                    sql_contains: String::from("from all_objects"),
4693                    params: vec![OracleBind::from("BILLING")],
4694                    rows: object_rows,
4695                },
4696                QueryExpectation {
4697                    sql_contains: String::from("from all_tab_cols"),
4698                    params: vec![OracleBind::from("BILLING")],
4699                    rows: column_rows,
4700                },
4701                QueryExpectation {
4702                    sql_contains: String::from("from all_constraints c"),
4703                    params: vec![OracleBind::from("BILLING")],
4704                    rows: constraint_rows,
4705                },
4706                QueryExpectation {
4707                    sql_contains: String::from("from all_indexes i"),
4708                    params: vec![OracleBind::from("BILLING")],
4709                    rows: index_rows,
4710                },
4711                QueryExpectation {
4712                    sql_contains: String::from("from all_triggers"),
4713                    params: vec![OracleBind::from("BILLING")],
4714                    rows: trigger_rows,
4715                },
4716                QueryExpectation {
4717                    sql_contains: String::from("from all_synonyms"),
4718                    params: vec![OracleBind::from("BILLING")],
4719                    rows: synonym_rows,
4720                },
4721                QueryExpectation {
4722                    sql_contains: String::from("from all_procedures"),
4723                    params: vec![OracleBind::from("BILLING")],
4724                    rows: procedure_rows,
4725                },
4726                QueryExpectation {
4727                    sql_contains: String::from("from all_arguments"),
4728                    params: vec![OracleBind::from("BILLING")],
4729                    rows: argument_rows,
4730                },
4731                QueryExpectation {
4732                    sql_contains: String::from("from all_views"),
4733                    params: vec![OracleBind::from("BILLING")],
4734                    rows: vec![oracle_row(&[
4735                        ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4736                        ("VIEW_NAME", "VARCHAR2(128)", Some("INVOICE_SUMMARY")),
4737                        (
4738                            "TEXT_VC",
4739                            "VARCHAR2(4000)",
4740                            Some(
4741                                "select invoice_id, sum(amount) total_due from invoices group by invoice_id",
4742                            ),
4743                        ),
4744                        ("READ_ONLY", "VARCHAR2(1)", Some("N")),
4745                    ])],
4746                },
4747                QueryExpectation {
4748                    sql_contains: String::from("from all_mviews"),
4749                    params: vec![OracleBind::from("BILLING")],
4750                    rows: vec![],
4751                },
4752                QueryExpectation {
4753                    sql_contains: String::from("from all_sequences"),
4754                    params: vec![OracleBind::from("BILLING")],
4755                    rows: vec![],
4756                },
4757                QueryExpectation {
4758                    sql_contains: String::from("from all_type_attrs"),
4759                    params: vec![OracleBind::from("BILLING")],
4760                    rows: vec![],
4761                },
4762                QueryExpectation {
4763                    sql_contains: String::from("from all_tab_privs"),
4764                    params: vec![OracleBind::from("BILLING")],
4765                    rows: vec![oracle_row(&[
4766                        ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
4767                        ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4768                        ("GRANTEE", "VARCHAR2(128)", Some("REPORTING")),
4769                        ("PRIVILEGE", "VARCHAR2(40)", Some("SELECT")),
4770                        ("GRANTABLE", "VARCHAR2(3)", Some("NO")),
4771                        ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
4772                    ])],
4773                },
4774                QueryExpectation {
4775                    sql_contains: String::from("from all_db_links"),
4776                    params: vec![OracleBind::from("BILLING")],
4777                    rows: vec![],
4778                },
4779                QueryExpectation {
4780                    sql_contains: String::from("from all_tab_comments"),
4781                    params: vec![OracleBind::from("BILLING")],
4782                    rows: vec![],
4783                },
4784                QueryExpectation {
4785                    sql_contains: String::from("from all_col_comments"),
4786                    params: vec![OracleBind::from("BILLING")],
4787                    rows: vec![],
4788                },
4789                QueryExpectation {
4790                    sql_contains: String::from("from all_editions"),
4791                    params: vec![],
4792                    rows: vec![],
4793                },
4794                QueryExpectation {
4795                    sql_contains: String::from("from all_editioning_views"),
4796                    params: vec![OracleBind::from("BILLING")],
4797                    rows: vec![],
4798                },
4799                QueryExpectation {
4800                    sql_contains: String::from("from all_policies"),
4801                    params: vec![OracleBind::from("BILLING")],
4802                    rows: vec![],
4803                },
4804                QueryExpectation {
4805                    sql_contains: String::from("from all_dependencies"),
4806                    params: vec![OracleBind::from("BILLING")],
4807                    rows: vec![],
4808                },
4809        ]);
4810        let connection = StaticConnection {
4811            expected_queries,
4812            ..StaticConnection::default()
4813        };
4814
4815        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
4816
4817        let current_schema = snapshot
4818            .profile
4819            .current_schema
4820            .and_then(|name| snapshot.interner.resolve(name.symbol()));
4821        assert_eq!(current_schema, Some("BILLING"));
4822        assert_eq!(
4823            snapshot.profile.oracle_version,
4824            plsql_core::OracleVersion::Oracle23ai
4825        );
4826        assert!(snapshot.capabilities.can_query_all_views);
4827
4828        let billing_schema = snapshot
4829            .profile
4830            .current_schema
4831            .expect("current schema should be interned");
4832        let schema_catalog = snapshot
4833            .schemas
4834            .get(&billing_schema)
4835            .expect("billing schema should exist");
4836
4837        let invoices_name = schema_catalog
4838            .objects
4839            .keys()
4840            .find(|name| {
4841                snapshot
4842                    .interner
4843                    .resolve(name.symbol())
4844                    .is_some_and(|label| label.eq("INVOICES"))
4845            })
4846            .copied()
4847            .expect("object name should intern");
4848        let invoice_summary_name = schema_catalog
4849            .objects
4850            .keys()
4851            .find(|name| {
4852                snapshot
4853                    .interner
4854                    .resolve(name.symbol())
4855                    .is_some_and(|label| label.eq("INVOICE_SUMMARY"))
4856            })
4857            .copied()
4858            .expect("object name should intern");
4859        let package_object_name = schema_catalog
4860            .objects
4861            .keys()
4862            .find(|name| {
4863                snapshot
4864                    .interner
4865                    .resolve(name.symbol())
4866                    .is_some_and(|label| label.eq("BILLING_API"))
4867            })
4868            .copied()
4869            .expect("package object name should intern");
4870        let tax_function_name = schema_catalog
4871            .objects
4872            .keys()
4873            .find(|name| {
4874                snapshot
4875                    .interner
4876                    .resolve(name.symbol())
4877                    .is_some_and(|label| label.eq("CALCULATE_TAX"))
4878            })
4879            .copied()
4880            .expect("function object name should intern");
4881        let trigger_object_name = schema_catalog
4882            .objects
4883            .keys()
4884            .find(|name| {
4885                snapshot
4886                    .interner
4887                    .resolve(name.symbol())
4888                    .is_some_and(|label| label.eq("INVOICES_BIU"))
4889            })
4890            .copied()
4891            .expect("trigger object name should intern");
4892
4893        let invoices_table = schema_catalog
4894            .objects
4895            .get(&invoices_name)
4896            .and_then(|object| {
4897                if let CatalogObject::Table(table) = object {
4898                    Some(table)
4899                } else {
4900                    None
4901                }
4902            });
4903        assert!(invoices_table.is_some());
4904        let invoice_id = invoices_table
4905            .and_then(|table| {
4906                table.columns.values().find(|column| {
4907                    snapshot
4908                        .interner
4909                        .resolve(column.name.symbol())
4910                        .is_some_and(|label| label.eq("INVOICE_ID"))
4911                })
4912            })
4913            .expect("table column should exist");
4914        assert_eq!(invoice_id.position, 1);
4915        assert_eq!(invoice_id.data_type.name, "NUMBER");
4916        assert_eq!(invoice_id.data_type.precision, Some(10));
4917        assert!(!invoice_id.nullable);
4918        assert!(!invoice_id.hidden);
4919        assert!(invoice_id.generated_expression.is_none());
4920
4921        let invoice_summary_view =
4922            schema_catalog
4923                .objects
4924                .get(&invoice_summary_name)
4925                .and_then(|object| {
4926                    if let CatalogObject::View(view) = object {
4927                        Some(view)
4928                    } else {
4929                        None
4930                    }
4931                });
4932        assert!(invoice_summary_view.is_some());
4933        let total_due = invoice_summary_view
4934            .and_then(|view| {
4935                view.columns.values().find(|column| {
4936                    snapshot
4937                        .interner
4938                        .resolve(column.name.symbol())
4939                        .is_some_and(|label| label.eq("TOTAL_DUE"))
4940                })
4941            })
4942            .expect("view column should exist");
4943        assert_eq!(total_due.position, 1);
4944        assert!(total_due.nullable);
4945        assert_eq!(total_due.generated_expression.as_deref(), Some("0"));
4946        assert!(total_due.default_expression.is_none());
4947
4948        let invoices_pk_index = schema_catalog
4949            .indexes
4950            .values()
4951            .find(|index| {
4952                snapshot
4953                    .interner
4954                    .resolve(index.name.symbol())
4955                    .is_some_and(|label| label.eq("INVOICES_PK_IDX"))
4956            })
4957            .expect("index metadata should exist");
4958        assert!(invoices_pk_index.unique);
4959        assert_eq!(invoices_pk_index.index_type, "NORMAL");
4960        assert_eq!(invoices_pk_index.status, ObjectStatus::Valid);
4961        assert_eq!(
4962            invoices_pk_index
4963                .columns
4964                .first()
4965                .and_then(|column| snapshot.interner.resolve(column.symbol())),
4966            Some("INVOICE_ID")
4967        );
4968
4969        let customer_fk = schema_catalog
4970            .constraints
4971            .values()
4972            .find(|constraint| {
4973                snapshot
4974                    .interner
4975                    .resolve(constraint.name.symbol())
4976                    .is_some_and(|label| label.eq("INVOICES_CUSTOMER_FK"))
4977            })
4978            .expect("foreign key should exist");
4979        assert_eq!(customer_fk.constraint_type, ConstraintType::ForeignKey);
4980        assert_eq!(
4981            customer_fk
4982                .columns
4983                .first()
4984                .and_then(|column| snapshot.interner.resolve(column.symbol())),
4985            Some("CUSTOMER_ID")
4986        );
4987        assert_eq!(
4988            customer_fk
4989                .referenced_table_name
4990                .and_then(|name| snapshot.interner.resolve(name.symbol())),
4991            Some("CUSTOMERS")
4992        );
4993        assert_eq!(
4994            customer_fk
4995                .referenced_columns
4996                .first()
4997                .and_then(|column| snapshot.interner.resolve(column.symbol())),
4998            Some("CUSTOMER_ID")
4999        );
5000
5001        let customer_not_null = schema_catalog
5002            .constraints
5003            .values()
5004            .find(|constraint| {
5005                snapshot
5006                    .interner
5007                    .resolve(constraint.name.symbol())
5008                    .is_some_and(|label| label.eq("INVOICES_CUSTOMER_NN"))
5009            })
5010            .expect("not-null constraint should exist");
5011        assert_eq!(customer_not_null.constraint_type, ConstraintType::NotNull);
5012
5013        let trigger_metadata = schema_catalog
5014            .triggers
5015            .values()
5016            .find(|trigger| {
5017                snapshot
5018                    .interner
5019                    .resolve(trigger.common.name.symbol())
5020                    .is_some_and(|label| label.eq("INVOICES_BIU"))
5021            })
5022            .expect("trigger metadata should exist");
5023        assert_eq!(trigger_metadata.timing, TriggerTiming::Before);
5024        assert_eq!(trigger_metadata.level, TriggerLevel::Row);
5025        assert_eq!(
5026            trigger_metadata.events.as_slice(),
5027            &[TriggerEvent::Insert, TriggerEvent::Update]
5028        );
5029        assert_eq!(
5030            trigger_metadata.target_name.symbol().get(),
5031            invoices_name.symbol().get()
5032        );
5033        assert_eq!(
5034            trigger_metadata.when_clause.as_deref(),
5035            Some(":new.total_due >= 0")
5036        );
5037        assert!(matches!(
5038            schema_catalog.objects.get(&trigger_object_name),
5039            Some(CatalogObject::Trigger(_))
5040        ));
5041
5042        let latest_invoice_synonym = schema_catalog
5043            .synonyms
5044            .values()
5045            .find(|synonym| {
5046                snapshot
5047                    .interner
5048                    .resolve(synonym.target_name.symbol())
5049                    .is_some_and(|label| label.eq("INVOICES"))
5050            })
5051            .expect("private synonym should exist");
5052        assert!(!latest_invoice_synonym.public_synonym);
5053
5054        let public_schema_name = snapshot
5055            .schemas
5056            .keys()
5057            .find(|name| {
5058                snapshot
5059                    .interner
5060                    .resolve(name.symbol())
5061                    .is_some_and(|label| label.eq("PUBLIC"))
5062            })
5063            .copied()
5064            .expect("public schema should exist");
5065        let public_schema = snapshot
5066            .schemas
5067            .get(&public_schema_name)
5068            .expect("public schema catalog should exist");
5069        let public_synonym = public_schema
5070            .synonyms
5071            .values()
5072            .find(|synonym| {
5073                snapshot
5074                    .interner
5075                    .resolve(synonym.target_name.symbol())
5076                    .is_some_and(|label| label.eq("INVOICES"))
5077            })
5078            .expect("public synonym should exist");
5079        assert!(public_synonym.public_synonym);
5080
5081        let package_metadata = schema_catalog
5082            .objects
5083            .get(&package_object_name)
5084            .and_then(|object| {
5085                if let CatalogObject::Package(metadata) = object {
5086                    Some(metadata)
5087                } else {
5088                    None
5089                }
5090            })
5091            .expect("package metadata should exist");
5092        let create_invoice = package_metadata
5093            .procedures
5094            .iter()
5095            .find(|signature| {
5096                snapshot
5097                    .interner
5098                    .resolve(signature.routine_name.symbol())
5099                    .is_some_and(|label| label.eq("CREATE_INVOICE"))
5100            })
5101            .expect("package procedure should exist");
5102        assert_eq!(create_invoice.arguments.len(), 2);
5103        let total_for_customer = package_metadata
5104            .functions
5105            .iter()
5106            .find(|signature| {
5107                snapshot
5108                    .interner
5109                    .resolve(signature.routine_name.symbol())
5110                    .is_some_and(|label| label.eq("TOTAL_FOR_CUSTOMER"))
5111            })
5112            .expect("package function should exist");
5113        assert_eq!(total_for_customer.overload, Some(1));
5114        assert_eq!(
5115            total_for_customer
5116                .return_type
5117                .as_ref()
5118                .map(|data_type| data_type.name.as_str()),
5119            Some("NUMBER")
5120        );
5121        assert_eq!(total_for_customer.arguments.len(), 1);
5122
5123        let tax_function = schema_catalog
5124            .objects
5125            .get(&tax_function_name)
5126            .and_then(|object| {
5127                if let CatalogObject::Function(metadata) = object {
5128                    Some(metadata)
5129                } else {
5130                    None
5131                }
5132            })
5133            .expect("top-level function should exist");
5134        assert!(tax_function.deterministic);
5135        assert!(!tax_function.pipelined);
5136        assert_eq!(tax_function.signature.arguments.len(), 1);
5137        assert_eq!(
5138            tax_function
5139                .signature
5140                .return_type
5141                .as_ref()
5142                .map(|data_type| data_type.name.as_str()),
5143            Some("NUMBER")
5144        );
5145
5146        let invoice_summary_view_with_hash = schema_catalog
5147            .objects
5148            .get(&invoice_summary_name)
5149            .and_then(|object| {
5150                if let CatalogObject::View(view) = object {
5151                    Some(view)
5152                } else {
5153                    None
5154                }
5155            })
5156            .expect("invoice summary view should be present");
5157        assert!(invoice_summary_view_with_hash.query_hash.is_some());
5158        assert_eq!(invoice_summary_view_with_hash.read_only, Some(false));
5159
5160        assert_eq!(schema_catalog.grants.len(), 1);
5161        let reporting_grant = &schema_catalog.grants[0];
5162        assert!(matches!(
5163            reporting_grant.privilege,
5164            crate::GrantPrivilege::Select
5165        ));
5166        // REPORTING appears in the ALL_USERS fixture, so it classifies as a
5167        // direct user grant (and not, conservatively, as a role).
5168        assert!(matches!(reporting_grant.grantee, crate::Grantee::User(_)));
5169        assert!(!reporting_grant.grantable);
5170    }
5171
5172    /// oracle-qm3q.2 regression: `grantee_from_dictionary_value` must
5173    /// discriminate an object-privilege grantee against the loaded
5174    /// `ALL_USERS` set. A grantee that is NOT a known user is a database
5175    /// role (the only remaining grantee class besides PUBLIC), and must be
5176    /// recorded as `Grantee::Role` so the privilege resolver downgrades it
5177    /// to Low confidence with a `RuntimeGrantOrRole` ambiguity instead of
5178    /// the previous, fail-toward-permissive `Grantee::User` (High).
5179    #[test]
5180    fn grantee_classification_uses_loaded_user_set() {
5181        let mut snapshot = CatalogSnapshot::new(
5182            plsql_core::AnalysisProfile::default(),
5183            CatalogCapabilities::default(),
5184            CatalogSource::default(),
5185            DateTime::<Utc>::UNIX_EPOCH,
5186        );
5187
5188        // Before the user set is loaded, the grantee class is undetermined.
5189        // R13 / fail-toward-restrictive: an undetermined grantee must NOT be
5190        // a high-confidence direct user grant — it routes through the role
5191        // ambiguity path instead.
5192        assert!(snapshot.known_users.is_none());
5193        let undetermined =
5194            grantee_from_dictionary_value(&mut snapshot, "MYSTERY_GRANTEE").expect("grantee");
5195        assert!(
5196            matches!(undetermined, crate::Grantee::Role(_)),
5197            "undetermined grantee (ALL_USERS not loaded) must not be a direct user grant; got {undetermined:?}"
5198        );
5199
5200        // Load a user set: APP_USER is a user, APP_READER_ROLE is not.
5201        let app_user = snapshot.intern_user_name("APP_USER").expect("user");
5202        let mut users = HashSet::new();
5203        users.insert(app_user);
5204        snapshot.known_users = Some(users);
5205
5206        // PUBLIC is always PUBLIC.
5207        assert!(matches!(
5208            grantee_from_dictionary_value(&mut snapshot, "PUBLIC").expect("grantee"),
5209            crate::Grantee::Public
5210        ));
5211        // A known user classifies as a direct user grant.
5212        assert!(matches!(
5213            grantee_from_dictionary_value(&mut snapshot, "APP_USER").expect("grantee"),
5214            crate::Grantee::User(_)
5215        ));
5216        // A grantee absent from ALL_USERS classifies as a role — the defect
5217        // this bead fixes (previously always `Grantee::User`).
5218        let role =
5219            grantee_from_dictionary_value(&mut snapshot, "APP_READER_ROLE").expect("grantee");
5220        let role_name = match role {
5221            crate::Grantee::Role(role_name) => Some(role_name),
5222            _ => None,
5223        };
5224        assert!(
5225            role_name.is_some(),
5226            "APP_READER_ROLE must classify as a role"
5227        );
5228        assert_eq!(
5229            snapshot
5230                .interner
5231                .resolve(role_name.expect("role assertion above").symbol()),
5232            Some("APP_READER_ROLE")
5233        );
5234    }
5235
5236    /// oracle-qm3q.2: if `ALL_USERS` cannot be read, `load_catalog_users`
5237    /// must leave `known_users` as `None` (so grantees stay conservatively
5238    /// classified) and record a capability warning rather than aborting the
5239    /// extraction or silently assuming the grantee universe.
5240    #[test]
5241    fn load_catalog_users_failure_is_nonfatal_and_marks_unknown() {
5242        // A strict mock with NO matching expectation for `from all_users`
5243        // makes the query fail.
5244        let connection = StaticConnection {
5245            expected_queries: vec![QueryExpectation {
5246                sql_contains: String::from("from something_else"),
5247                params: vec![],
5248                rows: vec![],
5249            }],
5250            ..StaticConnection::default()
5251        };
5252        let mut snapshot = CatalogSnapshot::new(
5253            plsql_core::AnalysisProfile::default(),
5254            CatalogCapabilities::default(),
5255            CatalogSource::default(),
5256            DateTime::<Utc>::UNIX_EPOCH,
5257        );
5258
5259        load_catalog_users_for_test(&connection, &mut snapshot).expect("non-fatal");
5260        assert!(
5261            snapshot.known_users.is_none(),
5262            "failed ALL_USERS read must leave grantee universe undetermined"
5263        );
5264        assert!(
5265            snapshot
5266                .capabilities
5267                .warnings
5268                .iter()
5269                .any(|w| w.code.eq("all-users-probe")),
5270            "a capability warning must record the ALL_USERS read failure"
5271        );
5272    }
5273
5274    #[test]
5275    fn load_snapshot_from_connection_extracts_views_sequences_mviews_types_and_grants() {
5276        let object_rows = vec![
5277            oracle_row(&[
5278                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5279                ("OBJECT_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5280                ("OBJECT_TYPE", "VARCHAR2(30)", Some("VIEW")),
5281                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5282            ]),
5283            oracle_row(&[
5284                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5285                ("OBJECT_NAME", "VARCHAR2(128)", Some("CUSTOMER_TOTALS_MV")),
5286                ("OBJECT_TYPE", "VARCHAR2(30)", Some("MATERIALIZED VIEW")),
5287                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5288            ]),
5289            oracle_row(&[
5290                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5291                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICE_SEQ")),
5292                ("OBJECT_TYPE", "VARCHAR2(30)", Some("SEQUENCE")),
5293                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5294            ]),
5295            oracle_row(&[
5296                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5297                ("OBJECT_NAME", "VARCHAR2(128)", Some("ADDRESS_T")),
5298                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TYPE")),
5299                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5300            ]),
5301        ];
5302        let view_rows = vec![oracle_row(&[
5303            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5304            ("VIEW_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5305            (
5306                "TEXT_VC",
5307                "VARCHAR2(4000)",
5308                Some("select customer_id from customers where active = 'Y'"),
5309            ),
5310            ("READ_ONLY", "VARCHAR2(1)", Some("Y")),
5311        ])];
5312        let mview_rows = vec![oracle_row(&[
5313            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5314            ("MVIEW_NAME", "VARCHAR2(128)", Some("CUSTOMER_TOTALS_MV")),
5315            ("REFRESH_MODE", "VARCHAR2(6)", Some("DEMAND")),
5316            ("REFRESH_METHOD", "VARCHAR2(8)", Some("COMPLETE")),
5317            (
5318                "QUERY",
5319                "LONG",
5320                Some("select customer_id, sum(amount) from invoices group by customer_id"),
5321            ),
5322        ])];
5323        let sequence_rows = vec![oracle_row(&[
5324            ("SEQUENCE_OWNER", "VARCHAR2(128)", Some("BILLING")),
5325            ("SEQUENCE_NAME", "VARCHAR2(128)", Some("INVOICE_SEQ")),
5326            ("MIN_VALUE", "NUMBER(28)", Some("1")),
5327            ("MAX_VALUE", "NUMBER(28)", Some("9999999999")),
5328            ("INCREMENT_BY", "NUMBER(28)", Some("1")),
5329            ("CYCLE_FLAG", "VARCHAR2(1)", Some("N")),
5330            ("ORDER_FLAG", "VARCHAR2(1)", Some("N")),
5331            ("CACHE_SIZE", "NUMBER(28)", Some("20")),
5332        ])];
5333        let type_attr_rows = vec![
5334            oracle_row(&[
5335                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5336                ("TYPE_NAME", "VARCHAR2(128)", Some("ADDRESS_T")),
5337                ("ATTR_NAME", "VARCHAR2(128)", Some("STREET")),
5338                ("ATTR_NO", "NUMBER(10)", Some("1")),
5339                ("ATTR_TYPE_OWNER", "VARCHAR2(128)", None),
5340                ("ATTR_TYPE_NAME", "VARCHAR2(128)", Some("VARCHAR2")),
5341                ("LENGTH", "NUMBER(10)", Some("200")),
5342                ("PRECISION", "NUMBER(10)", None),
5343                ("SCALE", "NUMBER(10)", None),
5344            ]),
5345            oracle_row(&[
5346                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5347                ("TYPE_NAME", "VARCHAR2(128)", Some("ADDRESS_T")),
5348                ("ATTR_NAME", "VARCHAR2(128)", Some("ZIP")),
5349                ("ATTR_NO", "NUMBER(10)", Some("2")),
5350                ("ATTR_TYPE_OWNER", "VARCHAR2(128)", None),
5351                ("ATTR_TYPE_NAME", "VARCHAR2(128)", Some("VARCHAR2")),
5352                ("LENGTH", "NUMBER(10)", Some("10")),
5353                ("PRECISION", "NUMBER(10)", None),
5354                ("SCALE", "NUMBER(10)", None),
5355            ]),
5356        ];
5357        let grant_rows = vec![
5358            oracle_row(&[
5359                ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
5360                ("TABLE_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5361                ("GRANTEE", "VARCHAR2(128)", Some("PUBLIC")),
5362                ("PRIVILEGE", "VARCHAR2(40)", Some("SELECT")),
5363                ("GRANTABLE", "VARCHAR2(3)", Some("NO")),
5364                ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
5365            ]),
5366            oracle_row(&[
5367                ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
5368                ("TABLE_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5369                ("GRANTEE", "VARCHAR2(128)", Some("REPORTING_ROLE")),
5370                ("PRIVILEGE", "VARCHAR2(40)", Some("UPDATE")),
5371                ("GRANTABLE", "VARCHAR2(3)", Some("YES")),
5372                ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
5373            ]),
5374        ];
5375
5376        // PLSQL-CAT-NEW-3 / oracle-fmro: one root edition + one child
5377        // edition + one editioning view to exercise both EBR paths.
5378        let edition_rows = vec![
5379            oracle_row(&[
5380                ("EDITION_NAME", "VARCHAR2(128)", Some("ORA$BASE")),
5381                ("PARENT_EDITION_NAME", "VARCHAR2(128)", None),
5382                ("USABLE", "VARCHAR2(1)", Some("Y")),
5383            ]),
5384            oracle_row(&[
5385                ("EDITION_NAME", "VARCHAR2(128)", Some("PATCH_2026_05")),
5386                ("PARENT_EDITION_NAME", "VARCHAR2(128)", Some("ORA$BASE")),
5387                ("USABLE", "VARCHAR2(1)", Some("Y")),
5388            ]),
5389        ];
5390        let editioning_view_rows = vec![oracle_row(&[
5391            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5392            ("VIEW_NAME", "VARCHAR2(128)", Some("INVOICES_E1")),
5393            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5394        ])];
5395
5396        // PLSQL-CAT-NEW-2 / oracle-c0gg: one VPD policy that gates
5397        // SELECT-only on INVOICES — exercises the yn() column reader
5398        // and the SchemaCatalog::vpd_policies path.
5399        let vpd_policy_rows = vec![oracle_row(&[
5400            ("OBJECT_OWNER", "VARCHAR2(128)", Some("BILLING")),
5401            ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
5402            ("POLICY_GROUP", "VARCHAR2(128)", None),
5403            (
5404                "POLICY_NAME",
5405                "VARCHAR2(128)",
5406                Some("BILLING_TENANT_ISOLATION"),
5407            ),
5408            ("PF_OWNER", "VARCHAR2(128)", Some("SECURITY")),
5409            ("PACKAGE", "VARCHAR2(128)", Some("RLS_PKG")),
5410            ("FUNCTION", "VARCHAR2(128)", Some("TENANT_PREDICATE")),
5411            ("SEL", "VARCHAR2(3)", Some("YES")),
5412            ("INS", "VARCHAR2(3)", Some("NO")),
5413            ("UPD", "VARCHAR2(3)", Some("NO")),
5414            ("DEL", "VARCHAR2(3)", Some("NO")),
5415            ("ENABLE", "VARCHAR2(3)", Some("YES")),
5416        ])];
5417
5418        // PLSQL-CAT-NEW-5 / oracle-grs0: one table comment + one column
5419        // comment exercise both apply_*_comment_row paths.
5420        let table_comment_rows = vec![oracle_row(&[
5421            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5422            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5423            ("TABLE_TYPE", "VARCHAR2(11)", Some("TABLE")),
5424            (
5425                "COMMENTS",
5426                "VARCHAR2(4000)",
5427                Some("Customer invoice header rows; one per invoice"),
5428            ),
5429        ])];
5430        let column_comment_rows = vec![oracle_row(&[
5431            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5432            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5433            ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
5434            (
5435                "COMMENTS",
5436                "VARCHAR2(4000)",
5437                Some("Primary key; surrogate, allocated from INVOICES_SEQ"),
5438            ),
5439        ])];
5440
5441        // One private link (BILLING.REPORTING_LINK) and one public link
5442        // (PUBLIC.WORLDWIDE_LINK) — exercises both code paths in
5443        // `apply_db_link_row` (PLSQL-CAT-NEW-1 / oracle-rr4y).
5444        let db_link_rows = vec![
5445            oracle_row(&[
5446                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5447                ("DB_LINK", "VARCHAR2(128)", Some("REPORTING_LINK")),
5448                ("HOST", "VARCHAR2(2000)", Some("reporting-db.internal")),
5449            ]),
5450            oracle_row(&[
5451                ("OWNER", "VARCHAR2(128)", Some("PUBLIC")),
5452                ("DB_LINK", "VARCHAR2(128)", Some("WORLDWIDE_LINK")),
5453                ("HOST", "VARCHAR2(2000)", Some("//world.example.com/PROD")),
5454            ]),
5455        ];
5456
5457        let billing_only = vec![OracleBind::from("BILLING")];
5458        let mut expected_queries = capability_probe_expectations();
5459        // Only BILLING is a real user; REPORTING_ROLE is deliberately absent
5460        // so the UPDATE grant to it classifies as a role, not a user.
5461        expected_queries.push(all_users_expectation(&["BILLING"]));
5462        expected_queries.extend(vec![
5463            QueryExpectation {
5464                sql_contains: String::from("from all_objects"),
5465                params: billing_only.clone(),
5466                rows: object_rows,
5467            },
5468            QueryExpectation {
5469                sql_contains: String::from("from all_tab_cols"),
5470                params: billing_only.clone(),
5471                rows: vec![],
5472            },
5473            QueryExpectation {
5474                sql_contains: String::from("from all_constraints c"),
5475                params: billing_only.clone(),
5476                rows: vec![],
5477            },
5478            QueryExpectation {
5479                sql_contains: String::from("from all_indexes i"),
5480                params: billing_only.clone(),
5481                rows: vec![],
5482            },
5483            QueryExpectation {
5484                sql_contains: String::from("from all_triggers"),
5485                params: billing_only.clone(),
5486                rows: vec![],
5487            },
5488            QueryExpectation {
5489                sql_contains: String::from("from all_synonyms"),
5490                params: billing_only.clone(),
5491                rows: vec![],
5492            },
5493            QueryExpectation {
5494                sql_contains: String::from("from all_procedures"),
5495                params: billing_only.clone(),
5496                rows: vec![],
5497            },
5498            QueryExpectation {
5499                sql_contains: String::from("from all_arguments"),
5500                params: billing_only.clone(),
5501                rows: vec![],
5502            },
5503            QueryExpectation {
5504                sql_contains: String::from("from all_views"),
5505                params: billing_only.clone(),
5506                rows: view_rows,
5507            },
5508            QueryExpectation {
5509                sql_contains: String::from("from all_mviews"),
5510                params: billing_only.clone(),
5511                rows: mview_rows,
5512            },
5513            QueryExpectation {
5514                sql_contains: String::from("from all_sequences"),
5515                params: billing_only.clone(),
5516                rows: sequence_rows,
5517            },
5518            QueryExpectation {
5519                sql_contains: String::from("from all_type_attrs"),
5520                params: billing_only.clone(),
5521                rows: type_attr_rows,
5522            },
5523            QueryExpectation {
5524                sql_contains: String::from("from all_tab_privs"),
5525                params: billing_only.clone(),
5526                rows: grant_rows,
5527            },
5528            QueryExpectation {
5529                sql_contains: String::from("from all_db_links"),
5530                params: billing_only.clone(),
5531                rows: db_link_rows,
5532            },
5533            QueryExpectation {
5534                sql_contains: String::from("from all_tab_comments"),
5535                params: billing_only.clone(),
5536                rows: table_comment_rows,
5537            },
5538            QueryExpectation {
5539                sql_contains: String::from("from all_col_comments"),
5540                params: billing_only.clone(),
5541                rows: column_comment_rows,
5542            },
5543            QueryExpectation {
5544                sql_contains: String::from("from all_editions"),
5545                params: vec![],
5546                rows: edition_rows,
5547            },
5548            QueryExpectation {
5549                sql_contains: String::from("from all_editioning_views"),
5550                params: billing_only.clone(),
5551                rows: editioning_view_rows,
5552            },
5553            QueryExpectation {
5554                sql_contains: String::from("from all_policies"),
5555                params: billing_only.clone(),
5556                rows: vpd_policy_rows,
5557            },
5558            QueryExpectation {
5559                sql_contains: String::from("from all_dependencies"),
5560                params: billing_only,
5561                rows: vec![],
5562            },
5563        ]);
5564        let connection = StaticConnection {
5565            expected_queries,
5566            ..StaticConnection::default()
5567        };
5568
5569        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
5570
5571        let schema = snapshot
5572            .profile
5573            .current_schema
5574            .expect("current schema interned");
5575        let schema_catalog = snapshot
5576            .schemas
5577            .get(&schema)
5578            .expect("billing schema catalog should exist");
5579
5580        let view = schema_catalog
5581            .objects
5582            .values()
5583            .find_map(|object| match object {
5584                CatalogObject::View(view) => Some(view),
5585                _ => None,
5586            })
5587            .expect("view metadata present");
5588        assert!(view.query_hash.is_some());
5589        assert_eq!(view.read_only, Some(true));
5590
5591        let mview = schema_catalog
5592            .objects
5593            .values()
5594            .find_map(|object| match object {
5595                CatalogObject::MaterializedView(metadata) => Some(metadata),
5596                _ => None,
5597            })
5598            .expect("mview metadata present");
5599        assert_eq!(mview.refresh_mode.as_deref(), Some("DEMAND"));
5600        assert_eq!(mview.refresh_method.as_deref(), Some("COMPLETE"));
5601        assert!(mview.query_hash.is_some());
5602
5603        let sequence = schema_catalog
5604            .objects
5605            .values()
5606            .find_map(|object| match object {
5607                CatalogObject::Sequence(metadata) => Some(metadata),
5608                _ => None,
5609            })
5610            .expect("sequence metadata present");
5611        assert_eq!(sequence.increment_by, 1);
5612        assert_eq!(sequence.min_value, Some(1));
5613        assert_eq!(sequence.max_value, Some(9999999999));
5614        assert!(!sequence.cycle);
5615        assert!(!sequence.ordered);
5616        assert_eq!(sequence.cache_size, Some(20));
5617
5618        let type_metadata = schema_catalog
5619            .objects
5620            .values()
5621            .find_map(|object| match object {
5622                CatalogObject::Type(metadata) => Some(metadata),
5623                _ => None,
5624            })
5625            .expect("type metadata present");
5626        assert_eq!(type_metadata.attributes.len(), 2);
5627        assert_eq!(type_metadata.attributes[0].position, 1);
5628        assert_eq!(type_metadata.attributes[1].position, 2);
5629        assert_eq!(type_metadata.attributes[0].data_type.name, "VARCHAR2");
5630        assert_eq!(type_metadata.attributes[0].data_type.length, Some(200));
5631
5632        let public_grant = schema_catalog
5633            .grants
5634            .iter()
5635            .find(|grant| matches!(grant.grantee, crate::Grantee::Public))
5636            .expect("public grant present");
5637        assert!(matches!(
5638            public_grant.privilege,
5639            crate::GrantPrivilege::Select
5640        ));
5641        // REPORTING_ROLE is not in the ALL_USERS fixture, so the UPDATE
5642        // grant to it must classify as a role grant (it was previously,
5643        // and incorrectly, recorded as a direct user grant — oracle-qm3q.2).
5644        let role_grant = schema_catalog
5645            .grants
5646            .iter()
5647            .find(|grant| matches!(grant.grantee, crate::Grantee::Role(_)))
5648            .expect("role grant present");
5649        let crate::Grantee::Role(role) = &role_grant.grantee else {
5650            unreachable!("matched Grantee::Role above");
5651        };
5652        assert_eq!(
5653            snapshot.interner.resolve(role.symbol()),
5654            Some("REPORTING_ROLE")
5655        );
5656        assert!(matches!(
5657            role_grant.privilege,
5658            crate::GrantPrivilege::Update
5659        ));
5660        assert!(role_grant.grantable);
5661        // And no grantee is misclassified as a direct user in this fixture.
5662        assert!(
5663            !schema_catalog
5664                .grants
5665                .iter()
5666                .any(|grant| matches!(grant.grantee, crate::Grantee::User(_)))
5667        );
5668
5669        // PLSQL-CAT-NEW-1 / oracle-rr4y: ALL_DB_LINKS rows lower into
5670        // the owning schema's `db_links` list. The fixture seeds one
5671        // private link on BILLING and one public link on the synthetic
5672        // PUBLIC schema — both must surface with correct `public_link`
5673        // classification and the raw HOST string preserved.
5674        let private_link = schema_catalog
5675            .db_links
5676            .iter()
5677            .find(|link| link.name.eq("REPORTING_LINK"))
5678            .expect("private db link present on BILLING schema");
5679        assert!(!private_link.public_link);
5680        assert_eq!(private_link.host.as_deref(), Some("reporting-db.internal"));
5681
5682        // Locate the synthetic PUBLIC schema by resolving each interned
5683        // schema name back through the interner — the loader materialized
5684        // it on demand when a `PUBLIC`-owned link was applied, but no
5685        // ergonomic lookup-by-text exists on SymbolInterner yet.
5686        let public_link = snapshot
5687            .schemas
5688            .iter()
5689            .find_map(|(schema_id, schema_catalog)| {
5690                let label = snapshot.interner.resolve(schema_id.symbol())?;
5691                if label.eq_ignore_ascii_case("PUBLIC") {
5692                    schema_catalog
5693                        .db_links
5694                        .iter()
5695                        .find(|link| link.name.eq("WORLDWIDE_LINK"))
5696                } else {
5697                    None
5698                }
5699            })
5700            .expect("public db link present on PUBLIC synthetic schema");
5701        assert!(public_link.public_link);
5702        assert_eq!(
5703            public_link.host.as_deref(),
5704            Some("//world.example.com/PROD")
5705        );
5706
5707        // PLSQL-CAT-NEW-2 / oracle-c0gg: ALL_POLICIES row lands in
5708        // SchemaCatalog::vpd_policies with SEL/INS/UPD/DEL/ENABLE
5709        // correctly decoded from the Y/N/YES/NO mix in dictionary text.
5710        assert_eq!(schema_catalog.vpd_policies.len(), 1);
5711        let policy = &schema_catalog.vpd_policies[0];
5712        assert_eq!(policy.policy_name, "BILLING_TENANT_ISOLATION");
5713        assert_eq!(policy.function_name, "TENANT_PREDICATE");
5714        assert_eq!(policy.function_package.as_deref(), Some("RLS_PKG"));
5715        assert!(policy.policy_group.is_none());
5716        assert!(policy.on_select);
5717        assert!(!policy.on_insert);
5718        assert!(!policy.on_update);
5719        assert!(!policy.on_delete);
5720        assert!(policy.enabled);
5721
5722        // PLSQL-CAT-NEW-3 / oracle-fmro: ALL_EDITIONS rows land in
5723        // CatalogSnapshot::editions; ALL_EDITIONING_VIEWS rows land in
5724        // SchemaCatalog::editioning_views.
5725        assert_eq!(snapshot.editions.len(), 2);
5726        let root = snapshot
5727            .editions
5728            .iter()
5729            .find(|e| e.edition_name.eq("ORA$BASE"))
5730            .expect("root edition present");
5731        assert!(root.parent_edition_name.is_none());
5732        assert!(root.usable);
5733        let child = snapshot
5734            .editions
5735            .iter()
5736            .find(|e| e.edition_name.eq("PATCH_2026_05"))
5737            .expect("child edition present");
5738        assert_eq!(child.parent_edition_name.as_deref(), Some("ORA$BASE"));
5739        assert_eq!(schema_catalog.editioning_views.len(), 1);
5740        let ev = &schema_catalog.editioning_views[0];
5741        let view_label = snapshot.interner.resolve(ev.view_name.symbol()).unwrap();
5742        let table_label = snapshot.interner.resolve(ev.table_name.symbol()).unwrap();
5743        assert_eq!(view_label, "INVOICES_E1");
5744        assert_eq!(table_label, "INVOICES");
5745
5746        // PLSQL-CAT-NEW-5 / oracle-grs0: ALL_TAB_COMMENTS row reaches
5747        // SchemaCatalog::table_comments with TABLE_TYPE + COMMENTS
5748        // preserved verbatim.
5749        assert_eq!(schema_catalog.table_comments.len(), 1);
5750        let table_comment = &schema_catalog.table_comments[0];
5751        assert_eq!(table_comment.table_type, "TABLE");
5752        assert_eq!(
5753            table_comment.comments,
5754            "Customer invoice header rows; one per invoice"
5755        );
5756        // ALL_COL_COMMENTS row lands in column_comments with the
5757        // interned ColumnName.
5758        assert_eq!(schema_catalog.column_comments.len(), 1);
5759        assert_eq!(
5760            schema_catalog.column_comments[0].comments,
5761            "Primary key; surrogate, allocated from INVOICES_SEQ"
5762        );
5763    }
5764
5765    #[test]
5766    fn load_snapshot_from_connection_requires_current_schema_when_requested() {
5767        let mut connection_info = StaticConnection::default().connection_info;
5768        connection_info.current_schema = None;
5769        let connection = StaticConnection {
5770            connection_info,
5771            ..StaticConnection::default()
5772        };
5773
5774        let error = load_snapshot_for_test(&connection, &CatalogLoadRequest::default());
5775
5776        assert!(matches!(error, Err(CatalogError::CurrentSchemaUnavailable)));
5777    }
5778
5779    #[test]
5780    fn oracle_connection_default_helpers_enforce_row_cardinality() {
5781        let mut row = OracleRow::default();
5782        row.insert(
5783            "schema_name",
5784            "VARCHAR2(128)",
5785            Some(String::from("billing")),
5786        );
5787
5788        let single = StaticConnection {
5789            rows: vec![row.clone()],
5790            row_count: 1,
5791            ..StaticConnection::default()
5792        };
5793        let multiple = StaticConnection {
5794            rows: vec![row.clone(), row],
5795            row_count: 2,
5796            ..StaticConnection::default()
5797        };
5798
5799        let single_result = run_catalog_future(async {
5800            let cx = test_cx();
5801            single.query_one_row(&cx, "select * from dual", &[]).await
5802        });
5803        assert!(single_result.is_ok());
5804        let multiple_result = run_catalog_future(async {
5805            let cx = test_cx();
5806            multiple
5807                .query_optional_row(&cx, "select * from dual", &[])
5808                .await
5809        });
5810        assert!(matches!(
5811            multiple_result,
5812            Err(CatalogError::UnexpectedRowCount { expected, actual })
5813                if expected.eq("0 or 1") && actual.eq(&2)
5814        ));
5815    }
5816
5817    #[test]
5818    fn oracle_connect_options_capture_session_overrides() {
5819        let options = OracleConnectOptions::new("scott", "tiger", "//localhost/XE")
5820            .with_current_schema("billing")
5821            .with_module("plsql-intelligence")
5822            .with_action("catalog-extract")
5823            .with_client_info("tests")
5824            .with_client_identifier("unit");
5825
5826        assert_eq!(options.current_schema.as_deref(), Some("billing"));
5827        assert_eq!(options.module.as_deref(), Some("plsql-intelligence"));
5828        assert_eq!(options.action.as_deref(), Some("catalog-extract"));
5829        assert_eq!(options.client_info.as_deref(), Some("tests"));
5830        assert_eq!(options.client_identifier.as_deref(), Some("unit"));
5831    }
5832
5833    #[test]
5834    fn catalog_snapshot_initializes_with_empty_schema_map() {
5835        let snapshot = CatalogSnapshot::new(
5836            AnalysisProfile::default(),
5837            CatalogCapabilities::default(),
5838            CatalogSource {
5839                kind: CatalogSourceKind::SyntheticTestCatalog,
5840                path: None,
5841                description: Some(String::from("fixture")),
5842            },
5843            DateTime::<Utc>::UNIX_EPOCH,
5844        );
5845
5846        assert!(snapshot.schemas.is_empty());
5847        assert!(snapshot.interner.is_empty());
5848        assert_eq!(snapshot.generated_at, DateTime::<Utc>::UNIX_EPOCH);
5849        assert_eq!(
5850            snapshot.source.kind,
5851            CatalogSourceKind::SyntheticTestCatalog
5852        );
5853    }
5854
5855    #[test]
5856    fn schema_catalog_can_hold_structural_lookup_maps() {
5857        let mut schema_catalog = SchemaCatalog::default();
5858        let object_name = ObjectName::from(SymbolId::new(2));
5859        let column_name = ColumnName::from(SymbolId::new(3));
5860        let owner = SchemaName::from(SymbolId::new(1));
5861
5862        let table = TableMetadata {
5863            common: ObjectCommon {
5864                owner,
5865                name: object_name,
5866                object_type: ObjectType::Table,
5867                status: ObjectStatus::Valid,
5868                source_hash: Some(Hash::new("abc123")),
5869                ..ObjectCommon::default()
5870            },
5871            columns: HashMap::from([(
5872                column_name,
5873                crate::ColumnMetadata {
5874                    name: column_name,
5875                    position: 1,
5876                    data_type: DataTypeRef {
5877                        name: String::from("NUMBER"),
5878                        precision: Some(10),
5879                        ..DataTypeRef::default()
5880                    },
5881                    nullable: false,
5882                    ..crate::ColumnMetadata::default()
5883                },
5884            )]),
5885            ..TableMetadata::default()
5886        };
5887
5888        schema_catalog
5889            .objects
5890            .insert(object_name, CatalogObject::Table(table));
5891        schema_catalog.synonyms.insert(
5892            SynonymName::from(SymbolId::new(4)),
5893            SynonymTarget {
5894                target_owner: Some(owner),
5895                target_name: object_name,
5896                public_synonym: false,
5897                ..SynonymTarget::default()
5898            },
5899        );
5900
5901        assert_eq!(schema_catalog.objects.len(), 1);
5902        assert_eq!(schema_catalog.synonyms.len(), 1);
5903    }
5904
5905    #[test]
5906    fn package_and_plscope_models_capture_signature_state() {
5907        let owner = SchemaName::from(SymbolId::new(10));
5908        let package_name = ObjectName::from(SymbolId::new(11));
5909        let procedure_name = ObjectName::from(SymbolId::new(12));
5910        let member_name = MemberName::from(SymbolId::new(13));
5911
5912        let package = PackageMetadata {
5913            common: ObjectCommon {
5914                owner,
5915                name: package_name,
5916                object_type: ObjectType::Package,
5917                status: ObjectStatus::Valid,
5918                ..ObjectCommon::default()
5919            },
5920            procedures: vec![RoutineSignature {
5921                routine_name: procedure_name,
5922                arguments: vec![crate::ArgumentMetadata {
5923                    position: 1,
5924                    name: Some(member_name),
5925                    data_type: DataTypeRef {
5926                        name: String::from("VARCHAR2"),
5927                        length: Some(30),
5928                        ..DataTypeRef::default()
5929                    },
5930                    ..crate::ArgumentMetadata::default()
5931                }],
5932                accessible_by: vec![AccessibleByTarget {
5933                    owner: Some(owner),
5934                    object_name: package_name,
5935                }],
5936                ..RoutineSignature::default()
5937            }],
5938            ..PackageMetadata::default()
5939        };
5940
5941        let plscope = PlScopeSnapshot {
5942            availability: PlScopeAvailability::IdentifiersAndStatements,
5943            identifiers: vec![CompilerIdentifier {
5944                owner,
5945                object_name: package_name,
5946                identifier_name: member_name,
5947                identifier_type: String::from("FORMAL IN"),
5948                usage: String::from("DECLARATION"),
5949                line: 4,
5950                column: 12,
5951            }],
5952            ..PlScopeSnapshot::default()
5953        };
5954
5955        assert_eq!(package.procedures.len(), 1);
5956        assert_eq!(package.procedures[0].accessible_by.len(), 1);
5957        assert_eq!(
5958            plscope.availability,
5959            PlScopeAvailability::IdentifiersAndStatements
5960        );
5961        assert_eq!(plscope.identifiers[0].line, 4);
5962        assert_eq!(ConstraintType::ForeignKey, ConstraintType::ForeignKey);
5963        assert_eq!(TypeFinality::Unknown, TypeFinality::default());
5964        assert_eq!(TypeInstantiable::Unknown, TypeInstantiable::default());
5965        assert_eq!(TriggerName::from(SymbolId::new(14)).symbol().get(), 14);
5966    }
5967
5968    #[test]
5969    fn catalog_snapshot_round_trips_through_versioned_json_document() {
5970        let tempdir = tempdir();
5971        assert!(tempdir.is_ok());
5972        let tempdir = if let Ok(tempdir) = tempdir {
5973            tempdir
5974        } else {
5975            return;
5976        };
5977
5978        let mut snapshot = CatalogSnapshot::new(
5979            AnalysisProfile::default(),
5980            CatalogCapabilities::default(),
5981            CatalogSource {
5982                kind: CatalogSourceKind::JsonSnapshot,
5983                path: None,
5984                description: Some(String::from("roundtrip")),
5985            },
5986            DateTime::<Utc>::UNIX_EPOCH,
5987        );
5988        let billing = snapshot.intern_schema_name("billing");
5989        let claims = snapshot.intern_object_name("claims");
5990        assert!(billing.is_some());
5991        assert!(claims.is_some());
5992
5993        let path = tempdir.path().join("snapshot.json");
5994        let exported = export_snapshot_to_json(&snapshot, &path);
5995        assert!(exported.is_ok());
5996
5997        let loaded = load_snapshot_from_json(&path);
5998        assert!(loaded.is_ok());
5999        assert_eq!(loaded.ok(), Some(snapshot.clone()));
6000
6001        let rendered = std::fs::read_to_string(path);
6002        assert!(rendered.is_ok());
6003        let rendered = if let Ok(rendered) = rendered {
6004            rendered
6005        } else {
6006            return;
6007        };
6008        let document = serde_json::from_str::<CatalogSnapshotDocument>(&rendered);
6009        assert!(document.is_ok());
6010        let document = if let Ok(document) = document {
6011            document
6012        } else {
6013            return;
6014        };
6015
6016        assert!(document.schema_id.as_str().eq(CATALOG_SNAPSHOT_SCHEMA_ID));
6017        assert!(matches!(
6018            document
6019                .schema_version
6020                .cmp(&CATALOG_SNAPSHOT_SCHEMA_VERSION),
6021            std::cmp::Ordering::Equal
6022        ));
6023        assert_eq!(
6024            document
6025                .snapshot
6026                .interner
6027                .resolve(billing.unwrap_or_default().symbol()),
6028            Some("billing")
6029        );
6030        assert_eq!(
6031            document
6032                .snapshot
6033                .interner
6034                .resolve(claims.unwrap_or_default().symbol()),
6035            Some("claims")
6036        );
6037    }
6038
6039    #[test]
6040    fn load_from_dbms_metadata_dir_classifies_create_statements() {
6041        let dir = tempdir().unwrap();
6042        let root = dir.path();
6043
6044        // Write some .sql files
6045        std::fs::write(
6046            root.join("customers.sql"),
6047            "CREATE TABLE customers (id NUMBER, name VARCHAR2(100));",
6048        )
6049        .unwrap();
6050        std::fs::write(
6051            root.join("billing_api.sql"),
6052            "CREATE OR REPLACE PACKAGE billing_api AS\n  PROCEDURE charge(p_id NUMBER);\nEND;",
6053        )
6054        .unwrap();
6055        std::fs::write(
6056            root.join("invoice_seq.sql"),
6057            "CREATE SEQUENCE invoice_seq START WITH 1 INCREMENT BY 1;",
6058        )
6059        .unwrap();
6060        std::fs::write(root.join("skip.txt"), "not a sql file").unwrap();
6061
6062        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6063        assert_eq!(snapshot.source.kind, CatalogSourceKind::DbmsMetadataFiles);
6064
6065        // Should have found objects in a default schema
6066        let total_objects: usize = snapshot.schemas.values().map(|s| s.objects.len()).sum();
6067        assert!(
6068            total_objects >= 2,
6069            "expected at least 2 objects, got {}",
6070            total_objects
6071        );
6072    }
6073
6074    #[test]
6075    fn load_from_dbms_metadata_dir_returns_error_for_nonexistent_dir() {
6076        let result = load_from_dbms_metadata_dir(std::path::Path::new("/nonexistent/path"));
6077        assert!(result.is_err());
6078    }
6079
6080    #[test]
6081    fn load_from_dbms_metadata_dir_handles_empty_dir() {
6082        let dir = tempdir().unwrap();
6083        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
6084        assert!(
6085            snapshot.schemas.is_empty() || snapshot.schemas.values().all(|s| s.objects.is_empty())
6086        );
6087    }
6088
6089    /// DDL with a qualified `OWNER.OBJECT` prefix must be filed under the
6090    /// real owner schema, never collapsed to a single shared bucket. A
6091    /// multi-schema DBMS_METADATA dump that lands under one schema would
6092    /// silently lose cross-schema topology.
6093    #[test]
6094    fn load_from_dbms_metadata_dir_records_real_schema_owner() {
6095        let dir = tempdir().unwrap();
6096        let root = dir.path();
6097        std::fs::write(
6098            root.join("hr_employees.sql"),
6099            "CREATE TABLE hr.employees (id NUMBER PRIMARY KEY);",
6100        )
6101        .unwrap();
6102        std::fs::write(
6103            root.join("billing_invoices.sql"),
6104            "CREATE TABLE billing.invoices (id NUMBER, amount NUMBER(12,2));",
6105        )
6106        .unwrap();
6107        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6108
6109        // Two distinct owner schemas must be present — not collapsed.
6110        let schema_names: std::collections::HashSet<String> = snapshot
6111            .schemas
6112            .keys()
6113            .filter_map(|s| snapshot.interner.resolve(s.symbol()).map(str::to_string))
6114            .collect();
6115        assert!(
6116            schema_names.contains("HR"),
6117            "HR schema bucket must exist; got {schema_names:?}"
6118        );
6119        assert!(
6120            schema_names.contains("BILLING"),
6121            "BILLING schema bucket must exist; got {schema_names:?}"
6122        );
6123
6124        // Each schema bucket holds exactly its own object — never the
6125        // other's.
6126        for (schema, bucket) in &snapshot.schemas {
6127            let name = snapshot.interner.resolve(schema.symbol()).unwrap();
6128            assert_eq!(
6129                bucket.objects.len(),
6130                1,
6131                "{name} bucket must hold exactly one object"
6132            );
6133        }
6134    }
6135
6136    /// Unqualified CREATE statements (no owner prefix) must land in a
6137    /// stable named schema (e.g. `PUBLIC`) interned through the regular
6138    /// interner — never `SymbolId::new(0)` which collides with whatever
6139    /// the first interner entry happens to be.
6140    #[test]
6141    fn load_from_dbms_metadata_dir_uses_named_default_schema_for_unqualified_ddl() {
6142        let dir = tempdir().unwrap();
6143        let root = dir.path();
6144        std::fs::write(
6145            root.join("customers.sql"),
6146            "CREATE TABLE customers (id NUMBER, name VARCHAR2(100));",
6147        )
6148        .unwrap();
6149        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6150
6151        let schema_names: std::collections::HashSet<String> = snapshot
6152            .schemas
6153            .keys()
6154            .filter_map(|s| snapshot.interner.resolve(s.symbol()).map(str::to_string))
6155            .collect();
6156        assert!(
6157            schema_names.contains("PUBLIC"),
6158            "default schema bucket (PUBLIC) must exist for unqualified DDL; got {schema_names:?}"
6159        );
6160    }
6161
6162    /// The classifier must actually record the raw DDL text on the
6163    /// produced `CatalogObject` — the original docstring promised this
6164    /// and downstream consumers (doc generation, lineage) rely on it.
6165    #[test]
6166    fn load_from_dbms_metadata_dir_records_raw_ddl_text_on_object() {
6167        let dir = tempdir().unwrap();
6168        let root = dir.path();
6169        let raw = "CREATE TABLE hr.orders (id NUMBER PRIMARY KEY, total NUMBER(12,2));";
6170        std::fs::write(root.join("orders.sql"), raw).unwrap();
6171        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6172
6173        let bucket = snapshot
6174            .schemas
6175            .values()
6176            .find(|b| !b.objects.is_empty())
6177            .expect("at least one schema bucket with an object");
6178        let obj = bucket.objects.values().next().unwrap();
6179        let common = match obj {
6180            CatalogObject::Table(t) => Some(&t.common),
6181            _ => None,
6182        }
6183        .expect("expected Table");
6184        let ddl = common
6185            .ddl
6186            .as_ref()
6187            .expect("CatalogObject must carry its raw DDL text");
6188        assert_eq!(ddl.ddl_text, raw, "ddl_text must round-trip the source DDL");
6189    }
6190
6191    #[test]
6192    fn load_snapshot_populates_object_metadata_and_dependency_rows() {
6193        let object_rows = vec![
6194            oracle_row(&[
6195                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6196                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6197                ("OBJECT_TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6198                ("STATUS", "VARCHAR2(7)", Some("INVALID")),
6199                (
6200                    "LAST_DDL_TIME_ISO",
6201                    "VARCHAR2(19)",
6202                    Some("2026-05-01T13:14:15"),
6203                ),
6204                ("EDITIONABLE", "VARCHAR2(1)", Some("Y")),
6205                ("EDITION_NAME", "VARCHAR2(128)", Some("ORA$BASE")),
6206            ]),
6207            oracle_row(&[
6208                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6209                ("OBJECT_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
6210                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
6211                ("STATUS", "VARCHAR2(7)", Some("VALID")),
6212                (
6213                    "LAST_DDL_TIME_ISO",
6214                    "VARCHAR2(19)",
6215                    Some("2026-04-22T08:30:00"),
6216                ),
6217                ("EDITIONABLE", "VARCHAR2(1)", None),
6218                ("EDITION_NAME", "VARCHAR2(128)", None),
6219            ]),
6220        ];
6221
6222        let dependency_rows = vec![
6223            oracle_row(&[
6224                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6225                ("NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6226                ("TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6227                ("REFERENCED_OWNER", "VARCHAR2(128)", Some("BILLING")),
6228                ("REFERENCED_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
6229                ("REFERENCED_TYPE", "VARCHAR2(30)", Some("TABLE")),
6230                ("DEPENDENCY_TYPE", "VARCHAR2(4)", Some("HARD")),
6231            ]),
6232            oracle_row(&[
6233                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6234                ("NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6235                ("TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6236                ("REFERENCED_OWNER", "VARCHAR2(128)", Some("SYS")),
6237                ("REFERENCED_NAME", "VARCHAR2(128)", Some("DBMS_OUTPUT")),
6238                ("REFERENCED_TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6239                ("DEPENDENCY_TYPE", "VARCHAR2(4)", Some("REF")),
6240            ]),
6241        ];
6242
6243        let billing_only = vec![OracleBind::from("BILLING")];
6244        let mut expected_queries = capability_probe_expectations();
6245        // No grants are asserted here, but live extraction still issues the
6246        // ALL_USERS probe before ALL_TAB_PRIVS.
6247        expected_queries.push(all_users_expectation(&["BILLING"]));
6248        expected_queries.extend(vec![
6249            QueryExpectation {
6250                sql_contains: String::from("from all_objects"),
6251                params: billing_only.clone(),
6252                rows: object_rows,
6253            },
6254            QueryExpectation {
6255                sql_contains: String::from("from all_tab_cols"),
6256                params: billing_only.clone(),
6257                rows: vec![],
6258            },
6259            QueryExpectation {
6260                sql_contains: String::from("from all_constraints c"),
6261                params: billing_only.clone(),
6262                rows: vec![],
6263            },
6264            QueryExpectation {
6265                sql_contains: String::from("from all_indexes i"),
6266                params: billing_only.clone(),
6267                rows: vec![],
6268            },
6269            QueryExpectation {
6270                sql_contains: String::from("from all_triggers"),
6271                params: billing_only.clone(),
6272                rows: vec![],
6273            },
6274            QueryExpectation {
6275                sql_contains: String::from("from all_synonyms"),
6276                params: billing_only.clone(),
6277                rows: vec![],
6278            },
6279            QueryExpectation {
6280                sql_contains: String::from("from all_procedures"),
6281                params: billing_only.clone(),
6282                rows: vec![],
6283            },
6284            QueryExpectation {
6285                sql_contains: String::from("from all_arguments"),
6286                params: billing_only.clone(),
6287                rows: vec![],
6288            },
6289            QueryExpectation {
6290                sql_contains: String::from("from all_views"),
6291                params: billing_only.clone(),
6292                rows: vec![],
6293            },
6294            QueryExpectation {
6295                sql_contains: String::from("from all_mviews"),
6296                params: billing_only.clone(),
6297                rows: vec![],
6298            },
6299            QueryExpectation {
6300                sql_contains: String::from("from all_sequences"),
6301                params: billing_only.clone(),
6302                rows: vec![],
6303            },
6304            QueryExpectation {
6305                sql_contains: String::from("from all_type_attrs"),
6306                params: billing_only.clone(),
6307                rows: vec![],
6308            },
6309            QueryExpectation {
6310                sql_contains: String::from("from all_tab_privs"),
6311                params: billing_only.clone(),
6312                rows: vec![],
6313            },
6314            QueryExpectation {
6315                sql_contains: String::from("from all_db_links"),
6316                params: billing_only.clone(),
6317                rows: vec![],
6318            },
6319            QueryExpectation {
6320                sql_contains: String::from("from all_tab_comments"),
6321                params: billing_only.clone(),
6322                rows: vec![],
6323            },
6324            QueryExpectation {
6325                sql_contains: String::from("from all_col_comments"),
6326                params: billing_only.clone(),
6327                rows: vec![],
6328            },
6329            QueryExpectation {
6330                sql_contains: String::from("from all_editions"),
6331                params: vec![],
6332                rows: vec![],
6333            },
6334            QueryExpectation {
6335                sql_contains: String::from("from all_editioning_views"),
6336                params: billing_only.clone(),
6337                rows: vec![],
6338            },
6339            QueryExpectation {
6340                sql_contains: String::from("from all_policies"),
6341                params: billing_only.clone(),
6342                rows: vec![],
6343            },
6344            QueryExpectation {
6345                sql_contains: String::from("from all_dependencies"),
6346                params: billing_only,
6347                rows: dependency_rows,
6348            },
6349        ]);
6350        let connection = StaticConnection {
6351            expected_queries,
6352            ..StaticConnection::default()
6353        };
6354
6355        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
6356        let schema = snapshot
6357            .profile
6358            .current_schema
6359            .expect("current schema interned");
6360        let schema_catalog = snapshot.schemas.get(&schema).expect("billing schema");
6361
6362        let billing_pkg = schema_catalog
6363            .objects
6364            .values()
6365            .find_map(|object| match object {
6366                CatalogObject::Package(metadata) => Some(metadata),
6367                _ => None,
6368            })
6369            .expect("billing pkg present");
6370        assert_eq!(billing_pkg.common.status, ObjectStatus::Invalid);
6371        assert_eq!(billing_pkg.common.editionable, Some(true));
6372        assert!(billing_pkg.common.edition_name.is_some());
6373        let last_ddl = billing_pkg
6374            .common
6375            .last_ddl_time
6376            .expect("last_ddl_time populated");
6377        // The fixture date is 2026-05-01 13:14:15 UTC.
6378        assert_eq!(last_ddl.timestamp(), 1_777_641_255);
6379
6380        let customers_table = schema_catalog
6381            .objects
6382            .values()
6383            .find_map(|object| match object {
6384                CatalogObject::Table(metadata) => Some(metadata),
6385                _ => None,
6386            })
6387            .expect("customers table");
6388        assert!(customers_table.common.editionable.is_none());
6389        assert!(customers_table.common.edition_name.is_none());
6390
6391        assert_eq!(schema_catalog.dependencies.len(), 2);
6392        let pkg_to_customers = schema_catalog
6393            .dependencies
6394            .iter()
6395            .find(|d| matches!(d.dependency_kind, CatalogDependencyKind::Hard))
6396            .expect("hard dependency");
6397        assert!(matches!(pkg_to_customers.object_type, ObjectType::Package));
6398        assert!(matches!(
6399            pkg_to_customers.referenced_type,
6400            Some(ObjectType::Table)
6401        ));
6402        let pkg_to_sys = schema_catalog
6403            .dependencies
6404            .iter()
6405            .find(|d| matches!(d.dependency_kind, CatalogDependencyKind::Reference))
6406            .expect("ref dependency");
6407        assert!(matches!(
6408            pkg_to_sys.referenced_type,
6409            Some(ObjectType::Package)
6410        ));
6411        assert_eq!(
6412            pkg_to_sys
6413                .referenced_owner
6414                .and_then(|owner| snapshot.interner.resolve(owner.symbol())),
6415            Some("SYS")
6416        );
6417    }
6418
6419    #[test]
6420    fn doctor_report_counts_objects_columns_and_indexes() {
6421        let connection = StaticConnection::default();
6422        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
6423        let mut snapshot = snapshot;
6424        let billing = snapshot.intern_schema_name("BILLING").expect("schema name");
6425        let invoices = snapshot
6426            .intern_object_name("INVOICES")
6427            .expect("object name");
6428        let invoice_view = snapshot
6429            .intern_object_name("INVOICE_VIEW")
6430            .expect("object name");
6431        let billing_seq = snapshot
6432            .intern_object_name("BILLING_SEQ")
6433            .expect("sequence name");
6434
6435        let invoice_id = snapshot
6436            .intern_column_name("INVOICE_ID")
6437            .expect("column name");
6438        let table_metadata = crate::TableMetadata {
6439            common: ObjectCommon {
6440                owner: billing,
6441                name: invoices,
6442                object_type: ObjectType::Table,
6443                status: ObjectStatus::Valid,
6444                ..ObjectCommon::default()
6445            },
6446            columns: HashMap::from([(
6447                invoice_id,
6448                crate::ColumnMetadata {
6449                    name: invoice_id,
6450                    position: 1,
6451                    ..crate::ColumnMetadata::default()
6452                },
6453            )]),
6454            ..crate::TableMetadata::default()
6455        };
6456
6457        let view_metadata = crate::ViewMetadata {
6458            common: ObjectCommon {
6459                owner: billing,
6460                name: invoice_view,
6461                object_type: ObjectType::View,
6462                status: ObjectStatus::Invalid,
6463                ..ObjectCommon::default()
6464            },
6465            ..crate::ViewMetadata::default()
6466        };
6467
6468        let sequence_metadata = crate::SequenceMetadata {
6469            common: ObjectCommon {
6470                owner: billing,
6471                name: billing_seq,
6472                object_type: ObjectType::Sequence,
6473                status: ObjectStatus::Valid,
6474                ..ObjectCommon::default()
6475            },
6476            ..crate::SequenceMetadata::default()
6477        };
6478
6479        let schema_catalog = snapshot.schemas.entry(billing).or_default();
6480        schema_catalog
6481            .objects
6482            .insert(invoices, CatalogObject::Table(table_metadata));
6483        schema_catalog
6484            .objects
6485            .insert(invoice_view, CatalogObject::View(view_metadata));
6486        schema_catalog
6487            .objects
6488            .insert(billing_seq, CatalogObject::Sequence(sequence_metadata));
6489
6490        let report = snapshot.doctor_report();
6491        assert_eq!(report.totals.objects_total, 3);
6492        assert_eq!(report.totals.columns_total, 1);
6493        let table_tile = report
6494            .object_counts
6495            .iter()
6496            .find(|tile| matches!(tile.object_type, ObjectType::Table))
6497            .expect("table tile");
6498        assert_eq!(table_tile.total, 1);
6499        assert_eq!(table_tile.valid, 1);
6500        let view_tile = report
6501            .object_counts
6502            .iter()
6503            .find(|tile| matches!(tile.object_type, ObjectType::View))
6504            .expect("view tile");
6505        assert_eq!(view_tile.invalid, 1);
6506        // Capability negotiation (PLSQL-CAT-017) probes the connection; the
6507        // StaticConnection mock returns Ok([]) for unmatched queries which
6508        // means every probe succeeds in this test path → no missing-permission
6509        // suggestions should appear.
6510        assert!(report.can_query_all_views);
6511        assert!(report.can_use_dbms_metadata);
6512        assert!(
6513            report.missing_permissions.is_empty(),
6514            "all probes succeeded on the mock so no permissions should be flagged"
6515        );
6516    }
6517
6518    #[test]
6519    fn doctor_report_suggests_grants_when_capabilities_are_missing() {
6520        let mut snapshot = CatalogSnapshot::new(
6521            plsql_core::AnalysisProfile::default(),
6522            CatalogCapabilities {
6523                can_query_all_views: true,
6524                can_query_dba_views: false,
6525                can_use_dbms_metadata: false,
6526                can_read_source: false,
6527                plscope_enabled: false,
6528                can_query_scheduler: false,
6529                can_query_roles_and_grants: false,
6530                warnings: vec![CapabilityWarning {
6531                    code: String::from("catalog-version-parse-fallback"),
6532                    message: String::from("server version did not parse"),
6533                    remediation: None,
6534                }],
6535            },
6536            CatalogSource {
6537                kind: CatalogSourceKind::LiveConnection,
6538                path: None,
6539                description: Some(String::from("live extraction via oraclemcp-db from xe")),
6540            },
6541            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
6542        );
6543        let _ = snapshot.intern_schema_name("BILLING");
6544
6545        let report = snapshot.doctor_report();
6546        assert!(matches!(
6547            report.source_kind,
6548            CatalogSourceKind::LiveConnection
6549        ));
6550        assert_eq!(report.capability_warnings.len(), 1);
6551        let view_names: Vec<&str> = report
6552            .missing_permissions
6553            .iter()
6554            .map(|m| m.view_name.as_str())
6555            .collect();
6556        assert!(view_names.iter().any(|v| v.contains("DBA_OBJECTS")));
6557        assert!(view_names.iter().any(|v| v.contains("DBMS_METADATA")));
6558        assert!(view_names.iter().any(|v| v.contains("ALL_SOURCE")));
6559        assert!(view_names.iter().any(|v| v.contains("PLSCOPE_SETTINGS")));
6560        assert!(view_names.iter().any(|v| v.contains("SCHEDULER_JOBS")));
6561        assert!(view_names.iter().any(|v| v.contains("ROLE_PRIVS")));
6562    }
6563
6564    #[test]
6565    fn load_snapshot_populates_plscope_availability_from_object_settings() {
6566        let plscope_rows = vec![
6567            oracle_row(&[
6568                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6569                (
6570                    "PLSCOPE_SETTINGS",
6571                    "VARCHAR2(255)",
6572                    Some("IDENTIFIERS:ALL,STATEMENTS:ALL"),
6573                ),
6574            ]),
6575            oracle_row(&[
6576                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6577                (
6578                    "PLSCOPE_SETTINGS",
6579                    "VARCHAR2(255)",
6580                    Some("IDENTIFIERS:ALL,STATEMENTS:NONE"),
6581                ),
6582            ]),
6583            oracle_row(&[
6584                ("OWNER", "VARCHAR2(128)", Some("REPORTING")),
6585                (
6586                    "PLSCOPE_SETTINGS",
6587                    "VARCHAR2(255)",
6588                    Some("IDENTIFIERS:NONE,STATEMENTS:NONE"),
6589                ),
6590            ]),
6591        ];
6592
6593        let billing_only = vec![OracleBind::from("BILLING")];
6594        let mut expected_queries = capability_probe_expectations();
6595        // Live extraction issues the ALL_USERS probe before ALL_TAB_PRIVS.
6596        expected_queries.push(all_users_expectation(&["BILLING", "REPORTING"]));
6597        for fragment in [
6598            "from all_objects",
6599            "from all_tab_cols",
6600            "from all_constraints c",
6601            "from all_indexes i",
6602            "from all_triggers",
6603            "from all_synonyms",
6604            "from all_procedures",
6605            "from all_arguments",
6606            "from all_views",
6607            "from all_mviews",
6608            "from all_sequences",
6609            "from all_type_attrs",
6610            "from all_tab_privs",
6611            "from all_db_links",
6612            "from all_tab_comments",
6613            "from all_col_comments",
6614            "from all_editions",
6615            "from all_editioning_views",
6616            "from all_policies",
6617            "from all_dependencies",
6618        ] {
6619            // all_editions is database-wide — no schema-bind param.
6620            let params = if fragment.eq("from all_editions") {
6621                vec![]
6622            } else {
6623                billing_only.clone()
6624            };
6625            expected_queries.push(QueryExpectation {
6626                sql_contains: String::from(fragment),
6627                params,
6628                rows: vec![],
6629            });
6630        }
6631        expected_queries.push(QueryExpectation {
6632            sql_contains: String::from("from all_plsql_object_settings"),
6633            params: billing_only.clone(),
6634            rows: plscope_rows,
6635        });
6636        let connection = StaticConnection {
6637            expected_queries,
6638            ..StaticConnection::default()
6639        };
6640        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
6641
6642        let billing = snapshot
6643            .schemas
6644            .keys()
6645            .find(|name| {
6646                snapshot
6647                    .interner
6648                    .resolve(name.symbol())
6649                    .is_some_and(|label| label.eq("BILLING"))
6650            })
6651            .copied()
6652            .expect("billing schema");
6653        let plscope = snapshot
6654            .schemas
6655            .get(&billing)
6656            .and_then(|s| s.plscope.as_ref())
6657            .expect("billing plscope");
6658        assert!(matches!(
6659            plscope.availability,
6660            PlScopeAvailability::IdentifiersAndStatements
6661        ));
6662
6663        let reporting = snapshot
6664            .schemas
6665            .keys()
6666            .find(|name| {
6667                snapshot
6668                    .interner
6669                    .resolve(name.symbol())
6670                    .is_some_and(|label| label.eq("REPORTING"))
6671            })
6672            .copied()
6673            .expect("reporting schema");
6674        let reporting_plscope = snapshot
6675            .schemas
6676            .get(&reporting)
6677            .and_then(|s| s.plscope.as_ref())
6678            .expect("reporting plscope");
6679        // All-NONE settings → PL/Scope is wired but stale.
6680        assert!(matches!(
6681            reporting_plscope.availability,
6682            PlScopeAvailability::AvailableButStale
6683        ));
6684    }
6685
6686    #[test]
6687    fn load_snapshot_extracts_all_identifiers_into_plscope() {
6688        let identifier_rows = vec![
6689            oracle_row(&[
6690                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6691                ("NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
6692                ("TYPE", "VARCHAR2(30)", Some("VARIABLE")),
6693                ("USAGE", "VARCHAR2(20)", Some("DECLARATION")),
6694                ("LINE", "NUMBER(10)", Some("12")),
6695                ("COL", "NUMBER(10)", Some("5")),
6696                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6697            ]),
6698            oracle_row(&[
6699                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6700                ("NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
6701                ("TYPE", "VARCHAR2(30)", Some("VARIABLE")),
6702                ("USAGE", "VARCHAR2(20)", Some("REFERENCE")),
6703                ("LINE", "NUMBER(10)", Some("18")),
6704                ("COL", "NUMBER(10)", Some("21")),
6705                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6706            ]),
6707        ];
6708
6709        let billing_only = vec![OracleBind::from("BILLING")];
6710        let mut expected_queries = capability_probe_expectations();
6711        // Live extraction issues the ALL_USERS probe before ALL_TAB_PRIVS.
6712        expected_queries.push(all_users_expectation(&["BILLING"]));
6713        for fragment in [
6714            "from all_objects",
6715            "from all_tab_cols",
6716            "from all_constraints c",
6717            "from all_indexes i",
6718            "from all_triggers",
6719            "from all_synonyms",
6720            "from all_procedures",
6721            "from all_arguments",
6722            "from all_views",
6723            "from all_mviews",
6724            "from all_sequences",
6725            "from all_type_attrs",
6726            "from all_tab_privs",
6727            "from all_db_links",
6728            "from all_tab_comments",
6729            "from all_col_comments",
6730            "from all_editions",
6731            "from all_editioning_views",
6732            "from all_policies",
6733            "from all_dependencies",
6734            "from all_plsql_object_settings",
6735        ] {
6736            let params = if fragment.eq("from all_editions") {
6737                vec![]
6738            } else {
6739                billing_only.clone()
6740            };
6741            expected_queries.push(QueryExpectation {
6742                sql_contains: String::from(fragment),
6743                params,
6744                rows: vec![],
6745            });
6746        }
6747        expected_queries.push(QueryExpectation {
6748            sql_contains: String::from("from all_identifiers"),
6749            params: billing_only.clone(),
6750            rows: identifier_rows,
6751        });
6752        let connection = StaticConnection {
6753            expected_queries,
6754            ..StaticConnection::default()
6755        };
6756        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
6757
6758        let billing = snapshot
6759            .schemas
6760            .keys()
6761            .find(|name| {
6762                snapshot
6763                    .interner
6764                    .resolve(name.symbol())
6765                    .is_some_and(|label| label.eq("BILLING"))
6766            })
6767            .copied()
6768            .expect("billing schema");
6769        let plscope = snapshot
6770            .schemas
6771            .get(&billing)
6772            .and_then(|s| s.plscope.as_ref())
6773            .expect("plscope present");
6774        assert_eq!(plscope.identifiers.len(), 2);
6775        let first = &plscope.identifiers[0];
6776        assert_eq!(first.identifier_type, "VARIABLE");
6777        assert_eq!(first.usage, "DECLARATION");
6778        assert_eq!(first.line, 12);
6779        assert_eq!(first.column, 5);
6780    }
6781
6782    #[test]
6783    fn normalize_dbms_metadata_ddl_collapses_whitespace_and_trims_slash() {
6784        use crate::normalize_dbms_metadata_ddl;
6785        let input = "  CREATE   TABLE   FOO ( ID   NUMBER )  \n/  ";
6786        let normalized = normalize_dbms_metadata_ddl(input);
6787        assert_eq!(normalized, "CREATE TABLE FOO ( ID NUMBER )");
6788    }
6789
6790    #[test]
6791    fn object_type_to_dbms_metadata_value_covers_all_known_types() {
6792        use crate::object_type_to_dbms_metadata_value;
6793        // Every object kind that DBMS_METADATA supports must map to a name.
6794        for object_type in [
6795            ObjectType::Table,
6796            ObjectType::View,
6797            ObjectType::MaterializedView,
6798            ObjectType::Sequence,
6799            ObjectType::Type,
6800            ObjectType::Package,
6801            ObjectType::Procedure,
6802            ObjectType::Function,
6803            ObjectType::Trigger,
6804            ObjectType::EditioningView,
6805            ObjectType::SchedulerJob,
6806            ObjectType::Synonym,
6807            ObjectType::Index,
6808        ] {
6809            assert!(object_type_to_dbms_metadata_value(object_type).is_some());
6810        }
6811        // Constraint + Unknown are not directly fetchable via DBMS_METADATA.
6812        assert!(object_type_to_dbms_metadata_value(ObjectType::Constraint).is_none());
6813        assert!(object_type_to_dbms_metadata_value(ObjectType::Unknown).is_none());
6814    }
6815
6816    #[test]
6817    fn populate_dbms_metadata_ddl_records_warnings_on_fetch_failure() {
6818        use crate::CatalogCapabilities;
6819
6820        // Build a tiny snapshot with one TABLE object, then run populate
6821        // against a mock that returns Err on every query. We expect the
6822        // CapabilityWarning trail to grow by one but the snapshot to remain
6823        // intact.
6824        let mut snapshot = CatalogSnapshot::new(
6825            plsql_core::AnalysisProfile::default(),
6826            CatalogCapabilities {
6827                can_use_dbms_metadata: true,
6828                ..CatalogCapabilities::default()
6829            },
6830            CatalogSource {
6831                kind: CatalogSourceKind::LiveConnection,
6832                path: None,
6833                description: Some(String::from("test")),
6834            },
6835            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
6836        );
6837        let billing = snapshot.intern_schema_name("BILLING").expect("schema");
6838        let invoices = snapshot.intern_object_name("INVOICES").expect("object");
6839        let table = crate::TableMetadata {
6840            common: ObjectCommon {
6841                owner: billing,
6842                name: invoices,
6843                object_type: ObjectType::Table,
6844                status: ObjectStatus::Valid,
6845                ..ObjectCommon::default()
6846            },
6847            ..crate::TableMetadata::default()
6848        };
6849        snapshot
6850            .schemas
6851            .entry(billing)
6852            .or_default()
6853            .objects
6854            .insert(invoices, CatalogObject::Table(table));
6855
6856        // Force the mock to fail by requesting params that won't match.
6857        let failing_connection = StaticConnection {
6858            expected_queries: vec![QueryExpectation {
6859                sql_contains: String::from("dbms_metadata.get_ddl"),
6860                params: vec![OracleBind::from("UNREACHABLE_SENTINEL")],
6861                rows: vec![],
6862            }],
6863            ..StaticConnection::default()
6864        };
6865
6866        populate_dbms_metadata_ddl_for_test(&failing_connection, &mut snapshot).unwrap();
6867
6868        assert!(
6869            snapshot
6870                .capabilities
6871                .warnings
6872                .iter()
6873                .any(|w| w.code.eq("dbms-metadata-fetch-failed"))
6874        );
6875
6876        // Capability disabled → populate is a no-op (no extra warning).
6877        snapshot.capabilities.can_use_dbms_metadata = false;
6878        let baseline_warnings = snapshot.capabilities.warnings.len();
6879        populate_dbms_metadata_ddl_for_test(&failing_connection, &mut snapshot).unwrap();
6880        assert_eq!(snapshot.capabilities.warnings.len(), baseline_warnings);
6881    }
6882
6883    #[test]
6884    fn populate_dbms_metadata_ddl_writes_ddl_field_on_success() {
6885        use crate::CatalogCapabilities;
6886
6887        let mut snapshot = CatalogSnapshot::new(
6888            plsql_core::AnalysisProfile::default(),
6889            CatalogCapabilities {
6890                can_use_dbms_metadata: true,
6891                ..CatalogCapabilities::default()
6892            },
6893            CatalogSource {
6894                kind: CatalogSourceKind::LiveConnection,
6895                path: None,
6896                description: Some(String::from("test")),
6897            },
6898            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
6899        );
6900        let billing = snapshot.intern_schema_name("BILLING").expect("schema");
6901        let invoices = snapshot.intern_object_name("INVOICES").expect("object");
6902        let table = crate::TableMetadata {
6903            common: ObjectCommon {
6904                owner: billing,
6905                name: invoices,
6906                object_type: ObjectType::Table,
6907                status: ObjectStatus::Valid,
6908                ..ObjectCommon::default()
6909            },
6910            ..crate::TableMetadata::default()
6911        };
6912        snapshot
6913            .schemas
6914            .entry(billing)
6915            .or_default()
6916            .objects
6917            .insert(invoices, CatalogObject::Table(table));
6918
6919        let connection = StaticConnection {
6920            expected_queries: vec![QueryExpectation {
6921                sql_contains: String::from("dbms_metadata.get_ddl"),
6922                params: vec![
6923                    OracleBind::from("TABLE"),
6924                    OracleBind::from("INVOICES"),
6925                    OracleBind::from("BILLING"),
6926                ],
6927                rows: vec![oracle_row(&[
6928                    (
6929                        "DDL_TEXT",
6930                        "CLOB",
6931                        Some("  CREATE TABLE BILLING.INVOICES (ID NUMBER)  \n  "),
6932                    ),
6933                    (
6934                        "XML_TEXT",
6935                        "CLOB",
6936                        Some("<TABLE_T><NAME>INVOICES</NAME></TABLE_T>"),
6937                    ),
6938                ])],
6939            }],
6940            ..StaticConnection::default()
6941        };
6942
6943        populate_dbms_metadata_ddl_for_test(&connection, &mut snapshot).unwrap();
6944
6945        let stored = snapshot
6946            .schemas
6947            .get(&billing)
6948            .and_then(|schema| schema.objects.get(&invoices));
6949        assert!(
6950            matches!(stored, Some(CatalogObject::Table(_))),
6951            "expected a Table catalog object for billing.invoices"
6952        );
6953        let Some(CatalogObject::Table(table)) = stored else {
6954            return;
6955        };
6956        let ddl = table.common.ddl.as_ref().expect("ddl populated");
6957        assert!(ddl.ddl_text.contains("CREATE TABLE BILLING.INVOICES"));
6958        assert_eq!(
6959            ddl.normalized_ddl.as_deref(),
6960            Some("CREATE TABLE BILLING.INVOICES (ID NUMBER)")
6961        );
6962        assert_eq!(
6963            ddl.xml_text.as_deref(),
6964            Some("<TABLE_T><NAME>INVOICES</NAME></TABLE_T>")
6965        );
6966    }
6967
6968    #[test]
6969    fn negotiate_capabilities_reports_failures_as_warnings() {
6970        let connection = StaticConnection {
6971            expected_queries: capability_probe_expectations()
6972                .into_iter()
6973                .map(|mut q| {
6974                    // Force every probe to "fail" by demanding a non-empty
6975                    // params slice while the negotiator passes empty params.
6976                    q.params = vec![OracleBind::from("UNREACHABLE_SENTINEL")];
6977                    q
6978                })
6979                .collect(),
6980            ..StaticConnection::default()
6981        };
6982
6983        let capabilities = negotiate_capabilities_for_test(&connection);
6984
6985        assert!(!capabilities.can_query_all_views);
6986        assert!(!capabilities.can_query_dba_views);
6987        assert!(!capabilities.can_read_source);
6988        assert!(!capabilities.can_query_scheduler);
6989        assert!(!capabilities.can_query_roles_and_grants);
6990        assert!(!capabilities.plscope_enabled);
6991        // execute() succeeds unconditionally in the mock, so DBMS_METADATA
6992        // probe is the one capability that "passes" without explicit setup.
6993        assert!(capabilities.can_use_dbms_metadata);
6994        // One warning per failed probe (six probes).
6995        assert_eq!(capabilities.warnings.len(), 6);
6996        assert!(
6997            capabilities
6998                .warnings
6999                .iter()
7000                .any(|w| w.code.eq("all-views-probe"))
7001        );
7002        assert!(
7003            capabilities
7004                .warnings
7005                .iter()
7006                .any(|w| w.code.eq("plscope-probe"))
7007        );
7008    }
7009
7010    #[test]
7011    fn negotiate_capabilities_marks_every_probe_succeeded_on_open_mock() {
7012        // Default StaticConnection has no expected_queries — falls through to
7013        // returning self.rows (empty) for every query → every probe succeeds.
7014        let connection = StaticConnection::default();
7015        let capabilities = negotiate_capabilities_for_test(&connection);
7016        assert!(capabilities.can_query_all_views);
7017        assert!(capabilities.can_query_dba_views);
7018        assert!(capabilities.can_read_source);
7019        assert!(capabilities.can_query_scheduler);
7020        assert!(capabilities.can_query_roles_and_grants);
7021        assert!(capabilities.plscope_enabled);
7022        assert!(capabilities.can_use_dbms_metadata);
7023        assert!(capabilities.warnings.is_empty());
7024    }
7025
7026    #[test]
7027    fn doctor_report_skips_grant_suggestions_for_json_snapshot_source() {
7028        let snapshot = CatalogSnapshot::new(
7029            plsql_core::AnalysisProfile::default(),
7030            CatalogCapabilities {
7031                can_query_all_views: false,
7032                can_query_dba_views: false,
7033                can_use_dbms_metadata: false,
7034                can_read_source: false,
7035                plscope_enabled: false,
7036                can_query_scheduler: false,
7037                can_query_roles_and_grants: false,
7038                warnings: vec![],
7039            },
7040            CatalogSource {
7041                kind: CatalogSourceKind::JsonSnapshot,
7042                path: Some(std::path::PathBuf::from("/tmp/snapshot.json")),
7043                description: None,
7044            },
7045            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
7046        );
7047
7048        let report = snapshot.doctor_report();
7049        assert!(matches!(
7050            report.source_kind,
7051            CatalogSourceKind::JsonSnapshot
7052        ));
7053        // Recommendations are inert for JSON snapshots — the capability bits
7054        // were already frozen at extraction time.
7055        assert!(report.missing_permissions.is_empty());
7056    }
7057
7058    // ----------------------------------------------------------------------
7059    // DDL kind classifier (header tokenizer, not body substring).
7060    //
7061    // Regression bar: the prior implementation substring-matched on the full
7062    // upper-cased DDL, so any object whose body or comments mentioned
7063    // `TABLE` was silently filed as a Table. Real public surface, real
7064    // silent data corruption.
7065    // ----------------------------------------------------------------------
7066
7067    /// Drive the per-file DDL through the public loader and return the
7068    /// single (`ObjectType`, `CatalogObject`) pair it classifies into.
7069    /// Panics on anything but exactly one classified object — every test
7070    /// below feeds exactly one DDL so the helper stays obvious.
7071    fn classify_single(ddl: &str) -> (ObjectType, CatalogObject) {
7072        let dir = tempdir().unwrap();
7073        std::fs::write(dir.path().join("obj.sql"), ddl).unwrap();
7074        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
7075        let mut found: Vec<CatalogObject> = snapshot
7076            .schemas
7077            .values()
7078            .flat_map(|s| s.objects.values().cloned())
7079            .collect();
7080        assert_eq!(
7081            found.len(),
7082            1,
7083            "expected exactly one classified object for DDL: {ddl}"
7084        );
7085        let obj = found.remove(0);
7086        let common = match &obj {
7087            CatalogObject::Table(m) => &m.common,
7088            CatalogObject::View(m) => &m.common,
7089            CatalogObject::MaterializedView(m) => &m.common,
7090            CatalogObject::Sequence(m) => &m.common,
7091            CatalogObject::Type(m) => &m.common,
7092            CatalogObject::Package(m) => &m.common,
7093            CatalogObject::Procedure(m) => &m.common,
7094            CatalogObject::Function(m) => &m.common,
7095            CatalogObject::Trigger(m) => &m.common,
7096            CatalogObject::SchedulerJob(m) => &m.common,
7097            CatalogObject::EditioningView(m) => &m.common,
7098        };
7099        (common.object_type, obj)
7100    }
7101
7102    /// A VIEW whose body merely mentions the word `TABLE` must classify as
7103    /// a View, never a Table. The prior substring matcher silently filed
7104    /// such views as tables — real catalog corruption visible to every
7105    /// downstream consumer.
7106    #[test]
7107    fn classify_view_with_table_in_body_is_view_not_table() {
7108        let ddl = "CREATE OR REPLACE VIEW hr.v_emp AS SELECT * FROM hr.emp WHERE 'TABLE'='TABLE';";
7109        let (kind, obj) = classify_single(ddl);
7110        assert_eq!(
7111            kind,
7112            ObjectType::View,
7113            "VIEW with 'TABLE' literal in body must classify as View, got {kind:?}",
7114        );
7115        assert!(
7116            matches!(obj, CatalogObject::View(_)),
7117            "expected CatalogObject::View, got {obj:?}",
7118        );
7119    }
7120
7121    /// A TRIGGER body that touches a TABLE must classify as Trigger.
7122    #[test]
7123    fn classify_trigger_with_table_in_body_is_trigger() {
7124        let ddl = "CREATE OR REPLACE TRIGGER hr.t_audit \
7125                   AFTER INSERT ON hr.employees \
7126                   BEGIN INSERT INTO hr.audit_table VALUES (:NEW.id); END;";
7127        let (kind, _obj) = classify_single(ddl);
7128        assert_eq!(kind, ObjectType::Trigger);
7129    }
7130
7131    /// A PROCEDURE body that touches a TABLE must classify as Procedure.
7132    #[test]
7133    fn classify_procedure_with_table_in_body_is_procedure() {
7134        let ddl = "CREATE OR REPLACE PROCEDURE hr.p_load \
7135                   AS BEGIN INSERT INTO hr.staging_table SELECT * FROM hr.src; END;";
7136        let (kind, _obj) = classify_single(ddl);
7137        assert_eq!(kind, ObjectType::Procedure);
7138    }
7139
7140    /// A FUNCTION body that touches a TABLE must classify as Function.
7141    #[test]
7142    fn classify_function_with_table_in_body_is_function() {
7143        let ddl = "CREATE OR REPLACE FUNCTION hr.f_count RETURN NUMBER \
7144                   AS n NUMBER; BEGIN SELECT COUNT(*) INTO n FROM hr.big_table; RETURN n; END;";
7145        let (kind, _obj) = classify_single(ddl);
7146        assert_eq!(kind, ObjectType::Function);
7147    }
7148
7149    /// `PACKAGE BODY` is a body, not a spec — the classifier returns the
7150    /// spec only, so a body file must produce zero classified objects
7151    /// (not a Package, never a Table just because the body mentions one).
7152    #[test]
7153    fn classify_package_body_with_table_is_not_package_or_table() {
7154        let ddl = "CREATE OR REPLACE PACKAGE BODY hr.billing_api AS \
7155                   PROCEDURE charge IS BEGIN INSERT INTO hr.charges_table VALUES (1); END; END;";
7156        let dir = tempdir().unwrap();
7157        std::fs::write(dir.path().join("body.sql"), ddl).unwrap();
7158        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
7159        let total: usize = snapshot.schemas.values().map(|s| s.objects.len()).sum();
7160        assert_eq!(
7161            total, 0,
7162            "PACKAGE BODY must not produce a classified object (got {total})",
7163        );
7164    }
7165
7166    /// `MATERIALIZED VIEW` must classify as `ObjectType::MaterializedView`,
7167    /// never as a plain View (substring match on `VIEW` would do the
7168    /// wrong thing).
7169    #[test]
7170    fn classify_materialized_view_is_materialized_view_not_view() {
7171        let ddl = "CREATE MATERIALIZED VIEW hr.mv_emp_summary AS SELECT dept, COUNT(*) FROM hr.emp GROUP BY dept;";
7172        let (kind, obj) = classify_single(ddl);
7173        assert_eq!(
7174            kind,
7175            ObjectType::MaterializedView,
7176            "MATERIALIZED VIEW must classify as MaterializedView, got {kind:?}",
7177        );
7178        assert!(
7179            matches!(obj, CatalogObject::MaterializedView(_)),
7180            "expected CatalogObject::MaterializedView, got {obj:?}",
7181        );
7182    }
7183
7184    /// Leading block comment that contains `CREATE TABLE …` must NOT
7185    /// fool the classifier — the real CREATE is for a VIEW.
7186    #[test]
7187    fn classify_view_with_leading_block_comment_mentioning_create_table_is_view() {
7188        let ddl = "/* comment with CREATE TABLE x; */\n\
7189                   CREATE OR REPLACE VIEW hr.v_dept AS SELECT * FROM hr.dept;";
7190        let (kind, _obj) = classify_single(ddl);
7191        assert_eq!(kind, ObjectType::View);
7192    }
7193
7194    /// Leading line comments that mention `CREATE TABLE` must NOT fool
7195    /// the classifier.
7196    #[test]
7197    fn classify_procedure_with_leading_line_comments_is_procedure() {
7198        let ddl = "-- CREATE TABLE oops (x NUMBER);\n\
7199                   -- another line mentioning VIEW and TABLE\n\
7200                   CREATE OR REPLACE PROCEDURE hr.p_noop AS BEGIN NULL; END;";
7201        let (kind, _obj) = classify_single(ddl);
7202        assert_eq!(kind, ObjectType::Procedure);
7203    }
7204
7205    /// `EDITIONABLE` / `NONEDITIONABLE` / `FORCE` modifiers between
7206    /// `CREATE [OR REPLACE]` and the KIND must be skipped — they are
7207    /// not the object kind.
7208    #[test]
7209    fn classify_editionable_view_is_view() {
7210        let ddl = "CREATE OR REPLACE EDITIONABLE VIEW hr.v_emp AS SELECT * FROM hr.emp;";
7211        let (kind, _obj) = classify_single(ddl);
7212        assert_eq!(kind, ObjectType::View);
7213    }
7214
7215    /// Quoted identifiers for owner/name must not break the
7216    /// header-tokenizer-based classifier.
7217    #[test]
7218    fn classify_quoted_table_owner_name_is_table() {
7219        let ddl = "CREATE TABLE \"HR\".\"EMP\" (id NUMBER);";
7220        let (kind, _obj) = classify_single(ddl);
7221        assert_eq!(kind, ObjectType::Table);
7222    }
7223
7224    /// A double-quoted Oracle identifier containing whitespace must be
7225    /// kept whole — never truncated at the first interior space. The
7226    /// prior `split_whitespace().next()` tokenizer cut `"MY TABLE"` down
7227    /// to `MY`, corrupting the snapshot key. `extract_owner_and_name`
7228    /// works on the already-upper-cased post-header remainder, so the
7229    /// inputs here are upper-cased the way `upper_remainder()` produces.
7230    #[test]
7231    fn extract_owner_and_name_keeps_whitespace_in_quoted_identifiers() {
7232        // OWNER.NAME, both quoted, name has a space.
7233        assert_eq!(
7234            crate::extract_owner_and_name("\"HR\".\"MY TABLE\" (ID NUMBER);"),
7235            Some((Some("HR".to_string()), "MY TABLE".to_string())),
7236        );
7237        // Quoted OWNER with a space must not be dropped (no PUBLIC misroute).
7238        assert_eq!(
7239            crate::extract_owner_and_name("\"MY OWNER\".\"EMP\" (ID NUMBER);"),
7240            Some((Some("MY OWNER".to_string()), "EMP".to_string())),
7241        );
7242        // Fully-quoted, unqualified, whitespace-bearing name.
7243        assert_eq!(
7244            crate::extract_owner_and_name("\"MY TABLE\" (ID NUMBER);"),
7245            Some((None, "MY TABLE".to_string())),
7246        );
7247        // Spaceless quoted and unquoted inputs still resolve correctly.
7248        assert_eq!(
7249            crate::extract_owner_and_name("\"HR\".\"EMP\" (ID NUMBER);"),
7250            Some((Some("HR".to_string()), "EMP".to_string())),
7251        );
7252        assert_eq!(
7253            crate::extract_owner_and_name("HR.ORDERS (ID NUMBER);"),
7254            Some((Some("HR".to_string()), "ORDERS".to_string())),
7255        );
7256    }
7257
7258    /// Two distinct whitespace-bearing quoted tables in one schema must
7259    /// intern under distinct keys — the truncating tokenizer collapsed
7260    /// both `"MY TABLE"` and `"MY OTHER"` to `MY` (last-write-wins).
7261    #[test]
7262    fn quoted_whitespace_names_intern_as_distinct_keys() {
7263        let dir = tempdir().unwrap();
7264        std::fs::write(
7265            dir.path().join("a.sql"),
7266            "CREATE TABLE \"HR\".\"MY TABLE\" (id NUMBER);",
7267        )
7268        .unwrap();
7269        std::fs::write(
7270            dir.path().join("b.sql"),
7271            "CREATE TABLE \"HR\".\"MY OTHER\" (id NUMBER);",
7272        )
7273        .unwrap();
7274        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
7275
7276        // Both objects land in the HR schema (no PUBLIC misroute).
7277        let hr = snapshot
7278            .schemas
7279            .iter()
7280            .find(|(s, _)| {
7281                snapshot
7282                    .interner
7283                    .resolve(s.symbol())
7284                    .is_some_and(|label| label.eq("HR"))
7285            })
7286            .map(|(_, c)| c)
7287            .expect("HR schema present");
7288        assert_eq!(hr.objects.len(), 2, "two distinct objects, no collision");
7289
7290        let mut names: Vec<&str> = hr
7291            .objects
7292            .values()
7293            .filter_map(|o| {
7294                let CatalogObject::Table(m) = o else {
7295                    return None;
7296                };
7297                snapshot.interner.resolve(m.common.name.symbol())
7298            })
7299            .collect();
7300        assert_eq!(names.len(), 2, "all HR objects must be tables");
7301        names.sort_unstable();
7302        assert_eq!(names, vec!["MY OTHER", "MY TABLE"]);
7303    }
7304
7305    /// Plain CREATE TABLE still works (negative-of-negative regression).
7306    #[test]
7307    fn classify_plain_table_is_table() {
7308        let ddl = "CREATE TABLE hr.orders (id NUMBER PRIMARY KEY, total NUMBER(12,2));";
7309        let (kind, obj) = classify_single(ddl);
7310        assert_eq!(kind, ObjectType::Table);
7311        assert!(matches!(obj, CatalogObject::Table(_)));
7312    }
7313
7314    /// A PACKAGE spec that mentions VIEW / TABLE in a comment or
7315    /// procedure name must classify as Package.
7316    #[test]
7317    fn classify_package_spec_mentioning_view_and_table_is_package() {
7318        let ddl = "CREATE OR REPLACE PACKAGE hr.report_api AS \
7319                   -- builds a VIEW over a TABLE \n\
7320                   PROCEDURE rebuild_view_from_table; END;";
7321        let (kind, _obj) = classify_single(ddl);
7322        assert_eq!(kind, ObjectType::Package);
7323    }
7324}