Skip to main content

plsql_catalog/
lib.rs

1#![forbid(unsafe_code)]
2pub mod synthetic;
3
4#[cfg(feature = "oraclemcp-db")]
5mod live;
6
7#[cfg(feature = "oraclemcp-db")]
8pub use live::{
9    OracleConnection, OraclemcpDbConnection, fetch_dbms_metadata_ddl,
10    load_snapshot_from_connection, negotiate_capabilities, populate_dbms_metadata_ddl,
11};
12
13use std::collections::{BTreeMap, HashMap, HashSet};
14use std::fs;
15use std::path::PathBuf;
16
17use chrono::{DateTime, Utc};
18#[cfg(feature = "oraclemcp-db")]
19use plsql_core::OracleVersion;
20use plsql_core::{
21    AnalysisProfile, ColumnName, EditionName, MemberName, ObjectName, RoleName, SchemaName,
22    SymbolId, SymbolInterner, UserName,
23};
24use serde::{Deserialize, Serialize};
25use thiserror::Error;
26use tracing::instrument;
27
28use plsql_output::SchemaVersion;
29
30macro_rules! catalog_name {
31    ($name:ident) => {
32        #[derive(
33            Clone,
34            Copy,
35            Debug,
36            Default,
37            Eq,
38            PartialEq,
39            Ord,
40            PartialOrd,
41            Hash,
42            Serialize,
43            Deserialize,
44        )]
45        #[serde(transparent)]
46        pub struct $name(SymbolId);
47
48        impl $name {
49            #[must_use]
50            #[instrument(level = "trace")]
51            pub fn new(symbol: SymbolId) -> Self {
52                Self(symbol)
53            }
54
55            #[must_use]
56            #[instrument(level = "trace", skip(self))]
57            pub fn symbol(self) -> SymbolId {
58                self.0
59            }
60        }
61
62        impl From<SymbolId> for $name {
63            fn from(value: SymbolId) -> Self {
64                Self::new(value)
65            }
66        }
67    };
68}
69
70catalog_name!(SynonymName);
71catalog_name!(IndexName);
72catalog_name!(ConstraintName);
73catalog_name!(TriggerName);
74
75#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
76#[serde(transparent)]
77pub struct Hash(String);
78
79impl Hash {
80    #[must_use]
81    #[instrument(level = "trace", skip(value))]
82    pub fn new(value: impl Into<String>) -> Self {
83        Self(value.into())
84    }
85
86    #[must_use]
87    #[instrument(level = "trace", skip(self))]
88    pub fn as_str(&self) -> &str {
89        self.0.as_str()
90    }
91}
92
93#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
94pub struct DbmsMetadataDdl {
95    pub ddl_text: String,
96    pub normalized_ddl: Option<String>,
97    pub xml_text: Option<String>,
98}
99
100#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
101pub enum CatalogSourceKind {
102    #[default]
103    JsonSnapshot,
104    LiveConnection,
105    DbmsMetadataFiles,
106    SyntheticTestCatalog,
107}
108
109#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
110pub struct CatalogSource {
111    pub kind: CatalogSourceKind,
112    pub path: Option<PathBuf>,
113    pub description: Option<String>,
114}
115
116#[derive(
117    Clone, Copy, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize,
118)]
119pub enum ObjectType {
120    Table,
121    View,
122    MaterializedView,
123    Sequence,
124    Type,
125    Package,
126    Procedure,
127    Function,
128    Trigger,
129    SchedulerJob,
130    EditioningView,
131    Synonym,
132    Index,
133    Constraint,
134    #[default]
135    Unknown,
136}
137
138#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
139pub enum ObjectStatus {
140    Valid,
141    Invalid,
142    #[default]
143    NotApplicable,
144}
145
146#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
147pub struct ObjectCommon {
148    pub owner: SchemaName,
149    pub name: ObjectName,
150    pub object_type: ObjectType,
151    pub status: ObjectStatus,
152    pub edition_name: Option<EditionName>,
153    pub editionable: Option<bool>,
154    pub last_ddl_time: Option<DateTime<Utc>>,
155    pub source_hash: Option<Hash>,
156    pub ddl: Option<DbmsMetadataDdl>,
157}
158
159#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
160pub struct CatalogCapabilities {
161    pub can_query_dba_views: bool,
162    pub can_query_all_views: bool,
163    pub can_use_dbms_metadata: bool,
164    pub can_read_source: bool,
165    pub plscope_enabled: bool,
166    pub can_query_scheduler: bool,
167    pub can_query_roles_and_grants: bool,
168    pub warnings: Vec<CapabilityWarning>,
169}
170
171#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
172pub struct CapabilityWarning {
173    pub code: String,
174    pub message: String,
175    pub remediation: Option<String>,
176}
177
178#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
179pub struct CatalogSnapshot {
180    pub schemas: HashMap<SchemaName, SchemaCatalog>,
181    pub profile: AnalysisProfile,
182    pub capabilities: CatalogCapabilities,
183    pub generated_at: DateTime<Utc>,
184    pub source: CatalogSource,
185    pub interner: SymbolInterner,
186    /// Database-wide edition tree from `ALL_EDITIONS`. Empty when EBR
187    /// is not in use.
188    #[serde(default)]
189    pub editions: Vec<Edition>,
190    /// Set of database usernames observed from `ALL_USERS`, used during
191    /// live extraction to discriminate an object-privilege grantee
192    /// (`ALL_TAB_PRIVS.GRANTEE`) between a real user and a database role —
193    /// Oracle's `ALL_TAB_PRIVS` carries no user/role discriminator column.
194    ///
195    /// `None` means the username set was never loaded (the `ALL_USERS`
196    /// probe was not run or failed); in that state grantee classification
197    /// is *undetermined* and, honoring R13, the extractor must NOT
198    /// silently assume a direct (high-confidence) user grant. `Some(_)`
199    /// (even when empty) means the set was loaded and is authoritative for
200    /// the schemas under analysis.
201    ///
202    /// This is transient extraction state: it is never serialized, because
203    /// the resulting `Grantee` discrimination is already baked into each
204    /// persisted `Grant`. JSON snapshots therefore round-trip unchanged.
205    #[serde(default, skip)]
206    pub known_users: Option<HashSet<UserName>>,
207}
208
209impl CatalogSnapshot {
210    #[must_use]
211    #[instrument(level = "trace", skip(profile, capabilities, source))]
212    pub fn new(
213        profile: AnalysisProfile,
214        capabilities: CatalogCapabilities,
215        source: CatalogSource,
216        generated_at: DateTime<Utc>,
217    ) -> Self {
218        Self {
219            schemas: HashMap::new(),
220            profile,
221            capabilities,
222            generated_at,
223            source,
224            interner: SymbolInterner::new(),
225            editions: Vec::new(),
226            known_users: None,
227        }
228    }
229
230    /// Intern `text` as a [`UserName`] without changing classification
231    /// state. Mirrors [`SymbolInterner::intern_user_name`] but routes
232    /// through this snapshot's interner.
233    #[must_use]
234    #[instrument(level = "trace", skip(self, text))]
235    pub fn intern_user_name(&mut self, text: impl Into<String>) -> Option<UserName> {
236        self.interner.intern_user_name(text)
237    }
238
239    /// Intern `text` as a [`RoleName`] through this snapshot's interner.
240    #[must_use]
241    #[instrument(level = "trace", skip(self, text))]
242    pub fn intern_role_name(&mut self, text: impl Into<String>) -> Option<RoleName> {
243        self.interner.intern_role_name(text)
244    }
245
246    #[must_use]
247    #[instrument(level = "trace", skip(self, text))]
248    pub fn intern_schema_name(&mut self, text: impl Into<String>) -> Option<SchemaName> {
249        self.interner.intern_schema_name(text)
250    }
251
252    #[must_use]
253    #[instrument(level = "trace", skip(self, text))]
254    pub fn intern_object_name(&mut self, text: impl Into<String>) -> Option<ObjectName> {
255        self.interner.intern(text).map(ObjectName::from)
256    }
257
258    #[must_use]
259    #[instrument(level = "trace", skip(self, text))]
260    pub fn intern_column_name(&mut self, text: impl Into<String>) -> Option<ColumnName> {
261        self.interner.intern(text).map(ColumnName::from)
262    }
263
264    #[must_use]
265    #[instrument(level = "trace", skip(self, text))]
266    pub fn intern_member_name(&mut self, text: impl Into<String>) -> Option<MemberName> {
267        self.interner.intern(text).map(MemberName::from)
268    }
269
270    #[must_use]
271    #[instrument(level = "trace", skip(self, text))]
272    pub fn intern_synonym_name(&mut self, text: impl Into<String>) -> Option<SynonymName> {
273        self.interner.intern(text).map(SynonymName::from)
274    }
275
276    #[must_use]
277    #[instrument(level = "trace", skip(self, text))]
278    pub fn intern_index_name(&mut self, text: impl Into<String>) -> Option<IndexName> {
279        self.interner.intern(text).map(IndexName::from)
280    }
281
282    #[must_use]
283    #[instrument(level = "trace", skip(self, text))]
284    pub fn intern_constraint_name(&mut self, text: impl Into<String>) -> Option<ConstraintName> {
285        self.interner.intern(text).map(ConstraintName::from)
286    }
287
288    #[must_use]
289    #[instrument(level = "trace", skip(self, text))]
290    pub fn intern_trigger_name(&mut self, text: impl Into<String>) -> Option<TriggerName> {
291        self.interner.intern(text).map(TriggerName::from)
292    }
293}
294
295/// Dictionary row family accepted by [`CatalogSnapshotBuilder`].
296///
297/// Each variant corresponds to one Oracle dictionary query shape used by the
298/// live extractor. The enum is intentionally stable and DB-free so
299/// `oraclemcp` can own live querying while this crate owns row normalization.
300#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
301pub enum CatalogRowSet {
302    Objects,
303    Columns,
304    Constraints,
305    Indexes,
306    Triggers,
307    Synonyms,
308    Routines,
309    RoutineArguments,
310    Views,
311    MaterializedViews,
312    Sequences,
313    TypeAttributes,
314    Users,
315    Grants,
316    DatabaseLinks,
317    TableComments,
318    ColumnComments,
319    Editions,
320    EditioningViews,
321    VpdPolicies,
322    Dependencies,
323    PlScopeAvailability,
324    PlScopeIdentifiers,
325}
326
327impl CatalogRowSet {
328    #[must_use]
329    pub fn as_str(self) -> &'static str {
330        match self {
331            Self::Objects => "objects",
332            Self::Columns => "columns",
333            Self::Constraints => "constraints",
334            Self::Indexes => "indexes",
335            Self::Triggers => "triggers",
336            Self::Synonyms => "synonyms",
337            Self::Routines => "routines",
338            Self::RoutineArguments => "routine_arguments",
339            Self::Views => "views",
340            Self::MaterializedViews => "materialized_views",
341            Self::Sequences => "sequences",
342            Self::TypeAttributes => "type_attributes",
343            Self::Users => "users",
344            Self::Grants => "grants",
345            Self::DatabaseLinks => "database_links",
346            Self::TableComments => "table_comments",
347            Self::ColumnComments => "column_comments",
348            Self::Editions => "editions",
349            Self::EditioningViews => "editioning_views",
350            Self::VpdPolicies => "vpd_policies",
351            Self::Dependencies => "dependencies",
352            Self::PlScopeAvailability => "plscope_availability",
353            Self::PlScopeIdentifiers => "plscope_identifiers",
354        }
355    }
356}
357
358/// Stable, offline builder for Oracle dictionary rows.
359///
360/// The builder accepts already-fetched [`OracleRow`] values and applies the
361/// same normalization used by the live loader. It performs no network or
362/// database I/O; callers such as `oraclemcp` own extraction and feed rows into
363/// this crate through [`CatalogRowSet`].
364///
365/// ```
366/// use chrono::Utc;
367/// use plsql_catalog::{
368///     CatalogCapabilities, CatalogRowSet, CatalogSnapshotBuilder, CatalogSource,
369///     CatalogSourceKind, ObjectType, OracleRow,
370/// };
371/// use plsql_core::AnalysisProfile;
372///
373/// fn row(columns: &[(&str, &str, Option<&str>)]) -> OracleRow {
374///     let mut row = OracleRow::default();
375///     for (name, oracle_type, value) in columns {
376///         row.insert(*name, *oracle_type, value.map(String::from));
377///     }
378///     row
379/// }
380///
381/// let mut builder = CatalogSnapshotBuilder::new(
382///     AnalysisProfile::default(),
383///     CatalogCapabilities::default(),
384///     CatalogSource {
385///         kind: CatalogSourceKind::LiveConnection,
386///         description: Some("synthetic rows from an external extractor".to_string()),
387///         ..CatalogSource::default()
388///     },
389///     Utc::now(),
390/// );
391///
392/// builder.apply_row(
393///     CatalogRowSet::Objects,
394///     &row(&[
395///         ("OWNER", "VARCHAR2(128)", Some("BILLING")),
396///         ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
397///         ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
398///         ("STATUS", "VARCHAR2(7)", Some("VALID")),
399///     ]),
400/// )?;
401///
402/// let snapshot = builder.finish()?;
403/// let report = snapshot.doctor_report();
404/// assert_eq!(report.totals.schemas_observed, 1);
405/// assert_eq!(
406///     report.object_counts.first().map(|count| count.object_type),
407///     Some(ObjectType::Table),
408/// );
409/// # Ok::<(), plsql_catalog::CatalogError>(())
410/// ```
411pub struct CatalogSnapshotBuilder {
412    snapshot: CatalogSnapshot,
413    routines: HashMap<RoutineLocator, RoutineAccumulator>,
414    plscope_tallies: HashMap<SchemaName, PlScopeTally>,
415}
416
417impl CatalogSnapshotBuilder {
418    #[must_use]
419    #[instrument(level = "trace", skip(profile, capabilities, source))]
420    pub fn new(
421        profile: AnalysisProfile,
422        capabilities: CatalogCapabilities,
423        source: CatalogSource,
424        generated_at: DateTime<Utc>,
425    ) -> Self {
426        Self::from_snapshot(CatalogSnapshot::new(
427            profile,
428            capabilities,
429            source,
430            generated_at,
431        ))
432    }
433
434    #[must_use]
435    #[instrument(level = "trace", skip(snapshot))]
436    pub fn from_snapshot(snapshot: CatalogSnapshot) -> Self {
437        Self {
438            snapshot,
439            routines: HashMap::new(),
440            plscope_tallies: HashMap::new(),
441        }
442    }
443
444    #[must_use]
445    #[instrument(level = "trace", skip(self))]
446    pub fn snapshot(&self) -> &CatalogSnapshot {
447        &self.snapshot
448    }
449
450    #[must_use]
451    #[instrument(level = "trace", skip(self))]
452    pub fn snapshot_mut(&mut self) -> &mut CatalogSnapshot {
453        &mut self.snapshot
454    }
455
456    #[instrument(level = "trace", skip(self, row), fields(row_set = row_set.as_str()))]
457    pub fn apply_row(
458        &mut self,
459        row_set: CatalogRowSet,
460        row: &OracleRow,
461    ) -> Result<&mut Self, CatalogError> {
462        match row_set {
463            CatalogRowSet::Objects => apply_object_row(&mut self.snapshot, row)?,
464            CatalogRowSet::Columns => apply_column_row(&mut self.snapshot, row)?,
465            CatalogRowSet::Constraints => apply_constraint_row(&mut self.snapshot, row)?,
466            CatalogRowSet::Indexes => apply_index_row(&mut self.snapshot, row)?,
467            CatalogRowSet::Triggers => apply_trigger_row(&mut self.snapshot, row)?,
468            CatalogRowSet::Synonyms => apply_synonym_row(&mut self.snapshot, row)?,
469            CatalogRowSet::Routines => {
470                apply_routine_row(&mut self.snapshot, row, &mut self.routines)?;
471            }
472            CatalogRowSet::RoutineArguments => {
473                apply_argument_row(&mut self.snapshot, row, &mut self.routines)?;
474            }
475            CatalogRowSet::Views => apply_view_row(&mut self.snapshot, row)?,
476            CatalogRowSet::MaterializedViews => apply_mview_row(&mut self.snapshot, row)?,
477            CatalogRowSet::Sequences => apply_sequence_row(&mut self.snapshot, row)?,
478            CatalogRowSet::TypeAttributes => apply_type_attr_row(&mut self.snapshot, row)?,
479            CatalogRowSet::Users => apply_user_row(&mut self.snapshot, row)?,
480            CatalogRowSet::Grants => apply_grant_row(&mut self.snapshot, row)?,
481            CatalogRowSet::DatabaseLinks => apply_db_link_row(&mut self.snapshot, row)?,
482            CatalogRowSet::TableComments => apply_table_comment_row(&mut self.snapshot, row)?,
483            CatalogRowSet::ColumnComments => apply_column_comment_row(&mut self.snapshot, row)?,
484            CatalogRowSet::Editions => apply_edition_row(&mut self.snapshot, row)?,
485            CatalogRowSet::EditioningViews => apply_editioning_view_row(&mut self.snapshot, row)?,
486            CatalogRowSet::VpdPolicies => apply_vpd_policy_row(&mut self.snapshot, row)?,
487            CatalogRowSet::Dependencies => apply_dependency_row(&mut self.snapshot, row)?,
488            CatalogRowSet::PlScopeAvailability => {
489                apply_plscope_availability_row(&mut self.snapshot, row, &mut self.plscope_tallies)?;
490            }
491            CatalogRowSet::PlScopeIdentifiers => {
492                apply_plscope_identifier_row(&mut self.snapshot, row)?;
493            }
494        }
495        Ok(self)
496    }
497
498    #[instrument(level = "trace", skip(self, rows), fields(row_set = row_set.as_str()))]
499    pub fn apply_rows<'a, I>(
500        &mut self,
501        row_set: CatalogRowSet,
502        rows: I,
503    ) -> Result<&mut Self, CatalogError>
504    where
505        I: IntoIterator<Item = &'a OracleRow>,
506    {
507        if row_set.eq(&CatalogRowSet::Users) {
508            self.snapshot.known_users.get_or_insert_with(HashSet::new);
509        }
510        for row in rows {
511            self.apply_row(row_set, row)?;
512        }
513        Ok(self)
514    }
515
516    #[instrument(level = "trace", skip(self))]
517    pub fn finish(mut self) -> Result<CatalogSnapshot, CatalogError> {
518        let routines = std::mem::take(&mut self.routines);
519        finalize_routines(&mut self.snapshot, routines)?;
520        let plscope_tallies = std::mem::take(&mut self.plscope_tallies);
521        finalize_plscope_availability(&mut self.snapshot, plscope_tallies);
522        Ok(self.snapshot)
523    }
524}
525
526impl Default for CatalogSnapshotBuilder {
527    fn default() -> Self {
528        Self::new(
529            AnalysisProfile::default(),
530            CatalogCapabilities::default(),
531            CatalogSource::default(),
532            Utc::now(),
533        )
534    }
535}
536
537pub const CATALOG_SNAPSHOT_SCHEMA_ID: &str = "plsql.catalog.snapshot";
538pub const CATALOG_SNAPSHOT_SCHEMA_VERSION: SchemaVersion = SchemaVersion::new(1, 1, 0);
539
540pub const CATALOG_DOCTOR_SCHEMA_ID: &str = "plsql.catalog.doctor";
541pub const CATALOG_DOCTOR_SCHEMA_VERSION: SchemaVersion = SchemaVersion::new(1, 0, 0);
542
543/// Per-`ObjectType` count tile shown in the doctor report.
544#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
545pub struct DoctorObjectCount {
546    pub object_type: ObjectType,
547    pub total: usize,
548    pub valid: usize,
549    pub invalid: usize,
550    pub other: usize,
551}
552
553/// Summary of how many catalog rows landed per family and how many
554/// schema-scoped buckets are populated.
555#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
556pub struct DoctorExtractionTotals {
557    pub schemas_observed: usize,
558    pub objects_total: usize,
559    pub columns_total: usize,
560    pub indexes_total: usize,
561    pub constraints_total: usize,
562    pub triggers_total: usize,
563    pub synonyms_total: usize,
564    pub grants_total: usize,
565    pub dependencies_total: usize,
566}
567
568/// Doctor-flagged missing privilege: the `plsql-catalog` driver could not
569/// observe an Oracle dictionary view that some upstream features require.
570#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
571pub struct MissingPermissionReport {
572    pub view_name: String,
573    pub required_for: Vec<String>,
574    pub suggested_grant: String,
575}
576
577/// Structured doctor report for a `CatalogSnapshot`.
578///
579/// Consumers (`plsql catalog doctor --robot-json`, `plsql-mcp` foundation
580/// tools, and the planned `plsql doctor` umbrella surface) can render the
581/// report directly or wrap it in a `RobotJsonEnvelope` for stable,
582/// versioned output.
583#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
584pub struct CatalogDoctorReport {
585    /// Identifier of the snapshot's origin (`live extraction via ...` or the
586    /// JSON snapshot path).
587    pub source_description: String,
588    pub source_kind: CatalogSourceKind,
589    pub generated_at: Option<DateTime<Utc>>,
590    pub totals: DoctorExtractionTotals,
591    pub object_counts: Vec<DoctorObjectCount>,
592    pub capability_warnings: Vec<CapabilityWarning>,
593    pub missing_permissions: Vec<MissingPermissionReport>,
594    /// Per-schema PL/Scope availability. Empty when the snapshot has no
595    /// PL/Scope detection wired.
596    pub plscope_availability_per_schema: Vec<PlScopeAvailabilityRow>,
597    /// Capability-bit copy for downstream consumers that don't want to read
598    /// the full `CatalogSnapshot` to learn whether a query family worked.
599    pub can_query_dba_views: bool,
600    pub can_query_all_views: bool,
601    pub can_use_dbms_metadata: bool,
602    pub can_read_source: bool,
603    pub plscope_enabled: bool,
604    pub can_query_scheduler: bool,
605    pub can_query_roles_and_grants: bool,
606}
607
608/// One row of the doctor report's per-schema PL/Scope availability summary.
609/// The `schema_name` is rendered through the snapshot's `SymbolInterner` so
610/// the report is stable across JSON snapshots and live extractions.
611#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
612pub struct PlScopeAvailabilityRow {
613    pub schema_name: String,
614    pub availability: PlScopeAvailability,
615}
616
617impl CatalogSnapshot {
618    /// Build the doctor report directly from this snapshot.
619    ///
620    /// The doctor is read-only — it never queries the DB itself; it
621    /// summarizes what was already extracted into the snapshot plus any
622    /// `CapabilityWarning`s the loader recorded. Missing-permission diagnoses
623    /// are inferred from per-family capability bits so the report is
624    /// equally useful for live-extracted and JSON-loaded snapshots.
625    #[must_use]
626    #[instrument(level = "trace", skip(self))]
627    pub fn doctor_report(&self) -> CatalogDoctorReport {
628        let mut counts: BTreeMap<ObjectType, DoctorObjectCount> = BTreeMap::new();
629        let mut columns_total = 0usize;
630        let mut indexes_total = 0usize;
631        let mut constraints_total = 0usize;
632        let mut triggers_total = 0usize;
633        let mut synonyms_total = 0usize;
634        let mut grants_total = 0usize;
635        let mut dependencies_total = 0usize;
636        let mut objects_total = 0usize;
637
638        for schema_catalog in self.schemas.values() {
639            for object in schema_catalog.objects.values() {
640                let common = catalog_object_common(object);
641                let tile = counts
642                    .entry(common.object_type)
643                    .or_insert(DoctorObjectCount {
644                        object_type: common.object_type,
645                        ..DoctorObjectCount::default()
646                    });
647                tile.total = tile.total.saturating_add(1);
648                match common.status {
649                    ObjectStatus::Valid => {
650                        tile.valid = tile.valid.saturating_add(1);
651                    }
652                    ObjectStatus::Invalid => {
653                        tile.invalid = tile.invalid.saturating_add(1);
654                    }
655                    ObjectStatus::NotApplicable => {
656                        tile.other = tile.other.saturating_add(1);
657                    }
658                }
659                objects_total = objects_total.saturating_add(1);
660
661                columns_total = columns_total.saturating_add(catalog_object_column_count(object));
662            }
663            indexes_total = indexes_total.saturating_add(schema_catalog.indexes.len());
664            constraints_total = constraints_total.saturating_add(schema_catalog.constraints.len());
665            triggers_total = triggers_total.saturating_add(schema_catalog.triggers.len());
666            synonyms_total = synonyms_total.saturating_add(schema_catalog.synonyms.len());
667            grants_total = grants_total.saturating_add(schema_catalog.grants.len());
668            dependencies_total =
669                dependencies_total.saturating_add(schema_catalog.dependencies.len());
670        }
671
672        let totals = DoctorExtractionTotals {
673            schemas_observed: self.schemas.len(),
674            objects_total,
675            columns_total,
676            indexes_total,
677            constraints_total,
678            triggers_total,
679            synonyms_total,
680            grants_total,
681            dependencies_total,
682        };
683
684        let mut object_counts: Vec<DoctorObjectCount> = counts.into_values().collect();
685        object_counts.sort_by_key(|tile| std::cmp::Reverse(tile.total));
686
687        let missing_permissions =
688            derive_missing_permission_reports(&self.capabilities, &self.source);
689
690        let mut plscope_availability_per_schema: Vec<PlScopeAvailabilityRow> = self
691            .schemas
692            .iter()
693            .filter_map(|(owner, schema_catalog)| {
694                let availability = schema_catalog.plscope.as_ref()?.availability;
695                let schema_name = self.interner.resolve(owner.symbol())?.to_string();
696                Some(PlScopeAvailabilityRow {
697                    schema_name,
698                    availability,
699                })
700            })
701            .collect();
702        plscope_availability_per_schema.sort_by(|a, b| a.schema_name.cmp(&b.schema_name));
703
704        CatalogDoctorReport {
705            source_description: self.source.description.clone().unwrap_or_default(),
706            source_kind: self.source.kind,
707            generated_at: Some(self.generated_at),
708            totals,
709            object_counts,
710            capability_warnings: self.capabilities.warnings.clone(),
711            missing_permissions,
712            plscope_availability_per_schema,
713            can_query_dba_views: self.capabilities.can_query_dba_views,
714            can_query_all_views: self.capabilities.can_query_all_views,
715            can_use_dbms_metadata: self.capabilities.can_use_dbms_metadata,
716            can_read_source: self.capabilities.can_read_source,
717            plscope_enabled: self.capabilities.plscope_enabled,
718            can_query_scheduler: self.capabilities.can_query_scheduler,
719            can_query_roles_and_grants: self.capabilities.can_query_roles_and_grants,
720        }
721    }
722}
723
724fn catalog_object_common(object: &CatalogObject) -> &ObjectCommon {
725    match object {
726        CatalogObject::Table(metadata) => &metadata.common,
727        CatalogObject::View(metadata) => &metadata.common,
728        CatalogObject::MaterializedView(metadata) => &metadata.common,
729        CatalogObject::Sequence(metadata) => &metadata.common,
730        CatalogObject::Type(metadata) => &metadata.common,
731        CatalogObject::Package(metadata) => &metadata.common,
732        CatalogObject::Procedure(metadata) => &metadata.common,
733        CatalogObject::Function(metadata) => &metadata.common,
734        CatalogObject::Trigger(metadata) => &metadata.common,
735        CatalogObject::SchedulerJob(metadata) => &metadata.common,
736        CatalogObject::EditioningView(metadata) => &metadata.common,
737    }
738}
739
740fn catalog_object_column_count(object: &CatalogObject) -> usize {
741    match object {
742        CatalogObject::Table(metadata) => metadata.columns.len(),
743        CatalogObject::View(metadata) => metadata.columns.len(),
744        CatalogObject::MaterializedView(metadata) => metadata.columns.len(),
745        CatalogObject::EditioningView(metadata) => metadata.columns.len(),
746        CatalogObject::Sequence(_)
747        | CatalogObject::Type(_)
748        | CatalogObject::Package(_)
749        | CatalogObject::Procedure(_)
750        | CatalogObject::Function(_)
751        | CatalogObject::Trigger(_)
752        | CatalogObject::SchedulerJob(_) => 0,
753    }
754}
755
756fn derive_missing_permission_reports(
757    capabilities: &CatalogCapabilities,
758    source: &CatalogSource,
759) -> Vec<MissingPermissionReport> {
760    // Missing-permission diagnoses only make sense for live extractions. A
761    // JSON snapshot was already produced once — its capability bits reflect
762    // the original extraction; we surface them verbatim instead of inventing
763    // new grant suggestions.
764    if !matches!(source.kind, CatalogSourceKind::LiveConnection) {
765        return Vec::new();
766    }
767
768    let mut reports = Vec::new();
769    if !capabilities.can_query_dba_views {
770        reports.push(MissingPermissionReport {
771            view_name: String::from("DBA_OBJECTS / DBA_TAB_COLUMNS / DBA_DEPENDENCIES"),
772            required_for: vec![
773                String::from("cross-schema extraction beyond ALL_*"),
774                String::from("PLSQL-CAT-014 dependency reachability over schemas"),
775            ],
776            suggested_grant: String::from(
777                "grant select_catalog_role to <user>; -- or individual grants on DBA_* views",
778            ),
779        });
780    }
781    if !capabilities.can_use_dbms_metadata {
782        reports.push(MissingPermissionReport {
783            view_name: String::from("DBMS_METADATA"),
784            required_for: vec![
785                String::from("PLSQL-CAT-015 DBMS_METADATA.GET_DDL extraction"),
786                String::from("normalized DDL hashes for `what-breaks`"),
787            ],
788            suggested_grant: String::from("grant execute on DBMS_METADATA to <user>;"),
789        });
790    }
791    if !capabilities.can_read_source {
792        reports.push(MissingPermissionReport {
793            view_name: String::from("ALL_SOURCE / DBA_SOURCE"),
794            required_for: vec![
795                String::from("packaged routine body inspection"),
796                String::from("get_object_source MCP tool"),
797            ],
798            suggested_grant: String::from(
799                "grant select on ALL_SOURCE to <user>; -- ALL_SOURCE itself is normally readable; ensure no DROP/REVOKE narrowed it",
800            ),
801        });
802    }
803    if !capabilities.plscope_enabled {
804        reports.push(MissingPermissionReport {
805            view_name: String::from("PLSCOPE_SETTINGS / ALL_IDENTIFIERS"),
806            required_for: vec![
807                String::from("PLSQL-CAT-010 PL/Scope availability detection"),
808                String::from("PLSQL-CAT-011 identifier extraction"),
809            ],
810            suggested_grant: String::from(
811                "alter session set plscope_settings = 'identifiers:all'; -- and recompile target objects",
812            ),
813        });
814    }
815    if !capabilities.can_query_scheduler {
816        reports.push(MissingPermissionReport {
817            view_name: String::from("ALL_SCHEDULER_JOBS / ALL_SCHEDULER_PROGRAMS"),
818            required_for: vec![String::from("scheduler job lineage edges")],
819            suggested_grant: String::from(
820                "grant select on ALL_SCHEDULER_JOBS to <user>; grant select on ALL_SCHEDULER_PROGRAMS to <user>;",
821            ),
822        });
823    }
824    if !capabilities.can_query_roles_and_grants {
825        reports.push(MissingPermissionReport {
826            view_name: String::from("DBA_ROLE_PRIVS / DBA_SYS_PRIVS / DBA_TAB_PRIVS"),
827            required_for: vec![
828                String::from("definer-rights privilege chain analysis"),
829                String::from("role-mediated execution evidence (PRIVILEGES-* beads)"),
830            ],
831            suggested_grant: String::from(
832                "grant select_catalog_role to <user>; -- enables DBA_*_PRIVS reads",
833            ),
834        });
835    }
836    reports
837}
838
839#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
840pub struct CatalogSnapshotDocument {
841    pub schema_id: String,
842    pub schema_version: SchemaVersion,
843    pub snapshot: CatalogSnapshot,
844}
845
846impl CatalogSnapshotDocument {
847    #[must_use]
848    #[instrument(level = "trace", skip(snapshot))]
849    pub fn new(snapshot: CatalogSnapshot) -> Self {
850        Self {
851            schema_id: String::from(CATALOG_SNAPSHOT_SCHEMA_ID),
852            schema_version: CATALOG_SNAPSHOT_SCHEMA_VERSION,
853            snapshot,
854        }
855    }
856}
857
858#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
859pub struct CatalogLoadRequest {
860    pub schema_filters: Vec<CatalogSchemaFilter>,
861}
862
863impl CatalogLoadRequest {
864    #[must_use]
865    #[instrument(level = "trace")]
866    pub fn for_current_schema() -> Self {
867        Self {
868            schema_filters: vec![CatalogSchemaFilter::CurrentSchema],
869        }
870    }
871
872    #[must_use]
873    #[instrument(level = "trace", skip(schema_names))]
874    pub fn for_named_schemas<I, S>(schema_names: I) -> Self
875    where
876        I: IntoIterator<Item = S>,
877        S: Into<String>,
878    {
879        Self {
880            schema_filters: schema_names
881                .into_iter()
882                .map(CatalogSchemaFilter::named)
883                .collect(),
884        }
885    }
886}
887
888impl Default for CatalogLoadRequest {
889    fn default() -> Self {
890        Self::for_current_schema()
891    }
892}
893
894#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
895pub enum CatalogSchemaFilter {
896    CurrentSchema,
897    Named(String),
898}
899
900impl CatalogSchemaFilter {
901    #[must_use]
902    #[instrument(level = "trace")]
903    pub fn current_schema() -> Self {
904        Self::CurrentSchema
905    }
906
907    #[must_use]
908    #[instrument(level = "trace", skip(schema_name))]
909    pub fn named(schema_name: impl Into<String>) -> Self {
910        Self::Named(schema_name.into())
911    }
912}
913
914#[derive(Debug, Error)]
915pub enum CatalogError {
916    #[error("i/o error: {0}")]
917    Io(#[from] std::io::Error),
918    #[error("json error: {0}")]
919    Json(#[from] serde_json::Error),
920    #[error("oracle backend `{backend}` is unavailable in this build; use `{feature}`")]
921    OracleBackendNotCompiled {
922        backend: OracleBackend,
923        feature: &'static str,
924    },
925    #[error("oracle backend `{backend}` error: {message}")]
926    OracleBackendError {
927        backend: OracleBackend,
928        message: String,
929    },
930    #[error("expected {expected} row(s) but received {actual}")]
931    UnexpectedRowCount { expected: String, actual: usize },
932    #[error("required column `{column}` was missing from the query result")]
933    MissingColumn { column: String },
934    #[error("column `{column}` was null")]
935    NullColumnValue { column: String },
936    #[error("column `{column}` could not be parsed as {expected}: `{value}`")]
937    InvalidColumnValue {
938        column: String,
939        expected: &'static str,
940        value: String,
941    },
942    #[error("unsupported catalog snapshot schema {found} for {schema_id}; expected {expected}")]
943    UnsupportedSchemaVersion {
944        schema_id: String,
945        found: SchemaVersion,
946        expected: SchemaVersion,
947    },
948    #[error("unexpected catalog snapshot schema id `{0}`")]
949    UnexpectedSchemaId(String),
950    #[error("catalog load request could not resolve the current schema from the Oracle connection")]
951    CurrentSchemaUnavailable,
952    #[error("schema filter `{schema_name}` is invalid: schema names must not be blank")]
953    InvalidSchemaFilter { schema_name: String },
954}
955
956#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
957pub enum OracleBackend {
958    RustOracle,
959    OracleRs,
960}
961
962impl OracleBackend {
963    #[must_use]
964    #[instrument(level = "trace", skip(self))]
965    pub fn as_str(self) -> &'static str {
966        match self {
967            Self::RustOracle => "oracle",
968            Self::OracleRs => "oracle-rs",
969        }
970    }
971}
972
973impl std::fmt::Display for OracleBackend {
974    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
975        f.write_str(self.as_str())
976    }
977}
978
979#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
980pub struct OracleConnectOptions {
981    pub username: String,
982    pub password: String,
983    pub connect_string: String,
984    pub current_schema: Option<String>,
985    pub module: Option<String>,
986    pub action: Option<String>,
987    pub client_info: Option<String>,
988    pub client_identifier: Option<String>,
989}
990
991impl OracleConnectOptions {
992    #[must_use]
993    pub fn new(
994        username: impl Into<String>,
995        password: impl Into<String>,
996        connect_string: impl Into<String>,
997    ) -> Self {
998        Self {
999            username: username.into(),
1000            password: password.into(),
1001            connect_string: connect_string.into(),
1002            current_schema: None,
1003            module: None,
1004            action: None,
1005            client_info: None,
1006            client_identifier: None,
1007        }
1008    }
1009
1010    #[must_use]
1011    pub fn with_current_schema(mut self, current_schema: impl Into<String>) -> Self {
1012        self.current_schema = Some(current_schema.into());
1013        self
1014    }
1015
1016    #[must_use]
1017    pub fn with_module(mut self, module: impl Into<String>) -> Self {
1018        self.module = Some(module.into());
1019        self
1020    }
1021
1022    #[must_use]
1023    pub fn with_action(mut self, action: impl Into<String>) -> Self {
1024        self.action = Some(action.into());
1025        self
1026    }
1027
1028    #[must_use]
1029    pub fn with_client_info(mut self, client_info: impl Into<String>) -> Self {
1030        self.client_info = Some(client_info.into());
1031        self
1032    }
1033
1034    #[must_use]
1035    pub fn with_client_identifier(mut self, client_identifier: impl Into<String>) -> Self {
1036        self.client_identifier = Some(client_identifier.into());
1037        self
1038    }
1039}
1040
1041#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1042pub enum OracleBind {
1043    String(String),
1044    I64(i64),
1045    U64(u64),
1046    Bool(bool),
1047}
1048
1049impl From<&str> for OracleBind {
1050    fn from(value: &str) -> Self {
1051        Self::String(String::from(value))
1052    }
1053}
1054
1055impl From<String> for OracleBind {
1056    fn from(value: String) -> Self {
1057        Self::String(value)
1058    }
1059}
1060
1061impl From<i32> for OracleBind {
1062    fn from(value: i32) -> Self {
1063        Self::I64(i64::from(value))
1064    }
1065}
1066
1067impl From<i64> for OracleBind {
1068    fn from(value: i64) -> Self {
1069        Self::I64(value)
1070    }
1071}
1072
1073impl From<u32> for OracleBind {
1074    fn from(value: u32) -> Self {
1075        Self::U64(u64::from(value))
1076    }
1077}
1078
1079impl From<u64> for OracleBind {
1080    fn from(value: u64) -> Self {
1081        Self::U64(value)
1082    }
1083}
1084
1085impl From<bool> for OracleBind {
1086    fn from(value: bool) -> Self {
1087        Self::Bool(value)
1088    }
1089}
1090
1091#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1092pub struct OracleCell {
1093    pub oracle_type: String,
1094    pub value: Option<String>,
1095}
1096
1097impl OracleCell {
1098    #[must_use]
1099    #[instrument(level = "trace", skip(oracle_type, value))]
1100    pub fn new(oracle_type: impl Into<String>, value: Option<String>) -> Self {
1101        Self {
1102            oracle_type: oracle_type.into(),
1103            value,
1104        }
1105    }
1106}
1107
1108#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1109pub struct OracleRow {
1110    pub columns: BTreeMap<String, OracleCell>,
1111}
1112
1113impl OracleRow {
1114    pub fn insert(
1115        &mut self,
1116        name: impl Into<String>,
1117        oracle_type: impl Into<String>,
1118        value: Option<String>,
1119    ) {
1120        self.columns.insert(
1121            name.into().to_ascii_uppercase(),
1122            OracleCell::new(oracle_type, value),
1123        );
1124    }
1125
1126    #[must_use]
1127    #[instrument(level = "trace", skip(self))]
1128    pub fn cell(&self, name: &str) -> Option<&OracleCell> {
1129        self.columns.get(&name.to_ascii_uppercase())
1130    }
1131
1132    #[must_use]
1133    #[instrument(level = "trace", skip(self))]
1134    pub fn text(&self, name: &str) -> Option<&str> {
1135        self.cell(name).and_then(|cell| cell.value.as_deref())
1136    }
1137
1138    #[instrument(level = "trace", skip(self))]
1139    pub fn require_text(&self, name: &str) -> Result<&str, CatalogError> {
1140        let Some(cell) = self.cell(name) else {
1141            return Err(CatalogError::MissingColumn {
1142                column: name.to_ascii_uppercase(),
1143            });
1144        };
1145        cell.value
1146            .as_deref()
1147            .ok_or_else(|| CatalogError::NullColumnValue {
1148                column: name.to_ascii_uppercase(),
1149            })
1150    }
1151
1152    #[instrument(level = "trace", skip(self))]
1153    pub fn parse_i64(&self, name: &str) -> Result<i64, CatalogError> {
1154        let text = self.require_text(name)?;
1155        text.parse::<i64>()
1156            .map_err(|_| CatalogError::InvalidColumnValue {
1157                column: name.to_ascii_uppercase(),
1158                expected: "i64",
1159                value: String::from(text),
1160            })
1161    }
1162
1163    #[instrument(level = "trace", skip(self))]
1164    pub fn parse_u64(&self, name: &str) -> Result<u64, CatalogError> {
1165        let text = self.require_text(name)?;
1166        text.parse::<u64>()
1167            .map_err(|_| CatalogError::InvalidColumnValue {
1168                column: name.to_ascii_uppercase(),
1169                expected: "u64",
1170                value: String::from(text),
1171            })
1172    }
1173
1174    #[instrument(level = "trace", skip(self))]
1175    pub fn parse_bool(&self, name: &str) -> Result<bool, CatalogError> {
1176        let text = self.require_text(name)?;
1177        let normalized = text.trim().to_ascii_uppercase();
1178        match normalized.as_str() {
1179            "Y" | "YES" | "TRUE" | "1" => Ok(true),
1180            "N" | "NO" | "FALSE" | "0" => Ok(false),
1181            _ => Err(CatalogError::InvalidColumnValue {
1182                column: name.to_ascii_uppercase(),
1183                expected: "bool",
1184                value: String::from(text),
1185            }),
1186        }
1187    }
1188}
1189
1190#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1191pub struct OracleConnectionInfo {
1192    pub backend: OracleBackend,
1193    pub connect_string: String,
1194    pub current_schema: Option<String>,
1195    pub server_version: String,
1196    pub db_name: String,
1197    pub db_domain: String,
1198    pub service_name: String,
1199    pub instance_name: String,
1200    pub server_type: String,
1201    pub max_identifier_length: u32,
1202    pub max_open_cursors: u32,
1203}
1204
1205#[instrument(level = "trace")]
1206pub fn load_snapshot_from_json(path: &std::path::Path) -> Result<CatalogSnapshot, CatalogError> {
1207    let raw = fs::read_to_string(path)?;
1208    let document: CatalogSnapshotDocument = serde_json::from_str(&raw)?;
1209
1210    if !document.schema_id.as_str().eq(CATALOG_SNAPSHOT_SCHEMA_ID) {
1211        return Err(CatalogError::UnexpectedSchemaId(document.schema_id));
1212    }
1213
1214    if !matches!(
1215        document
1216            .schema_version
1217            .cmp(&CATALOG_SNAPSHOT_SCHEMA_VERSION),
1218        std::cmp::Ordering::Equal
1219    ) {
1220        return Err(CatalogError::UnsupportedSchemaVersion {
1221            schema_id: String::from(CATALOG_SNAPSHOT_SCHEMA_ID),
1222            found: document.schema_version,
1223            expected: CATALOG_SNAPSHOT_SCHEMA_VERSION,
1224        });
1225    }
1226
1227    Ok(document.snapshot)
1228}
1229
1230#[instrument(level = "trace", skip(snapshot))]
1231pub fn export_snapshot_to_json(
1232    snapshot: &CatalogSnapshot,
1233    path: &std::path::Path,
1234) -> Result<(), CatalogError> {
1235    let document = CatalogSnapshotDocument::new(snapshot.clone());
1236    let rendered = serde_json::to_string_pretty(&document)?;
1237    fs::write(path, rendered)?;
1238    Ok(())
1239}
1240
1241/// Load a catalog snapshot from a directory of DBMS_METADATA-exported .sql files.
1242///
1243/// Load a `CatalogSnapshot` by classifying every `.sql` file under `dir` as a
1244/// single top-level CREATE DDL statement (the shape `DBMS_METADATA.GET_DDL`
1245/// emits when written per-object to disk).
1246///
1247/// For each file:
1248///
1249/// * The object kind is read from the leading `CREATE …` keyword
1250///   (`TABLE` / `VIEW` / `PACKAGE` / `PROCEDURE` / `FUNCTION` /
1251///   `SEQUENCE` / `TRIGGER` / `TYPE`); statements whose keyword does
1252///   not match a known kind are skipped (graceful degradation per
1253///   R13).
1254/// * The owner schema is read from the optional `OWNER.OBJECT` prefix
1255///   on the CREATE target. Unqualified statements (no `OWNER.`
1256///   prefix) are filed under a stable `PUBLIC` schema interned through
1257///   the regular interner — never `SymbolId::new(0)`, which would
1258///   collide with whatever the first object name happens to be.
1259/// * The raw file bytes are stored verbatim on
1260///   [`ObjectCommon::ddl`] as a [`DbmsMetadataDdl`] so downstream
1261///   consumers (doc generation, lineage, the doctor's
1262///   ddl-extraction ratio) can inspect the exact source the catalog
1263///   was derived from.
1264///
1265/// This classifier is keyword-shaped and does not parse arbitrary
1266/// PL/SQL bodies — column definitions, parameter signatures, view
1267/// projections and constraint details are *not* populated. When the
1268/// full parser (Layer 1) lands, callers that need column- or
1269/// signature-level fidelity should switch to that path; the
1270/// `DbmsMetadataDdl` stored here is sufficient seed for re-parsing
1271/// on demand without re-reading the disk.
1272#[instrument(level = "info", skip_all, fields(dir = %dir.display()))]
1273pub fn load_from_dbms_metadata_dir(dir: &std::path::Path) -> Result<CatalogSnapshot, CatalogError> {
1274    if !dir.is_dir() {
1275        return Err(CatalogError::Io(std::io::Error::new(
1276            std::io::ErrorKind::NotFound,
1277            format!("not a directory: {}", dir.display()),
1278        )));
1279    }
1280
1281    let mut interner = SymbolInterner::default();
1282    let mut schemas: HashMap<SchemaName, SchemaCatalog> = HashMap::new();
1283    let mut file_count = 0usize;
1284    let mut classified_count = 0usize;
1285
1286    // Collect + sort entries so the resulting snapshot (and its
1287    // interner symbol ids) are deterministic across runs and
1288    // platforms — `read_dir` ordering is unspecified.
1289    let mut paths: Vec<std::path::PathBuf> = fs::read_dir(dir)?
1290        .filter_map(|e| e.ok().map(|e| e.path()))
1291        .filter(|p| {
1292            p.extension()
1293                .and_then(|e| e.to_str())
1294                .is_some_and(|ext| ext.eq("sql"))
1295        })
1296        .collect();
1297    paths.sort();
1298
1299    for path in paths {
1300        file_count += 1;
1301        let ddl_text = match fs::read_to_string(&path) {
1302            Ok(text) => text,
1303            Err(_) => continue,
1304        };
1305
1306        if let Some((schema, obj_name, obj)) = classify_dbms_metadata_ddl(&ddl_text, &mut interner)
1307        {
1308            let schema_catalog = schemas.entry(schema).or_default();
1309            schema_catalog.objects.insert(obj_name, obj);
1310            classified_count += 1;
1311        }
1312    }
1313
1314    tracing::info!(
1315        files = file_count,
1316        classified = classified_count,
1317        "loaded DBMS_METADATA directory"
1318    );
1319
1320    Ok(CatalogSnapshot {
1321        schemas,
1322        profile: AnalysisProfile::default(),
1323        capabilities: CatalogCapabilities {
1324            can_query_all_views: false,
1325            can_query_dba_views: false,
1326            can_use_dbms_metadata: true,
1327            can_read_source: true,
1328            plscope_enabled: false,
1329            can_query_scheduler: false,
1330            can_query_roles_and_grants: false,
1331            warnings: vec![],
1332        },
1333        generated_at: Utc::now(),
1334        source: CatalogSource {
1335            kind: CatalogSourceKind::DbmsMetadataFiles,
1336            description: Some(format!("loaded from {}", dir.display())),
1337            ..CatalogSource::default()
1338        },
1339        interner,
1340        editions: Vec::new(),
1341        // DBMS_METADATA directory loads do not query ALL_USERS; grantee
1342        // classification is not exercised on this path.
1343        known_users: None,
1344    })
1345}
1346
1347/// Default schema name used when a CREATE statement has no `OWNER.`
1348/// prefix. Interned through the regular interner so the resulting
1349/// `SchemaName` has a real, resolvable text — never a collision with
1350/// `SymbolId::new(0)`.
1351const UNQUALIFIED_DDL_SCHEMA: &str = "PUBLIC";
1352
1353/// Classify a single per-file DDL statement into a `CatalogObject`.
1354///
1355/// Returns `None` for whitespace-only / comment-only files and for
1356/// CREATE statements whose object kind keyword is not in the
1357/// known set. The DDL bytes are preserved verbatim on
1358/// [`ObjectCommon::ddl`] so downstream code can re-parse them as
1359/// fidelity improves.
1360fn classify_dbms_metadata_ddl(
1361    ddl_text: &str,
1362    interner: &mut SymbolInterner,
1363) -> Option<(SchemaName, ObjectName, CatalogObject)> {
1364    // Parse the DDL HEADER as a real token stream — never substring-match
1365    // the whole DDL. Body / comment text that mentions `TABLE` etc. used
1366    // to silently re-classify VIEWs and PROCEDUREs as tables.
1367    let header = parse_create_header(ddl_text)?;
1368
1369    // `PACKAGE BODY` / `TYPE BODY` are bodies — the spec's catalog row
1370    // is the source of truth. Honest uncertainty: return None.
1371    if matches!(
1372        header.kind,
1373        DdlKind::PackageBody | DdlKind::TypeBody | DdlKind::Unknown
1374    ) {
1375        return None;
1376    }
1377
1378    let (owner_text, object_text) = extract_owner_and_name(&header.after_kind)?;
1379
1380    let owner_text = owner_text.unwrap_or_else(|| UNQUALIFIED_DDL_SCHEMA.to_string());
1381    let owner = interner.intern_schema_name(owner_text)?;
1382    let name_sid = interner.intern(&object_text)?;
1383    let obj_name = ObjectName::new(name_sid);
1384
1385    let ddl = DbmsMetadataDdl {
1386        ddl_text: ddl_text.to_string(),
1387        normalized_ddl: Some(normalize_dbms_metadata_ddl(ddl_text)),
1388        xml_text: None,
1389    };
1390
1391    let common = ObjectCommon {
1392        owner,
1393        name: obj_name,
1394        object_type: header.kind.object_type(),
1395        ddl: Some(ddl),
1396        ..ObjectCommon::default()
1397    };
1398
1399    let object = match header.kind {
1400        DdlKind::Table => CatalogObject::Table(TableMetadata {
1401            common,
1402            ..TableMetadata::default()
1403        }),
1404        DdlKind::View => CatalogObject::View(ViewMetadata {
1405            common,
1406            ..ViewMetadata::default()
1407        }),
1408        DdlKind::MaterializedView => CatalogObject::MaterializedView(MViewMetadata {
1409            common,
1410            ..MViewMetadata::default()
1411        }),
1412        DdlKind::Package => CatalogObject::Package(PackageMetadata {
1413            common,
1414            ..PackageMetadata::default()
1415        }),
1416        DdlKind::Procedure => CatalogObject::Procedure(ProcedureMetadata {
1417            common,
1418            signature: RoutineSignature {
1419                routine_name: obj_name,
1420                ..RoutineSignature::default()
1421            },
1422        }),
1423        DdlKind::Function => CatalogObject::Function(FunctionMetadata {
1424            common,
1425            signature: RoutineSignature {
1426                routine_name: obj_name,
1427                ..RoutineSignature::default()
1428            },
1429            ..FunctionMetadata::default()
1430        }),
1431        DdlKind::Sequence => CatalogObject::Sequence(SequenceMetadata {
1432            common,
1433            ..SequenceMetadata::default()
1434        }),
1435        DdlKind::Trigger => CatalogObject::Trigger(TriggerMetadata {
1436            common,
1437            ..TriggerMetadata::default()
1438        }),
1439        DdlKind::Type => CatalogObject::Type(TypeMetadata {
1440            common,
1441            ..TypeMetadata::default()
1442        }),
1443        // Filtered above — the match is exhaustive only because we
1444        // handle every concrete kind.
1445        DdlKind::PackageBody | DdlKind::TypeBody | DdlKind::Unknown => return None,
1446    };
1447
1448    Some((owner, obj_name, object))
1449}
1450
1451/// Object kinds the per-file DDL classifier recognizes. `Unknown`
1452/// represents honest uncertainty (R13) — the header didn't tokenize
1453/// into a kind we model. `PackageBody` / `TypeBody` are recognized
1454/// separately so the classifier can skip them without confusing them
1455/// with their specs.
1456#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1457enum DdlKind {
1458    Table,
1459    View,
1460    MaterializedView,
1461    Package,
1462    PackageBody,
1463    Procedure,
1464    Function,
1465    Sequence,
1466    Trigger,
1467    Type,
1468    TypeBody,
1469    Unknown,
1470}
1471
1472impl DdlKind {
1473    fn object_type(self) -> ObjectType {
1474        match self {
1475            DdlKind::Table => ObjectType::Table,
1476            DdlKind::View => ObjectType::View,
1477            DdlKind::MaterializedView => ObjectType::MaterializedView,
1478            DdlKind::Package | DdlKind::PackageBody => ObjectType::Package,
1479            DdlKind::Procedure => ObjectType::Procedure,
1480            DdlKind::Function => ObjectType::Function,
1481            DdlKind::Sequence => ObjectType::Sequence,
1482            DdlKind::Trigger => ObjectType::Trigger,
1483            DdlKind::Type | DdlKind::TypeBody => ObjectType::Type,
1484            DdlKind::Unknown => ObjectType::Unknown,
1485        }
1486    }
1487}
1488
1489/// Parsed CREATE header: the typed `DdlKind` plus the upper-cased
1490/// remainder of the DDL starting immediately after the kind tokens.
1491/// Callers use `after_kind` to locate the `[OWNER.]NAME` — it has
1492/// already been stripped of leading comments / whitespace / `CREATE`
1493/// modifiers / kind tokens so a substring match in there cannot be
1494/// fooled by body content.
1495#[derive(Clone, Debug)]
1496struct ParsedCreateHeader {
1497    kind: DdlKind,
1498    after_kind: String,
1499}
1500
1501/// Parse the CREATE header of a raw DDL string.
1502///
1503/// Skips leading whitespace, `--` line comments, and `/* … */` block
1504/// comments. Consumes `CREATE` then optional `OR REPLACE`, optional
1505/// `FORCE` / `EDITIONABLE` / `NONEDITIONABLE` (in any order), then
1506/// reads one or two tokens to form a [`DdlKind`] (multi-word kinds
1507/// `MATERIALIZED VIEW`, `PACKAGE BODY`, `TYPE BODY` handled). Returns
1508/// `None` only when the input has no `CREATE` token at all; an
1509/// unrecognized kind word produces `DdlKind::Unknown` so callers can
1510/// represent honest uncertainty (R13).
1511fn parse_create_header(ddl: &str) -> Option<ParsedCreateHeader> {
1512    let mut cursor = Cursor::new(ddl);
1513    cursor.skip_ws_and_comments();
1514
1515    // Must start with `CREATE`.
1516    if !cursor.consume_keyword("CREATE") {
1517        return None;
1518    }
1519    cursor.skip_ws_and_comments();
1520
1521    // Optional `OR REPLACE`.
1522    if cursor.consume_keyword("OR") {
1523        cursor.skip_ws_and_comments();
1524        // `OR` without `REPLACE` is malformed; let it fall through to
1525        // kind parsing — the kind word won't match and we'll honestly
1526        // return `Unknown`.
1527        let _ = cursor.consume_keyword("REPLACE");
1528        cursor.skip_ws_and_comments();
1529    }
1530
1531    // Optional `FORCE` / `EDITIONABLE` / `NONEDITIONABLE` modifiers,
1532    // any order, any subset.
1533    loop {
1534        if cursor.consume_keyword("FORCE")
1535            || cursor.consume_keyword("NONEDITIONABLE")
1536            || cursor.consume_keyword("EDITIONABLE")
1537            || cursor.consume_keyword("NO")
1538        {
1539            cursor.skip_ws_and_comments();
1540            continue;
1541        }
1542        break;
1543    }
1544
1545    // Read the kind word (one token, possibly extended to two for
1546    // `MATERIALIZED VIEW` / `PACKAGE BODY` / `TYPE BODY`).
1547    let first = match cursor.consume_identifier() {
1548        Some(tok) => tok,
1549        None => {
1550            return Some(ParsedCreateHeader {
1551                kind: DdlKind::Unknown,
1552                after_kind: cursor.upper_remainder(),
1553            });
1554        }
1555    };
1556    cursor.skip_ws_and_comments();
1557
1558    // Speculatively look at the second token without committing — only
1559    // commit if it forms a known two-word kind.
1560    let kind = match first.as_str() {
1561        "MATERIALIZED" => {
1562            if cursor.peek_keyword("VIEW") {
1563                cursor.consume_keyword("VIEW");
1564                cursor.skip_ws_and_comments();
1565                DdlKind::MaterializedView
1566            } else {
1567                DdlKind::Unknown
1568            }
1569        }
1570        "PACKAGE" => {
1571            if cursor.peek_keyword("BODY") {
1572                cursor.consume_keyword("BODY");
1573                cursor.skip_ws_and_comments();
1574                DdlKind::PackageBody
1575            } else {
1576                DdlKind::Package
1577            }
1578        }
1579        "TYPE" => {
1580            if cursor.peek_keyword("BODY") {
1581                cursor.consume_keyword("BODY");
1582                cursor.skip_ws_and_comments();
1583                DdlKind::TypeBody
1584            } else {
1585                DdlKind::Type
1586            }
1587        }
1588        "TABLE" => DdlKind::Table,
1589        "VIEW" => DdlKind::View,
1590        "PROCEDURE" => DdlKind::Procedure,
1591        "FUNCTION" => DdlKind::Function,
1592        "SEQUENCE" => DdlKind::Sequence,
1593        "TRIGGER" => DdlKind::Trigger,
1594        _ => DdlKind::Unknown,
1595    };
1596
1597    Some(ParsedCreateHeader {
1598        kind,
1599        after_kind: cursor.upper_remainder(),
1600    })
1601}
1602
1603/// Hand-rolled byte cursor for the CREATE header tokenizer.
1604///
1605/// Only knows enough about SQL to skip whitespace / `--` line
1606/// comments / `/* … */` block comments and to read alphabetic
1607/// identifier keywords case-insensitively. It deliberately does
1608/// **not** try to parse the whole DDL — every operation past the
1609/// kind word is delegated to [`extract_owner_and_name`] working on
1610/// the upper-cased remainder.
1611struct Cursor<'a> {
1612    bytes: &'a [u8],
1613    pos: usize,
1614}
1615
1616impl<'a> Cursor<'a> {
1617    fn new(text: &'a str) -> Self {
1618        Self {
1619            bytes: text.as_bytes(),
1620            pos: 0,
1621        }
1622    }
1623
1624    fn skip_ws_and_comments(&mut self) {
1625        loop {
1626            // Skip ASCII whitespace.
1627            while self.pos < self.bytes.len() && self.bytes[self.pos].is_ascii_whitespace() {
1628                self.pos += 1;
1629            }
1630            // `--` line comment.
1631            if self.pos + 1 < self.bytes.len()
1632                && self.bytes[self.pos].eq(&b'-')
1633                && self.bytes[self.pos + 1].eq(&b'-')
1634            {
1635                self.pos += 2;
1636                while self.pos < self.bytes.len() && self.bytes[self.pos].ne(&b'\n') {
1637                    self.pos += 1;
1638                }
1639                continue;
1640            }
1641            // `/* … */` block comment.
1642            if self.pos + 1 < self.bytes.len()
1643                && self.bytes[self.pos].eq(&b'/')
1644                && self.bytes[self.pos + 1].eq(&b'*')
1645            {
1646                self.pos += 2;
1647                while self.pos + 1 < self.bytes.len()
1648                    && !(self.bytes[self.pos].eq(&b'*') && self.bytes[self.pos + 1].eq(&b'/'))
1649                {
1650                    self.pos += 1;
1651                }
1652                if self.pos + 1 < self.bytes.len() {
1653                    self.pos += 2; // consume the closing `*/`
1654                } else {
1655                    self.pos = self.bytes.len(); // unterminated — end-of-input
1656                }
1657                continue;
1658            }
1659            break;
1660        }
1661    }
1662
1663    /// Returns true if the next identifier token matches `kw`
1664    /// case-insensitively (and is followed by a non-identifier
1665    /// character or end-of-input). Does not advance the cursor.
1666    fn peek_keyword(&self, kw: &str) -> bool {
1667        let end = self.pos + kw.len();
1668        if end > self.bytes.len() {
1669            return false;
1670        }
1671        if !self.bytes[self.pos..end].eq_ignore_ascii_case(kw.as_bytes()) {
1672            return false;
1673        }
1674        // Word boundary check — `CREATEDOC` must not match `CREATE`.
1675        if end < self.bytes.len() {
1676            let next = self.bytes[end];
1677            if next.eq(&b'_') || next.is_ascii_alphanumeric() {
1678                return false;
1679            }
1680        }
1681        true
1682    }
1683
1684    fn consume_keyword(&mut self, kw: &str) -> bool {
1685        if self.peek_keyword(kw) {
1686            self.pos += kw.len();
1687            true
1688        } else {
1689            false
1690        }
1691    }
1692
1693    /// Consume the next bare ASCII identifier (letters / digits /
1694    /// underscore, must start with a letter) and return it
1695    /// upper-cased. Returns `None` if the cursor is not on an
1696    /// identifier start character — e.g. a quoted identifier or a
1697    /// punctuation token. Quoted identifiers in the header position
1698    /// (the kind word) are not legal Oracle DDL so we don't bother.
1699    fn consume_identifier(&mut self) -> Option<String> {
1700        if self.pos >= self.bytes.len() {
1701            return None;
1702        }
1703        let first = self.bytes[self.pos];
1704        if !first.is_ascii_alphabetic() {
1705            return None;
1706        }
1707        let start = self.pos;
1708        while self.pos < self.bytes.len() {
1709            let b = self.bytes[self.pos];
1710            if b.is_ascii_alphanumeric() || b.eq(&b'_') {
1711                self.pos += 1;
1712            } else {
1713                break;
1714            }
1715        }
1716        let raw = std::str::from_utf8(&self.bytes[start..self.pos]).ok()?;
1717        Some(raw.to_ascii_uppercase())
1718    }
1719
1720    /// Return the rest of the input from the current cursor position,
1721    /// upper-cased. Used to hand off to [`extract_owner_and_name`].
1722    fn upper_remainder(&self) -> String {
1723        std::str::from_utf8(&self.bytes[self.pos..])
1724            .unwrap_or("")
1725            .to_ascii_uppercase()
1726    }
1727}
1728
1729/// Extract the optional `OWNER` and the bare `OBJECT` name from the
1730/// upper-cased remainder that follows the parsed `CREATE <KIND>`
1731/// header. Strips surrounding quotes (so `CREATE TABLE "HR"."EMP"`
1732/// works) and trailing punctuation / parenthesis that the column
1733/// list would attach. Operates on the post-header slice only — never
1734/// on the body — so it can't be fooled by `TABLE` appearing later.
1735fn extract_owner_and_name(after_kind: &str) -> Option<(Option<String>, String)> {
1736    let after = after_kind.trim_start();
1737
1738    // Scan the `[OWNER.]NAME` token honouring double-quoted Oracle
1739    // identifiers. A `"..."` segment is a single token that may contain
1740    // whitespace and runs to its closing `"`; an unquoted segment stops
1741    // at whitespace, `(`, `;`, or other DDL punctuation. The owner/name
1742    // split is the first top-level (outside-quotes) `.`.
1743    let mut segments: Vec<Segment> = Vec::new();
1744    let bytes = after.as_bytes();
1745    let mut i = 0usize;
1746    'scan: while i < bytes.len() {
1747        if bytes[i].eq(&b'"') {
1748            // Quoted segment: consume up to (and including) the closing `"`.
1749            let content_start = i + 1;
1750            let mut j = content_start;
1751            while j < bytes.len() && bytes[j].ne(&b'"') {
1752                j += 1;
1753            }
1754            // Unterminated quote ⇒ malformed header; give up.
1755            if j >= bytes.len() {
1756                return None;
1757            }
1758            segments.push(Segment {
1759                text: after[content_start..j].to_string(),
1760                quoted: true,
1761            });
1762            i = j + 1; // skip closing quote
1763        } else {
1764            // Unquoted run: identifier chars only. Anything else (space,
1765            // `(`, `;`, `,`, …) terminates the `[OWNER.]NAME` token —
1766            // except a top-level `.` which separates owner from name.
1767            let start = i;
1768            while i < bytes.len() {
1769                let c = bytes[i] as char;
1770                if c.is_ascii_alphanumeric() || c.eq(&'_') {
1771                    i += 1;
1772                } else {
1773                    break;
1774                }
1775            }
1776            // An empty unquoted run means we hit a non-identifier byte
1777            // that is not a segment separator: stop scanning the token.
1778            if i.eq(&start) {
1779                break 'scan;
1780            }
1781            segments.push(Segment {
1782                text: after[start..i].to_string(),
1783                quoted: false,
1784            });
1785        }
1786
1787        // After a segment, a `.` continues into the next (NAME) segment;
1788        // anything else ends the `[OWNER.]NAME` token.
1789        if i < bytes.len() && bytes[i].eq(&b'.') {
1790            i += 1;
1791        } else {
1792            break 'scan;
1793        }
1794    }
1795
1796    // Validate each segment: quoted segments accept any non-empty
1797    // content; unquoted segments must be a real identifier.
1798    let valid = |seg: &Segment| -> bool {
1799        if seg.text.is_empty() {
1800            return false;
1801        }
1802        seg.quoted || seg.text.chars().all(|c| c.is_alphanumeric() || c.eq(&'_'))
1803    };
1804
1805    match segments.as_slice() {
1806        [name] if valid(name) => Some((None, name.text.clone())),
1807        [owner, name] if valid(owner) && valid(name) => {
1808            Some((Some(owner.text.clone()), name.text.clone()))
1809        }
1810        _ => None,
1811    }
1812}
1813
1814/// One dot-delimited segment of a `[OWNER.]NAME` token, tracking whether
1815/// it originated from a double-quoted Oracle identifier (which may hold
1816/// whitespace and bypasses the unquoted identifier-char validity rule).
1817struct Segment {
1818    text: String,
1819    quoted: bool,
1820}
1821
1822#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1823enum RoutineKind {
1824    Procedure,
1825    Function,
1826}
1827
1828#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1829struct RoutineLocator {
1830    owner: SchemaName,
1831    package_name: Option<ObjectName>,
1832    routine_name: ObjectName,
1833    subprogram_id: Option<u32>,
1834    overload: Option<u32>,
1835}
1836
1837#[derive(Clone, Debug, Default)]
1838struct RoutineAccumulator {
1839    signature: Option<RoutineSignature>,
1840    kind_hint: Option<RoutineKind>,
1841    deterministic: bool,
1842    pipelined: bool,
1843}
1844
1845#[derive(Clone, Debug, Default)]
1846struct PlScopeTally {
1847    total: usize,
1848    with_identifiers: usize,
1849    with_statements: usize,
1850}
1851
1852#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1853pub struct SchemaCatalog {
1854    pub objects: HashMap<ObjectName, CatalogObject>,
1855    pub synonyms: HashMap<SynonymName, SynonymTarget>,
1856    pub grants: Vec<Grant>,
1857    pub indexes: HashMap<IndexName, IndexMetadata>,
1858    pub constraints: HashMap<ConstraintName, ConstraintMetadata>,
1859    pub triggers: HashMap<TriggerName, TriggerMetadata>,
1860    pub dependencies: Vec<CatalogDependency>,
1861    pub plscope: Option<PlScopeSnapshot>,
1862    /// Database links owned by this schema. Public links live in the
1863    /// synthetic `PUBLIC` schema. Sourced from `ALL_DB_LINKS`.
1864    #[serde(default)]
1865    pub db_links: Vec<DatabaseLink>,
1866    /// Per-object COMMENT ON TABLE / VIEW text. Sourced from
1867    /// `ALL_TAB_COMMENTS`.
1868    #[serde(default)]
1869    pub table_comments: Vec<TableComment>,
1870    /// Per-column COMMENT ON COLUMN text. Sourced from
1871    /// `ALL_COL_COMMENTS`.
1872    #[serde(default)]
1873    pub column_comments: Vec<ColumnComment>,
1874    /// Editioning views owned by this schema (the views that mask the
1875    /// underlying base table in an EBR shop). Sourced from
1876    /// `ALL_EDITIONING_VIEWS`.
1877    #[serde(default)]
1878    pub editioning_views: Vec<EditioningView>,
1879    /// VPD/RLS policies attached to objects in this schema. Sourced
1880    /// from `ALL_POLICIES`.
1881    #[serde(default)]
1882    pub vpd_policies: Vec<VpdPolicy>,
1883}
1884
1885#[cfg(feature = "oraclemcp-db")]
1886fn resolve_schema_filters(
1887    connection_info: &OracleConnectionInfo,
1888    request: &CatalogLoadRequest,
1889) -> Result<Vec<String>, CatalogError> {
1890    let mut resolved = Vec::<String>::new();
1891
1892    for filter in &request.schema_filters {
1893        let schema_name = match filter {
1894            CatalogSchemaFilter::CurrentSchema => connection_info
1895                .current_schema
1896                .clone()
1897                .ok_or(CatalogError::CurrentSchemaUnavailable)?,
1898            CatalogSchemaFilter::Named(schema_name) => {
1899                let trimmed = schema_name.trim();
1900                if trimmed.is_empty() {
1901                    return Err(CatalogError::InvalidSchemaFilter {
1902                        schema_name: schema_name.clone(),
1903                    });
1904                }
1905                String::from(trimmed)
1906            }
1907        };
1908
1909        if !resolved.iter().any(|candidate| candidate.eq(&schema_name)) {
1910            resolved.push(schema_name);
1911        }
1912    }
1913
1914    if resolved.is_empty() {
1915        return Err(CatalogError::CurrentSchemaUnavailable);
1916    }
1917
1918    Ok(resolved)
1919}
1920
1921/// Normalize DDL text emitted by `DBMS_METADATA.GET_DDL` so equality checks
1922/// across runs ignore cosmetic differences:
1923///
1924/// - Trim leading + trailing whitespace.
1925/// - Collapse runs of whitespace inside the body to a single space (newlines
1926///   are preserved as-is so the result remains readable).
1927/// - Strip the trailing `/` SQL*Plus terminator if present.
1928#[must_use]
1929pub fn normalize_dbms_metadata_ddl(text: &str) -> String {
1930    let trimmed = text.trim();
1931    let trimmed = trimmed.strip_suffix('/').unwrap_or(trimmed).trim_end();
1932    let mut normalized = String::with_capacity(trimmed.len());
1933    let mut prev_space = false;
1934    for c in trimmed.chars() {
1935        if c.eq(&' ') || c.eq(&'\t') {
1936            if !prev_space {
1937                normalized.push(' ');
1938                prev_space = true;
1939            }
1940        } else {
1941            normalized.push(c);
1942            prev_space = false;
1943        }
1944    }
1945    normalized
1946}
1947
1948/// Map an `ObjectType` to the string the `DBMS_METADATA.GET_DDL` /
1949/// `GET_XML` overloads expect as their first parameter. Returns `None` for
1950/// types that have no DBMS_METADATA representation (e.g.
1951/// `ObjectType::Unknown`, `ObjectType::Constraint`).
1952#[must_use]
1953pub fn object_type_to_dbms_metadata_value(object_type: ObjectType) -> Option<&'static str> {
1954    match object_type {
1955        ObjectType::Table => Some("TABLE"),
1956        ObjectType::View => Some("VIEW"),
1957        ObjectType::MaterializedView => Some("MATERIALIZED_VIEW"),
1958        ObjectType::Sequence => Some("SEQUENCE"),
1959        ObjectType::Type => Some("TYPE"),
1960        ObjectType::Package => Some("PACKAGE"),
1961        ObjectType::Procedure => Some("PROCEDURE"),
1962        ObjectType::Function => Some("FUNCTION"),
1963        ObjectType::Trigger => Some("TRIGGER"),
1964        ObjectType::EditioningView => Some("VIEW"),
1965        ObjectType::SchedulerJob => Some("PROCOBJ"),
1966        ObjectType::Synonym => Some("SYNONYM"),
1967        ObjectType::Index => Some("INDEX"),
1968        ObjectType::Constraint | ObjectType::Unknown => None,
1969    }
1970}
1971
1972#[cfg(feature = "oraclemcp-db")]
1973fn oracle_version_from_server_version(
1974    server_version: &str,
1975) -> (OracleVersion, Option<CapabilityWarning>) {
1976    let major_component = server_version
1977        .split('.')
1978        .next()
1979        .unwrap_or_default()
1980        .trim()
1981        .parse::<u32>()
1982        .ok();
1983
1984    match major_component {
1985        Some(11) => (OracleVersion::Oracle11g, None),
1986        Some(12) => (OracleVersion::Oracle12c, None),
1987        Some(19) => (OracleVersion::Oracle19c, None),
1988        Some(21) => (OracleVersion::Oracle21c, None),
1989        Some(23) => (OracleVersion::Oracle23ai, None),
1990        Some(26) => (OracleVersion::Oracle26ai, None),
1991        _ => (
1992            OracleVersion::Oracle19c,
1993            Some(CapabilityWarning {
1994                code: String::from("catalog-version-parse-fallback"),
1995                message: format!(
1996                    "server version `{server_version}` did not map cleanly to a supported OracleVersion; defaulted AnalysisProfile.oracle_version to Oracle19c"
1997                ),
1998                remediation: Some(String::from(
1999                    "Set the workspace AnalysisProfile explicitly if this estate targets a newer or older Oracle release.",
2000                )),
2001            }),
2002        ),
2003    }
2004}
2005
2006#[cfg(feature = "oraclemcp-db")]
2007fn oracle_bind_placeholders(count: usize, start_index: usize) -> String {
2008    (0..count)
2009        .map(|offset| format!(":{}", start_index + offset))
2010        .collect::<Vec<_>>()
2011        .join(", ")
2012}
2013
2014fn hash_text(text: &str) -> Hash {
2015    use sha2::{Digest as _, Sha256};
2016    let mut hasher = Sha256::new();
2017    hasher.update(text.as_bytes());
2018    // sha2 0.11+ returns `Array<u8, …>` from `finalize` which no
2019    // longer impls `LowerHex` directly; render byte-by-byte (matches
2020    // the plsql-store pattern). Keeps the bump from being a breaking
2021    // change for callers.
2022    let digest = hasher.finalize();
2023    let mut rendered = String::with_capacity(7 + digest.len() * 2);
2024    rendered.push_str("sha256:");
2025    for byte in digest {
2026        rendered.push_str(&format!("{byte:02x}"));
2027    }
2028    Hash::new(rendered)
2029}
2030
2031#[cfg(feature = "oraclemcp-db")]
2032fn schema_filter_params(schema_names: &[String]) -> Vec<OracleBind> {
2033    schema_names
2034        .iter()
2035        .cloned()
2036        .map(OracleBind::from)
2037        .collect::<Vec<_>>()
2038}
2039
2040fn apply_object_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2041    let owner_text = row.require_text("OWNER")?;
2042    let object_name_text = row.require_text("OBJECT_NAME")?;
2043    let object_type_text = row.require_text("OBJECT_TYPE")?;
2044    let Some(object_type) = object_type_from_dictionary_value(object_type_text) else {
2045        return Ok(());
2046    };
2047
2048    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2049        return Err(CatalogError::InvalidColumnValue {
2050            column: String::from("OWNER"),
2051            expected: "interned schema name",
2052            value: String::from(owner_text),
2053        });
2054    };
2055    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
2056        return Err(CatalogError::InvalidColumnValue {
2057            column: String::from("OBJECT_NAME"),
2058            expected: "interned object name",
2059            value: String::from(object_name_text),
2060        });
2061    };
2062
2063    let last_ddl_time =
2064        optional_nonblank_text(row, "LAST_DDL_TIME_ISO").and_then(parse_dictionary_timestamp);
2065    let editionable = optional_bool(row, "EDITIONABLE")?;
2066    let edition_name = optional_nonblank_text(row, "EDITION_NAME")
2067        .map(|value| {
2068            snapshot
2069                .interner
2070                .intern(value)
2071                .map(EditionName::from)
2072                .ok_or(CatalogError::InvalidColumnValue {
2073                    column: String::from("EDITION_NAME"),
2074                    expected: "interned edition name",
2075                    value: String::from(value),
2076                })
2077        })
2078        .transpose()?;
2079
2080    let common = ObjectCommon {
2081        owner,
2082        name: object_name,
2083        object_type,
2084        status: row
2085            .text("STATUS")
2086            .map(object_status_from_dictionary_value)
2087            .unwrap_or_default(),
2088        edition_name,
2089        editionable,
2090        last_ddl_time,
2091        ..ObjectCommon::default()
2092    };
2093
2094    let Some(catalog_object) = blank_catalog_object(common) else {
2095        return Ok(());
2096    };
2097
2098    snapshot
2099        .schemas
2100        .entry(owner)
2101        .or_default()
2102        .objects
2103        .insert(object_name, catalog_object);
2104
2105    Ok(())
2106}
2107
2108fn apply_dependency_row(
2109    snapshot: &mut CatalogSnapshot,
2110    row: &OracleRow,
2111) -> Result<(), CatalogError> {
2112    let owner_text = row.require_text("OWNER")?;
2113    let name_text = row.require_text("NAME")?;
2114    let referenced_owner_text = row.require_text("REFERENCED_OWNER")?;
2115    let referenced_name_text = row.require_text("REFERENCED_NAME")?;
2116
2117    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2118        return Err(CatalogError::InvalidColumnValue {
2119            column: String::from("OWNER"),
2120            expected: "interned schema name",
2121            value: String::from(owner_text),
2122        });
2123    };
2124    let Some(object_name) = snapshot.intern_object_name(name_text) else {
2125        return Err(CatalogError::InvalidColumnValue {
2126            column: String::from("NAME"),
2127            expected: "interned object name",
2128            value: String::from(name_text),
2129        });
2130    };
2131    let Some(referenced_owner) = snapshot.intern_schema_name(referenced_owner_text) else {
2132        return Err(CatalogError::InvalidColumnValue {
2133            column: String::from("REFERENCED_OWNER"),
2134            expected: "interned schema name",
2135            value: String::from(referenced_owner_text),
2136        });
2137    };
2138    let Some(referenced_name) = snapshot.intern_object_name(referenced_name_text) else {
2139        return Err(CatalogError::InvalidColumnValue {
2140            column: String::from("REFERENCED_NAME"),
2141            expected: "interned object name",
2142            value: String::from(referenced_name_text),
2143        });
2144    };
2145
2146    let object_type = optional_nonblank_text(row, "TYPE")
2147        .and_then(object_type_from_dictionary_value)
2148        .unwrap_or_default();
2149    let referenced_type =
2150        optional_nonblank_text(row, "REFERENCED_TYPE").and_then(object_type_from_dictionary_value);
2151
2152    let dependency = CatalogDependency {
2153        owner,
2154        name: object_name,
2155        object_type,
2156        referenced_owner: Some(referenced_owner),
2157        referenced_name,
2158        referenced_type,
2159        dependency_kind: optional_nonblank_text(row, "DEPENDENCY_TYPE")
2160            .map(catalog_dependency_kind_from_dictionary_value)
2161            .unwrap_or_default(),
2162        via_db_link: None,
2163    };
2164
2165    snapshot
2166        .schemas
2167        .entry(owner)
2168        .or_default()
2169        .dependencies
2170        .push(dependency);
2171
2172    Ok(())
2173}
2174
2175fn parse_dictionary_timestamp(text: &str) -> Option<DateTime<Utc>> {
2176    // Expected shape from the loader query: `YYYY-MM-DD"T"HH24:MI:SS`.
2177    chrono::NaiveDateTime::parse_from_str(text, "%Y-%m-%dT%H:%M:%S")
2178        .ok()
2179        .map(|naive| DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc))
2180}
2181
2182fn catalog_dependency_kind_from_dictionary_value(text: &str) -> CatalogDependencyKind {
2183    match text.to_ascii_uppercase().as_str() {
2184        "HARD" => CatalogDependencyKind::Hard,
2185        "REF" => CatalogDependencyKind::Reference,
2186        "EXTENDED" => CatalogDependencyKind::Extended,
2187        _ => CatalogDependencyKind::default(),
2188    }
2189}
2190
2191fn apply_column_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2192    let owner_text = row.require_text("OWNER")?;
2193    let table_name_text = row.require_text("TABLE_NAME")?;
2194    let column_name_text = row.require_text("COLUMN_NAME")?;
2195
2196    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2197        return Err(CatalogError::InvalidColumnValue {
2198            column: String::from("OWNER"),
2199            expected: "interned schema name",
2200            value: String::from(owner_text),
2201        });
2202    };
2203    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2204        return Err(CatalogError::InvalidColumnValue {
2205            column: String::from("TABLE_NAME"),
2206            expected: "interned object name",
2207            value: String::from(table_name_text),
2208        });
2209    };
2210    let Some(column_name) = snapshot.intern_column_name(column_name_text) else {
2211        return Err(CatalogError::InvalidColumnValue {
2212            column: String::from("COLUMN_NAME"),
2213            expected: "interned column name",
2214            value: String::from(column_name_text),
2215        });
2216    };
2217    let data_type = data_type_ref_from_row(snapshot, row)?;
2218
2219    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2220        return Ok(());
2221    };
2222    let Some(catalog_object) = schema_catalog.objects.get_mut(&table_name) else {
2223        return Ok(());
2224    };
2225
2226    let default_expression = row
2227        .text("DATA_DEFAULT_VC")
2228        .map(String::from)
2229        .filter(|value| !value.trim().is_empty());
2230    let virtual_column = optional_bool(row, "VIRTUAL_COLUMN")?.unwrap_or(false);
2231    let column = ColumnMetadata {
2232        name: column_name,
2233        position: required_u32(row, "COLUMN_POSITION")?,
2234        data_type,
2235        nullable: optional_bool(row, "NULLABLE")?.unwrap_or(false),
2236        default_expression: if virtual_column {
2237            None
2238        } else {
2239            default_expression.clone()
2240        },
2241        generated_expression: if virtual_column {
2242            default_expression
2243        } else {
2244            None
2245        },
2246        hidden: optional_bool(row, "HIDDEN_COLUMN")?.unwrap_or(false),
2247    };
2248
2249    match catalog_object {
2250        CatalogObject::Table(metadata) => {
2251            metadata.columns.insert(column.name, column);
2252        }
2253        CatalogObject::View(metadata) => {
2254            metadata.columns.insert(column.name, column);
2255        }
2256        CatalogObject::MaterializedView(metadata) => {
2257            metadata.columns.insert(column.name, column);
2258        }
2259        CatalogObject::EditioningView(metadata) => {
2260            metadata.columns.insert(column.name, column);
2261        }
2262        CatalogObject::Sequence(_)
2263        | CatalogObject::Type(_)
2264        | CatalogObject::Package(_)
2265        | CatalogObject::Procedure(_)
2266        | CatalogObject::Function(_)
2267        | CatalogObject::Trigger(_)
2268        | CatalogObject::SchedulerJob(_) => {}
2269    }
2270
2271    Ok(())
2272}
2273
2274fn apply_constraint_row(
2275    snapshot: &mut CatalogSnapshot,
2276    row: &OracleRow,
2277) -> Result<(), CatalogError> {
2278    let owner_text = row.require_text("OWNER")?;
2279    let constraint_name_text = row.require_text("CONSTRAINT_NAME")?;
2280    let table_name_text = row.require_text("TABLE_NAME")?;
2281    let search_condition = optional_nonblank_text(row, "SEARCH_CONDITION_VC").map(String::from);
2282
2283    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2284        return Err(CatalogError::InvalidColumnValue {
2285            column: String::from("OWNER"),
2286            expected: "interned schema name",
2287            value: String::from(owner_text),
2288        });
2289    };
2290    let Some(constraint_name) = snapshot.intern_constraint_name(constraint_name_text) else {
2291        return Err(CatalogError::InvalidColumnValue {
2292            column: String::from("CONSTRAINT_NAME"),
2293            expected: "interned constraint name",
2294            value: String::from(constraint_name_text),
2295        });
2296    };
2297    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2298        return Err(CatalogError::InvalidColumnValue {
2299            column: String::from("TABLE_NAME"),
2300            expected: "interned object name",
2301            value: String::from(table_name_text),
2302        });
2303    };
2304    let referenced_table_owner = optional_nonblank_text(row, "REFERENCED_TABLE_OWNER")
2305        .map(|value| {
2306            snapshot
2307                .intern_schema_name(value)
2308                .ok_or(CatalogError::InvalidColumnValue {
2309                    column: String::from("REFERENCED_TABLE_OWNER"),
2310                    expected: "interned schema name",
2311                    value: String::from(value),
2312                })
2313        })
2314        .transpose()?;
2315    let referenced_table_name = optional_nonblank_text(row, "REFERENCED_TABLE_NAME")
2316        .map(|value| {
2317            snapshot
2318                .intern_object_name(value)
2319                .ok_or(CatalogError::InvalidColumnValue {
2320                    column: String::from("REFERENCED_TABLE_NAME"),
2321                    expected: "interned object name",
2322                    value: String::from(value),
2323                })
2324        })
2325        .transpose()?;
2326    let child_column = optional_nonblank_text(row, "COLUMN_NAME")
2327        .map(|value| {
2328            snapshot
2329                .intern_column_name(value)
2330                .ok_or(CatalogError::InvalidColumnValue {
2331                    column: String::from("COLUMN_NAME"),
2332                    expected: "interned column name",
2333                    value: String::from(value),
2334                })
2335        })
2336        .transpose()?;
2337    let referenced_column = optional_nonblank_text(row, "REFERENCED_COLUMN_NAME")
2338        .map(|value| {
2339            snapshot
2340                .intern_column_name(value)
2341                .ok_or(CatalogError::InvalidColumnValue {
2342                    column: String::from("REFERENCED_COLUMN_NAME"),
2343                    expected: "interned column name",
2344                    value: String::from(value),
2345                })
2346        })
2347        .transpose()?;
2348
2349    let constraint_type = constraint_type_from_dictionary_value(
2350        row.require_text("CONSTRAINT_TYPE")?,
2351        search_condition.as_deref(),
2352        child_column.is_some(),
2353    );
2354
2355    let metadata = snapshot
2356        .schemas
2357        .entry(owner)
2358        .or_default()
2359        .constraints
2360        .entry(constraint_name)
2361        .or_insert_with(|| ConstraintMetadata {
2362            name: constraint_name,
2363            table_owner: owner,
2364            table_name,
2365            constraint_type,
2366            columns: Vec::new(),
2367            referenced_table_owner,
2368            referenced_table_name,
2369            referenced_columns: Vec::new(),
2370            search_condition: search_condition.clone(),
2371            deferrable: optional_bool(row, "IS_DEFERRABLE").ok().flatten(),
2372            initially_deferred: optional_bool(row, "IS_DEFERRED").ok().flatten(),
2373        });
2374
2375    metadata.table_name = table_name;
2376    metadata.constraint_type = constraint_type;
2377    metadata.referenced_table_owner = referenced_table_owner;
2378    metadata.referenced_table_name = referenced_table_name;
2379    metadata.search_condition = search_condition;
2380    metadata.deferrable = optional_bool(row, "IS_DEFERRABLE")?;
2381    metadata.initially_deferred = optional_bool(row, "IS_DEFERRED")?;
2382
2383    if let Some(column_name) = child_column {
2384        push_unique_column(&mut metadata.columns, column_name);
2385    }
2386    if let Some(column_name) = referenced_column {
2387        push_unique_column(&mut metadata.referenced_columns, column_name);
2388    }
2389
2390    Ok(())
2391}
2392
2393fn apply_index_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2394    let owner_text = row.require_text("OWNER")?;
2395    let index_name_text = row.require_text("INDEX_NAME")?;
2396    let table_owner_text = row.require_text("TABLE_OWNER")?;
2397    let table_name_text = row.require_text("TABLE_NAME")?;
2398
2399    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2400        return Err(CatalogError::InvalidColumnValue {
2401            column: String::from("OWNER"),
2402            expected: "interned schema name",
2403            value: String::from(owner_text),
2404        });
2405    };
2406    let Some(index_name) = snapshot.intern_index_name(index_name_text) else {
2407        return Err(CatalogError::InvalidColumnValue {
2408            column: String::from("INDEX_NAME"),
2409            expected: "interned index name",
2410            value: String::from(index_name_text),
2411        });
2412    };
2413    let Some(table_owner) = snapshot.intern_schema_name(table_owner_text) else {
2414        return Err(CatalogError::InvalidColumnValue {
2415            column: String::from("TABLE_OWNER"),
2416            expected: "interned schema name",
2417            value: String::from(table_owner_text),
2418        });
2419    };
2420    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2421        return Err(CatalogError::InvalidColumnValue {
2422            column: String::from("TABLE_NAME"),
2423            expected: "interned object name",
2424            value: String::from(table_name_text),
2425        });
2426    };
2427    let index_column = optional_nonblank_text(row, "COLUMN_NAME")
2428        .map(|value| {
2429            snapshot
2430                .intern_column_name(value)
2431                .ok_or(CatalogError::InvalidColumnValue {
2432                    column: String::from("COLUMN_NAME"),
2433                    expected: "interned column name",
2434                    value: String::from(value),
2435                })
2436        })
2437        .transpose()?;
2438
2439    let metadata = snapshot
2440        .schemas
2441        .entry(owner)
2442        .or_default()
2443        .indexes
2444        .entry(index_name)
2445        .or_insert_with(|| IndexMetadata {
2446            name: index_name,
2447            table_owner,
2448            table_name,
2449            unique: optional_bool(row, "IS_UNIQUE")
2450                .ok()
2451                .flatten()
2452                .unwrap_or(false),
2453            columns: Vec::new(),
2454            index_type: String::from(row.text("INDEX_TYPE").unwrap_or_default()),
2455            status: row
2456                .text("STATUS")
2457                .map(object_status_from_dictionary_value)
2458                .unwrap_or_default(),
2459        });
2460
2461    metadata.table_owner = table_owner;
2462    metadata.table_name = table_name;
2463    metadata.unique = optional_bool(row, "IS_UNIQUE")?.unwrap_or(false);
2464    metadata.index_type = String::from(row.text("INDEX_TYPE").unwrap_or_default());
2465    metadata.status = row
2466        .text("STATUS")
2467        .map(object_status_from_dictionary_value)
2468        .unwrap_or_default();
2469
2470    if let Some(column_name) = index_column {
2471        push_unique_column(&mut metadata.columns, column_name);
2472    }
2473
2474    Ok(())
2475}
2476
2477fn apply_trigger_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2478    let owner_text = row.require_text("OWNER")?;
2479    let trigger_name_text = row.require_text("TRIGGER_NAME")?;
2480    let table_owner_text = row.require_text("TABLE_OWNER")?;
2481    let table_name_text = row.require_text("TABLE_NAME")?;
2482
2483    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2484        return Err(CatalogError::InvalidColumnValue {
2485            column: String::from("OWNER"),
2486            expected: "interned schema name",
2487            value: String::from(owner_text),
2488        });
2489    };
2490    let Some(trigger_name) = snapshot.intern_trigger_name(trigger_name_text) else {
2491        return Err(CatalogError::InvalidColumnValue {
2492            column: String::from("TRIGGER_NAME"),
2493            expected: "interned trigger name",
2494            value: String::from(trigger_name_text),
2495        });
2496    };
2497    let Some(object_name) = snapshot.intern_object_name(trigger_name_text) else {
2498        return Err(CatalogError::InvalidColumnValue {
2499            column: String::from("TRIGGER_NAME"),
2500            expected: "interned object name",
2501            value: String::from(trigger_name_text),
2502        });
2503    };
2504    let Some(target_owner) = snapshot.intern_schema_name(table_owner_text) else {
2505        return Err(CatalogError::InvalidColumnValue {
2506            column: String::from("TABLE_OWNER"),
2507            expected: "interned schema name",
2508            value: String::from(table_owner_text),
2509        });
2510    };
2511    let Some(target_name) = snapshot.intern_object_name(table_name_text) else {
2512        return Err(CatalogError::InvalidColumnValue {
2513            column: String::from("TABLE_NAME"),
2514            expected: "interned object name",
2515            value: String::from(table_name_text),
2516        });
2517    };
2518
2519    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2520    let common = schema_catalog
2521        .objects
2522        .get(&object_name)
2523        .and_then(|object| {
2524            if let CatalogObject::Trigger(metadata) = object {
2525                Some(metadata.common.clone())
2526            } else {
2527                None
2528            }
2529        })
2530        .unwrap_or_else(|| ObjectCommon {
2531            owner,
2532            name: object_name,
2533            object_type: ObjectType::Trigger,
2534            ..ObjectCommon::default()
2535        });
2536
2537    let metadata = TriggerMetadata {
2538        common,
2539        target_owner,
2540        target_name,
2541        timing: trigger_timing_from_dictionary_value(row.text("TRIGGER_TYPE").unwrap_or_default()),
2542        level: trigger_level_from_dictionary_value(row.text("TRIGGER_TYPE").unwrap_or_default()),
2543        events: trigger_events_from_dictionary_value(
2544            row.text("TRIGGERING_EVENT").unwrap_or_default(),
2545        ),
2546        when_clause: optional_nonblank_text(row, "WHEN_CLAUSE").map(String::from),
2547        body_hash: None,
2548    };
2549
2550    schema_catalog
2551        .triggers
2552        .insert(trigger_name, metadata.clone());
2553    schema_catalog
2554        .objects
2555        .insert(object_name, CatalogObject::Trigger(metadata));
2556
2557    Ok(())
2558}
2559
2560fn apply_synonym_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2561    let owner_text = row.require_text("OWNER")?;
2562    let synonym_name_text = row.require_text("SYNONYM_NAME")?;
2563    let target_name_text = row.require_text("TABLE_NAME")?;
2564
2565    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2566        return Err(CatalogError::InvalidColumnValue {
2567            column: String::from("OWNER"),
2568            expected: "interned schema name",
2569            value: String::from(owner_text),
2570        });
2571    };
2572    let Some(synonym_name) = snapshot.intern_synonym_name(synonym_name_text) else {
2573        return Err(CatalogError::InvalidColumnValue {
2574            column: String::from("SYNONYM_NAME"),
2575            expected: "interned synonym name",
2576            value: String::from(synonym_name_text),
2577        });
2578    };
2579    let Some(target_name) = snapshot.intern_object_name(target_name_text) else {
2580        return Err(CatalogError::InvalidColumnValue {
2581            column: String::from("TABLE_NAME"),
2582            expected: "interned object name",
2583            value: String::from(target_name_text),
2584        });
2585    };
2586    let target_owner = optional_nonblank_text(row, "TABLE_OWNER")
2587        .map(|value| {
2588            snapshot
2589                .intern_schema_name(value)
2590                .ok_or(CatalogError::InvalidColumnValue {
2591                    column: String::from("TABLE_OWNER"),
2592                    expected: "interned schema name",
2593                    value: String::from(value),
2594                })
2595        })
2596        .transpose()?;
2597
2598    snapshot.schemas.entry(owner).or_default().synonyms.insert(
2599        synonym_name,
2600        SynonymTarget {
2601            target_owner,
2602            target_name,
2603            target_type: None,
2604            db_link: optional_nonblank_text(row, "DB_LINK").map(String::from),
2605            public_synonym: owner_text.eq_ignore_ascii_case("PUBLIC"),
2606        },
2607    );
2608
2609    Ok(())
2610}
2611
2612fn apply_view_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2613    let owner_text = row.require_text("OWNER")?;
2614    let view_name_text = row.require_text("VIEW_NAME")?;
2615
2616    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2617        return Err(CatalogError::InvalidColumnValue {
2618            column: String::from("OWNER"),
2619            expected: "interned schema name",
2620            value: String::from(owner_text),
2621        });
2622    };
2623    let Some(view_name) = snapshot.intern_object_name(view_name_text) else {
2624        return Err(CatalogError::InvalidColumnValue {
2625            column: String::from("VIEW_NAME"),
2626            expected: "interned object name",
2627            value: String::from(view_name_text),
2628        });
2629    };
2630
2631    let query_hash = optional_nonblank_text(row, "TEXT_VC").map(hash_text);
2632    let read_only = optional_bool(row, "READ_ONLY")?;
2633
2634    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2635        return Ok(());
2636    };
2637    let Some(catalog_object) = schema_catalog.objects.get_mut(&view_name) else {
2638        return Ok(());
2639    };
2640
2641    if let CatalogObject::View(metadata) = catalog_object {
2642        metadata.query_hash = query_hash;
2643        metadata.read_only = read_only;
2644    }
2645
2646    Ok(())
2647}
2648
2649fn apply_mview_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2650    let owner_text = row.require_text("OWNER")?;
2651    let mview_name_text = row.require_text("MVIEW_NAME")?;
2652
2653    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2654        return Err(CatalogError::InvalidColumnValue {
2655            column: String::from("OWNER"),
2656            expected: "interned schema name",
2657            value: String::from(owner_text),
2658        });
2659    };
2660    let Some(mview_name) = snapshot.intern_object_name(mview_name_text) else {
2661        return Err(CatalogError::InvalidColumnValue {
2662            column: String::from("MVIEW_NAME"),
2663            expected: "interned object name",
2664            value: String::from(mview_name_text),
2665        });
2666    };
2667
2668    let refresh_mode = optional_nonblank_text(row, "REFRESH_MODE").map(String::from);
2669    let refresh_method = optional_nonblank_text(row, "REFRESH_METHOD").map(String::from);
2670    let query_hash = optional_nonblank_text(row, "QUERY").map(hash_text);
2671
2672    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2673        return Ok(());
2674    };
2675    let Some(catalog_object) = schema_catalog.objects.get_mut(&mview_name) else {
2676        return Ok(());
2677    };
2678
2679    if let CatalogObject::MaterializedView(metadata) = catalog_object {
2680        metadata.refresh_mode = refresh_mode;
2681        metadata.refresh_method = refresh_method;
2682        metadata.query_hash = query_hash;
2683    }
2684
2685    Ok(())
2686}
2687
2688fn apply_sequence_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2689    let owner_text = row.require_text("SEQUENCE_OWNER")?;
2690    let sequence_name_text = row.require_text("SEQUENCE_NAME")?;
2691
2692    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2693        return Err(CatalogError::InvalidColumnValue {
2694            column: String::from("SEQUENCE_OWNER"),
2695            expected: "interned schema name",
2696            value: String::from(owner_text),
2697        });
2698    };
2699    let Some(sequence_name) = snapshot.intern_object_name(sequence_name_text) else {
2700        return Err(CatalogError::InvalidColumnValue {
2701            column: String::from("SEQUENCE_NAME"),
2702            expected: "interned object name",
2703            value: String::from(sequence_name_text),
2704        });
2705    };
2706
2707    let increment_by = row.parse_i64("INCREMENT_BY").unwrap_or(1);
2708    let min_value = row.parse_i64("MIN_VALUE").ok();
2709    let max_value = row.parse_i64("MAX_VALUE").ok();
2710    let cycle = row
2711        .text("CYCLE_FLAG")
2712        .map(|value| value.eq_ignore_ascii_case("Y"))
2713        .unwrap_or(false);
2714    let ordered = row
2715        .text("ORDER_FLAG")
2716        .map(|value| value.eq_ignore_ascii_case("Y"))
2717        .unwrap_or(false);
2718    let cache_size = row.parse_u64("CACHE_SIZE").ok();
2719
2720    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2721        return Ok(());
2722    };
2723    let Some(catalog_object) = schema_catalog.objects.get_mut(&sequence_name) else {
2724        return Ok(());
2725    };
2726
2727    if let CatalogObject::Sequence(metadata) = catalog_object {
2728        metadata.increment_by = increment_by;
2729        metadata.min_value = min_value;
2730        metadata.max_value = max_value;
2731        metadata.cycle = cycle;
2732        metadata.ordered = ordered;
2733        metadata.cache_size = cache_size;
2734    }
2735
2736    Ok(())
2737}
2738
2739fn apply_type_attr_row(
2740    snapshot: &mut CatalogSnapshot,
2741    row: &OracleRow,
2742) -> Result<(), CatalogError> {
2743    let owner_text = row.require_text("OWNER")?;
2744    let type_name_text = row.require_text("TYPE_NAME")?;
2745    let attr_name_text = row.require_text("ATTR_NAME")?;
2746
2747    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2748        return Err(CatalogError::InvalidColumnValue {
2749            column: String::from("OWNER"),
2750            expected: "interned schema name",
2751            value: String::from(owner_text),
2752        });
2753    };
2754    let Some(type_name) = snapshot.intern_object_name(type_name_text) else {
2755        return Err(CatalogError::InvalidColumnValue {
2756            column: String::from("TYPE_NAME"),
2757            expected: "interned object name",
2758            value: String::from(type_name_text),
2759        });
2760    };
2761    let Some(attr_name) = snapshot.intern_member_name(attr_name_text) else {
2762        return Err(CatalogError::InvalidColumnValue {
2763            column: String::from("ATTR_NAME"),
2764            expected: "interned member name",
2765            value: String::from(attr_name_text),
2766        });
2767    };
2768
2769    let attr_type_owner = optional_nonblank_text(row, "ATTR_TYPE_OWNER")
2770        .map(|value| {
2771            snapshot
2772                .intern_schema_name(value)
2773                .ok_or(CatalogError::InvalidColumnValue {
2774                    column: String::from("ATTR_TYPE_OWNER"),
2775                    expected: "interned schema name",
2776                    value: String::from(value),
2777                })
2778        })
2779        .transpose()?;
2780    let attr_type_name = row
2781        .text("ATTR_TYPE_NAME")
2782        .map(String::from)
2783        .unwrap_or_default();
2784
2785    let attribute = TypeAttribute {
2786        name: attr_name,
2787        position: required_u32(row, "ATTR_NO")?,
2788        data_type: DataTypeRef {
2789            owner: attr_type_owner,
2790            name: attr_type_name,
2791            length: optional_u32(row, "LENGTH")?,
2792            precision: optional_u32(row, "PRECISION")?,
2793            scale: optional_i32(row, "SCALE")?,
2794            char_semantics: None,
2795        },
2796    };
2797
2798    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2799        return Ok(());
2800    };
2801    let Some(catalog_object) = schema_catalog.objects.get_mut(&type_name) else {
2802        return Ok(());
2803    };
2804
2805    if let CatalogObject::Type(metadata) = catalog_object {
2806        match metadata
2807            .attributes
2808            .iter()
2809            .position(|existing| existing.position.eq(&attribute.position))
2810        {
2811            Some(index) => metadata.attributes[index] = attribute,
2812            None => metadata.attributes.push(attribute),
2813        }
2814        metadata
2815            .attributes
2816            .sort_by_key(|attribute| attribute.position);
2817    }
2818
2819    Ok(())
2820}
2821
2822/// Apply a single `ALL_DB_LINKS` row into the snapshot. Ensures the
2823/// owning schema entry exists (lazily creates it) so a `PUBLIC` row
2824/// lands even when no other catalog object has been recorded for that
2825/// synthetic schema.
2826fn apply_db_link_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2827    let owner_text = row.require_text("OWNER")?;
2828    let link_name_text = row.require_text("DB_LINK")?;
2829
2830    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2831        return Err(CatalogError::InvalidColumnValue {
2832            column: String::from("OWNER"),
2833            expected: "interned schema name",
2834            value: String::from(owner_text),
2835        });
2836    };
2837
2838    let host = optional_nonblank_text(row, "HOST").map(String::from);
2839    let public_link = owner_text.eq_ignore_ascii_case("PUBLIC");
2840
2841    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2842    schema_catalog.db_links.push(DatabaseLink {
2843        owner,
2844        name: String::from(link_name_text),
2845        host,
2846        public_link,
2847    });
2848
2849    Ok(())
2850}
2851
2852/// Apply a single `ALL_POLICIES` row.
2853fn apply_vpd_policy_row(
2854    snapshot: &mut CatalogSnapshot,
2855    row: &OracleRow,
2856) -> Result<(), CatalogError> {
2857    let object_owner_text = row.require_text("OBJECT_OWNER")?;
2858    let object_name_text = row.require_text("OBJECT_NAME")?;
2859    let policy_name = row.require_text("POLICY_NAME")?.to_string();
2860    let function_owner_text = row.require_text("PF_OWNER")?;
2861    let function_name = row.require_text("FUNCTION")?.to_string();
2862
2863    let Some(object_owner) = snapshot.intern_schema_name(object_owner_text) else {
2864        return Err(CatalogError::InvalidColumnValue {
2865            column: String::from("OBJECT_OWNER"),
2866            expected: "interned schema name",
2867            value: String::from(object_owner_text),
2868        });
2869    };
2870    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
2871        return Err(CatalogError::InvalidColumnValue {
2872            column: String::from("OBJECT_NAME"),
2873            expected: "interned object name",
2874            value: String::from(object_name_text),
2875        });
2876    };
2877    let Some(function_owner) = snapshot.intern_schema_name(function_owner_text) else {
2878        return Err(CatalogError::InvalidColumnValue {
2879            column: String::from("PF_OWNER"),
2880            expected: "interned schema name",
2881            value: String::from(function_owner_text),
2882        });
2883    };
2884
2885    let policy_group = optional_nonblank_text(row, "POLICY_GROUP").map(String::from);
2886    let function_package = optional_nonblank_text(row, "PACKAGE").map(String::from);
2887
2888    let yn = |col: &str| {
2889        row.text(col)
2890            .map(|v| v.eq_ignore_ascii_case("Y") || v.eq_ignore_ascii_case("YES"))
2891            .unwrap_or(false)
2892    };
2893
2894    let schema_catalog = snapshot.schemas.entry(object_owner).or_default();
2895    schema_catalog.vpd_policies.push(VpdPolicy {
2896        object_owner,
2897        object_name,
2898        policy_group,
2899        policy_name,
2900        function_owner,
2901        function_package,
2902        function_name,
2903        on_select: yn("SEL"),
2904        on_insert: yn("INS"),
2905        on_update: yn("UPD"),
2906        on_delete: yn("DEL"),
2907        enabled: yn("ENABLE"),
2908    });
2909    Ok(())
2910}
2911
2912/// Apply a single `ALL_EDITIONS` row.
2913fn apply_edition_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2914    let edition_name = row.require_text("EDITION_NAME")?.to_string();
2915    let parent_edition_name = optional_nonblank_text(row, "PARENT_EDITION_NAME").map(String::from);
2916    let usable = row
2917        .text("USABLE")
2918        .map(|v| v.eq_ignore_ascii_case("Y"))
2919        .unwrap_or(true);
2920    snapshot.editions.push(Edition {
2921        edition_name,
2922        parent_edition_name,
2923        usable,
2924    });
2925    Ok(())
2926}
2927
2928/// Apply a single `ALL_EDITIONING_VIEWS` row.
2929fn apply_editioning_view_row(
2930    snapshot: &mut CatalogSnapshot,
2931    row: &OracleRow,
2932) -> Result<(), CatalogError> {
2933    let owner_text = row.require_text("OWNER")?;
2934    let view_name_text = row.require_text("VIEW_NAME")?;
2935    let table_name_text = row.require_text("TABLE_NAME")?;
2936
2937    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2938        return Err(CatalogError::InvalidColumnValue {
2939            column: String::from("OWNER"),
2940            expected: "interned schema name",
2941            value: String::from(owner_text),
2942        });
2943    };
2944    let Some(view_name) = snapshot.intern_object_name(view_name_text) else {
2945        return Err(CatalogError::InvalidColumnValue {
2946            column: String::from("VIEW_NAME"),
2947            expected: "interned object name",
2948            value: String::from(view_name_text),
2949        });
2950    };
2951    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2952        return Err(CatalogError::InvalidColumnValue {
2953            column: String::from("TABLE_NAME"),
2954            expected: "interned object name",
2955            value: String::from(table_name_text),
2956        });
2957    };
2958
2959    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2960    schema_catalog.editioning_views.push(EditioningView {
2961        owner,
2962        view_name,
2963        table_name,
2964    });
2965    Ok(())
2966}
2967
2968/// Apply a single `ALL_TAB_COMMENTS` row into the snapshot.
2969fn apply_table_comment_row(
2970    snapshot: &mut CatalogSnapshot,
2971    row: &OracleRow,
2972) -> Result<(), CatalogError> {
2973    let owner_text = row.require_text("OWNER")?;
2974    let table_name_text = row.require_text("TABLE_NAME")?;
2975    let table_type = row.text("TABLE_TYPE").map(String::from).unwrap_or_default();
2976    let comments = row.require_text("COMMENTS")?.to_string();
2977
2978    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2979        return Err(CatalogError::InvalidColumnValue {
2980            column: String::from("OWNER"),
2981            expected: "interned schema name",
2982            value: String::from(owner_text),
2983        });
2984    };
2985    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2986        return Err(CatalogError::InvalidColumnValue {
2987            column: String::from("TABLE_NAME"),
2988            expected: "interned object name",
2989            value: String::from(table_name_text),
2990        });
2991    };
2992
2993    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2994    schema_catalog.table_comments.push(TableComment {
2995        owner,
2996        table_name,
2997        table_type,
2998        comments,
2999    });
3000    Ok(())
3001}
3002
3003/// Apply a single `ALL_COL_COMMENTS` row into the snapshot.
3004fn apply_column_comment_row(
3005    snapshot: &mut CatalogSnapshot,
3006    row: &OracleRow,
3007) -> Result<(), CatalogError> {
3008    let owner_text = row.require_text("OWNER")?;
3009    let table_name_text = row.require_text("TABLE_NAME")?;
3010    let column_name_text = row.require_text("COLUMN_NAME")?;
3011    let comments = row.require_text("COMMENTS")?.to_string();
3012
3013    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3014        return Err(CatalogError::InvalidColumnValue {
3015            column: String::from("OWNER"),
3016            expected: "interned schema name",
3017            value: String::from(owner_text),
3018        });
3019    };
3020    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
3021        return Err(CatalogError::InvalidColumnValue {
3022            column: String::from("TABLE_NAME"),
3023            expected: "interned object name",
3024            value: String::from(table_name_text),
3025        });
3026    };
3027    let Some(column_name) = snapshot.intern_column_name(column_name_text) else {
3028        return Err(CatalogError::InvalidColumnValue {
3029            column: String::from("COLUMN_NAME"),
3030            expected: "interned column name",
3031            value: String::from(column_name_text),
3032        });
3033    };
3034
3035    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3036    schema_catalog.column_comments.push(ColumnComment {
3037        owner,
3038        table_name,
3039        column_name,
3040        comments,
3041    });
3042    Ok(())
3043}
3044
3045fn apply_user_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
3046    let username = row.require_text("USERNAME")?;
3047    let Some(user) = snapshot.intern_user_name(username) else {
3048        return Err(CatalogError::InvalidColumnValue {
3049            column: String::from("USERNAME"),
3050            expected: "interned user name",
3051            value: String::from(username),
3052        });
3053    };
3054    snapshot
3055        .known_users
3056        .get_or_insert_with(HashSet::new)
3057        .insert(user);
3058    Ok(())
3059}
3060
3061fn apply_plscope_availability_row(
3062    snapshot: &mut CatalogSnapshot,
3063    row: &OracleRow,
3064    tallies: &mut HashMap<SchemaName, PlScopeTally>,
3065) -> Result<(), CatalogError> {
3066    let Some(owner_text) = optional_nonblank_text(row, "OWNER") else {
3067        return Ok(());
3068    };
3069    let settings = row
3070        .text("PLSCOPE_SETTINGS")
3071        .unwrap_or("")
3072        .to_ascii_uppercase();
3073    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3074        return Ok(());
3075    };
3076    let tally = tallies.entry(owner).or_default();
3077    tally.total = tally.total.saturating_add(1);
3078    if settings.contains("STATEMENTS:") && !settings.contains("STATEMENTS:NONE") {
3079        tally.with_statements = tally.with_statements.saturating_add(1);
3080    }
3081    if settings.contains("IDENTIFIERS:") && !settings.contains("IDENTIFIERS:NONE") {
3082        tally.with_identifiers = tally.with_identifiers.saturating_add(1);
3083    }
3084    Ok(())
3085}
3086
3087fn finalize_plscope_availability(
3088    snapshot: &mut CatalogSnapshot,
3089    tallies: HashMap<SchemaName, PlScopeTally>,
3090) {
3091    for (owner, tally) in tallies {
3092        let availability = if tally.with_statements > 0 {
3093            PlScopeAvailability::IdentifiersAndStatements
3094        } else if tally.with_identifiers > 0 {
3095            PlScopeAvailability::IdentifiersOnly
3096        } else if tally.total > 0 {
3097            PlScopeAvailability::AvailableButStale
3098        } else {
3099            PlScopeAvailability::NotAvailable
3100        };
3101        let schema_catalog = snapshot.schemas.entry(owner).or_default();
3102        let plscope = schema_catalog
3103            .plscope
3104            .get_or_insert_with(PlScopeSnapshot::default);
3105        plscope.availability = availability;
3106        plscope.collected_at = Some(snapshot.generated_at);
3107    }
3108}
3109
3110fn apply_plscope_identifier_row(
3111    snapshot: &mut CatalogSnapshot,
3112    row: &OracleRow,
3113) -> Result<(), CatalogError> {
3114    let Some(owner_text) = optional_nonblank_text(row, "OWNER") else {
3115        return Ok(());
3116    };
3117    let Some(object_name_text) = optional_nonblank_text(row, "OBJECT_NAME") else {
3118        return Ok(());
3119    };
3120    let Some(identifier_name_text) = optional_nonblank_text(row, "NAME") else {
3121        return Ok(());
3122    };
3123    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3124        return Ok(());
3125    };
3126    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
3127        return Ok(());
3128    };
3129    let Some(identifier_name) = snapshot.intern_member_name(identifier_name_text) else {
3130        return Ok(());
3131    };
3132    let identifier = CompilerIdentifier {
3133        owner,
3134        object_name,
3135        identifier_name,
3136        identifier_type: optional_nonblank_text(row, "TYPE")
3137            .map(String::from)
3138            .unwrap_or_default(),
3139        usage: optional_nonblank_text(row, "USAGE")
3140            .map(String::from)
3141            .unwrap_or_default(),
3142        line: optional_u32(row, "LINE")?.unwrap_or(0),
3143        column: optional_u32(row, "COL")?.unwrap_or(0),
3144    };
3145
3146    let plscope = snapshot
3147        .schemas
3148        .entry(owner)
3149        .or_default()
3150        .plscope
3151        .get_or_insert_with(|| PlScopeSnapshot {
3152            availability: PlScopeAvailability::IdentifiersOnly,
3153            collected_at: Some(snapshot.generated_at),
3154            ..PlScopeSnapshot::default()
3155        });
3156    plscope.identifiers.push(identifier);
3157    Ok(())
3158}
3159
3160fn apply_grant_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
3161    let owner_text = row.require_text("TABLE_SCHEMA")?;
3162    let object_name_text = row.require_text("TABLE_NAME")?;
3163    let grantee_text = row.require_text("GRANTEE")?;
3164    let privilege_text = row.require_text("PRIVILEGE")?;
3165
3166    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3167        return Err(CatalogError::InvalidColumnValue {
3168            column: String::from("TABLE_SCHEMA"),
3169            expected: "interned schema name",
3170            value: String::from(owner_text),
3171        });
3172    };
3173    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
3174        return Err(CatalogError::InvalidColumnValue {
3175            column: String::from("TABLE_NAME"),
3176            expected: "interned object name",
3177            value: String::from(object_name_text),
3178        });
3179    };
3180
3181    let grantee = grantee_from_dictionary_value(snapshot, grantee_text)?;
3182    let privilege = grant_privilege_from_dictionary_value(privilege_text);
3183    let grantable = row
3184        .text("GRANTABLE")
3185        .map(|value| value.eq_ignore_ascii_case("YES"))
3186        .unwrap_or(false);
3187    let with_hierarchy = row
3188        .text("HIERARCHY")
3189        .map(|value| value.eq_ignore_ascii_case("YES"))
3190        .unwrap_or(false);
3191
3192    let grant = Grant {
3193        object_owner: owner,
3194        object_name,
3195        privilege,
3196        grantee,
3197        grantable,
3198        via_role: None,
3199        with_hierarchy,
3200    };
3201
3202    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3203    if !schema_catalog
3204        .grants
3205        .iter()
3206        .any(|existing| existing.eq(&grant))
3207    {
3208        schema_catalog.grants.push(grant);
3209    }
3210
3211    Ok(())
3212}
3213
3214/// Classify an `ALL_TAB_PRIVS.GRANTEE` value into a [`Grantee`].
3215///
3216/// `ALL_TAB_PRIVS` carries no user/role discriminator column, so the
3217/// grantee universe — `{ user, role, PUBLIC }` — is resolved against the
3218/// `ALL_USERS`-derived [`CatalogSnapshot::known_users`] set:
3219///
3220/// * `PUBLIC` -> [`Grantee::Public`].
3221/// * known username -> [`Grantee::User`] (a statically certain direct grant).
3222/// * loaded set, name absent -> [`Grantee::Role`] (the only remaining class;
3223///   the resolver then caps it at Low confidence and emits a
3224///   `RuntimeGrantOrRole` ambiguity because a role grant only applies when
3225///   the role is enabled in `SESSION_ROLES` at runtime).
3226/// * username set NOT loaded (`known_users` is `None`) -> [`Grantee::Role`]
3227///   as well. This is the R13 fail-toward-restrictive choice: when the
3228///   grantee class is genuinely undetermined we must NOT default to a
3229///   high-confidence direct user grant (a fail-toward-permissive result in
3230///   a privilege/SAST product); treating it as a role routes it through the
3231///   runtime-ambiguity downgrade instead of over-claiming certainty.
3232fn grantee_from_dictionary_value(
3233    snapshot: &mut CatalogSnapshot,
3234    text: &str,
3235) -> Result<Grantee, CatalogError> {
3236    if text.eq_ignore_ascii_case("PUBLIC") {
3237        return Ok(Grantee::Public);
3238    }
3239    let Some(symbol) = snapshot.interner.intern(text) else {
3240        return Err(CatalogError::InvalidColumnValue {
3241            column: String::from("GRANTEE"),
3242            expected: "interned grantee name",
3243            value: String::from(text),
3244        });
3245    };
3246    let is_known_user = snapshot
3247        .known_users
3248        .as_ref()
3249        .is_some_and(|users| users.contains(&UserName::from(symbol)));
3250    if is_known_user {
3251        Ok(Grantee::User(UserName::from(symbol)))
3252    } else {
3253        Ok(Grantee::Role(RoleName::from(symbol)))
3254    }
3255}
3256
3257fn grant_privilege_from_dictionary_value(text: &str) -> GrantPrivilege {
3258    match text.to_ascii_uppercase().as_str() {
3259        "SELECT" => GrantPrivilege::Select,
3260        "INSERT" => GrantPrivilege::Insert,
3261        "UPDATE" => GrantPrivilege::Update,
3262        "DELETE" => GrantPrivilege::Delete,
3263        "EXECUTE" => GrantPrivilege::Execute,
3264        "ALTER" => GrantPrivilege::Alter,
3265        "INDEX" => GrantPrivilege::Index,
3266        "REFERENCES" => GrantPrivilege::References,
3267        "DEBUG" => GrantPrivilege::Debug,
3268        _ => GrantPrivilege::Other,
3269    }
3270}
3271
3272fn apply_routine_row(
3273    snapshot: &mut CatalogSnapshot,
3274    row: &OracleRow,
3275    routines: &mut HashMap<RoutineLocator, RoutineAccumulator>,
3276) -> Result<(), CatalogError> {
3277    let locator = routine_locator_from_procedure_row(snapshot, row)?;
3278    let deterministic = optional_bool(row, "DETERMINISTIC")?.unwrap_or(false);
3279    let pipelined = optional_bool(row, "PIPELINED")?.unwrap_or(false);
3280    let kind_hint = routine_kind_from_dictionary_value(optional_nonblank_text(row, "OBJECT_TYPE"));
3281
3282    let accumulator = routines.entry(locator).or_default();
3283    accumulator
3284        .signature
3285        .get_or_insert_with(|| RoutineSignature {
3286            routine_name: locator.routine_name,
3287            overload: locator.overload,
3288            ..RoutineSignature::default()
3289        });
3290    accumulator.kind_hint = kind_hint.or(accumulator.kind_hint);
3291    accumulator.deterministic = deterministic;
3292    accumulator.pipelined = pipelined;
3293
3294    Ok(())
3295}
3296
3297fn apply_argument_row(
3298    snapshot: &mut CatalogSnapshot,
3299    row: &OracleRow,
3300    routines: &mut HashMap<RoutineLocator, RoutineAccumulator>,
3301) -> Result<(), CatalogError> {
3302    let locator = routine_locator_from_argument_row(snapshot, row)?;
3303    let data_type = data_type_ref_from_argument_row(snapshot, row)?;
3304    let accumulator = routines.entry(locator).or_default();
3305    let signature = accumulator
3306        .signature
3307        .get_or_insert_with(|| RoutineSignature {
3308            routine_name: locator.routine_name,
3309            overload: locator.overload,
3310            ..RoutineSignature::default()
3311        });
3312    let position = required_u32(row, "POSITION")?;
3313
3314    if position.eq(&0) {
3315        signature.return_type = Some(data_type);
3316        accumulator.kind_hint = Some(RoutineKind::Function);
3317        return Ok(());
3318    }
3319
3320    signature.arguments.push(ArgumentMetadata {
3321        position,
3322        name: optional_nonblank_text(row, "ARGUMENT_NAME")
3323            .map(|value| {
3324                snapshot
3325                    .intern_member_name(value)
3326                    .ok_or(CatalogError::InvalidColumnValue {
3327                        column: String::from("ARGUMENT_NAME"),
3328                        expected: "interned member name",
3329                        value: String::from(value),
3330                    })
3331            })
3332            .transpose()?,
3333        mode: parameter_mode_from_dictionary_value(row.text("IN_OUT")),
3334        data_type,
3335        defaulted: optional_bool(row, "DEFAULTED")?.unwrap_or(false),
3336    });
3337
3338    Ok(())
3339}
3340
3341fn finalize_routines(
3342    snapshot: &mut CatalogSnapshot,
3343    routines: HashMap<RoutineLocator, RoutineAccumulator>,
3344) -> Result<(), CatalogError> {
3345    for (locator, accumulator) in routines {
3346        let Some(signature) = accumulator.signature else {
3347            continue;
3348        };
3349        let kind = accumulator
3350            .kind_hint
3351            .or_else(|| {
3352                if signature.return_type.is_some() {
3353                    Some(RoutineKind::Function)
3354                } else {
3355                    Some(RoutineKind::Procedure)
3356                }
3357            })
3358            .unwrap_or(RoutineKind::Procedure);
3359
3360        if let Some(package_name) = locator.package_name {
3361            upsert_packaged_routine(snapshot, locator.owner, package_name, kind, signature)?;
3362        } else {
3363            upsert_top_level_routine(
3364                snapshot,
3365                locator.owner,
3366                locator.routine_name,
3367                kind,
3368                signature,
3369                accumulator.deterministic,
3370                accumulator.pipelined,
3371            )?;
3372        }
3373    }
3374
3375    Ok(())
3376}
3377
3378fn object_type_from_dictionary_value(text: &str) -> Option<ObjectType> {
3379    match text.trim().to_ascii_uppercase().as_str() {
3380        "TABLE" => Some(ObjectType::Table),
3381        "VIEW" => Some(ObjectType::View),
3382        "MATERIALIZED VIEW" => Some(ObjectType::MaterializedView),
3383        "SEQUENCE" => Some(ObjectType::Sequence),
3384        "TYPE" => Some(ObjectType::Type),
3385        "PACKAGE" => Some(ObjectType::Package),
3386        "PROCEDURE" => Some(ObjectType::Procedure),
3387        "FUNCTION" => Some(ObjectType::Function),
3388        "TRIGGER" => Some(ObjectType::Trigger),
3389        "EDITIONING VIEW" => Some(ObjectType::EditioningView),
3390        _ => None,
3391    }
3392}
3393
3394fn object_status_from_dictionary_value(text: &str) -> ObjectStatus {
3395    match text.trim().to_ascii_uppercase().as_str() {
3396        "VALID" => ObjectStatus::Valid,
3397        "ENABLED" => ObjectStatus::Valid,
3398        "INVALID" => ObjectStatus::Invalid,
3399        "UNUSABLE" | "DISABLED" => ObjectStatus::Invalid,
3400        _ => ObjectStatus::NotApplicable,
3401    }
3402}
3403
3404fn routine_kind_from_dictionary_value(text: Option<&str>) -> Option<RoutineKind> {
3405    match text.map(|value| value.trim().to_ascii_uppercase()) {
3406        Some(value) if value.eq("FUNCTION") => Some(RoutineKind::Function),
3407        Some(value) if value.eq("PROCEDURE") => Some(RoutineKind::Procedure),
3408        _ => None,
3409    }
3410}
3411
3412fn constraint_type_from_dictionary_value(
3413    text: &str,
3414    search_condition: Option<&str>,
3415    has_columns: bool,
3416) -> ConstraintType {
3417    match text.trim().to_ascii_uppercase().as_str() {
3418        "P" => ConstraintType::PrimaryKey,
3419        "R" => ConstraintType::ForeignKey,
3420        "U" => ConstraintType::Unique,
3421        "F" => ConstraintType::Ref,
3422        "C" => {
3423            if has_columns
3424                && search_condition
3425                    .map(|condition| {
3426                        condition
3427                            .trim()
3428                            .to_ascii_uppercase()
3429                            .contains("IS NOT NULL")
3430                    })
3431                    .unwrap_or(false)
3432            {
3433                ConstraintType::NotNull
3434            } else {
3435                ConstraintType::Check
3436            }
3437        }
3438        _ => ConstraintType::Other,
3439    }
3440}
3441
3442fn trigger_timing_from_dictionary_value(text: &str) -> TriggerTiming {
3443    let normalized = text.trim().to_ascii_uppercase();
3444    if normalized.contains("INSTEAD OF") {
3445        TriggerTiming::InsteadOf
3446    } else if normalized.contains("BEFORE") {
3447        TriggerTiming::Before
3448    } else if normalized.contains("AFTER") {
3449        TriggerTiming::After
3450    } else {
3451        TriggerTiming::Unknown
3452    }
3453}
3454
3455fn trigger_level_from_dictionary_value(text: &str) -> TriggerLevel {
3456    let normalized = text.trim().to_ascii_uppercase();
3457    if normalized.contains("EACH ROW") {
3458        TriggerLevel::Row
3459    } else if normalized.contains("STATEMENT") {
3460        TriggerLevel::Statement
3461    } else {
3462        TriggerLevel::Unknown
3463    }
3464}
3465
3466fn trigger_events_from_dictionary_value(text: &str) -> Vec<TriggerEvent> {
3467    let normalized = text.trim().to_ascii_uppercase();
3468    let mut events = Vec::<TriggerEvent>::new();
3469
3470    if normalized.contains("INSERT") {
3471        events.push(TriggerEvent::Insert);
3472    }
3473    if normalized.contains("UPDATE") {
3474        events.push(TriggerEvent::Update);
3475    }
3476    if normalized.contains("DELETE") {
3477        events.push(TriggerEvent::Delete);
3478    }
3479    if normalized.contains("LOGON") {
3480        events.push(TriggerEvent::Logon);
3481    }
3482    if normalized.contains("LOGOFF") {
3483        events.push(TriggerEvent::Logoff);
3484    }
3485    if normalized.contains("DDL") {
3486        events.push(TriggerEvent::Ddl);
3487    }
3488
3489    if events.is_empty() {
3490        events.push(TriggerEvent::Other);
3491    }
3492
3493    events
3494}
3495
3496fn push_unique_column(columns: &mut Vec<ColumnName>, column_name: ColumnName) {
3497    if !columns.contains(&column_name) {
3498        columns.push(column_name);
3499    }
3500}
3501
3502fn routine_locator_from_procedure_row(
3503    snapshot: &mut CatalogSnapshot,
3504    row: &OracleRow,
3505) -> Result<RoutineLocator, CatalogError> {
3506    let owner_text = row.require_text("OWNER")?;
3507    let container_name_text = row.require_text("OBJECT_NAME")?;
3508    let routine_name_text = row
3509        .text("PROCEDURE_NAME")
3510        .unwrap_or(container_name_text)
3511        .trim();
3512
3513    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3514        return Err(CatalogError::InvalidColumnValue {
3515            column: String::from("OWNER"),
3516            expected: "interned schema name",
3517            value: String::from(owner_text),
3518        });
3519    };
3520    let Some(container_name) = snapshot.intern_object_name(container_name_text) else {
3521        return Err(CatalogError::InvalidColumnValue {
3522            column: String::from("OBJECT_NAME"),
3523            expected: "interned object name",
3524            value: String::from(container_name_text),
3525        });
3526    };
3527    let Some(routine_name) = snapshot.intern_object_name(routine_name_text) else {
3528        return Err(CatalogError::InvalidColumnValue {
3529            column: String::from("PROCEDURE_NAME"),
3530            expected: "interned object name",
3531            value: String::from(routine_name_text),
3532        });
3533    };
3534
3535    Ok(RoutineLocator {
3536        owner,
3537        package_name: if optional_nonblank_text(row, "PROCEDURE_NAME").is_some() {
3538            Some(container_name)
3539        } else {
3540            None
3541        },
3542        routine_name,
3543        subprogram_id: optional_u32(row, "SUBPROGRAM_ID")?,
3544        overload: optional_u32(row, "OVERLOAD")?,
3545    })
3546}
3547
3548fn routine_locator_from_argument_row(
3549    snapshot: &mut CatalogSnapshot,
3550    row: &OracleRow,
3551) -> Result<RoutineLocator, CatalogError> {
3552    let owner_text = row.require_text("OWNER")?;
3553    let routine_name_text = row.require_text("OBJECT_NAME")?;
3554
3555    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3556        return Err(CatalogError::InvalidColumnValue {
3557            column: String::from("OWNER"),
3558            expected: "interned schema name",
3559            value: String::from(owner_text),
3560        });
3561    };
3562    let package_name = optional_nonblank_text(row, "PACKAGE_NAME")
3563        .map(|value| {
3564            snapshot
3565                .intern_object_name(value)
3566                .ok_or(CatalogError::InvalidColumnValue {
3567                    column: String::from("PACKAGE_NAME"),
3568                    expected: "interned object name",
3569                    value: String::from(value),
3570                })
3571        })
3572        .transpose()?;
3573    let Some(routine_name) = snapshot.intern_object_name(routine_name_text) else {
3574        return Err(CatalogError::InvalidColumnValue {
3575            column: String::from("OBJECT_NAME"),
3576            expected: "interned object name",
3577            value: String::from(routine_name_text),
3578        });
3579    };
3580
3581    Ok(RoutineLocator {
3582        owner,
3583        package_name,
3584        routine_name,
3585        subprogram_id: optional_u32(row, "SUBPROGRAM_ID")?,
3586        overload: optional_u32(row, "OVERLOAD")?,
3587    })
3588}
3589
3590fn upsert_packaged_routine(
3591    snapshot: &mut CatalogSnapshot,
3592    owner: SchemaName,
3593    package_name: ObjectName,
3594    kind: RoutineKind,
3595    signature: RoutineSignature,
3596) -> Result<(), CatalogError> {
3597    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3598
3599    schema_catalog
3600        .objects
3601        .entry(package_name)
3602        .or_insert_with(|| {
3603            CatalogObject::Package(PackageMetadata {
3604                common: ObjectCommon {
3605                    owner,
3606                    name: package_name,
3607                    object_type: ObjectType::Package,
3608                    ..ObjectCommon::default()
3609                },
3610                ..PackageMetadata::default()
3611            })
3612        });
3613
3614    let Some(CatalogObject::Package(metadata)) = schema_catalog.objects.get_mut(&package_name)
3615    else {
3616        return Ok(());
3617    };
3618
3619    match kind {
3620        RoutineKind::Procedure => upsert_signature(&mut metadata.procedures, signature),
3621        RoutineKind::Function => upsert_signature(&mut metadata.functions, signature),
3622    }
3623
3624    Ok(())
3625}
3626
3627fn upsert_top_level_routine(
3628    snapshot: &mut CatalogSnapshot,
3629    owner: SchemaName,
3630    routine_name: ObjectName,
3631    kind: RoutineKind,
3632    signature: RoutineSignature,
3633    deterministic: bool,
3634    pipelined: bool,
3635) -> Result<(), CatalogError> {
3636    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3637    let common = schema_catalog
3638        .objects
3639        .get(&routine_name)
3640        .and_then(|object| match object {
3641            CatalogObject::Procedure(metadata) => Some(metadata.common.clone()),
3642            CatalogObject::Function(metadata) => Some(metadata.common.clone()),
3643            _ => None,
3644        })
3645        .unwrap_or_else(|| ObjectCommon {
3646            owner,
3647            name: routine_name,
3648            object_type: match kind {
3649                RoutineKind::Procedure => ObjectType::Procedure,
3650                RoutineKind::Function => ObjectType::Function,
3651            },
3652            ..ObjectCommon::default()
3653        });
3654
3655    let catalog_object = match kind {
3656        RoutineKind::Procedure => CatalogObject::Procedure(ProcedureMetadata { common, signature }),
3657        RoutineKind::Function => CatalogObject::Function(FunctionMetadata {
3658            common,
3659            signature,
3660            deterministic,
3661            pipelined,
3662        }),
3663    };
3664    schema_catalog.objects.insert(routine_name, catalog_object);
3665
3666    Ok(())
3667}
3668
3669fn upsert_signature(signatures: &mut Vec<RoutineSignature>, signature: RoutineSignature) {
3670    if let Some(existing) = signatures.iter_mut().find(|candidate| {
3671        candidate.routine_name.eq(&signature.routine_name)
3672            && candidate.overload.eq(&signature.overload)
3673    }) {
3674        *existing = signature;
3675    } else {
3676        signatures.push(signature);
3677    }
3678}
3679
3680fn data_type_ref_from_argument_row(
3681    snapshot: &mut CatalogSnapshot,
3682    row: &OracleRow,
3683) -> Result<DataTypeRef, CatalogError> {
3684    let owner = optional_nonblank_text(row, "TYPE_OWNER")
3685        .map(|value| {
3686            snapshot
3687                .intern_schema_name(value)
3688                .ok_or(CatalogError::InvalidColumnValue {
3689                    column: String::from("TYPE_OWNER"),
3690                    expected: "interned schema name",
3691                    value: String::from(value),
3692                })
3693        })
3694        .transpose()?;
3695    let type_name = optional_nonblank_text(row, "TYPE_NAME")
3696        .or_else(|| optional_nonblank_text(row, "DATA_TYPE"))
3697        .unwrap_or_default();
3698
3699    Ok(DataTypeRef {
3700        owner,
3701        name: String::from(type_name),
3702        length: optional_u32(row, "DATA_LENGTH")?,
3703        precision: optional_u32(row, "DATA_PRECISION")?,
3704        scale: optional_i32(row, "DATA_SCALE")?,
3705        char_semantics: None,
3706    })
3707}
3708
3709fn parameter_mode_from_dictionary_value(text: Option<&str>) -> ParameterMode {
3710    match text.map(|value| value.trim().to_ascii_uppercase()) {
3711        Some(value) if value.eq("OUT") => ParameterMode::Out,
3712        Some(value) if value.eq("IN/OUT") => ParameterMode::InOut,
3713        _ => ParameterMode::In,
3714    }
3715}
3716
3717fn blank_catalog_object(common: ObjectCommon) -> Option<CatalogObject> {
3718    match common.object_type {
3719        ObjectType::Table => Some(CatalogObject::Table(TableMetadata {
3720            common,
3721            ..TableMetadata::default()
3722        })),
3723        ObjectType::View => Some(CatalogObject::View(ViewMetadata {
3724            common,
3725            ..ViewMetadata::default()
3726        })),
3727        ObjectType::MaterializedView => Some(CatalogObject::MaterializedView(MViewMetadata {
3728            common,
3729            ..MViewMetadata::default()
3730        })),
3731        ObjectType::Sequence => Some(CatalogObject::Sequence(SequenceMetadata {
3732            common,
3733            ..SequenceMetadata::default()
3734        })),
3735        ObjectType::Type => Some(CatalogObject::Type(TypeMetadata {
3736            common,
3737            ..TypeMetadata::default()
3738        })),
3739        ObjectType::Package => Some(CatalogObject::Package(PackageMetadata {
3740            common,
3741            ..PackageMetadata::default()
3742        })),
3743        ObjectType::Procedure => Some(CatalogObject::Procedure(ProcedureMetadata {
3744            common,
3745            ..ProcedureMetadata::default()
3746        })),
3747        ObjectType::Function => Some(CatalogObject::Function(FunctionMetadata {
3748            common,
3749            ..FunctionMetadata::default()
3750        })),
3751        ObjectType::Trigger => Some(CatalogObject::Trigger(TriggerMetadata {
3752            common,
3753            ..TriggerMetadata::default()
3754        })),
3755        ObjectType::SchedulerJob => Some(CatalogObject::SchedulerJob(SchedulerJobMetadata {
3756            common,
3757            ..SchedulerJobMetadata::default()
3758        })),
3759        ObjectType::EditioningView => Some(CatalogObject::EditioningView(EditioningViewMetadata {
3760            common,
3761            ..EditioningViewMetadata::default()
3762        })),
3763        ObjectType::Synonym | ObjectType::Index | ObjectType::Constraint | ObjectType::Unknown => {
3764            None
3765        }
3766    }
3767}
3768
3769fn data_type_ref_from_row(
3770    snapshot: &mut CatalogSnapshot,
3771    row: &OracleRow,
3772) -> Result<DataTypeRef, CatalogError> {
3773    let owner = row
3774        .text("DATA_TYPE_OWNER")
3775        .map(str::trim)
3776        .filter(|value| !value.is_empty())
3777        .map(|value| {
3778            snapshot
3779                .intern_schema_name(value)
3780                .ok_or(CatalogError::InvalidColumnValue {
3781                    column: String::from("DATA_TYPE_OWNER"),
3782                    expected: "interned schema name",
3783                    value: String::from(value),
3784                })
3785        })
3786        .transpose()?;
3787
3788    Ok(DataTypeRef {
3789        owner,
3790        name: String::from(row.require_text("DATA_TYPE")?),
3791        length: optional_u32(row, "DATA_LENGTH")?,
3792        precision: optional_u32(row, "DATA_PRECISION")?,
3793        scale: optional_i32(row, "DATA_SCALE")?,
3794        char_semantics: row.text("CHAR_USED").map(String::from),
3795    })
3796}
3797
3798fn optional_bool(row: &OracleRow, column: &str) -> Result<Option<bool>, CatalogError> {
3799    match row.text(column) {
3800        Some(_) => row.parse_bool(column).map(Some),
3801        None => Ok(None),
3802    }
3803}
3804
3805fn optional_nonblank_text<'a>(row: &'a OracleRow, column: &str) -> Option<&'a str> {
3806    row.text(column)
3807        .map(str::trim)
3808        .filter(|value| !value.is_empty())
3809}
3810
3811fn optional_u32(row: &OracleRow, column: &str) -> Result<Option<u32>, CatalogError> {
3812    match row.text(column) {
3813        Some(_) => {
3814            let parsed = row.parse_u64(column)?;
3815            u32::try_from(parsed)
3816                .map(Some)
3817                .map_err(|_| CatalogError::InvalidColumnValue {
3818                    column: column.to_ascii_uppercase(),
3819                    expected: "u32",
3820                    value: parsed.to_string(),
3821                })
3822        }
3823        None => Ok(None),
3824    }
3825}
3826
3827fn required_u32(row: &OracleRow, column: &str) -> Result<u32, CatalogError> {
3828    let parsed = row.parse_u64(column)?;
3829    u32::try_from(parsed).map_err(|_| CatalogError::InvalidColumnValue {
3830        column: column.to_ascii_uppercase(),
3831        expected: "u32",
3832        value: parsed.to_string(),
3833    })
3834}
3835
3836fn optional_i32(row: &OracleRow, column: &str) -> Result<Option<i32>, CatalogError> {
3837    match row.text(column) {
3838        Some(_) => {
3839            let parsed = row.parse_i64(column)?;
3840            i32::try_from(parsed)
3841                .map(Some)
3842                .map_err(|_| CatalogError::InvalidColumnValue {
3843                    column: column.to_ascii_uppercase(),
3844                    expected: "i32",
3845                    value: parsed.to_string(),
3846                })
3847        }
3848        None => Ok(None),
3849    }
3850}
3851
3852#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3853pub struct SynonymTarget {
3854    pub target_owner: Option<SchemaName>,
3855    pub target_name: ObjectName,
3856    pub target_type: Option<ObjectType>,
3857    pub db_link: Option<String>,
3858    pub public_synonym: bool,
3859}
3860
3861/// Virtual Private Database (VPD / RLS) policy entry from
3862/// `ALL_POLICIES`. Each row describes one policy attached to an object;
3863/// the policy function (PF_OWNER.PACKAGE.FUNCTION) is the predicate
3864/// generator that Oracle invokes at parse time to inject a WHERE clause
3865/// into reads (and optional ones into INSERT/UPDATE/DELETE).
3866///
3867/// Lineage flags VPD-protected objects with
3868/// `UnknownReason::DbLinkRemoteObject` reused-as-marker pending a
3869/// dedicated `UnknownReason::VpdPolicyApplied` variant.
3870#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3871pub struct VpdPolicy {
3872    /// Owning schema (the object's owner).
3873    pub object_owner: SchemaName,
3874    /// Protected object.
3875    pub object_name: ObjectName,
3876    /// Optional policy-group name (Oracle policy-groups; usually NULL).
3877    pub policy_group: Option<String>,
3878    /// Policy name within the group.
3879    pub policy_name: String,
3880    /// Owner of the policy function.
3881    pub function_owner: SchemaName,
3882    /// Package containing the policy function (NULL for standalone).
3883    pub function_package: Option<String>,
3884    /// Function name that produces the WHERE-clause predicate.
3885    pub function_name: String,
3886    /// Statement-type bits — true means the policy applies to that DML.
3887    pub on_select: bool,
3888    pub on_insert: bool,
3889    pub on_update: bool,
3890    pub on_delete: bool,
3891    /// Whether the policy is currently enabled.
3892    pub enabled: bool,
3893}
3894
3895/// Edition entry from `ALL_EDITIONS` — the per-database edition tree
3896/// used by Oracle Edition-Based Redefinition (EBR). Linked into
3897/// [`CatalogSnapshot::editions`].
3898#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3899pub struct Edition {
3900    /// Edition name (Oracle identifiers are case-preserving but case-
3901    /// insensitive when unquoted; stored as the dictionary value).
3902    pub edition_name: String,
3903    /// Parent edition, if any. `None` for the root edition (typically
3904    /// `ORA$BASE`).
3905    pub parent_edition_name: Option<String>,
3906    /// Whether the edition is currently usable (`USABLE = 'Y'`).
3907    pub usable: bool,
3908}
3909
3910/// Editioning view from `ALL_EDITIONING_VIEWS` — a view that masks an
3911/// editioned table during EBR cutovers. Linked into
3912/// [`SchemaCatalog::editioning_views`].
3913#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3914pub struct EditioningView {
3915    /// Owning schema.
3916    pub owner: SchemaName,
3917    /// Editioning view name.
3918    pub view_name: ObjectName,
3919    /// Base table the editioning view masks.
3920    pub table_name: ObjectName,
3921}
3922
3923/// Documentation comment attached to a table, view, or materialized
3924/// view via `COMMENT ON TABLE owner.name IS '...'`. Sourced from
3925/// `ALL_TAB_COMMENTS`.
3926///
3927/// `plsql-docgen` consumes these to render description text alongside
3928/// object docs; dependency analysis does not interact with them.
3929#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3930pub struct TableComment {
3931    /// Owning schema.
3932    pub owner: SchemaName,
3933    /// Object name.
3934    pub table_name: ObjectName,
3935    /// `TABLE` / `VIEW` / `MATERIALIZED VIEW` — preserved verbatim
3936    /// from `ALL_TAB_COMMENTS.TABLE_TYPE` so docgen can pick a
3937    /// per-kind template.
3938    pub table_type: String,
3939    /// Comment text. Always present (we filter out NULL rows server-side
3940    /// to keep the snapshot compact).
3941    pub comments: String,
3942}
3943
3944/// Documentation comment attached to a column via
3945/// `COMMENT ON COLUMN owner.table.column IS '...'`. Sourced from
3946/// `ALL_COL_COMMENTS`.
3947#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3948pub struct ColumnComment {
3949    /// Owning schema.
3950    pub owner: SchemaName,
3951    /// Object name.
3952    pub table_name: ObjectName,
3953    /// Column name.
3954    pub column_name: ColumnName,
3955    /// Comment text.
3956    pub comments: String,
3957}
3958
3959/// Database link metadata sourced from `ALL_DB_LINKS`.
3960///
3961/// PL/SQL code that references `remote_object@my_link` resolves through
3962/// one of these entries. Public links have `owner = PUBLIC`. Lineage uses
3963/// the `host` field to classify the remote endpoint (a TNS alias, a full
3964/// EZCONNECT string, or the legacy `(DESCRIPTION=...)` form).
3965///
3966/// The shape intentionally avoids `username` — `ALL_DB_LINKS.USERNAME`
3967/// is the *connect user*, not a privilege grant, and most consumers
3968/// don't need it. If a future product surface needs the connect user it
3969/// can be added behind `#[serde(default)]` without breaking older
3970/// snapshots.
3971#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3972pub struct DatabaseLink {
3973    /// Owning schema. `PUBLIC` for public links.
3974    pub owner: SchemaName,
3975    /// Link name, e.g. `REPORTING.WORLD`.
3976    pub name: String,
3977    /// Connect-target host string. Can be a TNS alias, an EZCONNECT
3978    /// string, or a full `(DESCRIPTION=...)` block.
3979    pub host: Option<String>,
3980    /// `true` when `owner` is `PUBLIC`. Surfaced for fast filtering in
3981    /// downstream lineage without re-resolving the schema interner.
3982    pub public_link: bool,
3983}
3984
3985#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3986pub enum GrantPrivilege {
3987    Select,
3988    Insert,
3989    Update,
3990    Delete,
3991    Execute,
3992    Alter,
3993    Index,
3994    References,
3995    Debug,
3996    #[default]
3997    Other,
3998}
3999
4000#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4001pub enum Grantee {
4002    User(UserName),
4003    Role(RoleName),
4004    #[default]
4005    Public,
4006}
4007
4008#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4009pub struct Grant {
4010    pub object_owner: SchemaName,
4011    pub object_name: ObjectName,
4012    pub privilege: GrantPrivilege,
4013    pub grantee: Grantee,
4014    pub grantable: bool,
4015    pub via_role: Option<RoleName>,
4016    pub with_hierarchy: bool,
4017}
4018
4019#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4020pub struct IndexMetadata {
4021    pub name: IndexName,
4022    pub table_owner: SchemaName,
4023    pub table_name: ObjectName,
4024    pub unique: bool,
4025    pub columns: Vec<ColumnName>,
4026    pub index_type: String,
4027    pub status: ObjectStatus,
4028}
4029
4030#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4031pub enum ConstraintType {
4032    PrimaryKey,
4033    ForeignKey,
4034    Unique,
4035    Check,
4036    NotNull,
4037    Ref,
4038    #[default]
4039    Other,
4040}
4041
4042#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4043pub struct ConstraintMetadata {
4044    pub name: ConstraintName,
4045    pub table_owner: SchemaName,
4046    pub table_name: ObjectName,
4047    pub constraint_type: ConstraintType,
4048    pub columns: Vec<ColumnName>,
4049    pub referenced_table_owner: Option<SchemaName>,
4050    pub referenced_table_name: Option<ObjectName>,
4051    pub referenced_columns: Vec<ColumnName>,
4052    pub search_condition: Option<String>,
4053    pub deferrable: Option<bool>,
4054    pub initially_deferred: Option<bool>,
4055}
4056
4057#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4058pub enum CatalogDependencyKind {
4059    Hard,
4060    Reference,
4061    Extended,
4062    #[default]
4063    Other,
4064}
4065
4066#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4067pub struct CatalogDependency {
4068    pub owner: SchemaName,
4069    pub name: ObjectName,
4070    pub object_type: ObjectType,
4071    pub referenced_owner: Option<SchemaName>,
4072    pub referenced_name: ObjectName,
4073    pub referenced_type: Option<ObjectType>,
4074    pub dependency_kind: CatalogDependencyKind,
4075    pub via_db_link: Option<String>,
4076}
4077
4078#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4079pub enum PlScopeAvailability {
4080    #[default]
4081    NotAvailable,
4082    AvailableButStale,
4083    IdentifiersOnly,
4084    IdentifiersAndStatements,
4085}
4086
4087#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4088pub struct CompilerIdentifier {
4089    pub owner: SchemaName,
4090    pub object_name: ObjectName,
4091    pub identifier_name: MemberName,
4092    pub identifier_type: String,
4093    pub usage: String,
4094    pub line: u32,
4095    pub column: u32,
4096}
4097
4098#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4099pub struct CompilerReference {
4100    pub owner: SchemaName,
4101    pub object_name: ObjectName,
4102    pub usage_line: u32,
4103    pub usage_column: u32,
4104    pub target_owner: Option<SchemaName>,
4105    pub target_object_name: Option<ObjectName>,
4106    pub target_identifier_name: Option<MemberName>,
4107}
4108
4109#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4110pub struct CompilerStatementUsage {
4111    pub owner: SchemaName,
4112    pub object_name: ObjectName,
4113    pub statement_kind: String,
4114    pub line: u32,
4115    pub column: u32,
4116}
4117
4118#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4119pub struct PlScopeSnapshot {
4120    pub availability: PlScopeAvailability,
4121    pub identifiers: Vec<CompilerIdentifier>,
4122    pub references: Vec<CompilerReference>,
4123    pub statements: Vec<CompilerStatementUsage>,
4124    pub collected_at: Option<DateTime<Utc>>,
4125    pub source_hash: Option<Hash>,
4126    pub warnings: Vec<CapabilityWarning>,
4127}
4128
4129#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4130pub struct DataTypeRef {
4131    pub owner: Option<SchemaName>,
4132    pub name: String,
4133    pub length: Option<u32>,
4134    pub precision: Option<u32>,
4135    pub scale: Option<i32>,
4136    pub char_semantics: Option<String>,
4137}
4138
4139#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4140pub struct ColumnMetadata {
4141    pub name: ColumnName,
4142    pub position: u32,
4143    pub data_type: DataTypeRef,
4144    pub nullable: bool,
4145    pub default_expression: Option<String>,
4146    pub generated_expression: Option<String>,
4147    pub hidden: bool,
4148}
4149
4150#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4151pub enum TemporaryTableDuration {
4152    #[default]
4153    Transaction,
4154    Session,
4155}
4156
4157#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4158pub struct TableMetadata {
4159    pub common: ObjectCommon,
4160    pub columns: HashMap<ColumnName, ColumnMetadata>,
4161    pub temporary: bool,
4162    pub temporary_duration: Option<TemporaryTableDuration>,
4163    pub index_organized: bool,
4164}
4165
4166#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4167pub struct ViewMetadata {
4168    pub common: ObjectCommon,
4169    pub columns: HashMap<ColumnName, ColumnMetadata>,
4170    pub query_hash: Option<Hash>,
4171    pub read_only: Option<bool>,
4172    pub check_option: Option<String>,
4173}
4174
4175#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4176pub struct MViewMetadata {
4177    pub common: ObjectCommon,
4178    pub columns: HashMap<ColumnName, ColumnMetadata>,
4179    pub refresh_mode: Option<String>,
4180    pub refresh_method: Option<String>,
4181    pub query_hash: Option<Hash>,
4182}
4183
4184#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4185pub struct SequenceMetadata {
4186    pub common: ObjectCommon,
4187    pub increment_by: i64,
4188    pub min_value: Option<i64>,
4189    pub max_value: Option<i64>,
4190    pub cycle: bool,
4191    pub ordered: bool,
4192    pub cache_size: Option<u64>,
4193}
4194
4195#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4196pub enum ParameterMode {
4197    #[default]
4198    In,
4199    Out,
4200    InOut,
4201}
4202
4203#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4204pub struct ArgumentMetadata {
4205    pub position: u32,
4206    pub name: Option<MemberName>,
4207    pub mode: ParameterMode,
4208    pub data_type: DataTypeRef,
4209    pub defaulted: bool,
4210}
4211
4212#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4213pub struct AccessibleByTarget {
4214    pub owner: Option<SchemaName>,
4215    pub object_name: ObjectName,
4216}
4217
4218#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4219pub struct RoutineSignature {
4220    pub routine_name: ObjectName,
4221    pub overload: Option<u32>,
4222    pub arguments: Vec<ArgumentMetadata>,
4223    pub return_type: Option<DataTypeRef>,
4224    pub authid_current_user: Option<bool>,
4225    pub accessible_by: Vec<AccessibleByTarget>,
4226}
4227
4228#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4229pub enum TypeFinality {
4230    Final,
4231    NotFinal,
4232    #[default]
4233    Unknown,
4234}
4235
4236#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4237pub enum TypeInstantiable {
4238    Instantiable,
4239    NotInstantiable,
4240    #[default]
4241    Unknown,
4242}
4243
4244#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4245pub struct TypeAttribute {
4246    pub name: MemberName,
4247    pub position: u32,
4248    pub data_type: DataTypeRef,
4249}
4250
4251#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4252pub struct TypeMetadata {
4253    pub common: ObjectCommon,
4254    pub attributes: Vec<TypeAttribute>,
4255    pub methods: Vec<RoutineSignature>,
4256    pub supertype_owner: Option<SchemaName>,
4257    pub supertype_name: Option<ObjectName>,
4258    pub finality: TypeFinality,
4259    pub instantiable: TypeInstantiable,
4260}
4261
4262#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4263pub struct PackageMetadata {
4264    pub common: ObjectCommon,
4265    pub procedures: Vec<RoutineSignature>,
4266    pub functions: Vec<RoutineSignature>,
4267    pub package_stateful: Option<bool>,
4268    pub authid_current_user: Option<bool>,
4269    pub accessible_by: Vec<AccessibleByTarget>,
4270}
4271
4272#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4273pub struct ProcedureMetadata {
4274    pub common: ObjectCommon,
4275    pub signature: RoutineSignature,
4276}
4277
4278#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4279pub struct FunctionMetadata {
4280    pub common: ObjectCommon,
4281    pub signature: RoutineSignature,
4282    pub deterministic: bool,
4283    pub pipelined: bool,
4284}
4285
4286#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4287pub enum TriggerTiming {
4288    Before,
4289    After,
4290    InsteadOf,
4291    #[default]
4292    Unknown,
4293}
4294
4295#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4296pub enum TriggerLevel {
4297    Statement,
4298    Row,
4299    #[default]
4300    Unknown,
4301}
4302
4303#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4304pub enum TriggerEvent {
4305    Insert,
4306    Update,
4307    Delete,
4308    Logon,
4309    Logoff,
4310    Ddl,
4311    #[default]
4312    Other,
4313}
4314
4315#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4316pub struct TriggerMetadata {
4317    pub common: ObjectCommon,
4318    pub target_owner: SchemaName,
4319    pub target_name: ObjectName,
4320    pub timing: TriggerTiming,
4321    pub level: TriggerLevel,
4322    pub events: Vec<TriggerEvent>,
4323    pub when_clause: Option<String>,
4324    pub body_hash: Option<Hash>,
4325}
4326
4327#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4328pub struct SchedulerJobMetadata {
4329    pub common: ObjectCommon,
4330    pub enabled: bool,
4331    pub job_type: String,
4332    pub program_name: Option<ObjectName>,
4333    pub schedule_name: Option<ObjectName>,
4334    pub job_action: Option<String>,
4335}
4336
4337#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4338pub struct EditioningViewMetadata {
4339    pub common: ObjectCommon,
4340    pub base_table_owner: SchemaName,
4341    pub base_table_name: ObjectName,
4342    pub columns: HashMap<ColumnName, ColumnMetadata>,
4343}
4344
4345#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
4346pub enum CatalogObject {
4347    Table(TableMetadata),
4348    View(ViewMetadata),
4349    MaterializedView(MViewMetadata),
4350    Sequence(SequenceMetadata),
4351    Type(TypeMetadata),
4352    Package(PackageMetadata),
4353    Procedure(ProcedureMetadata),
4354    Function(FunctionMetadata),
4355    Trigger(TriggerMetadata),
4356    SchedulerJob(SchedulerJobMetadata),
4357    EditioningView(EditioningViewMetadata),
4358}
4359
4360#[cfg(test)]
4361mod builder_tests {
4362    use chrono::Utc;
4363    use plsql_core::AnalysisProfile;
4364
4365    use crate::{
4366        CatalogCapabilities, CatalogRowSet, CatalogSnapshotBuilder, CatalogSource,
4367        CatalogSourceKind, CatalogSourceKind::LiveConnection, GrantPrivilege, Grantee, ObjectType,
4368        OracleRow, PlScopeAvailability,
4369    };
4370
4371    fn oracle_row(columns: &[(&str, &str, Option<&str>)]) -> OracleRow {
4372        let mut row = OracleRow::default();
4373        for (name, oracle_type, value) in columns {
4374            row.insert(*name, *oracle_type, value.map(String::from));
4375        }
4376        row
4377    }
4378
4379    fn builder() -> CatalogSnapshotBuilder {
4380        CatalogSnapshotBuilder::new(
4381            AnalysisProfile::default(),
4382            CatalogCapabilities {
4383                plscope_enabled: true,
4384                can_query_roles_and_grants: true,
4385                ..CatalogCapabilities::default()
4386            },
4387            CatalogSource {
4388                kind: LiveConnection,
4389                description: Some(String::from("synthetic external extractor")),
4390                ..CatalogSource::default()
4391            },
4392            Utc::now(),
4393        )
4394    }
4395
4396    #[test]
4397    fn catalog_snapshot_builder_applies_synthetic_dictionary_rows_on_stable()
4398    -> Result<(), crate::CatalogError> {
4399        let mut builder = builder();
4400
4401        builder.apply_row(
4402            CatalogRowSet::Objects,
4403            &oracle_row(&[
4404                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4405                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
4406                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
4407                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4408            ]),
4409        )?;
4410        builder.apply_row(
4411            CatalogRowSet::Columns,
4412            &oracle_row(&[
4413                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4414                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4415                ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4416                ("COLUMN_POSITION", "NUMBER", Some("1")),
4417                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4418                ("DATA_PRECISION", "NUMBER", Some("10")),
4419                ("DATA_SCALE", "NUMBER", Some("0")),
4420                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4421            ]),
4422        )?;
4423        builder.apply_row(
4424            CatalogRowSet::Users,
4425            &oracle_row(&[("USERNAME", "VARCHAR2(128)", Some("APP_USER"))]),
4426        )?;
4427        builder.apply_row(
4428            CatalogRowSet::Grants,
4429            &oracle_row(&[
4430                ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
4431                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4432                ("GRANTEE", "VARCHAR2(128)", Some("APP_USER")),
4433                ("PRIVILEGE", "VARCHAR2(40)", Some("SELECT")),
4434                ("GRANTABLE", "VARCHAR2(3)", Some("NO")),
4435                ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
4436            ]),
4437        )?;
4438        builder.apply_row(
4439            CatalogRowSet::PlScopeAvailability,
4440            &oracle_row(&[
4441                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4442                (
4443                    "PLSCOPE_SETTINGS",
4444                    "VARCHAR2(4000)",
4445                    Some("IDENTIFIERS:ALL, STATEMENTS:ALL"),
4446                ),
4447            ]),
4448        )?;
4449        builder.apply_row(
4450            CatalogRowSet::PlScopeIdentifiers,
4451            &oracle_row(&[
4452                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4453                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES_PKG")),
4454                ("NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4455                ("TYPE", "VARCHAR2(128)", Some("VARIABLE")),
4456                ("USAGE", "VARCHAR2(128)", Some("DECLARATION")),
4457                ("LINE", "NUMBER", Some("7")),
4458                ("COL", "NUMBER", Some("12")),
4459            ]),
4460        )?;
4461
4462        let snapshot = builder.finish()?;
4463        let report = snapshot.doctor_report();
4464        assert_eq!(report.source_kind, CatalogSourceKind::LiveConnection);
4465        assert_eq!(report.totals.schemas_observed, 1);
4466        assert_eq!(report.totals.objects_total, 1);
4467        assert_eq!(report.totals.columns_total, 1);
4468        assert_eq!(report.totals.grants_total, 1);
4469        assert_eq!(
4470            report.object_counts.first().map(|count| count.object_type),
4471            Some(ObjectType::Table)
4472        );
4473        assert_eq!(
4474            report
4475                .plscope_availability_per_schema
4476                .first()
4477                .map(|entry| entry.availability),
4478            Some(PlScopeAvailability::IdentifiersAndStatements)
4479        );
4480        assert!(snapshot.schemas.values().any(|schema| {
4481            schema.grants.iter().any(|grant| {
4482                grant.privilege == GrantPrivilege::Select
4483                    && matches!(grant.grantee, Grantee::User(_))
4484            })
4485        }));
4486        assert_eq!(
4487            snapshot
4488                .schemas
4489                .values()
4490                .filter_map(|schema| schema.plscope.as_ref())
4491                .map(|plscope| plscope.identifiers.len())
4492                .sum::<usize>(),
4493            1
4494        );
4495        Ok(())
4496    }
4497
4498    #[test]
4499    fn catalog_snapshot_builder_can_mark_user_universe_known_empty()
4500    -> Result<(), crate::CatalogError> {
4501        let mut builder = builder();
4502        let rows: Vec<OracleRow> = Vec::new();
4503        builder.apply_rows(CatalogRowSet::Users, &rows)?;
4504        let snapshot = builder.finish()?;
4505        assert_eq!(snapshot.known_users, Some(Default::default()));
4506        Ok(())
4507    }
4508}
4509
4510#[cfg(all(test, feature = "oraclemcp-db"))]
4511mod tests {
4512    use std::collections::{HashMap, HashSet};
4513    use std::future::Future;
4514
4515    use chrono::{DateTime, Utc};
4516    use plsql_core::{AnalysisProfile, ColumnName, MemberName, ObjectName, SchemaName, SymbolId};
4517    use tempfile::tempdir;
4518
4519    use crate::live::{LiveContext, RuntimeBuilder, load_catalog_users};
4520    use crate::{
4521        AccessibleByTarget, CATALOG_SNAPSHOT_SCHEMA_ID, CATALOG_SNAPSHOT_SCHEMA_VERSION,
4522        CapabilityWarning, CatalogCapabilities, CatalogDependencyKind, CatalogError,
4523        CatalogLoadRequest, CatalogObject, CatalogSchemaFilter, CatalogSnapshot,
4524        CatalogSnapshotDocument, CatalogSource, CatalogSourceKind, CompilerIdentifier,
4525        ConstraintType, DataTypeRef, Hash, ObjectCommon, ObjectStatus, ObjectType, OracleBackend,
4526        OracleBind, OracleConnectOptions, OracleConnection, OracleConnectionInfo, OracleRow,
4527        PackageMetadata, PlScopeAvailability, PlScopeSnapshot, RoutineSignature, SchemaCatalog,
4528        SynonymName, SynonymTarget, TableMetadata, TriggerEvent, TriggerLevel, TriggerName,
4529        TriggerTiming, TypeFinality, TypeInstantiable, export_snapshot_to_json,
4530        grantee_from_dictionary_value, load_from_dbms_metadata_dir, load_snapshot_from_connection,
4531        load_snapshot_from_json, negotiate_capabilities, populate_dbms_metadata_ddl,
4532    };
4533
4534    #[derive(Clone, Debug, Eq, PartialEq)]
4535    struct QueryExpectation {
4536        sql_contains: String,
4537        params: Vec<OracleBind>,
4538        rows: Vec<OracleRow>,
4539    }
4540
4541    #[derive(Clone, Debug)]
4542    struct StaticConnection {
4543        rows: Vec<OracleRow>,
4544        row_count: u64,
4545        expected_queries: Vec<QueryExpectation>,
4546        connection_info: OracleConnectionInfo,
4547    }
4548
4549    impl Default for StaticConnection {
4550        fn default() -> Self {
4551            Self {
4552                rows: Vec::new(),
4553                row_count: 0,
4554                expected_queries: Vec::new(),
4555                connection_info: OracleConnectionInfo {
4556                    backend: OracleBackend::RustOracle,
4557                    connect_string: String::from("//localhost/XE"),
4558                    current_schema: Some(String::from("BILLING")),
4559                    server_version: String::from("23.0.0.0.0"),
4560                    db_name: String::from("XE"),
4561                    db_domain: String::new(),
4562                    service_name: String::from("XE"),
4563                    instance_name: String::from("xe"),
4564                    server_type: String::from("Dedicated"),
4565                    max_identifier_length: 128,
4566                    max_open_cursors: 500,
4567                },
4568            }
4569        }
4570    }
4571
4572    #[async_trait::async_trait(?Send)]
4573    impl OracleConnection for StaticConnection {
4574        fn backend(&self) -> OracleBackend {
4575            OracleBackend::RustOracle
4576        }
4577
4578        async fn ping(&self, cx: &LiveContext) -> Result<(), CatalogError> {
4579            let _ = cx;
4580            Ok(())
4581        }
4582
4583        async fn describe(&self, cx: &LiveContext) -> Result<OracleConnectionInfo, CatalogError> {
4584            let _ = cx;
4585            Ok(self.connection_info.clone())
4586        }
4587
4588        async fn query_rows(
4589            &self,
4590            cx: &LiveContext,
4591            sql: &str,
4592            params: &[OracleBind],
4593        ) -> Result<Vec<OracleRow>, CatalogError> {
4594            let _ = cx;
4595            if !self.expected_queries.is_empty() {
4596                if let Some(expectation) = self.expected_queries.iter().find(|expectation| {
4597                    sql.contains(expectation.sql_contains.as_str())
4598                        && params.eq(expectation.params.as_slice())
4599                }) {
4600                    return Ok(expectation.rows.clone());
4601                }
4602
4603                return Err(CatalogError::OracleBackendError {
4604                    backend: OracleBackend::RustOracle,
4605                    message: format!("unexpected query `{sql}` with params {params:?}"),
4606                });
4607            }
4608
4609            Ok(self.rows.clone())
4610        }
4611
4612        async fn execute(
4613            &self,
4614            cx: &LiveContext,
4615            _sql: &str,
4616            _params: &[OracleBind],
4617        ) -> Result<u64, CatalogError> {
4618            let _ = cx;
4619            Ok(self.row_count)
4620        }
4621    }
4622
4623    fn run_catalog_future<F: Future>(future: F) -> F::Output {
4624        RuntimeBuilder::current_thread()
4625            .build()
4626            .expect("test live runtime")
4627            .block_on(future)
4628    }
4629
4630    fn test_cx() -> LiveContext {
4631        LiveContext::current().expect("test runtime installs a request context")
4632    }
4633
4634    fn load_snapshot_for_test<C: OracleConnection>(
4635        connection: &C,
4636        request: &CatalogLoadRequest,
4637    ) -> Result<CatalogSnapshot, CatalogError> {
4638        run_catalog_future(async {
4639            let cx = test_cx();
4640            load_snapshot_from_connection(&cx, connection, request).await
4641        })
4642    }
4643
4644    fn load_catalog_users_for_test<C: OracleConnection>(
4645        connection: &C,
4646        snapshot: &mut CatalogSnapshot,
4647    ) -> Result<(), CatalogError> {
4648        run_catalog_future(async {
4649            let cx = test_cx();
4650            load_catalog_users(&cx, connection, snapshot).await
4651        })
4652    }
4653
4654    fn populate_dbms_metadata_ddl_for_test<C: OracleConnection>(
4655        connection: &C,
4656        snapshot: &mut CatalogSnapshot,
4657    ) -> Result<(), CatalogError> {
4658        run_catalog_future(async {
4659            let cx = test_cx();
4660            populate_dbms_metadata_ddl(&cx, connection, snapshot).await
4661        })
4662    }
4663
4664    fn negotiate_capabilities_for_test<C: OracleConnection>(connection: &C) -> CatalogCapabilities {
4665        run_catalog_future(async {
4666            let cx = test_cx();
4667            negotiate_capabilities(&cx, connection).await
4668        })
4669    }
4670
4671    fn oracle_row(columns: &[(&str, &str, Option<&str>)]) -> OracleRow {
4672        let mut row = OracleRow::default();
4673        for (name, oracle_type, value) in columns {
4674            row.insert(*name, *oracle_type, value.map(String::from));
4675        }
4676        row
4677    }
4678
4679    /// Probe queries issued by `negotiate_capabilities`. Every mock test
4680    /// that wants the live-extraction loader to behave normally should
4681    /// prepend these to its `expected_queries` so each probe succeeds.
4682    fn capability_probe_expectations() -> Vec<QueryExpectation> {
4683        [
4684            "from all_objects where rownum = 0",
4685            "from dba_objects where rownum = 0",
4686            "from all_source where rownum = 0",
4687            "from all_scheduler_jobs where rownum = 0",
4688            "from all_tab_privs where rownum = 0",
4689            "from all_plsql_object_settings where rownum = 0",
4690        ]
4691        .into_iter()
4692        .map(|fragment| QueryExpectation {
4693            sql_contains: String::from(fragment),
4694            params: vec![],
4695            rows: vec![],
4696        })
4697        .collect()
4698    }
4699
4700    /// `ALL_USERS` extraction expectation. The live loader queries this
4701    /// (database-wide, no schema bind) to learn which grantees are users so
4702    /// `grantee_from_dictionary_value` can tell users from roles. Tests pass
4703    /// the set of usernames they want to be classified as `Grantee::User`;
4704    /// any grantee absent from this set is classified as `Grantee::Role`.
4705    fn all_users_expectation(usernames: &[&str]) -> QueryExpectation {
4706        QueryExpectation {
4707            sql_contains: String::from("from all_users"),
4708            params: vec![],
4709            rows: usernames
4710                .iter()
4711                .map(|name| oracle_row(&[("USERNAME", "VARCHAR2(128)", Some(name))]))
4712                .collect(),
4713        }
4714    }
4715
4716    #[test]
4717    fn oracle_row_helpers_are_case_insensitive_and_typed() {
4718        let mut row = OracleRow::default();
4719        row.insert(
4720            "current_schema",
4721            "VARCHAR2(128)",
4722            Some(String::from("billing")),
4723        );
4724        row.insert("object_id", "NUMBER(10)", Some(String::from("42")));
4725        row.insert("editionable", "VARCHAR2(1)", Some(String::from("Y")));
4726        row.insert("ddl_text", "CLOB", None);
4727
4728        assert_eq!(row.text("CURRENT_SCHEMA"), Some("billing"));
4729        assert_eq!(row.parse_u64("object_id").ok(), Some(42));
4730        assert_eq!(row.parse_bool("EDITIONABLE").ok(), Some(true));
4731        assert!(matches!(
4732            row.require_text("ddl_text"),
4733            Err(CatalogError::NullColumnValue { column }) if column.eq("DDL_TEXT")
4734        ));
4735    }
4736
4737    #[test]
4738    fn parse_bool_honors_oracle_conventions_and_errors() {
4739        // The whole catalog's boolean flags (status/editionable/
4740        // deterministic/…) flow through parse_bool. Lock the Oracle
4741        // convention: Y/YES/TRUE/1 -> true, N/NO/FALSE/0 -> false,
4742        // case-insensitive, whitespace-trimmed; anything else is an
4743        // explicit InvalidColumnValue (R13 — never a silent default).
4744        for t in ["Y", "y", "YES", " yes ", "TRUE", "true", "1"] {
4745            let row = oracle_row(&[("FLAG", "VARCHAR2(3)", Some(t))]);
4746            assert_eq!(row.parse_bool("flag").ok(), Some(true), "{t:?} -> true");
4747        }
4748        for f in ["N", "n", "NO", " no ", "FALSE", "false", "0"] {
4749            let row = oracle_row(&[("FLAG", "VARCHAR2(3)", Some(f))]);
4750            assert_eq!(row.parse_bool("flag").ok(), Some(false), "{f:?} -> false");
4751        }
4752        // Unrecognized value -> explicit error, not a silent bool.
4753        let bad = oracle_row(&[("FLAG", "VARCHAR2(5)", Some("MAYBE"))]);
4754        assert!(matches!(
4755            bad.parse_bool("FLAG"),
4756            Err(CatalogError::InvalidColumnValue {
4757                column,
4758                expected: "bool",
4759                ..
4760            }) if column.eq("FLAG")
4761        ));
4762        // Missing column and NULL value are distinct typed errors.
4763        let empty = oracle_row(&[]);
4764        assert!(matches!(
4765            empty.parse_bool("FLAG"),
4766            Err(CatalogError::MissingColumn { column }) if column.eq("FLAG")
4767        ));
4768        let nullv = oracle_row(&[("FLAG", "VARCHAR2(1)", None)]);
4769        assert!(matches!(
4770            nullv.parse_bool("FLAG"),
4771            Err(CatalogError::NullColumnValue { column }) if column.eq("FLAG")
4772        ));
4773
4774        // parse_u64 must reject a negative value (Oracle NUMBER can
4775        // be signed) rather than wrap/panic.
4776        let neg = oracle_row(&[("N", "NUMBER", Some("-1"))]);
4777        assert!(matches!(
4778            neg.parse_u64("N"),
4779            Err(CatalogError::InvalidColumnValue {
4780                expected: "u64",
4781                ..
4782            })
4783        ));
4784        let i = oracle_row(&[("N", "NUMBER", Some("-42"))]);
4785        assert_eq!(i.parse_i64("N").ok(), Some(-42));
4786    }
4787
4788    #[test]
4789    fn catalog_load_request_defaults_to_current_schema() {
4790        let request = CatalogLoadRequest::default();
4791        assert_eq!(
4792            request.schema_filters,
4793            vec![CatalogSchemaFilter::CurrentSchema]
4794        );
4795    }
4796
4797    #[test]
4798    fn load_snapshot_from_connection_extracts_structural_metadata() {
4799        let object_rows = vec![
4800            oracle_row(&[
4801                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4802                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_API")),
4803                ("OBJECT_TYPE", "VARCHAR2(30)", Some("PACKAGE")),
4804                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4805            ]),
4806            oracle_row(&[
4807                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4808                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
4809                ("OBJECT_TYPE", "VARCHAR2(30)", Some("FUNCTION")),
4810                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4811            ]),
4812            oracle_row(&[
4813                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4814                ("OBJECT_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
4815                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
4816                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4817            ]),
4818            oracle_row(&[
4819                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4820                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
4821                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
4822                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4823            ]),
4824            oracle_row(&[
4825                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4826                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICE_SUMMARY")),
4827                ("OBJECT_TYPE", "VARCHAR2(30)", Some("VIEW")),
4828                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4829            ]),
4830            oracle_row(&[
4831                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4832                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES_BIU")),
4833                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TRIGGER")),
4834                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4835            ]),
4836        ];
4837        let column_rows = vec![
4838            oracle_row(&[
4839                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4840                ("TABLE_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
4841                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4842                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4843                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4844                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4845                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4846                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4847                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4848                ("CHAR_USED", "VARCHAR2(1)", None),
4849                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4850                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", None),
4851                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("NO")),
4852                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4853            ]),
4854            oracle_row(&[
4855                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4856                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4857                ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4858                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4859                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4860                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4861                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4862                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4863                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4864                ("CHAR_USED", "VARCHAR2(1)", None),
4865                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4866                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", None),
4867                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("NO")),
4868                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4869            ]),
4870            oracle_row(&[
4871                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4872                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4873                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4874                ("COLUMN_POSITION", "NUMBER(10)", Some("2")),
4875                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4876                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4877                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4878                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
4879                ("DATA_SCALE", "NUMBER(10)", Some("0")),
4880                ("CHAR_USED", "VARCHAR2(1)", None),
4881                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4882                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", None),
4883                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("NO")),
4884                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4885            ]),
4886            oracle_row(&[
4887                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4888                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICE_SUMMARY")),
4889                ("COLUMN_NAME", "VARCHAR2(128)", Some("TOTAL_DUE")),
4890                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4891                ("DATA_TYPE_OWNER", "VARCHAR2(128)", None),
4892                ("DATA_TYPE", "VARCHAR2(128)", Some("NUMBER")),
4893                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
4894                ("DATA_PRECISION", "NUMBER(10)", Some("12")),
4895                ("DATA_SCALE", "NUMBER(10)", Some("2")),
4896                ("CHAR_USED", "VARCHAR2(1)", None),
4897                ("NULLABLE", "VARCHAR2(1)", Some("Y")),
4898                ("DATA_DEFAULT_VC", "VARCHAR2(4000)", Some("0")),
4899                ("VIRTUAL_COLUMN", "VARCHAR2(3)", Some("YES")),
4900                ("HIDDEN_COLUMN", "VARCHAR2(3)", Some("NO")),
4901            ]),
4902        ];
4903        let constraint_rows = vec![
4904            oracle_row(&[
4905                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4906                (
4907                    "CONSTRAINT_NAME",
4908                    "VARCHAR2(128)",
4909                    Some("INVOICES_CUSTOMER_FK"),
4910                ),
4911                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4912                ("CONSTRAINT_TYPE", "VARCHAR2(1)", Some("R")),
4913                ("REFERENCED_TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4914                ("REFERENCED_TABLE_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
4915                ("SEARCH_CONDITION_VC", "VARCHAR2(4000)", None),
4916                ("IS_DEFERRABLE", "VARCHAR2(1)", Some("N")),
4917                ("IS_DEFERRED", "VARCHAR2(1)", Some("N")),
4918                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4919                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4920                (
4921                    "REFERENCED_COLUMN_NAME",
4922                    "VARCHAR2(128)",
4923                    Some("CUSTOMER_ID"),
4924                ),
4925            ]),
4926            oracle_row(&[
4927                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4928                ("CONSTRAINT_NAME", "VARCHAR2(128)", Some("INVOICES_PK")),
4929                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4930                ("CONSTRAINT_TYPE", "VARCHAR2(1)", Some("P")),
4931                ("REFERENCED_TABLE_OWNER", "VARCHAR2(128)", None),
4932                ("REFERENCED_TABLE_NAME", "VARCHAR2(128)", None),
4933                ("SEARCH_CONDITION_VC", "VARCHAR2(4000)", None),
4934                ("IS_DEFERRABLE", "VARCHAR2(1)", Some("N")),
4935                ("IS_DEFERRED", "VARCHAR2(1)", Some("N")),
4936                ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4937                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4938                ("REFERENCED_COLUMN_NAME", "VARCHAR2(128)", None),
4939            ]),
4940            oracle_row(&[
4941                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4942                (
4943                    "CONSTRAINT_NAME",
4944                    "VARCHAR2(128)",
4945                    Some("INVOICES_CUSTOMER_NN"),
4946                ),
4947                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4948                ("CONSTRAINT_TYPE", "VARCHAR2(1)", Some("C")),
4949                ("REFERENCED_TABLE_OWNER", "VARCHAR2(128)", None),
4950                ("REFERENCED_TABLE_NAME", "VARCHAR2(128)", None),
4951                (
4952                    "SEARCH_CONDITION_VC",
4953                    "VARCHAR2(4000)",
4954                    Some("\"CUSTOMER_ID\" IS NOT NULL"),
4955                ),
4956                ("IS_DEFERRABLE", "VARCHAR2(1)", Some("N")),
4957                ("IS_DEFERRED", "VARCHAR2(1)", Some("N")),
4958                ("COLUMN_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
4959                ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4960                ("REFERENCED_COLUMN_NAME", "VARCHAR2(128)", None),
4961            ]),
4962        ];
4963        let index_rows = vec![oracle_row(&[
4964            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4965            ("INDEX_NAME", "VARCHAR2(128)", Some("INVOICES_PK_IDX")),
4966            ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4967            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4968            ("IS_UNIQUE", "VARCHAR2(1)", Some("Y")),
4969            ("INDEX_TYPE", "VARCHAR2(27)", Some("NORMAL")),
4970            ("STATUS", "VARCHAR2(8)", Some("VALID")),
4971            ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4972            ("COLUMN_POSITION", "NUMBER(10)", Some("1")),
4973        ])];
4974        let trigger_rows = vec![oracle_row(&[
4975            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4976            ("TRIGGER_NAME", "VARCHAR2(128)", Some("INVOICES_BIU")),
4977            ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4978            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4979            ("TRIGGER_TYPE", "VARCHAR2(80)", Some("BEFORE EACH ROW")),
4980            (
4981                "TRIGGERING_EVENT",
4982                "VARCHAR2(246)",
4983                Some("INSERT OR UPDATE"),
4984            ),
4985            ("WHEN_CLAUSE", "VARCHAR2(4000)", Some(":new.total_due >= 0")),
4986        ])];
4987        let synonym_rows = vec![
4988            oracle_row(&[
4989                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4990                ("SYNONYM_NAME", "VARCHAR2(128)", Some("LATEST_INVOICE")),
4991                ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4992                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4993                ("DB_LINK", "VARCHAR2(128)", None),
4994            ]),
4995            oracle_row(&[
4996                ("OWNER", "VARCHAR2(128)", Some("PUBLIC")),
4997                ("SYNONYM_NAME", "VARCHAR2(128)", Some("INVOICE_PUBLIC")),
4998                ("TABLE_OWNER", "VARCHAR2(128)", Some("BILLING")),
4999                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5000                ("DB_LINK", "VARCHAR2(128)", None),
5001            ]),
5002        ];
5003        let procedure_rows = vec![
5004            oracle_row(&[
5005                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5006                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_API")),
5007                ("PROCEDURE_NAME", "VARCHAR2(128)", Some("CREATE_INVOICE")),
5008                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
5009                ("OVERLOAD", "VARCHAR2(40)", None),
5010                ("OBJECT_TYPE", "VARCHAR2(13)", Some("PROCEDURE")),
5011                ("DETERMINISTIC", "VARCHAR2(3)", Some("NO")),
5012                ("PIPELINED", "VARCHAR2(3)", Some("NO")),
5013            ]),
5014            oracle_row(&[
5015                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5016                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_API")),
5017                (
5018                    "PROCEDURE_NAME",
5019                    "VARCHAR2(128)",
5020                    Some("TOTAL_FOR_CUSTOMER"),
5021                ),
5022                ("SUBPROGRAM_ID", "NUMBER(10)", Some("2")),
5023                ("OVERLOAD", "VARCHAR2(40)", Some("1")),
5024                ("OBJECT_TYPE", "VARCHAR2(13)", Some("FUNCTION")),
5025                ("DETERMINISTIC", "VARCHAR2(3)", Some("NO")),
5026                ("PIPELINED", "VARCHAR2(3)", Some("NO")),
5027            ]),
5028            oracle_row(&[
5029                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5030                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
5031                ("PROCEDURE_NAME", "VARCHAR2(128)", None),
5032                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
5033                ("OVERLOAD", "VARCHAR2(40)", None),
5034                ("OBJECT_TYPE", "VARCHAR2(13)", Some("FUNCTION")),
5035                ("DETERMINISTIC", "VARCHAR2(3)", Some("YES")),
5036                ("PIPELINED", "VARCHAR2(3)", Some("NO")),
5037            ]),
5038        ];
5039        let argument_rows = vec![
5040            oracle_row(&[
5041                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5042                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
5043                ("OBJECT_NAME", "VARCHAR2(128)", Some("CREATE_INVOICE")),
5044                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
5045                ("OVERLOAD", "VARCHAR2(40)", None),
5046                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
5047                ("POSITION", "NUMBER(10)", Some("1")),
5048                ("SEQUENCE", "NUMBER(10)", Some("1")),
5049                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
5050                ("TYPE_OWNER", "VARCHAR2(128)", None),
5051                ("TYPE_NAME", "VARCHAR2(128)", None),
5052                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
5053                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
5054                ("DATA_SCALE", "NUMBER(10)", Some("0")),
5055                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
5056                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
5057            ]),
5058            oracle_row(&[
5059                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5060                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
5061                ("OBJECT_NAME", "VARCHAR2(128)", Some("CREATE_INVOICE")),
5062                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
5063                ("OVERLOAD", "VARCHAR2(40)", None),
5064                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("TOTAL_DUE")),
5065                ("POSITION", "NUMBER(10)", Some("2")),
5066                ("SEQUENCE", "NUMBER(10)", Some("2")),
5067                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
5068                ("TYPE_OWNER", "VARCHAR2(128)", None),
5069                ("TYPE_NAME", "VARCHAR2(128)", None),
5070                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
5071                ("DATA_PRECISION", "NUMBER(10)", Some("12")),
5072                ("DATA_SCALE", "NUMBER(10)", Some("2")),
5073                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
5074                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
5075            ]),
5076            oracle_row(&[
5077                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5078                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
5079                ("OBJECT_NAME", "VARCHAR2(128)", Some("TOTAL_FOR_CUSTOMER")),
5080                ("SUBPROGRAM_ID", "NUMBER(10)", Some("2")),
5081                ("OVERLOAD", "VARCHAR2(40)", Some("1")),
5082                ("ARGUMENT_NAME", "VARCHAR2(128)", None),
5083                ("POSITION", "NUMBER(10)", Some("0")),
5084                ("SEQUENCE", "NUMBER(10)", Some("1")),
5085                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
5086                ("TYPE_OWNER", "VARCHAR2(128)", None),
5087                ("TYPE_NAME", "VARCHAR2(128)", None),
5088                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
5089                ("DATA_PRECISION", "NUMBER(10)", Some("12")),
5090                ("DATA_SCALE", "NUMBER(10)", Some("2")),
5091                ("IN_OUT", "VARCHAR2(9)", Some("OUT")),
5092                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
5093            ]),
5094            oracle_row(&[
5095                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5096                ("PACKAGE_NAME", "VARCHAR2(128)", Some("BILLING_API")),
5097                ("OBJECT_NAME", "VARCHAR2(128)", Some("TOTAL_FOR_CUSTOMER")),
5098                ("SUBPROGRAM_ID", "NUMBER(10)", Some("2")),
5099                ("OVERLOAD", "VARCHAR2(40)", Some("1")),
5100                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
5101                ("POSITION", "NUMBER(10)", Some("1")),
5102                ("SEQUENCE", "NUMBER(10)", Some("2")),
5103                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
5104                ("TYPE_OWNER", "VARCHAR2(128)", None),
5105                ("TYPE_NAME", "VARCHAR2(128)", None),
5106                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
5107                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
5108                ("DATA_SCALE", "NUMBER(10)", Some("0")),
5109                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
5110                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
5111            ]),
5112            oracle_row(&[
5113                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5114                ("PACKAGE_NAME", "VARCHAR2(128)", None),
5115                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
5116                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
5117                ("OVERLOAD", "VARCHAR2(40)", None),
5118                ("ARGUMENT_NAME", "VARCHAR2(128)", None),
5119                ("POSITION", "NUMBER(10)", Some("0")),
5120                ("SEQUENCE", "NUMBER(10)", Some("1")),
5121                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
5122                ("TYPE_OWNER", "VARCHAR2(128)", None),
5123                ("TYPE_NAME", "VARCHAR2(128)", None),
5124                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
5125                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
5126                ("DATA_SCALE", "NUMBER(10)", Some("2")),
5127                ("IN_OUT", "VARCHAR2(9)", Some("OUT")),
5128                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
5129            ]),
5130            oracle_row(&[
5131                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5132                ("PACKAGE_NAME", "VARCHAR2(128)", None),
5133                ("OBJECT_NAME", "VARCHAR2(128)", Some("CALCULATE_TAX")),
5134                ("SUBPROGRAM_ID", "NUMBER(10)", Some("1")),
5135                ("OVERLOAD", "VARCHAR2(40)", None),
5136                ("ARGUMENT_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
5137                ("POSITION", "NUMBER(10)", Some("1")),
5138                ("SEQUENCE", "NUMBER(10)", Some("2")),
5139                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
5140                ("TYPE_OWNER", "VARCHAR2(128)", None),
5141                ("TYPE_NAME", "VARCHAR2(128)", None),
5142                ("DATA_LENGTH", "NUMBER(10)", Some("22")),
5143                ("DATA_PRECISION", "NUMBER(10)", Some("10")),
5144                ("DATA_SCALE", "NUMBER(10)", Some("0")),
5145                ("IN_OUT", "VARCHAR2(9)", Some("IN")),
5146                ("DEFAULTED", "VARCHAR2(1)", Some("N")),
5147            ]),
5148        ];
5149        let mut expected_queries = capability_probe_expectations();
5150        // REPORTING is a database user, so its object grant stays a direct
5151        // (high-confidence) Grantee::User after the role-classification fix.
5152        expected_queries.push(all_users_expectation(&["BILLING", "REPORTING"]));
5153        expected_queries.extend(vec![
5154                QueryExpectation {
5155                    sql_contains: String::from("from all_objects"),
5156                    params: vec![OracleBind::from("BILLING")],
5157                    rows: object_rows,
5158                },
5159                QueryExpectation {
5160                    sql_contains: String::from("from all_tab_cols"),
5161                    params: vec![OracleBind::from("BILLING")],
5162                    rows: column_rows,
5163                },
5164                QueryExpectation {
5165                    sql_contains: String::from("from all_constraints c"),
5166                    params: vec![OracleBind::from("BILLING")],
5167                    rows: constraint_rows,
5168                },
5169                QueryExpectation {
5170                    sql_contains: String::from("from all_indexes i"),
5171                    params: vec![OracleBind::from("BILLING")],
5172                    rows: index_rows,
5173                },
5174                QueryExpectation {
5175                    sql_contains: String::from("from all_triggers"),
5176                    params: vec![OracleBind::from("BILLING")],
5177                    rows: trigger_rows,
5178                },
5179                QueryExpectation {
5180                    sql_contains: String::from("from all_synonyms"),
5181                    params: vec![OracleBind::from("BILLING")],
5182                    rows: synonym_rows,
5183                },
5184                QueryExpectation {
5185                    sql_contains: String::from("from all_procedures"),
5186                    params: vec![OracleBind::from("BILLING")],
5187                    rows: procedure_rows,
5188                },
5189                QueryExpectation {
5190                    sql_contains: String::from("from all_arguments"),
5191                    params: vec![OracleBind::from("BILLING")],
5192                    rows: argument_rows,
5193                },
5194                QueryExpectation {
5195                    sql_contains: String::from("from all_views"),
5196                    params: vec![OracleBind::from("BILLING")],
5197                    rows: vec![oracle_row(&[
5198                        ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5199                        ("VIEW_NAME", "VARCHAR2(128)", Some("INVOICE_SUMMARY")),
5200                        (
5201                            "TEXT_VC",
5202                            "VARCHAR2(4000)",
5203                            Some(
5204                                "select invoice_id, sum(amount) total_due from invoices group by invoice_id",
5205                            ),
5206                        ),
5207                        ("READ_ONLY", "VARCHAR2(1)", Some("N")),
5208                    ])],
5209                },
5210                QueryExpectation {
5211                    sql_contains: String::from("from all_mviews"),
5212                    params: vec![OracleBind::from("BILLING")],
5213                    rows: vec![],
5214                },
5215                QueryExpectation {
5216                    sql_contains: String::from("from all_sequences"),
5217                    params: vec![OracleBind::from("BILLING")],
5218                    rows: vec![],
5219                },
5220                QueryExpectation {
5221                    sql_contains: String::from("from all_type_attrs"),
5222                    params: vec![OracleBind::from("BILLING")],
5223                    rows: vec![],
5224                },
5225                QueryExpectation {
5226                    sql_contains: String::from("from all_tab_privs"),
5227                    params: vec![OracleBind::from("BILLING")],
5228                    rows: vec![oracle_row(&[
5229                        ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
5230                        ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5231                        ("GRANTEE", "VARCHAR2(128)", Some("REPORTING")),
5232                        ("PRIVILEGE", "VARCHAR2(40)", Some("SELECT")),
5233                        ("GRANTABLE", "VARCHAR2(3)", Some("NO")),
5234                        ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
5235                    ])],
5236                },
5237                QueryExpectation {
5238                    sql_contains: String::from("from all_db_links"),
5239                    params: vec![OracleBind::from("BILLING")],
5240                    rows: vec![],
5241                },
5242                QueryExpectation {
5243                    sql_contains: String::from("from all_tab_comments"),
5244                    params: vec![OracleBind::from("BILLING")],
5245                    rows: vec![],
5246                },
5247                QueryExpectation {
5248                    sql_contains: String::from("from all_col_comments"),
5249                    params: vec![OracleBind::from("BILLING")],
5250                    rows: vec![],
5251                },
5252                QueryExpectation {
5253                    sql_contains: String::from("from all_editions"),
5254                    params: vec![],
5255                    rows: vec![],
5256                },
5257                QueryExpectation {
5258                    sql_contains: String::from("from all_editioning_views"),
5259                    params: vec![OracleBind::from("BILLING")],
5260                    rows: vec![],
5261                },
5262                QueryExpectation {
5263                    sql_contains: String::from("from all_policies"),
5264                    params: vec![OracleBind::from("BILLING")],
5265                    rows: vec![],
5266                },
5267                QueryExpectation {
5268                    sql_contains: String::from("from all_dependencies"),
5269                    params: vec![OracleBind::from("BILLING")],
5270                    rows: vec![],
5271                },
5272        ]);
5273        let connection = StaticConnection {
5274            expected_queries,
5275            ..StaticConnection::default()
5276        };
5277
5278        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
5279
5280        let current_schema = snapshot
5281            .profile
5282            .current_schema
5283            .and_then(|name| snapshot.interner.resolve(name.symbol()));
5284        assert_eq!(current_schema, Some("BILLING"));
5285        assert_eq!(
5286            snapshot.profile.oracle_version,
5287            plsql_core::OracleVersion::Oracle23ai
5288        );
5289        assert!(snapshot.capabilities.can_query_all_views);
5290
5291        let billing_schema = snapshot
5292            .profile
5293            .current_schema
5294            .expect("current schema should be interned");
5295        let schema_catalog = snapshot
5296            .schemas
5297            .get(&billing_schema)
5298            .expect("billing schema should exist");
5299
5300        let invoices_name = schema_catalog
5301            .objects
5302            .keys()
5303            .find(|name| {
5304                snapshot
5305                    .interner
5306                    .resolve(name.symbol())
5307                    .is_some_and(|label| label.eq("INVOICES"))
5308            })
5309            .copied()
5310            .expect("object name should intern");
5311        let invoice_summary_name = schema_catalog
5312            .objects
5313            .keys()
5314            .find(|name| {
5315                snapshot
5316                    .interner
5317                    .resolve(name.symbol())
5318                    .is_some_and(|label| label.eq("INVOICE_SUMMARY"))
5319            })
5320            .copied()
5321            .expect("object name should intern");
5322        let package_object_name = schema_catalog
5323            .objects
5324            .keys()
5325            .find(|name| {
5326                snapshot
5327                    .interner
5328                    .resolve(name.symbol())
5329                    .is_some_and(|label| label.eq("BILLING_API"))
5330            })
5331            .copied()
5332            .expect("package object name should intern");
5333        let tax_function_name = schema_catalog
5334            .objects
5335            .keys()
5336            .find(|name| {
5337                snapshot
5338                    .interner
5339                    .resolve(name.symbol())
5340                    .is_some_and(|label| label.eq("CALCULATE_TAX"))
5341            })
5342            .copied()
5343            .expect("function object name should intern");
5344        let trigger_object_name = schema_catalog
5345            .objects
5346            .keys()
5347            .find(|name| {
5348                snapshot
5349                    .interner
5350                    .resolve(name.symbol())
5351                    .is_some_and(|label| label.eq("INVOICES_BIU"))
5352            })
5353            .copied()
5354            .expect("trigger object name should intern");
5355
5356        let invoices_table = schema_catalog
5357            .objects
5358            .get(&invoices_name)
5359            .and_then(|object| {
5360                if let CatalogObject::Table(table) = object {
5361                    Some(table)
5362                } else {
5363                    None
5364                }
5365            });
5366        assert!(invoices_table.is_some());
5367        let invoice_id = invoices_table
5368            .and_then(|table| {
5369                table.columns.values().find(|column| {
5370                    snapshot
5371                        .interner
5372                        .resolve(column.name.symbol())
5373                        .is_some_and(|label| label.eq("INVOICE_ID"))
5374                })
5375            })
5376            .expect("table column should exist");
5377        assert_eq!(invoice_id.position, 1);
5378        assert_eq!(invoice_id.data_type.name, "NUMBER");
5379        assert_eq!(invoice_id.data_type.precision, Some(10));
5380        assert!(!invoice_id.nullable);
5381        assert!(!invoice_id.hidden);
5382        assert!(invoice_id.generated_expression.is_none());
5383
5384        let invoice_summary_view =
5385            schema_catalog
5386                .objects
5387                .get(&invoice_summary_name)
5388                .and_then(|object| {
5389                    if let CatalogObject::View(view) = object {
5390                        Some(view)
5391                    } else {
5392                        None
5393                    }
5394                });
5395        assert!(invoice_summary_view.is_some());
5396        let total_due = invoice_summary_view
5397            .and_then(|view| {
5398                view.columns.values().find(|column| {
5399                    snapshot
5400                        .interner
5401                        .resolve(column.name.symbol())
5402                        .is_some_and(|label| label.eq("TOTAL_DUE"))
5403                })
5404            })
5405            .expect("view column should exist");
5406        assert_eq!(total_due.position, 1);
5407        assert!(total_due.nullable);
5408        assert_eq!(total_due.generated_expression.as_deref(), Some("0"));
5409        assert!(total_due.default_expression.is_none());
5410
5411        let invoices_pk_index = schema_catalog
5412            .indexes
5413            .values()
5414            .find(|index| {
5415                snapshot
5416                    .interner
5417                    .resolve(index.name.symbol())
5418                    .is_some_and(|label| label.eq("INVOICES_PK_IDX"))
5419            })
5420            .expect("index metadata should exist");
5421        assert!(invoices_pk_index.unique);
5422        assert_eq!(invoices_pk_index.index_type, "NORMAL");
5423        assert_eq!(invoices_pk_index.status, ObjectStatus::Valid);
5424        assert_eq!(
5425            invoices_pk_index
5426                .columns
5427                .first()
5428                .and_then(|column| snapshot.interner.resolve(column.symbol())),
5429            Some("INVOICE_ID")
5430        );
5431
5432        let customer_fk = schema_catalog
5433            .constraints
5434            .values()
5435            .find(|constraint| {
5436                snapshot
5437                    .interner
5438                    .resolve(constraint.name.symbol())
5439                    .is_some_and(|label| label.eq("INVOICES_CUSTOMER_FK"))
5440            })
5441            .expect("foreign key should exist");
5442        assert_eq!(customer_fk.constraint_type, ConstraintType::ForeignKey);
5443        assert_eq!(
5444            customer_fk
5445                .columns
5446                .first()
5447                .and_then(|column| snapshot.interner.resolve(column.symbol())),
5448            Some("CUSTOMER_ID")
5449        );
5450        assert_eq!(
5451            customer_fk
5452                .referenced_table_name
5453                .and_then(|name| snapshot.interner.resolve(name.symbol())),
5454            Some("CUSTOMERS")
5455        );
5456        assert_eq!(
5457            customer_fk
5458                .referenced_columns
5459                .first()
5460                .and_then(|column| snapshot.interner.resolve(column.symbol())),
5461            Some("CUSTOMER_ID")
5462        );
5463
5464        let customer_not_null = schema_catalog
5465            .constraints
5466            .values()
5467            .find(|constraint| {
5468                snapshot
5469                    .interner
5470                    .resolve(constraint.name.symbol())
5471                    .is_some_and(|label| label.eq("INVOICES_CUSTOMER_NN"))
5472            })
5473            .expect("not-null constraint should exist");
5474        assert_eq!(customer_not_null.constraint_type, ConstraintType::NotNull);
5475
5476        let trigger_metadata = schema_catalog
5477            .triggers
5478            .values()
5479            .find(|trigger| {
5480                snapshot
5481                    .interner
5482                    .resolve(trigger.common.name.symbol())
5483                    .is_some_and(|label| label.eq("INVOICES_BIU"))
5484            })
5485            .expect("trigger metadata should exist");
5486        assert_eq!(trigger_metadata.timing, TriggerTiming::Before);
5487        assert_eq!(trigger_metadata.level, TriggerLevel::Row);
5488        assert_eq!(
5489            trigger_metadata.events.as_slice(),
5490            &[TriggerEvent::Insert, TriggerEvent::Update]
5491        );
5492        assert_eq!(
5493            trigger_metadata.target_name.symbol().get(),
5494            invoices_name.symbol().get()
5495        );
5496        assert_eq!(
5497            trigger_metadata.when_clause.as_deref(),
5498            Some(":new.total_due >= 0")
5499        );
5500        assert!(matches!(
5501            schema_catalog.objects.get(&trigger_object_name),
5502            Some(CatalogObject::Trigger(_))
5503        ));
5504
5505        let latest_invoice_synonym = schema_catalog
5506            .synonyms
5507            .values()
5508            .find(|synonym| {
5509                snapshot
5510                    .interner
5511                    .resolve(synonym.target_name.symbol())
5512                    .is_some_and(|label| label.eq("INVOICES"))
5513            })
5514            .expect("private synonym should exist");
5515        assert!(!latest_invoice_synonym.public_synonym);
5516
5517        let public_schema_name = snapshot
5518            .schemas
5519            .keys()
5520            .find(|name| {
5521                snapshot
5522                    .interner
5523                    .resolve(name.symbol())
5524                    .is_some_and(|label| label.eq("PUBLIC"))
5525            })
5526            .copied()
5527            .expect("public schema should exist");
5528        let public_schema = snapshot
5529            .schemas
5530            .get(&public_schema_name)
5531            .expect("public schema catalog should exist");
5532        let public_synonym = public_schema
5533            .synonyms
5534            .values()
5535            .find(|synonym| {
5536                snapshot
5537                    .interner
5538                    .resolve(synonym.target_name.symbol())
5539                    .is_some_and(|label| label.eq("INVOICES"))
5540            })
5541            .expect("public synonym should exist");
5542        assert!(public_synonym.public_synonym);
5543
5544        let package_metadata = schema_catalog
5545            .objects
5546            .get(&package_object_name)
5547            .and_then(|object| {
5548                if let CatalogObject::Package(metadata) = object {
5549                    Some(metadata)
5550                } else {
5551                    None
5552                }
5553            })
5554            .expect("package metadata should exist");
5555        let create_invoice = package_metadata
5556            .procedures
5557            .iter()
5558            .find(|signature| {
5559                snapshot
5560                    .interner
5561                    .resolve(signature.routine_name.symbol())
5562                    .is_some_and(|label| label.eq("CREATE_INVOICE"))
5563            })
5564            .expect("package procedure should exist");
5565        assert_eq!(create_invoice.arguments.len(), 2);
5566        let total_for_customer = package_metadata
5567            .functions
5568            .iter()
5569            .find(|signature| {
5570                snapshot
5571                    .interner
5572                    .resolve(signature.routine_name.symbol())
5573                    .is_some_and(|label| label.eq("TOTAL_FOR_CUSTOMER"))
5574            })
5575            .expect("package function should exist");
5576        assert_eq!(total_for_customer.overload, Some(1));
5577        assert_eq!(
5578            total_for_customer
5579                .return_type
5580                .as_ref()
5581                .map(|data_type| data_type.name.as_str()),
5582            Some("NUMBER")
5583        );
5584        assert_eq!(total_for_customer.arguments.len(), 1);
5585
5586        let tax_function = schema_catalog
5587            .objects
5588            .get(&tax_function_name)
5589            .and_then(|object| {
5590                if let CatalogObject::Function(metadata) = object {
5591                    Some(metadata)
5592                } else {
5593                    None
5594                }
5595            })
5596            .expect("top-level function should exist");
5597        assert!(tax_function.deterministic);
5598        assert!(!tax_function.pipelined);
5599        assert_eq!(tax_function.signature.arguments.len(), 1);
5600        assert_eq!(
5601            tax_function
5602                .signature
5603                .return_type
5604                .as_ref()
5605                .map(|data_type| data_type.name.as_str()),
5606            Some("NUMBER")
5607        );
5608
5609        let invoice_summary_view_with_hash = schema_catalog
5610            .objects
5611            .get(&invoice_summary_name)
5612            .and_then(|object| {
5613                if let CatalogObject::View(view) = object {
5614                    Some(view)
5615                } else {
5616                    None
5617                }
5618            })
5619            .expect("invoice summary view should be present");
5620        assert!(invoice_summary_view_with_hash.query_hash.is_some());
5621        assert_eq!(invoice_summary_view_with_hash.read_only, Some(false));
5622
5623        assert_eq!(schema_catalog.grants.len(), 1);
5624        let reporting_grant = &schema_catalog.grants[0];
5625        assert!(matches!(
5626            reporting_grant.privilege,
5627            crate::GrantPrivilege::Select
5628        ));
5629        // REPORTING appears in the ALL_USERS fixture, so it classifies as a
5630        // direct user grant (and not, conservatively, as a role).
5631        assert!(matches!(reporting_grant.grantee, crate::Grantee::User(_)));
5632        assert!(!reporting_grant.grantable);
5633    }
5634
5635    /// oracle-qm3q.2 regression: `grantee_from_dictionary_value` must
5636    /// discriminate an object-privilege grantee against the loaded
5637    /// `ALL_USERS` set. A grantee that is NOT a known user is a database
5638    /// role (the only remaining grantee class besides PUBLIC), and must be
5639    /// recorded as `Grantee::Role` so the privilege resolver downgrades it
5640    /// to Low confidence with a `RuntimeGrantOrRole` ambiguity instead of
5641    /// the previous, fail-toward-permissive `Grantee::User` (High).
5642    #[test]
5643    fn grantee_classification_uses_loaded_user_set() {
5644        let mut snapshot = CatalogSnapshot::new(
5645            plsql_core::AnalysisProfile::default(),
5646            CatalogCapabilities::default(),
5647            CatalogSource::default(),
5648            DateTime::<Utc>::UNIX_EPOCH,
5649        );
5650
5651        // Before the user set is loaded, the grantee class is undetermined.
5652        // R13 / fail-toward-restrictive: an undetermined grantee must NOT be
5653        // a high-confidence direct user grant — it routes through the role
5654        // ambiguity path instead.
5655        assert!(snapshot.known_users.is_none());
5656        let undetermined =
5657            grantee_from_dictionary_value(&mut snapshot, "MYSTERY_GRANTEE").expect("grantee");
5658        assert!(
5659            matches!(undetermined, crate::Grantee::Role(_)),
5660            "undetermined grantee (ALL_USERS not loaded) must not be a direct user grant; got {undetermined:?}"
5661        );
5662
5663        // Load a user set: APP_USER is a user, APP_READER_ROLE is not.
5664        let app_user = snapshot.intern_user_name("APP_USER").expect("user");
5665        let mut users = HashSet::new();
5666        users.insert(app_user);
5667        snapshot.known_users = Some(users);
5668
5669        // PUBLIC is always PUBLIC.
5670        assert!(matches!(
5671            grantee_from_dictionary_value(&mut snapshot, "PUBLIC").expect("grantee"),
5672            crate::Grantee::Public
5673        ));
5674        // A known user classifies as a direct user grant.
5675        assert!(matches!(
5676            grantee_from_dictionary_value(&mut snapshot, "APP_USER").expect("grantee"),
5677            crate::Grantee::User(_)
5678        ));
5679        // A grantee absent from ALL_USERS classifies as a role — the defect
5680        // this bead fixes (previously always `Grantee::User`).
5681        let role =
5682            grantee_from_dictionary_value(&mut snapshot, "APP_READER_ROLE").expect("grantee");
5683        let role_name = match role {
5684            crate::Grantee::Role(role_name) => Some(role_name),
5685            _ => None,
5686        };
5687        assert!(
5688            role_name.is_some(),
5689            "APP_READER_ROLE must classify as a role"
5690        );
5691        assert_eq!(
5692            snapshot
5693                .interner
5694                .resolve(role_name.expect("role assertion above").symbol()),
5695            Some("APP_READER_ROLE")
5696        );
5697    }
5698
5699    /// oracle-qm3q.2: if `ALL_USERS` cannot be read, `load_catalog_users`
5700    /// must leave `known_users` as `None` (so grantees stay conservatively
5701    /// classified) and record a capability warning rather than aborting the
5702    /// extraction or silently assuming the grantee universe.
5703    #[test]
5704    fn load_catalog_users_failure_is_nonfatal_and_marks_unknown() {
5705        // A strict mock with NO matching expectation for `from all_users`
5706        // makes the query fail.
5707        let connection = StaticConnection {
5708            expected_queries: vec![QueryExpectation {
5709                sql_contains: String::from("from something_else"),
5710                params: vec![],
5711                rows: vec![],
5712            }],
5713            ..StaticConnection::default()
5714        };
5715        let mut snapshot = CatalogSnapshot::new(
5716            plsql_core::AnalysisProfile::default(),
5717            CatalogCapabilities::default(),
5718            CatalogSource::default(),
5719            DateTime::<Utc>::UNIX_EPOCH,
5720        );
5721
5722        load_catalog_users_for_test(&connection, &mut snapshot).expect("non-fatal");
5723        assert!(
5724            snapshot.known_users.is_none(),
5725            "failed ALL_USERS read must leave grantee universe undetermined"
5726        );
5727        assert!(
5728            snapshot
5729                .capabilities
5730                .warnings
5731                .iter()
5732                .any(|w| w.code.eq("all-users-probe")),
5733            "a capability warning must record the ALL_USERS read failure"
5734        );
5735    }
5736
5737    #[test]
5738    fn load_snapshot_from_connection_extracts_views_sequences_mviews_types_and_grants() {
5739        let object_rows = vec![
5740            oracle_row(&[
5741                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5742                ("OBJECT_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5743                ("OBJECT_TYPE", "VARCHAR2(30)", Some("VIEW")),
5744                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5745            ]),
5746            oracle_row(&[
5747                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5748                ("OBJECT_NAME", "VARCHAR2(128)", Some("CUSTOMER_TOTALS_MV")),
5749                ("OBJECT_TYPE", "VARCHAR2(30)", Some("MATERIALIZED VIEW")),
5750                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5751            ]),
5752            oracle_row(&[
5753                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5754                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICE_SEQ")),
5755                ("OBJECT_TYPE", "VARCHAR2(30)", Some("SEQUENCE")),
5756                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5757            ]),
5758            oracle_row(&[
5759                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5760                ("OBJECT_NAME", "VARCHAR2(128)", Some("ADDRESS_T")),
5761                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TYPE")),
5762                ("STATUS", "VARCHAR2(7)", Some("VALID")),
5763            ]),
5764        ];
5765        let view_rows = vec![oracle_row(&[
5766            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5767            ("VIEW_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5768            (
5769                "TEXT_VC",
5770                "VARCHAR2(4000)",
5771                Some("select customer_id from customers where active = 'Y'"),
5772            ),
5773            ("READ_ONLY", "VARCHAR2(1)", Some("Y")),
5774        ])];
5775        let mview_rows = vec![oracle_row(&[
5776            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5777            ("MVIEW_NAME", "VARCHAR2(128)", Some("CUSTOMER_TOTALS_MV")),
5778            ("REFRESH_MODE", "VARCHAR2(6)", Some("DEMAND")),
5779            ("REFRESH_METHOD", "VARCHAR2(8)", Some("COMPLETE")),
5780            (
5781                "QUERY",
5782                "LONG",
5783                Some("select customer_id, sum(amount) from invoices group by customer_id"),
5784            ),
5785        ])];
5786        let sequence_rows = vec![oracle_row(&[
5787            ("SEQUENCE_OWNER", "VARCHAR2(128)", Some("BILLING")),
5788            ("SEQUENCE_NAME", "VARCHAR2(128)", Some("INVOICE_SEQ")),
5789            ("MIN_VALUE", "NUMBER(28)", Some("1")),
5790            ("MAX_VALUE", "NUMBER(28)", Some("9999999999")),
5791            ("INCREMENT_BY", "NUMBER(28)", Some("1")),
5792            ("CYCLE_FLAG", "VARCHAR2(1)", Some("N")),
5793            ("ORDER_FLAG", "VARCHAR2(1)", Some("N")),
5794            ("CACHE_SIZE", "NUMBER(28)", Some("20")),
5795        ])];
5796        let type_attr_rows = vec![
5797            oracle_row(&[
5798                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5799                ("TYPE_NAME", "VARCHAR2(128)", Some("ADDRESS_T")),
5800                ("ATTR_NAME", "VARCHAR2(128)", Some("STREET")),
5801                ("ATTR_NO", "NUMBER(10)", Some("1")),
5802                ("ATTR_TYPE_OWNER", "VARCHAR2(128)", None),
5803                ("ATTR_TYPE_NAME", "VARCHAR2(128)", Some("VARCHAR2")),
5804                ("LENGTH", "NUMBER(10)", Some("200")),
5805                ("PRECISION", "NUMBER(10)", None),
5806                ("SCALE", "NUMBER(10)", None),
5807            ]),
5808            oracle_row(&[
5809                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5810                ("TYPE_NAME", "VARCHAR2(128)", Some("ADDRESS_T")),
5811                ("ATTR_NAME", "VARCHAR2(128)", Some("ZIP")),
5812                ("ATTR_NO", "NUMBER(10)", Some("2")),
5813                ("ATTR_TYPE_OWNER", "VARCHAR2(128)", None),
5814                ("ATTR_TYPE_NAME", "VARCHAR2(128)", Some("VARCHAR2")),
5815                ("LENGTH", "NUMBER(10)", Some("10")),
5816                ("PRECISION", "NUMBER(10)", None),
5817                ("SCALE", "NUMBER(10)", None),
5818            ]),
5819        ];
5820        let grant_rows = vec![
5821            oracle_row(&[
5822                ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
5823                ("TABLE_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5824                ("GRANTEE", "VARCHAR2(128)", Some("PUBLIC")),
5825                ("PRIVILEGE", "VARCHAR2(40)", Some("SELECT")),
5826                ("GRANTABLE", "VARCHAR2(3)", Some("NO")),
5827                ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
5828            ]),
5829            oracle_row(&[
5830                ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
5831                ("TABLE_NAME", "VARCHAR2(128)", Some("ACTIVE_CUSTOMERS")),
5832                ("GRANTEE", "VARCHAR2(128)", Some("REPORTING_ROLE")),
5833                ("PRIVILEGE", "VARCHAR2(40)", Some("UPDATE")),
5834                ("GRANTABLE", "VARCHAR2(3)", Some("YES")),
5835                ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
5836            ]),
5837        ];
5838
5839        // PLSQL-CAT-NEW-3 / oracle-fmro: one root edition + one child
5840        // edition + one editioning view to exercise both EBR paths.
5841        let edition_rows = vec![
5842            oracle_row(&[
5843                ("EDITION_NAME", "VARCHAR2(128)", Some("ORA$BASE")),
5844                ("PARENT_EDITION_NAME", "VARCHAR2(128)", None),
5845                ("USABLE", "VARCHAR2(1)", Some("Y")),
5846            ]),
5847            oracle_row(&[
5848                ("EDITION_NAME", "VARCHAR2(128)", Some("PATCH_2026_05")),
5849                ("PARENT_EDITION_NAME", "VARCHAR2(128)", Some("ORA$BASE")),
5850                ("USABLE", "VARCHAR2(1)", Some("Y")),
5851            ]),
5852        ];
5853        let editioning_view_rows = vec![oracle_row(&[
5854            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5855            ("VIEW_NAME", "VARCHAR2(128)", Some("INVOICES_E1")),
5856            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5857        ])];
5858
5859        // PLSQL-CAT-NEW-2 / oracle-c0gg: one VPD policy that gates
5860        // SELECT-only on INVOICES — exercises the yn() column reader
5861        // and the SchemaCatalog::vpd_policies path.
5862        let vpd_policy_rows = vec![oracle_row(&[
5863            ("OBJECT_OWNER", "VARCHAR2(128)", Some("BILLING")),
5864            ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
5865            ("POLICY_GROUP", "VARCHAR2(128)", None),
5866            (
5867                "POLICY_NAME",
5868                "VARCHAR2(128)",
5869                Some("BILLING_TENANT_ISOLATION"),
5870            ),
5871            ("PF_OWNER", "VARCHAR2(128)", Some("SECURITY")),
5872            ("PACKAGE", "VARCHAR2(128)", Some("RLS_PKG")),
5873            ("FUNCTION", "VARCHAR2(128)", Some("TENANT_PREDICATE")),
5874            ("SEL", "VARCHAR2(3)", Some("YES")),
5875            ("INS", "VARCHAR2(3)", Some("NO")),
5876            ("UPD", "VARCHAR2(3)", Some("NO")),
5877            ("DEL", "VARCHAR2(3)", Some("NO")),
5878            ("ENABLE", "VARCHAR2(3)", Some("YES")),
5879        ])];
5880
5881        // PLSQL-CAT-NEW-5 / oracle-grs0: one table comment + one column
5882        // comment exercise both apply_*_comment_row paths.
5883        let table_comment_rows = vec![oracle_row(&[
5884            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5885            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5886            ("TABLE_TYPE", "VARCHAR2(11)", Some("TABLE")),
5887            (
5888                "COMMENTS",
5889                "VARCHAR2(4000)",
5890                Some("Customer invoice header rows; one per invoice"),
5891            ),
5892        ])];
5893        let column_comment_rows = vec![oracle_row(&[
5894            ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5895            ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
5896            ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
5897            (
5898                "COMMENTS",
5899                "VARCHAR2(4000)",
5900                Some("Primary key; surrogate, allocated from INVOICES_SEQ"),
5901            ),
5902        ])];
5903
5904        // One private link (BILLING.REPORTING_LINK) and one public link
5905        // (PUBLIC.WORLDWIDE_LINK) — exercises both code paths in
5906        // `apply_db_link_row` (PLSQL-CAT-NEW-1 / oracle-rr4y).
5907        let db_link_rows = vec![
5908            oracle_row(&[
5909                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
5910                ("DB_LINK", "VARCHAR2(128)", Some("REPORTING_LINK")),
5911                ("HOST", "VARCHAR2(2000)", Some("reporting-db.internal")),
5912            ]),
5913            oracle_row(&[
5914                ("OWNER", "VARCHAR2(128)", Some("PUBLIC")),
5915                ("DB_LINK", "VARCHAR2(128)", Some("WORLDWIDE_LINK")),
5916                ("HOST", "VARCHAR2(2000)", Some("//world.example.com/PROD")),
5917            ]),
5918        ];
5919
5920        let billing_only = vec![OracleBind::from("BILLING")];
5921        let mut expected_queries = capability_probe_expectations();
5922        // Only BILLING is a real user; REPORTING_ROLE is deliberately absent
5923        // so the UPDATE grant to it classifies as a role, not a user.
5924        expected_queries.push(all_users_expectation(&["BILLING"]));
5925        expected_queries.extend(vec![
5926            QueryExpectation {
5927                sql_contains: String::from("from all_objects"),
5928                params: billing_only.clone(),
5929                rows: object_rows,
5930            },
5931            QueryExpectation {
5932                sql_contains: String::from("from all_tab_cols"),
5933                params: billing_only.clone(),
5934                rows: vec![],
5935            },
5936            QueryExpectation {
5937                sql_contains: String::from("from all_constraints c"),
5938                params: billing_only.clone(),
5939                rows: vec![],
5940            },
5941            QueryExpectation {
5942                sql_contains: String::from("from all_indexes i"),
5943                params: billing_only.clone(),
5944                rows: vec![],
5945            },
5946            QueryExpectation {
5947                sql_contains: String::from("from all_triggers"),
5948                params: billing_only.clone(),
5949                rows: vec![],
5950            },
5951            QueryExpectation {
5952                sql_contains: String::from("from all_synonyms"),
5953                params: billing_only.clone(),
5954                rows: vec![],
5955            },
5956            QueryExpectation {
5957                sql_contains: String::from("from all_procedures"),
5958                params: billing_only.clone(),
5959                rows: vec![],
5960            },
5961            QueryExpectation {
5962                sql_contains: String::from("from all_arguments"),
5963                params: billing_only.clone(),
5964                rows: vec![],
5965            },
5966            QueryExpectation {
5967                sql_contains: String::from("from all_views"),
5968                params: billing_only.clone(),
5969                rows: view_rows,
5970            },
5971            QueryExpectation {
5972                sql_contains: String::from("from all_mviews"),
5973                params: billing_only.clone(),
5974                rows: mview_rows,
5975            },
5976            QueryExpectation {
5977                sql_contains: String::from("from all_sequences"),
5978                params: billing_only.clone(),
5979                rows: sequence_rows,
5980            },
5981            QueryExpectation {
5982                sql_contains: String::from("from all_type_attrs"),
5983                params: billing_only.clone(),
5984                rows: type_attr_rows,
5985            },
5986            QueryExpectation {
5987                sql_contains: String::from("from all_tab_privs"),
5988                params: billing_only.clone(),
5989                rows: grant_rows,
5990            },
5991            QueryExpectation {
5992                sql_contains: String::from("from all_db_links"),
5993                params: billing_only.clone(),
5994                rows: db_link_rows,
5995            },
5996            QueryExpectation {
5997                sql_contains: String::from("from all_tab_comments"),
5998                params: billing_only.clone(),
5999                rows: table_comment_rows,
6000            },
6001            QueryExpectation {
6002                sql_contains: String::from("from all_col_comments"),
6003                params: billing_only.clone(),
6004                rows: column_comment_rows,
6005            },
6006            QueryExpectation {
6007                sql_contains: String::from("from all_editions"),
6008                params: vec![],
6009                rows: edition_rows,
6010            },
6011            QueryExpectation {
6012                sql_contains: String::from("from all_editioning_views"),
6013                params: billing_only.clone(),
6014                rows: editioning_view_rows,
6015            },
6016            QueryExpectation {
6017                sql_contains: String::from("from all_policies"),
6018                params: billing_only.clone(),
6019                rows: vpd_policy_rows,
6020            },
6021            QueryExpectation {
6022                sql_contains: String::from("from all_dependencies"),
6023                params: billing_only,
6024                rows: vec![],
6025            },
6026        ]);
6027        let connection = StaticConnection {
6028            expected_queries,
6029            ..StaticConnection::default()
6030        };
6031
6032        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
6033
6034        let schema = snapshot
6035            .profile
6036            .current_schema
6037            .expect("current schema interned");
6038        let schema_catalog = snapshot
6039            .schemas
6040            .get(&schema)
6041            .expect("billing schema catalog should exist");
6042
6043        let view = schema_catalog
6044            .objects
6045            .values()
6046            .find_map(|object| match object {
6047                CatalogObject::View(view) => Some(view),
6048                _ => None,
6049            })
6050            .expect("view metadata present");
6051        assert!(view.query_hash.is_some());
6052        assert_eq!(view.read_only, Some(true));
6053
6054        let mview = schema_catalog
6055            .objects
6056            .values()
6057            .find_map(|object| match object {
6058                CatalogObject::MaterializedView(metadata) => Some(metadata),
6059                _ => None,
6060            })
6061            .expect("mview metadata present");
6062        assert_eq!(mview.refresh_mode.as_deref(), Some("DEMAND"));
6063        assert_eq!(mview.refresh_method.as_deref(), Some("COMPLETE"));
6064        assert!(mview.query_hash.is_some());
6065
6066        let sequence = schema_catalog
6067            .objects
6068            .values()
6069            .find_map(|object| match object {
6070                CatalogObject::Sequence(metadata) => Some(metadata),
6071                _ => None,
6072            })
6073            .expect("sequence metadata present");
6074        assert_eq!(sequence.increment_by, 1);
6075        assert_eq!(sequence.min_value, Some(1));
6076        assert_eq!(sequence.max_value, Some(9999999999));
6077        assert!(!sequence.cycle);
6078        assert!(!sequence.ordered);
6079        assert_eq!(sequence.cache_size, Some(20));
6080
6081        let type_metadata = schema_catalog
6082            .objects
6083            .values()
6084            .find_map(|object| match object {
6085                CatalogObject::Type(metadata) => Some(metadata),
6086                _ => None,
6087            })
6088            .expect("type metadata present");
6089        assert_eq!(type_metadata.attributes.len(), 2);
6090        assert_eq!(type_metadata.attributes[0].position, 1);
6091        assert_eq!(type_metadata.attributes[1].position, 2);
6092        assert_eq!(type_metadata.attributes[0].data_type.name, "VARCHAR2");
6093        assert_eq!(type_metadata.attributes[0].data_type.length, Some(200));
6094
6095        let public_grant = schema_catalog
6096            .grants
6097            .iter()
6098            .find(|grant| matches!(grant.grantee, crate::Grantee::Public))
6099            .expect("public grant present");
6100        assert!(matches!(
6101            public_grant.privilege,
6102            crate::GrantPrivilege::Select
6103        ));
6104        // REPORTING_ROLE is not in the ALL_USERS fixture, so the UPDATE
6105        // grant to it must classify as a role grant (it was previously,
6106        // and incorrectly, recorded as a direct user grant — oracle-qm3q.2).
6107        let role_grant = schema_catalog
6108            .grants
6109            .iter()
6110            .find(|grant| matches!(grant.grantee, crate::Grantee::Role(_)))
6111            .expect("role grant present");
6112        let crate::Grantee::Role(role) = &role_grant.grantee else {
6113            unreachable!("matched Grantee::Role above");
6114        };
6115        assert_eq!(
6116            snapshot.interner.resolve(role.symbol()),
6117            Some("REPORTING_ROLE")
6118        );
6119        assert!(matches!(
6120            role_grant.privilege,
6121            crate::GrantPrivilege::Update
6122        ));
6123        assert!(role_grant.grantable);
6124        // And no grantee is misclassified as a direct user in this fixture.
6125        assert!(
6126            !schema_catalog
6127                .grants
6128                .iter()
6129                .any(|grant| matches!(grant.grantee, crate::Grantee::User(_)))
6130        );
6131
6132        // PLSQL-CAT-NEW-1 / oracle-rr4y: ALL_DB_LINKS rows lower into
6133        // the owning schema's `db_links` list. The fixture seeds one
6134        // private link on BILLING and one public link on the synthetic
6135        // PUBLIC schema — both must surface with correct `public_link`
6136        // classification and the raw HOST string preserved.
6137        let private_link = schema_catalog
6138            .db_links
6139            .iter()
6140            .find(|link| link.name.eq("REPORTING_LINK"))
6141            .expect("private db link present on BILLING schema");
6142        assert!(!private_link.public_link);
6143        assert_eq!(private_link.host.as_deref(), Some("reporting-db.internal"));
6144
6145        // Locate the synthetic PUBLIC schema by resolving each interned
6146        // schema name back through the interner — the loader materialized
6147        // it on demand when a `PUBLIC`-owned link was applied, but no
6148        // ergonomic lookup-by-text exists on SymbolInterner yet.
6149        let public_link = snapshot
6150            .schemas
6151            .iter()
6152            .find_map(|(schema_id, schema_catalog)| {
6153                let label = snapshot.interner.resolve(schema_id.symbol())?;
6154                if label.eq_ignore_ascii_case("PUBLIC") {
6155                    schema_catalog
6156                        .db_links
6157                        .iter()
6158                        .find(|link| link.name.eq("WORLDWIDE_LINK"))
6159                } else {
6160                    None
6161                }
6162            })
6163            .expect("public db link present on PUBLIC synthetic schema");
6164        assert!(public_link.public_link);
6165        assert_eq!(
6166            public_link.host.as_deref(),
6167            Some("//world.example.com/PROD")
6168        );
6169
6170        // PLSQL-CAT-NEW-2 / oracle-c0gg: ALL_POLICIES row lands in
6171        // SchemaCatalog::vpd_policies with SEL/INS/UPD/DEL/ENABLE
6172        // correctly decoded from the Y/N/YES/NO mix in dictionary text.
6173        assert_eq!(schema_catalog.vpd_policies.len(), 1);
6174        let policy = &schema_catalog.vpd_policies[0];
6175        assert_eq!(policy.policy_name, "BILLING_TENANT_ISOLATION");
6176        assert_eq!(policy.function_name, "TENANT_PREDICATE");
6177        assert_eq!(policy.function_package.as_deref(), Some("RLS_PKG"));
6178        assert!(policy.policy_group.is_none());
6179        assert!(policy.on_select);
6180        assert!(!policy.on_insert);
6181        assert!(!policy.on_update);
6182        assert!(!policy.on_delete);
6183        assert!(policy.enabled);
6184
6185        // PLSQL-CAT-NEW-3 / oracle-fmro: ALL_EDITIONS rows land in
6186        // CatalogSnapshot::editions; ALL_EDITIONING_VIEWS rows land in
6187        // SchemaCatalog::editioning_views.
6188        assert_eq!(snapshot.editions.len(), 2);
6189        let root = snapshot
6190            .editions
6191            .iter()
6192            .find(|e| e.edition_name.eq("ORA$BASE"))
6193            .expect("root edition present");
6194        assert!(root.parent_edition_name.is_none());
6195        assert!(root.usable);
6196        let child = snapshot
6197            .editions
6198            .iter()
6199            .find(|e| e.edition_name.eq("PATCH_2026_05"))
6200            .expect("child edition present");
6201        assert_eq!(child.parent_edition_name.as_deref(), Some("ORA$BASE"));
6202        assert_eq!(schema_catalog.editioning_views.len(), 1);
6203        let ev = &schema_catalog.editioning_views[0];
6204        let view_label = snapshot.interner.resolve(ev.view_name.symbol()).unwrap();
6205        let table_label = snapshot.interner.resolve(ev.table_name.symbol()).unwrap();
6206        assert_eq!(view_label, "INVOICES_E1");
6207        assert_eq!(table_label, "INVOICES");
6208
6209        // PLSQL-CAT-NEW-5 / oracle-grs0: ALL_TAB_COMMENTS row reaches
6210        // SchemaCatalog::table_comments with TABLE_TYPE + COMMENTS
6211        // preserved verbatim.
6212        assert_eq!(schema_catalog.table_comments.len(), 1);
6213        let table_comment = &schema_catalog.table_comments[0];
6214        assert_eq!(table_comment.table_type, "TABLE");
6215        assert_eq!(
6216            table_comment.comments,
6217            "Customer invoice header rows; one per invoice"
6218        );
6219        // ALL_COL_COMMENTS row lands in column_comments with the
6220        // interned ColumnName.
6221        assert_eq!(schema_catalog.column_comments.len(), 1);
6222        assert_eq!(
6223            schema_catalog.column_comments[0].comments,
6224            "Primary key; surrogate, allocated from INVOICES_SEQ"
6225        );
6226    }
6227
6228    #[test]
6229    fn load_snapshot_from_connection_requires_current_schema_when_requested() {
6230        let mut connection_info = StaticConnection::default().connection_info;
6231        connection_info.current_schema = None;
6232        let connection = StaticConnection {
6233            connection_info,
6234            ..StaticConnection::default()
6235        };
6236
6237        let error = load_snapshot_for_test(&connection, &CatalogLoadRequest::default());
6238
6239        assert!(matches!(error, Err(CatalogError::CurrentSchemaUnavailable)));
6240    }
6241
6242    #[test]
6243    fn oracle_connection_default_helpers_enforce_row_cardinality() {
6244        let mut row = OracleRow::default();
6245        row.insert(
6246            "schema_name",
6247            "VARCHAR2(128)",
6248            Some(String::from("billing")),
6249        );
6250
6251        let single = StaticConnection {
6252            rows: vec![row.clone()],
6253            row_count: 1,
6254            ..StaticConnection::default()
6255        };
6256        let multiple = StaticConnection {
6257            rows: vec![row.clone(), row],
6258            row_count: 2,
6259            ..StaticConnection::default()
6260        };
6261
6262        let single_result = run_catalog_future(async {
6263            let cx = test_cx();
6264            single.query_one_row(&cx, "select * from dual", &[]).await
6265        });
6266        assert!(single_result.is_ok());
6267        let multiple_result = run_catalog_future(async {
6268            let cx = test_cx();
6269            multiple
6270                .query_optional_row(&cx, "select * from dual", &[])
6271                .await
6272        });
6273        assert!(matches!(
6274            multiple_result,
6275            Err(CatalogError::UnexpectedRowCount { expected, actual })
6276                if expected.eq("0 or 1") && actual.eq(&2)
6277        ));
6278    }
6279
6280    #[test]
6281    fn oracle_connect_options_capture_session_overrides() {
6282        let options = OracleConnectOptions::new("scott", "tiger", "//localhost/XE")
6283            .with_current_schema("billing")
6284            .with_module("plsql-intelligence")
6285            .with_action("catalog-extract")
6286            .with_client_info("tests")
6287            .with_client_identifier("unit");
6288
6289        assert_eq!(options.current_schema.as_deref(), Some("billing"));
6290        assert_eq!(options.module.as_deref(), Some("plsql-intelligence"));
6291        assert_eq!(options.action.as_deref(), Some("catalog-extract"));
6292        assert_eq!(options.client_info.as_deref(), Some("tests"));
6293        assert_eq!(options.client_identifier.as_deref(), Some("unit"));
6294    }
6295
6296    #[test]
6297    fn catalog_snapshot_initializes_with_empty_schema_map() {
6298        let snapshot = CatalogSnapshot::new(
6299            AnalysisProfile::default(),
6300            CatalogCapabilities::default(),
6301            CatalogSource {
6302                kind: CatalogSourceKind::SyntheticTestCatalog,
6303                path: None,
6304                description: Some(String::from("fixture")),
6305            },
6306            DateTime::<Utc>::UNIX_EPOCH,
6307        );
6308
6309        assert!(snapshot.schemas.is_empty());
6310        assert!(snapshot.interner.is_empty());
6311        assert_eq!(snapshot.generated_at, DateTime::<Utc>::UNIX_EPOCH);
6312        assert_eq!(
6313            snapshot.source.kind,
6314            CatalogSourceKind::SyntheticTestCatalog
6315        );
6316    }
6317
6318    #[test]
6319    fn schema_catalog_can_hold_structural_lookup_maps() {
6320        let mut schema_catalog = SchemaCatalog::default();
6321        let object_name = ObjectName::from(SymbolId::new(2));
6322        let column_name = ColumnName::from(SymbolId::new(3));
6323        let owner = SchemaName::from(SymbolId::new(1));
6324
6325        let table = TableMetadata {
6326            common: ObjectCommon {
6327                owner,
6328                name: object_name,
6329                object_type: ObjectType::Table,
6330                status: ObjectStatus::Valid,
6331                source_hash: Some(Hash::new("abc123")),
6332                ..ObjectCommon::default()
6333            },
6334            columns: HashMap::from([(
6335                column_name,
6336                crate::ColumnMetadata {
6337                    name: column_name,
6338                    position: 1,
6339                    data_type: DataTypeRef {
6340                        name: String::from("NUMBER"),
6341                        precision: Some(10),
6342                        ..DataTypeRef::default()
6343                    },
6344                    nullable: false,
6345                    ..crate::ColumnMetadata::default()
6346                },
6347            )]),
6348            ..TableMetadata::default()
6349        };
6350
6351        schema_catalog
6352            .objects
6353            .insert(object_name, CatalogObject::Table(table));
6354        schema_catalog.synonyms.insert(
6355            SynonymName::from(SymbolId::new(4)),
6356            SynonymTarget {
6357                target_owner: Some(owner),
6358                target_name: object_name,
6359                public_synonym: false,
6360                ..SynonymTarget::default()
6361            },
6362        );
6363
6364        assert_eq!(schema_catalog.objects.len(), 1);
6365        assert_eq!(schema_catalog.synonyms.len(), 1);
6366    }
6367
6368    #[test]
6369    fn package_and_plscope_models_capture_signature_state() {
6370        let owner = SchemaName::from(SymbolId::new(10));
6371        let package_name = ObjectName::from(SymbolId::new(11));
6372        let procedure_name = ObjectName::from(SymbolId::new(12));
6373        let member_name = MemberName::from(SymbolId::new(13));
6374
6375        let package = PackageMetadata {
6376            common: ObjectCommon {
6377                owner,
6378                name: package_name,
6379                object_type: ObjectType::Package,
6380                status: ObjectStatus::Valid,
6381                ..ObjectCommon::default()
6382            },
6383            procedures: vec![RoutineSignature {
6384                routine_name: procedure_name,
6385                arguments: vec![crate::ArgumentMetadata {
6386                    position: 1,
6387                    name: Some(member_name),
6388                    data_type: DataTypeRef {
6389                        name: String::from("VARCHAR2"),
6390                        length: Some(30),
6391                        ..DataTypeRef::default()
6392                    },
6393                    ..crate::ArgumentMetadata::default()
6394                }],
6395                accessible_by: vec![AccessibleByTarget {
6396                    owner: Some(owner),
6397                    object_name: package_name,
6398                }],
6399                ..RoutineSignature::default()
6400            }],
6401            ..PackageMetadata::default()
6402        };
6403
6404        let plscope = PlScopeSnapshot {
6405            availability: PlScopeAvailability::IdentifiersAndStatements,
6406            identifiers: vec![CompilerIdentifier {
6407                owner,
6408                object_name: package_name,
6409                identifier_name: member_name,
6410                identifier_type: String::from("FORMAL IN"),
6411                usage: String::from("DECLARATION"),
6412                line: 4,
6413                column: 12,
6414            }],
6415            ..PlScopeSnapshot::default()
6416        };
6417
6418        assert_eq!(package.procedures.len(), 1);
6419        assert_eq!(package.procedures[0].accessible_by.len(), 1);
6420        assert_eq!(
6421            plscope.availability,
6422            PlScopeAvailability::IdentifiersAndStatements
6423        );
6424        assert_eq!(plscope.identifiers[0].line, 4);
6425        assert_eq!(ConstraintType::ForeignKey, ConstraintType::ForeignKey);
6426        assert_eq!(TypeFinality::Unknown, TypeFinality::default());
6427        assert_eq!(TypeInstantiable::Unknown, TypeInstantiable::default());
6428        assert_eq!(TriggerName::from(SymbolId::new(14)).symbol().get(), 14);
6429    }
6430
6431    #[test]
6432    fn catalog_snapshot_round_trips_through_versioned_json_document() {
6433        let tempdir = tempdir();
6434        assert!(tempdir.is_ok());
6435        let tempdir = if let Ok(tempdir) = tempdir {
6436            tempdir
6437        } else {
6438            return;
6439        };
6440
6441        let mut snapshot = CatalogSnapshot::new(
6442            AnalysisProfile::default(),
6443            CatalogCapabilities::default(),
6444            CatalogSource {
6445                kind: CatalogSourceKind::JsonSnapshot,
6446                path: None,
6447                description: Some(String::from("roundtrip")),
6448            },
6449            DateTime::<Utc>::UNIX_EPOCH,
6450        );
6451        let billing = snapshot.intern_schema_name("billing");
6452        let claims = snapshot.intern_object_name("claims");
6453        assert!(billing.is_some());
6454        assert!(claims.is_some());
6455
6456        let path = tempdir.path().join("snapshot.json");
6457        let exported = export_snapshot_to_json(&snapshot, &path);
6458        assert!(exported.is_ok());
6459
6460        let loaded = load_snapshot_from_json(&path);
6461        assert!(loaded.is_ok());
6462        assert_eq!(loaded.ok(), Some(snapshot.clone()));
6463
6464        let rendered = std::fs::read_to_string(path);
6465        assert!(rendered.is_ok());
6466        let rendered = if let Ok(rendered) = rendered {
6467            rendered
6468        } else {
6469            return;
6470        };
6471        let document = serde_json::from_str::<CatalogSnapshotDocument>(&rendered);
6472        assert!(document.is_ok());
6473        let document = if let Ok(document) = document {
6474            document
6475        } else {
6476            return;
6477        };
6478
6479        assert!(document.schema_id.as_str().eq(CATALOG_SNAPSHOT_SCHEMA_ID));
6480        assert!(matches!(
6481            document
6482                .schema_version
6483                .cmp(&CATALOG_SNAPSHOT_SCHEMA_VERSION),
6484            std::cmp::Ordering::Equal
6485        ));
6486        assert_eq!(
6487            document
6488                .snapshot
6489                .interner
6490                .resolve(billing.unwrap_or_default().symbol()),
6491            Some("billing")
6492        );
6493        assert_eq!(
6494            document
6495                .snapshot
6496                .interner
6497                .resolve(claims.unwrap_or_default().symbol()),
6498            Some("claims")
6499        );
6500    }
6501
6502    #[test]
6503    fn load_from_dbms_metadata_dir_classifies_create_statements() {
6504        let dir = tempdir().unwrap();
6505        let root = dir.path();
6506
6507        // Write some .sql files
6508        std::fs::write(
6509            root.join("customers.sql"),
6510            "CREATE TABLE customers (id NUMBER, name VARCHAR2(100));",
6511        )
6512        .unwrap();
6513        std::fs::write(
6514            root.join("billing_api.sql"),
6515            "CREATE OR REPLACE PACKAGE billing_api AS\n  PROCEDURE charge(p_id NUMBER);\nEND;",
6516        )
6517        .unwrap();
6518        std::fs::write(
6519            root.join("invoice_seq.sql"),
6520            "CREATE SEQUENCE invoice_seq START WITH 1 INCREMENT BY 1;",
6521        )
6522        .unwrap();
6523        std::fs::write(root.join("skip.txt"), "not a sql file").unwrap();
6524
6525        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6526        assert_eq!(snapshot.source.kind, CatalogSourceKind::DbmsMetadataFiles);
6527
6528        // Should have found objects in a default schema
6529        let total_objects: usize = snapshot.schemas.values().map(|s| s.objects.len()).sum();
6530        assert!(
6531            total_objects >= 2,
6532            "expected at least 2 objects, got {}",
6533            total_objects
6534        );
6535    }
6536
6537    #[test]
6538    fn load_from_dbms_metadata_dir_returns_error_for_nonexistent_dir() {
6539        let result = load_from_dbms_metadata_dir(std::path::Path::new("/nonexistent/path"));
6540        assert!(result.is_err());
6541    }
6542
6543    #[test]
6544    fn load_from_dbms_metadata_dir_handles_empty_dir() {
6545        let dir = tempdir().unwrap();
6546        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
6547        assert!(
6548            snapshot.schemas.is_empty() || snapshot.schemas.values().all(|s| s.objects.is_empty())
6549        );
6550    }
6551
6552    /// DDL with a qualified `OWNER.OBJECT` prefix must be filed under the
6553    /// real owner schema, never collapsed to a single shared bucket. A
6554    /// multi-schema DBMS_METADATA dump that lands under one schema would
6555    /// silently lose cross-schema topology.
6556    #[test]
6557    fn load_from_dbms_metadata_dir_records_real_schema_owner() {
6558        let dir = tempdir().unwrap();
6559        let root = dir.path();
6560        std::fs::write(
6561            root.join("hr_employees.sql"),
6562            "CREATE TABLE hr.employees (id NUMBER PRIMARY KEY);",
6563        )
6564        .unwrap();
6565        std::fs::write(
6566            root.join("billing_invoices.sql"),
6567            "CREATE TABLE billing.invoices (id NUMBER, amount NUMBER(12,2));",
6568        )
6569        .unwrap();
6570        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6571
6572        // Two distinct owner schemas must be present — not collapsed.
6573        let schema_names: std::collections::HashSet<String> = snapshot
6574            .schemas
6575            .keys()
6576            .filter_map(|s| snapshot.interner.resolve(s.symbol()).map(str::to_string))
6577            .collect();
6578        assert!(
6579            schema_names.contains("HR"),
6580            "HR schema bucket must exist; got {schema_names:?}"
6581        );
6582        assert!(
6583            schema_names.contains("BILLING"),
6584            "BILLING schema bucket must exist; got {schema_names:?}"
6585        );
6586
6587        // Each schema bucket holds exactly its own object — never the
6588        // other's.
6589        for (schema, bucket) in &snapshot.schemas {
6590            let name = snapshot.interner.resolve(schema.symbol()).unwrap();
6591            assert_eq!(
6592                bucket.objects.len(),
6593                1,
6594                "{name} bucket must hold exactly one object"
6595            );
6596        }
6597    }
6598
6599    /// Unqualified CREATE statements (no owner prefix) must land in a
6600    /// stable named schema (e.g. `PUBLIC`) interned through the regular
6601    /// interner — never `SymbolId::new(0)` which collides with whatever
6602    /// the first interner entry happens to be.
6603    #[test]
6604    fn load_from_dbms_metadata_dir_uses_named_default_schema_for_unqualified_ddl() {
6605        let dir = tempdir().unwrap();
6606        let root = dir.path();
6607        std::fs::write(
6608            root.join("customers.sql"),
6609            "CREATE TABLE customers (id NUMBER, name VARCHAR2(100));",
6610        )
6611        .unwrap();
6612        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6613
6614        let schema_names: std::collections::HashSet<String> = snapshot
6615            .schemas
6616            .keys()
6617            .filter_map(|s| snapshot.interner.resolve(s.symbol()).map(str::to_string))
6618            .collect();
6619        assert!(
6620            schema_names.contains("PUBLIC"),
6621            "default schema bucket (PUBLIC) must exist for unqualified DDL; got {schema_names:?}"
6622        );
6623    }
6624
6625    /// The classifier must actually record the raw DDL text on the
6626    /// produced `CatalogObject` — the original docstring promised this
6627    /// and downstream consumers (doc generation, lineage) rely on it.
6628    #[test]
6629    fn load_from_dbms_metadata_dir_records_raw_ddl_text_on_object() {
6630        let dir = tempdir().unwrap();
6631        let root = dir.path();
6632        let raw = "CREATE TABLE hr.orders (id NUMBER PRIMARY KEY, total NUMBER(12,2));";
6633        std::fs::write(root.join("orders.sql"), raw).unwrap();
6634        let snapshot = load_from_dbms_metadata_dir(root).unwrap();
6635
6636        let bucket = snapshot
6637            .schemas
6638            .values()
6639            .find(|b| !b.objects.is_empty())
6640            .expect("at least one schema bucket with an object");
6641        let obj = bucket.objects.values().next().unwrap();
6642        let common = match obj {
6643            CatalogObject::Table(t) => Some(&t.common),
6644            _ => None,
6645        }
6646        .expect("expected Table");
6647        let ddl = common
6648            .ddl
6649            .as_ref()
6650            .expect("CatalogObject must carry its raw DDL text");
6651        assert_eq!(ddl.ddl_text, raw, "ddl_text must round-trip the source DDL");
6652    }
6653
6654    #[test]
6655    fn load_snapshot_populates_object_metadata_and_dependency_rows() {
6656        let object_rows = vec![
6657            oracle_row(&[
6658                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6659                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6660                ("OBJECT_TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6661                ("STATUS", "VARCHAR2(7)", Some("INVALID")),
6662                (
6663                    "LAST_DDL_TIME_ISO",
6664                    "VARCHAR2(19)",
6665                    Some("2026-05-01T13:14:15"),
6666                ),
6667                ("EDITIONABLE", "VARCHAR2(1)", Some("Y")),
6668                ("EDITION_NAME", "VARCHAR2(128)", Some("ORA$BASE")),
6669            ]),
6670            oracle_row(&[
6671                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6672                ("OBJECT_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
6673                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
6674                ("STATUS", "VARCHAR2(7)", Some("VALID")),
6675                (
6676                    "LAST_DDL_TIME_ISO",
6677                    "VARCHAR2(19)",
6678                    Some("2026-04-22T08:30:00"),
6679                ),
6680                ("EDITIONABLE", "VARCHAR2(1)", None),
6681                ("EDITION_NAME", "VARCHAR2(128)", None),
6682            ]),
6683        ];
6684
6685        let dependency_rows = vec![
6686            oracle_row(&[
6687                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6688                ("NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6689                ("TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6690                ("REFERENCED_OWNER", "VARCHAR2(128)", Some("BILLING")),
6691                ("REFERENCED_NAME", "VARCHAR2(128)", Some("CUSTOMERS")),
6692                ("REFERENCED_TYPE", "VARCHAR2(30)", Some("TABLE")),
6693                ("DEPENDENCY_TYPE", "VARCHAR2(4)", Some("HARD")),
6694            ]),
6695            oracle_row(&[
6696                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
6697                ("NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
6698                ("TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6699                ("REFERENCED_OWNER", "VARCHAR2(128)", Some("SYS")),
6700                ("REFERENCED_NAME", "VARCHAR2(128)", Some("DBMS_OUTPUT")),
6701                ("REFERENCED_TYPE", "VARCHAR2(30)", Some("PACKAGE")),
6702                ("DEPENDENCY_TYPE", "VARCHAR2(4)", Some("REF")),
6703            ]),
6704        ];
6705
6706        let billing_only = vec![OracleBind::from("BILLING")];
6707        let mut expected_queries = capability_probe_expectations();
6708        // No grants are asserted here, but live extraction still issues the
6709        // ALL_USERS probe before ALL_TAB_PRIVS.
6710        expected_queries.push(all_users_expectation(&["BILLING"]));
6711        expected_queries.extend(vec![
6712            QueryExpectation {
6713                sql_contains: String::from("from all_objects"),
6714                params: billing_only.clone(),
6715                rows: object_rows,
6716            },
6717            QueryExpectation {
6718                sql_contains: String::from("from all_tab_cols"),
6719                params: billing_only.clone(),
6720                rows: vec![],
6721            },
6722            QueryExpectation {
6723                sql_contains: String::from("from all_constraints c"),
6724                params: billing_only.clone(),
6725                rows: vec![],
6726            },
6727            QueryExpectation {
6728                sql_contains: String::from("from all_indexes i"),
6729                params: billing_only.clone(),
6730                rows: vec![],
6731            },
6732            QueryExpectation {
6733                sql_contains: String::from("from all_triggers"),
6734                params: billing_only.clone(),
6735                rows: vec![],
6736            },
6737            QueryExpectation {
6738                sql_contains: String::from("from all_synonyms"),
6739                params: billing_only.clone(),
6740                rows: vec![],
6741            },
6742            QueryExpectation {
6743                sql_contains: String::from("from all_procedures"),
6744                params: billing_only.clone(),
6745                rows: vec![],
6746            },
6747            QueryExpectation {
6748                sql_contains: String::from("from all_arguments"),
6749                params: billing_only.clone(),
6750                rows: vec![],
6751            },
6752            QueryExpectation {
6753                sql_contains: String::from("from all_views"),
6754                params: billing_only.clone(),
6755                rows: vec![],
6756            },
6757            QueryExpectation {
6758                sql_contains: String::from("from all_mviews"),
6759                params: billing_only.clone(),
6760                rows: vec![],
6761            },
6762            QueryExpectation {
6763                sql_contains: String::from("from all_sequences"),
6764                params: billing_only.clone(),
6765                rows: vec![],
6766            },
6767            QueryExpectation {
6768                sql_contains: String::from("from all_type_attrs"),
6769                params: billing_only.clone(),
6770                rows: vec![],
6771            },
6772            QueryExpectation {
6773                sql_contains: String::from("from all_tab_privs"),
6774                params: billing_only.clone(),
6775                rows: vec![],
6776            },
6777            QueryExpectation {
6778                sql_contains: String::from("from all_db_links"),
6779                params: billing_only.clone(),
6780                rows: vec![],
6781            },
6782            QueryExpectation {
6783                sql_contains: String::from("from all_tab_comments"),
6784                params: billing_only.clone(),
6785                rows: vec![],
6786            },
6787            QueryExpectation {
6788                sql_contains: String::from("from all_col_comments"),
6789                params: billing_only.clone(),
6790                rows: vec![],
6791            },
6792            QueryExpectation {
6793                sql_contains: String::from("from all_editions"),
6794                params: vec![],
6795                rows: vec![],
6796            },
6797            QueryExpectation {
6798                sql_contains: String::from("from all_editioning_views"),
6799                params: billing_only.clone(),
6800                rows: vec![],
6801            },
6802            QueryExpectation {
6803                sql_contains: String::from("from all_policies"),
6804                params: billing_only.clone(),
6805                rows: vec![],
6806            },
6807            QueryExpectation {
6808                sql_contains: String::from("from all_dependencies"),
6809                params: billing_only,
6810                rows: dependency_rows,
6811            },
6812        ]);
6813        let connection = StaticConnection {
6814            expected_queries,
6815            ..StaticConnection::default()
6816        };
6817
6818        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
6819        let schema = snapshot
6820            .profile
6821            .current_schema
6822            .expect("current schema interned");
6823        let schema_catalog = snapshot.schemas.get(&schema).expect("billing schema");
6824
6825        let billing_pkg = schema_catalog
6826            .objects
6827            .values()
6828            .find_map(|object| match object {
6829                CatalogObject::Package(metadata) => Some(metadata),
6830                _ => None,
6831            })
6832            .expect("billing pkg present");
6833        assert_eq!(billing_pkg.common.status, ObjectStatus::Invalid);
6834        assert_eq!(billing_pkg.common.editionable, Some(true));
6835        assert!(billing_pkg.common.edition_name.is_some());
6836        let last_ddl = billing_pkg
6837            .common
6838            .last_ddl_time
6839            .expect("last_ddl_time populated");
6840        // The fixture date is 2026-05-01 13:14:15 UTC.
6841        assert_eq!(last_ddl.timestamp(), 1_777_641_255);
6842
6843        let customers_table = schema_catalog
6844            .objects
6845            .values()
6846            .find_map(|object| match object {
6847                CatalogObject::Table(metadata) => Some(metadata),
6848                _ => None,
6849            })
6850            .expect("customers table");
6851        assert!(customers_table.common.editionable.is_none());
6852        assert!(customers_table.common.edition_name.is_none());
6853
6854        assert_eq!(schema_catalog.dependencies.len(), 2);
6855        let pkg_to_customers = schema_catalog
6856            .dependencies
6857            .iter()
6858            .find(|d| matches!(d.dependency_kind, CatalogDependencyKind::Hard))
6859            .expect("hard dependency");
6860        assert!(matches!(pkg_to_customers.object_type, ObjectType::Package));
6861        assert!(matches!(
6862            pkg_to_customers.referenced_type,
6863            Some(ObjectType::Table)
6864        ));
6865        let pkg_to_sys = schema_catalog
6866            .dependencies
6867            .iter()
6868            .find(|d| matches!(d.dependency_kind, CatalogDependencyKind::Reference))
6869            .expect("ref dependency");
6870        assert!(matches!(
6871            pkg_to_sys.referenced_type,
6872            Some(ObjectType::Package)
6873        ));
6874        assert_eq!(
6875            pkg_to_sys
6876                .referenced_owner
6877                .and_then(|owner| snapshot.interner.resolve(owner.symbol())),
6878            Some("SYS")
6879        );
6880    }
6881
6882    #[test]
6883    fn doctor_report_counts_objects_columns_and_indexes() {
6884        let connection = StaticConnection::default();
6885        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
6886        let mut snapshot = snapshot;
6887        let billing = snapshot.intern_schema_name("BILLING").expect("schema name");
6888        let invoices = snapshot
6889            .intern_object_name("INVOICES")
6890            .expect("object name");
6891        let invoice_view = snapshot
6892            .intern_object_name("INVOICE_VIEW")
6893            .expect("object name");
6894        let billing_seq = snapshot
6895            .intern_object_name("BILLING_SEQ")
6896            .expect("sequence name");
6897
6898        let invoice_id = snapshot
6899            .intern_column_name("INVOICE_ID")
6900            .expect("column name");
6901        let table_metadata = crate::TableMetadata {
6902            common: ObjectCommon {
6903                owner: billing,
6904                name: invoices,
6905                object_type: ObjectType::Table,
6906                status: ObjectStatus::Valid,
6907                ..ObjectCommon::default()
6908            },
6909            columns: HashMap::from([(
6910                invoice_id,
6911                crate::ColumnMetadata {
6912                    name: invoice_id,
6913                    position: 1,
6914                    ..crate::ColumnMetadata::default()
6915                },
6916            )]),
6917            ..crate::TableMetadata::default()
6918        };
6919
6920        let view_metadata = crate::ViewMetadata {
6921            common: ObjectCommon {
6922                owner: billing,
6923                name: invoice_view,
6924                object_type: ObjectType::View,
6925                status: ObjectStatus::Invalid,
6926                ..ObjectCommon::default()
6927            },
6928            ..crate::ViewMetadata::default()
6929        };
6930
6931        let sequence_metadata = crate::SequenceMetadata {
6932            common: ObjectCommon {
6933                owner: billing,
6934                name: billing_seq,
6935                object_type: ObjectType::Sequence,
6936                status: ObjectStatus::Valid,
6937                ..ObjectCommon::default()
6938            },
6939            ..crate::SequenceMetadata::default()
6940        };
6941
6942        let schema_catalog = snapshot.schemas.entry(billing).or_default();
6943        schema_catalog
6944            .objects
6945            .insert(invoices, CatalogObject::Table(table_metadata));
6946        schema_catalog
6947            .objects
6948            .insert(invoice_view, CatalogObject::View(view_metadata));
6949        schema_catalog
6950            .objects
6951            .insert(billing_seq, CatalogObject::Sequence(sequence_metadata));
6952
6953        let report = snapshot.doctor_report();
6954        assert_eq!(report.totals.objects_total, 3);
6955        assert_eq!(report.totals.columns_total, 1);
6956        let table_tile = report
6957            .object_counts
6958            .iter()
6959            .find(|tile| matches!(tile.object_type, ObjectType::Table))
6960            .expect("table tile");
6961        assert_eq!(table_tile.total, 1);
6962        assert_eq!(table_tile.valid, 1);
6963        let view_tile = report
6964            .object_counts
6965            .iter()
6966            .find(|tile| matches!(tile.object_type, ObjectType::View))
6967            .expect("view tile");
6968        assert_eq!(view_tile.invalid, 1);
6969        // Capability negotiation (PLSQL-CAT-017) probes the connection; the
6970        // StaticConnection mock returns Ok([]) for unmatched queries which
6971        // means every probe succeeds in this test path → no missing-permission
6972        // suggestions should appear.
6973        assert!(report.can_query_all_views);
6974        assert!(report.can_use_dbms_metadata);
6975        assert!(
6976            report.missing_permissions.is_empty(),
6977            "all probes succeeded on the mock so no permissions should be flagged"
6978        );
6979    }
6980
6981    #[test]
6982    fn doctor_report_suggests_grants_when_capabilities_are_missing() {
6983        let mut snapshot = CatalogSnapshot::new(
6984            plsql_core::AnalysisProfile::default(),
6985            CatalogCapabilities {
6986                can_query_all_views: true,
6987                can_query_dba_views: false,
6988                can_use_dbms_metadata: false,
6989                can_read_source: false,
6990                plscope_enabled: false,
6991                can_query_scheduler: false,
6992                can_query_roles_and_grants: false,
6993                warnings: vec![CapabilityWarning {
6994                    code: String::from("catalog-version-parse-fallback"),
6995                    message: String::from("server version did not parse"),
6996                    remediation: None,
6997                }],
6998            },
6999            CatalogSource {
7000                kind: CatalogSourceKind::LiveConnection,
7001                path: None,
7002                description: Some(String::from("live extraction via oraclemcp-db from xe")),
7003            },
7004            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
7005        );
7006        let _ = snapshot.intern_schema_name("BILLING");
7007
7008        let report = snapshot.doctor_report();
7009        assert!(matches!(
7010            report.source_kind,
7011            CatalogSourceKind::LiveConnection
7012        ));
7013        assert_eq!(report.capability_warnings.len(), 1);
7014        let view_names: Vec<&str> = report
7015            .missing_permissions
7016            .iter()
7017            .map(|m| m.view_name.as_str())
7018            .collect();
7019        assert!(view_names.iter().any(|v| v.contains("DBA_OBJECTS")));
7020        assert!(view_names.iter().any(|v| v.contains("DBMS_METADATA")));
7021        assert!(view_names.iter().any(|v| v.contains("ALL_SOURCE")));
7022        assert!(view_names.iter().any(|v| v.contains("PLSCOPE_SETTINGS")));
7023        assert!(view_names.iter().any(|v| v.contains("SCHEDULER_JOBS")));
7024        assert!(view_names.iter().any(|v| v.contains("ROLE_PRIVS")));
7025    }
7026
7027    #[test]
7028    fn load_snapshot_populates_plscope_availability_from_object_settings() {
7029        let plscope_rows = vec![
7030            oracle_row(&[
7031                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
7032                (
7033                    "PLSCOPE_SETTINGS",
7034                    "VARCHAR2(255)",
7035                    Some("IDENTIFIERS:ALL,STATEMENTS:ALL"),
7036                ),
7037            ]),
7038            oracle_row(&[
7039                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
7040                (
7041                    "PLSCOPE_SETTINGS",
7042                    "VARCHAR2(255)",
7043                    Some("IDENTIFIERS:ALL,STATEMENTS:NONE"),
7044                ),
7045            ]),
7046            oracle_row(&[
7047                ("OWNER", "VARCHAR2(128)", Some("REPORTING")),
7048                (
7049                    "PLSCOPE_SETTINGS",
7050                    "VARCHAR2(255)",
7051                    Some("IDENTIFIERS:NONE,STATEMENTS:NONE"),
7052                ),
7053            ]),
7054        ];
7055
7056        let billing_only = vec![OracleBind::from("BILLING")];
7057        let mut expected_queries = capability_probe_expectations();
7058        // Live extraction issues the ALL_USERS probe before ALL_TAB_PRIVS.
7059        expected_queries.push(all_users_expectation(&["BILLING", "REPORTING"]));
7060        for fragment in [
7061            "from all_objects",
7062            "from all_tab_cols",
7063            "from all_constraints c",
7064            "from all_indexes i",
7065            "from all_triggers",
7066            "from all_synonyms",
7067            "from all_procedures",
7068            "from all_arguments",
7069            "from all_views",
7070            "from all_mviews",
7071            "from all_sequences",
7072            "from all_type_attrs",
7073            "from all_tab_privs",
7074            "from all_db_links",
7075            "from all_tab_comments",
7076            "from all_col_comments",
7077            "from all_editions",
7078            "from all_editioning_views",
7079            "from all_policies",
7080            "from all_dependencies",
7081        ] {
7082            // all_editions is database-wide — no schema-bind param.
7083            let params = if fragment.eq("from all_editions") {
7084                vec![]
7085            } else {
7086                billing_only.clone()
7087            };
7088            expected_queries.push(QueryExpectation {
7089                sql_contains: String::from(fragment),
7090                params,
7091                rows: vec![],
7092            });
7093        }
7094        expected_queries.push(QueryExpectation {
7095            sql_contains: String::from("from all_plsql_object_settings"),
7096            params: billing_only.clone(),
7097            rows: plscope_rows,
7098        });
7099        let connection = StaticConnection {
7100            expected_queries,
7101            ..StaticConnection::default()
7102        };
7103        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
7104
7105        let billing = snapshot
7106            .schemas
7107            .keys()
7108            .find(|name| {
7109                snapshot
7110                    .interner
7111                    .resolve(name.symbol())
7112                    .is_some_and(|label| label.eq("BILLING"))
7113            })
7114            .copied()
7115            .expect("billing schema");
7116        let plscope = snapshot
7117            .schemas
7118            .get(&billing)
7119            .and_then(|s| s.plscope.as_ref())
7120            .expect("billing plscope");
7121        assert!(matches!(
7122            plscope.availability,
7123            PlScopeAvailability::IdentifiersAndStatements
7124        ));
7125
7126        let reporting = snapshot
7127            .schemas
7128            .keys()
7129            .find(|name| {
7130                snapshot
7131                    .interner
7132                    .resolve(name.symbol())
7133                    .is_some_and(|label| label.eq("REPORTING"))
7134            })
7135            .copied()
7136            .expect("reporting schema");
7137        let reporting_plscope = snapshot
7138            .schemas
7139            .get(&reporting)
7140            .and_then(|s| s.plscope.as_ref())
7141            .expect("reporting plscope");
7142        // All-NONE settings → PL/Scope is wired but stale.
7143        assert!(matches!(
7144            reporting_plscope.availability,
7145            PlScopeAvailability::AvailableButStale
7146        ));
7147    }
7148
7149    #[test]
7150    fn load_snapshot_extracts_all_identifiers_into_plscope() {
7151        let identifier_rows = vec![
7152            oracle_row(&[
7153                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
7154                ("NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
7155                ("TYPE", "VARCHAR2(30)", Some("VARIABLE")),
7156                ("USAGE", "VARCHAR2(20)", Some("DECLARATION")),
7157                ("LINE", "NUMBER(10)", Some("12")),
7158                ("COL", "NUMBER(10)", Some("5")),
7159                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
7160            ]),
7161            oracle_row(&[
7162                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
7163                ("NAME", "VARCHAR2(128)", Some("CUSTOMER_ID")),
7164                ("TYPE", "VARCHAR2(30)", Some("VARIABLE")),
7165                ("USAGE", "VARCHAR2(20)", Some("REFERENCE")),
7166                ("LINE", "NUMBER(10)", Some("18")),
7167                ("COL", "NUMBER(10)", Some("21")),
7168                ("OBJECT_NAME", "VARCHAR2(128)", Some("BILLING_PKG")),
7169            ]),
7170        ];
7171
7172        let billing_only = vec![OracleBind::from("BILLING")];
7173        let mut expected_queries = capability_probe_expectations();
7174        // Live extraction issues the ALL_USERS probe before ALL_TAB_PRIVS.
7175        expected_queries.push(all_users_expectation(&["BILLING"]));
7176        for fragment in [
7177            "from all_objects",
7178            "from all_tab_cols",
7179            "from all_constraints c",
7180            "from all_indexes i",
7181            "from all_triggers",
7182            "from all_synonyms",
7183            "from all_procedures",
7184            "from all_arguments",
7185            "from all_views",
7186            "from all_mviews",
7187            "from all_sequences",
7188            "from all_type_attrs",
7189            "from all_tab_privs",
7190            "from all_db_links",
7191            "from all_tab_comments",
7192            "from all_col_comments",
7193            "from all_editions",
7194            "from all_editioning_views",
7195            "from all_policies",
7196            "from all_dependencies",
7197            "from all_plsql_object_settings",
7198        ] {
7199            let params = if fragment.eq("from all_editions") {
7200                vec![]
7201            } else {
7202                billing_only.clone()
7203            };
7204            expected_queries.push(QueryExpectation {
7205                sql_contains: String::from(fragment),
7206                params,
7207                rows: vec![],
7208            });
7209        }
7210        expected_queries.push(QueryExpectation {
7211            sql_contains: String::from("from all_identifiers"),
7212            params: billing_only.clone(),
7213            rows: identifier_rows,
7214        });
7215        let connection = StaticConnection {
7216            expected_queries,
7217            ..StaticConnection::default()
7218        };
7219        let snapshot = load_snapshot_for_test(&connection, &CatalogLoadRequest::default()).unwrap();
7220
7221        let billing = snapshot
7222            .schemas
7223            .keys()
7224            .find(|name| {
7225                snapshot
7226                    .interner
7227                    .resolve(name.symbol())
7228                    .is_some_and(|label| label.eq("BILLING"))
7229            })
7230            .copied()
7231            .expect("billing schema");
7232        let plscope = snapshot
7233            .schemas
7234            .get(&billing)
7235            .and_then(|s| s.plscope.as_ref())
7236            .expect("plscope present");
7237        assert_eq!(plscope.identifiers.len(), 2);
7238        let first = &plscope.identifiers[0];
7239        assert_eq!(first.identifier_type, "VARIABLE");
7240        assert_eq!(first.usage, "DECLARATION");
7241        assert_eq!(first.line, 12);
7242        assert_eq!(first.column, 5);
7243    }
7244
7245    #[test]
7246    fn normalize_dbms_metadata_ddl_collapses_whitespace_and_trims_slash() {
7247        use crate::normalize_dbms_metadata_ddl;
7248        let input = "  CREATE   TABLE   FOO ( ID   NUMBER )  \n/  ";
7249        let normalized = normalize_dbms_metadata_ddl(input);
7250        assert_eq!(normalized, "CREATE TABLE FOO ( ID NUMBER )");
7251    }
7252
7253    #[test]
7254    fn object_type_to_dbms_metadata_value_covers_all_known_types() {
7255        use crate::object_type_to_dbms_metadata_value;
7256        // Every object kind that DBMS_METADATA supports must map to a name.
7257        for object_type in [
7258            ObjectType::Table,
7259            ObjectType::View,
7260            ObjectType::MaterializedView,
7261            ObjectType::Sequence,
7262            ObjectType::Type,
7263            ObjectType::Package,
7264            ObjectType::Procedure,
7265            ObjectType::Function,
7266            ObjectType::Trigger,
7267            ObjectType::EditioningView,
7268            ObjectType::SchedulerJob,
7269            ObjectType::Synonym,
7270            ObjectType::Index,
7271        ] {
7272            assert!(object_type_to_dbms_metadata_value(object_type).is_some());
7273        }
7274        // Constraint + Unknown are not directly fetchable via DBMS_METADATA.
7275        assert!(object_type_to_dbms_metadata_value(ObjectType::Constraint).is_none());
7276        assert!(object_type_to_dbms_metadata_value(ObjectType::Unknown).is_none());
7277    }
7278
7279    #[test]
7280    fn populate_dbms_metadata_ddl_records_warnings_on_fetch_failure() {
7281        use crate::CatalogCapabilities;
7282
7283        // Build a tiny snapshot with one TABLE object, then run populate
7284        // against a mock that returns Err on every query. We expect the
7285        // CapabilityWarning trail to grow by one but the snapshot to remain
7286        // intact.
7287        let mut snapshot = CatalogSnapshot::new(
7288            plsql_core::AnalysisProfile::default(),
7289            CatalogCapabilities {
7290                can_use_dbms_metadata: true,
7291                ..CatalogCapabilities::default()
7292            },
7293            CatalogSource {
7294                kind: CatalogSourceKind::LiveConnection,
7295                path: None,
7296                description: Some(String::from("test")),
7297            },
7298            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
7299        );
7300        let billing = snapshot.intern_schema_name("BILLING").expect("schema");
7301        let invoices = snapshot.intern_object_name("INVOICES").expect("object");
7302        let table = crate::TableMetadata {
7303            common: ObjectCommon {
7304                owner: billing,
7305                name: invoices,
7306                object_type: ObjectType::Table,
7307                status: ObjectStatus::Valid,
7308                ..ObjectCommon::default()
7309            },
7310            ..crate::TableMetadata::default()
7311        };
7312        snapshot
7313            .schemas
7314            .entry(billing)
7315            .or_default()
7316            .objects
7317            .insert(invoices, CatalogObject::Table(table));
7318
7319        // Force the mock to fail by requesting params that won't match.
7320        let failing_connection = StaticConnection {
7321            expected_queries: vec![QueryExpectation {
7322                sql_contains: String::from("dbms_metadata.get_ddl"),
7323                params: vec![OracleBind::from("UNREACHABLE_SENTINEL")],
7324                rows: vec![],
7325            }],
7326            ..StaticConnection::default()
7327        };
7328
7329        populate_dbms_metadata_ddl_for_test(&failing_connection, &mut snapshot).unwrap();
7330
7331        assert!(
7332            snapshot
7333                .capabilities
7334                .warnings
7335                .iter()
7336                .any(|w| w.code.eq("dbms-metadata-fetch-failed"))
7337        );
7338
7339        // Capability disabled → populate is a no-op (no extra warning).
7340        snapshot.capabilities.can_use_dbms_metadata = false;
7341        let baseline_warnings = snapshot.capabilities.warnings.len();
7342        populate_dbms_metadata_ddl_for_test(&failing_connection, &mut snapshot).unwrap();
7343        assert_eq!(snapshot.capabilities.warnings.len(), baseline_warnings);
7344    }
7345
7346    #[test]
7347    fn populate_dbms_metadata_ddl_writes_ddl_field_on_success() {
7348        use crate::CatalogCapabilities;
7349
7350        let mut snapshot = CatalogSnapshot::new(
7351            plsql_core::AnalysisProfile::default(),
7352            CatalogCapabilities {
7353                can_use_dbms_metadata: true,
7354                ..CatalogCapabilities::default()
7355            },
7356            CatalogSource {
7357                kind: CatalogSourceKind::LiveConnection,
7358                path: None,
7359                description: Some(String::from("test")),
7360            },
7361            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
7362        );
7363        let billing = snapshot.intern_schema_name("BILLING").expect("schema");
7364        let invoices = snapshot.intern_object_name("INVOICES").expect("object");
7365        let table = crate::TableMetadata {
7366            common: ObjectCommon {
7367                owner: billing,
7368                name: invoices,
7369                object_type: ObjectType::Table,
7370                status: ObjectStatus::Valid,
7371                ..ObjectCommon::default()
7372            },
7373            ..crate::TableMetadata::default()
7374        };
7375        snapshot
7376            .schemas
7377            .entry(billing)
7378            .or_default()
7379            .objects
7380            .insert(invoices, CatalogObject::Table(table));
7381
7382        let connection = StaticConnection {
7383            expected_queries: vec![QueryExpectation {
7384                sql_contains: String::from("dbms_metadata.get_ddl"),
7385                params: vec![
7386                    OracleBind::from("TABLE"),
7387                    OracleBind::from("INVOICES"),
7388                    OracleBind::from("BILLING"),
7389                ],
7390                rows: vec![oracle_row(&[
7391                    (
7392                        "DDL_TEXT",
7393                        "CLOB",
7394                        Some("  CREATE TABLE BILLING.INVOICES (ID NUMBER)  \n  "),
7395                    ),
7396                    (
7397                        "XML_TEXT",
7398                        "CLOB",
7399                        Some("<TABLE_T><NAME>INVOICES</NAME></TABLE_T>"),
7400                    ),
7401                ])],
7402            }],
7403            ..StaticConnection::default()
7404        };
7405
7406        populate_dbms_metadata_ddl_for_test(&connection, &mut snapshot).unwrap();
7407
7408        let stored = snapshot
7409            .schemas
7410            .get(&billing)
7411            .and_then(|schema| schema.objects.get(&invoices));
7412        assert!(
7413            matches!(stored, Some(CatalogObject::Table(_))),
7414            "expected a Table catalog object for billing.invoices"
7415        );
7416        let Some(CatalogObject::Table(table)) = stored else {
7417            return;
7418        };
7419        let ddl = table.common.ddl.as_ref().expect("ddl populated");
7420        assert!(ddl.ddl_text.contains("CREATE TABLE BILLING.INVOICES"));
7421        assert_eq!(
7422            ddl.normalized_ddl.as_deref(),
7423            Some("CREATE TABLE BILLING.INVOICES (ID NUMBER)")
7424        );
7425        assert_eq!(
7426            ddl.xml_text.as_deref(),
7427            Some("<TABLE_T><NAME>INVOICES</NAME></TABLE_T>")
7428        );
7429    }
7430
7431    #[test]
7432    fn negotiate_capabilities_reports_failures_as_warnings() {
7433        let connection = StaticConnection {
7434            expected_queries: capability_probe_expectations()
7435                .into_iter()
7436                .map(|mut q| {
7437                    // Force every probe to "fail" by demanding a non-empty
7438                    // params slice while the negotiator passes empty params.
7439                    q.params = vec![OracleBind::from("UNREACHABLE_SENTINEL")];
7440                    q
7441                })
7442                .collect(),
7443            ..StaticConnection::default()
7444        };
7445
7446        let capabilities = negotiate_capabilities_for_test(&connection);
7447
7448        assert!(!capabilities.can_query_all_views);
7449        assert!(!capabilities.can_query_dba_views);
7450        assert!(!capabilities.can_read_source);
7451        assert!(!capabilities.can_query_scheduler);
7452        assert!(!capabilities.can_query_roles_and_grants);
7453        assert!(!capabilities.plscope_enabled);
7454        // execute() succeeds unconditionally in the mock, so DBMS_METADATA
7455        // probe is the one capability that "passes" without explicit setup.
7456        assert!(capabilities.can_use_dbms_metadata);
7457        // One warning per failed probe (six probes).
7458        assert_eq!(capabilities.warnings.len(), 6);
7459        assert!(
7460            capabilities
7461                .warnings
7462                .iter()
7463                .any(|w| w.code.eq("all-views-probe"))
7464        );
7465        assert!(
7466            capabilities
7467                .warnings
7468                .iter()
7469                .any(|w| w.code.eq("plscope-probe"))
7470        );
7471    }
7472
7473    #[test]
7474    fn negotiate_capabilities_marks_every_probe_succeeded_on_open_mock() {
7475        // Default StaticConnection has no expected_queries — falls through to
7476        // returning self.rows (empty) for every query → every probe succeeds.
7477        let connection = StaticConnection::default();
7478        let capabilities = negotiate_capabilities_for_test(&connection);
7479        assert!(capabilities.can_query_all_views);
7480        assert!(capabilities.can_query_dba_views);
7481        assert!(capabilities.can_read_source);
7482        assert!(capabilities.can_query_scheduler);
7483        assert!(capabilities.can_query_roles_and_grants);
7484        assert!(capabilities.plscope_enabled);
7485        assert!(capabilities.can_use_dbms_metadata);
7486        assert!(capabilities.warnings.is_empty());
7487    }
7488
7489    #[test]
7490    fn doctor_report_skips_grant_suggestions_for_json_snapshot_source() {
7491        let snapshot = CatalogSnapshot::new(
7492            plsql_core::AnalysisProfile::default(),
7493            CatalogCapabilities {
7494                can_query_all_views: false,
7495                can_query_dba_views: false,
7496                can_use_dbms_metadata: false,
7497                can_read_source: false,
7498                plscope_enabled: false,
7499                can_query_scheduler: false,
7500                can_query_roles_and_grants: false,
7501                warnings: vec![],
7502            },
7503            CatalogSource {
7504                kind: CatalogSourceKind::JsonSnapshot,
7505                path: Some(std::path::PathBuf::from("/tmp/snapshot.json")),
7506                description: None,
7507            },
7508            DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap(),
7509        );
7510
7511        let report = snapshot.doctor_report();
7512        assert!(matches!(
7513            report.source_kind,
7514            CatalogSourceKind::JsonSnapshot
7515        ));
7516        // Recommendations are inert for JSON snapshots — the capability bits
7517        // were already frozen at extraction time.
7518        assert!(report.missing_permissions.is_empty());
7519    }
7520
7521    // ----------------------------------------------------------------------
7522    // DDL kind classifier (header tokenizer, not body substring).
7523    //
7524    // Regression bar: the prior implementation substring-matched on the full
7525    // upper-cased DDL, so any object whose body or comments mentioned
7526    // `TABLE` was silently filed as a Table. Real public surface, real
7527    // silent data corruption.
7528    // ----------------------------------------------------------------------
7529
7530    /// Drive the per-file DDL through the public loader and return the
7531    /// single (`ObjectType`, `CatalogObject`) pair it classifies into.
7532    /// Panics on anything but exactly one classified object — every test
7533    /// below feeds exactly one DDL so the helper stays obvious.
7534    fn classify_single(ddl: &str) -> (ObjectType, CatalogObject) {
7535        let dir = tempdir().unwrap();
7536        std::fs::write(dir.path().join("obj.sql"), ddl).unwrap();
7537        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
7538        let mut found: Vec<CatalogObject> = snapshot
7539            .schemas
7540            .values()
7541            .flat_map(|s| s.objects.values().cloned())
7542            .collect();
7543        assert_eq!(
7544            found.len(),
7545            1,
7546            "expected exactly one classified object for DDL: {ddl}"
7547        );
7548        let obj = found.remove(0);
7549        let common = match &obj {
7550            CatalogObject::Table(m) => &m.common,
7551            CatalogObject::View(m) => &m.common,
7552            CatalogObject::MaterializedView(m) => &m.common,
7553            CatalogObject::Sequence(m) => &m.common,
7554            CatalogObject::Type(m) => &m.common,
7555            CatalogObject::Package(m) => &m.common,
7556            CatalogObject::Procedure(m) => &m.common,
7557            CatalogObject::Function(m) => &m.common,
7558            CatalogObject::Trigger(m) => &m.common,
7559            CatalogObject::SchedulerJob(m) => &m.common,
7560            CatalogObject::EditioningView(m) => &m.common,
7561        };
7562        (common.object_type, obj)
7563    }
7564
7565    /// A VIEW whose body merely mentions the word `TABLE` must classify as
7566    /// a View, never a Table. The prior substring matcher silently filed
7567    /// such views as tables — real catalog corruption visible to every
7568    /// downstream consumer.
7569    #[test]
7570    fn classify_view_with_table_in_body_is_view_not_table() {
7571        let ddl = "CREATE OR REPLACE VIEW hr.v_emp AS SELECT * FROM hr.emp WHERE 'TABLE'='TABLE';";
7572        let (kind, obj) = classify_single(ddl);
7573        assert_eq!(
7574            kind,
7575            ObjectType::View,
7576            "VIEW with 'TABLE' literal in body must classify as View, got {kind:?}",
7577        );
7578        assert!(
7579            matches!(obj, CatalogObject::View(_)),
7580            "expected CatalogObject::View, got {obj:?}",
7581        );
7582    }
7583
7584    /// A TRIGGER body that touches a TABLE must classify as Trigger.
7585    #[test]
7586    fn classify_trigger_with_table_in_body_is_trigger() {
7587        let ddl = "CREATE OR REPLACE TRIGGER hr.t_audit \
7588                   AFTER INSERT ON hr.employees \
7589                   BEGIN INSERT INTO hr.audit_table VALUES (:NEW.id); END;";
7590        let (kind, _obj) = classify_single(ddl);
7591        assert_eq!(kind, ObjectType::Trigger);
7592    }
7593
7594    /// A PROCEDURE body that touches a TABLE must classify as Procedure.
7595    #[test]
7596    fn classify_procedure_with_table_in_body_is_procedure() {
7597        let ddl = "CREATE OR REPLACE PROCEDURE hr.p_load \
7598                   AS BEGIN INSERT INTO hr.staging_table SELECT * FROM hr.src; END;";
7599        let (kind, _obj) = classify_single(ddl);
7600        assert_eq!(kind, ObjectType::Procedure);
7601    }
7602
7603    /// A FUNCTION body that touches a TABLE must classify as Function.
7604    #[test]
7605    fn classify_function_with_table_in_body_is_function() {
7606        let ddl = "CREATE OR REPLACE FUNCTION hr.f_count RETURN NUMBER \
7607                   AS n NUMBER; BEGIN SELECT COUNT(*) INTO n FROM hr.big_table; RETURN n; END;";
7608        let (kind, _obj) = classify_single(ddl);
7609        assert_eq!(kind, ObjectType::Function);
7610    }
7611
7612    /// `PACKAGE BODY` is a body, not a spec — the classifier returns the
7613    /// spec only, so a body file must produce zero classified objects
7614    /// (not a Package, never a Table just because the body mentions one).
7615    #[test]
7616    fn classify_package_body_with_table_is_not_package_or_table() {
7617        let ddl = "CREATE OR REPLACE PACKAGE BODY hr.billing_api AS \
7618                   PROCEDURE charge IS BEGIN INSERT INTO hr.charges_table VALUES (1); END; END;";
7619        let dir = tempdir().unwrap();
7620        std::fs::write(dir.path().join("body.sql"), ddl).unwrap();
7621        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
7622        let total: usize = snapshot.schemas.values().map(|s| s.objects.len()).sum();
7623        assert_eq!(
7624            total, 0,
7625            "PACKAGE BODY must not produce a classified object (got {total})",
7626        );
7627    }
7628
7629    /// `MATERIALIZED VIEW` must classify as `ObjectType::MaterializedView`,
7630    /// never as a plain View (substring match on `VIEW` would do the
7631    /// wrong thing).
7632    #[test]
7633    fn classify_materialized_view_is_materialized_view_not_view() {
7634        let ddl = "CREATE MATERIALIZED VIEW hr.mv_emp_summary AS SELECT dept, COUNT(*) FROM hr.emp GROUP BY dept;";
7635        let (kind, obj) = classify_single(ddl);
7636        assert_eq!(
7637            kind,
7638            ObjectType::MaterializedView,
7639            "MATERIALIZED VIEW must classify as MaterializedView, got {kind:?}",
7640        );
7641        assert!(
7642            matches!(obj, CatalogObject::MaterializedView(_)),
7643            "expected CatalogObject::MaterializedView, got {obj:?}",
7644        );
7645    }
7646
7647    /// Leading block comment that contains `CREATE TABLE …` must NOT
7648    /// fool the classifier — the real CREATE is for a VIEW.
7649    #[test]
7650    fn classify_view_with_leading_block_comment_mentioning_create_table_is_view() {
7651        let ddl = "/* comment with CREATE TABLE x; */\n\
7652                   CREATE OR REPLACE VIEW hr.v_dept AS SELECT * FROM hr.dept;";
7653        let (kind, _obj) = classify_single(ddl);
7654        assert_eq!(kind, ObjectType::View);
7655    }
7656
7657    /// Leading line comments that mention `CREATE TABLE` must NOT fool
7658    /// the classifier.
7659    #[test]
7660    fn classify_procedure_with_leading_line_comments_is_procedure() {
7661        let ddl = "-- CREATE TABLE oops (x NUMBER);\n\
7662                   -- another line mentioning VIEW and TABLE\n\
7663                   CREATE OR REPLACE PROCEDURE hr.p_noop AS BEGIN NULL; END;";
7664        let (kind, _obj) = classify_single(ddl);
7665        assert_eq!(kind, ObjectType::Procedure);
7666    }
7667
7668    /// `EDITIONABLE` / `NONEDITIONABLE` / `FORCE` modifiers between
7669    /// `CREATE [OR REPLACE]` and the KIND must be skipped — they are
7670    /// not the object kind.
7671    #[test]
7672    fn classify_editionable_view_is_view() {
7673        let ddl = "CREATE OR REPLACE EDITIONABLE VIEW hr.v_emp AS SELECT * FROM hr.emp;";
7674        let (kind, _obj) = classify_single(ddl);
7675        assert_eq!(kind, ObjectType::View);
7676    }
7677
7678    /// Quoted identifiers for owner/name must not break the
7679    /// header-tokenizer-based classifier.
7680    #[test]
7681    fn classify_quoted_table_owner_name_is_table() {
7682        let ddl = "CREATE TABLE \"HR\".\"EMP\" (id NUMBER);";
7683        let (kind, _obj) = classify_single(ddl);
7684        assert_eq!(kind, ObjectType::Table);
7685    }
7686
7687    /// A double-quoted Oracle identifier containing whitespace must be
7688    /// kept whole — never truncated at the first interior space. The
7689    /// prior `split_whitespace().next()` tokenizer cut `"MY TABLE"` down
7690    /// to `MY`, corrupting the snapshot key. `extract_owner_and_name`
7691    /// works on the already-upper-cased post-header remainder, so the
7692    /// inputs here are upper-cased the way `upper_remainder()` produces.
7693    #[test]
7694    fn extract_owner_and_name_keeps_whitespace_in_quoted_identifiers() {
7695        // OWNER.NAME, both quoted, name has a space.
7696        assert_eq!(
7697            crate::extract_owner_and_name("\"HR\".\"MY TABLE\" (ID NUMBER);"),
7698            Some((Some("HR".to_string()), "MY TABLE".to_string())),
7699        );
7700        // Quoted OWNER with a space must not be dropped (no PUBLIC misroute).
7701        assert_eq!(
7702            crate::extract_owner_and_name("\"MY OWNER\".\"EMP\" (ID NUMBER);"),
7703            Some((Some("MY OWNER".to_string()), "EMP".to_string())),
7704        );
7705        // Fully-quoted, unqualified, whitespace-bearing name.
7706        assert_eq!(
7707            crate::extract_owner_and_name("\"MY TABLE\" (ID NUMBER);"),
7708            Some((None, "MY TABLE".to_string())),
7709        );
7710        // Spaceless quoted and unquoted inputs still resolve correctly.
7711        assert_eq!(
7712            crate::extract_owner_and_name("\"HR\".\"EMP\" (ID NUMBER);"),
7713            Some((Some("HR".to_string()), "EMP".to_string())),
7714        );
7715        assert_eq!(
7716            crate::extract_owner_and_name("HR.ORDERS (ID NUMBER);"),
7717            Some((Some("HR".to_string()), "ORDERS".to_string())),
7718        );
7719    }
7720
7721    /// Two distinct whitespace-bearing quoted tables in one schema must
7722    /// intern under distinct keys — the truncating tokenizer collapsed
7723    /// both `"MY TABLE"` and `"MY OTHER"` to `MY` (last-write-wins).
7724    #[test]
7725    fn quoted_whitespace_names_intern_as_distinct_keys() {
7726        let dir = tempdir().unwrap();
7727        std::fs::write(
7728            dir.path().join("a.sql"),
7729            "CREATE TABLE \"HR\".\"MY TABLE\" (id NUMBER);",
7730        )
7731        .unwrap();
7732        std::fs::write(
7733            dir.path().join("b.sql"),
7734            "CREATE TABLE \"HR\".\"MY OTHER\" (id NUMBER);",
7735        )
7736        .unwrap();
7737        let snapshot = load_from_dbms_metadata_dir(dir.path()).unwrap();
7738
7739        // Both objects land in the HR schema (no PUBLIC misroute).
7740        let hr = snapshot
7741            .schemas
7742            .iter()
7743            .find(|(s, _)| {
7744                snapshot
7745                    .interner
7746                    .resolve(s.symbol())
7747                    .is_some_and(|label| label.eq("HR"))
7748            })
7749            .map(|(_, c)| c)
7750            .expect("HR schema present");
7751        assert_eq!(hr.objects.len(), 2, "two distinct objects, no collision");
7752
7753        let mut names: Vec<&str> = hr
7754            .objects
7755            .values()
7756            .filter_map(|o| {
7757                let CatalogObject::Table(m) = o else {
7758                    return None;
7759                };
7760                snapshot.interner.resolve(m.common.name.symbol())
7761            })
7762            .collect();
7763        assert_eq!(names.len(), 2, "all HR objects must be tables");
7764        names.sort_unstable();
7765        assert_eq!(names, vec!["MY OTHER", "MY TABLE"]);
7766    }
7767
7768    /// Plain CREATE TABLE still works (negative-of-negative regression).
7769    #[test]
7770    fn classify_plain_table_is_table() {
7771        let ddl = "CREATE TABLE hr.orders (id NUMBER PRIMARY KEY, total NUMBER(12,2));";
7772        let (kind, obj) = classify_single(ddl);
7773        assert_eq!(kind, ObjectType::Table);
7774        assert!(matches!(obj, CatalogObject::Table(_)));
7775    }
7776
7777    /// A PACKAGE spec that mentions VIEW / TABLE in a comment or
7778    /// procedure name must classify as Package.
7779    #[test]
7780    fn classify_package_spec_mentioning_view_and_table_is_package() {
7781        let ddl = "CREATE OR REPLACE PACKAGE hr.report_api AS \
7782                   -- builds a VIEW over a TABLE \n\
7783                   PROCEDURE rebuild_view_from_table; END;";
7784        let (kind, _obj) = classify_single(ddl);
7785        assert_eq!(kind, ObjectType::Package);
7786    }
7787}