Skip to main content

plsql_catalog/
lib.rs

1#![forbid(unsafe_code)]
2pub mod synthetic;
3
4use std::collections::{BTreeMap, HashMap, HashSet};
5use std::fs;
6use std::path::PathBuf;
7
8use chrono::{DateTime, Utc};
9use plsql_core::{
10    AnalysisProfile, ColumnName, EditionName, MemberName, ObjectName, RoleName, SchemaName,
11    SymbolId, SymbolInterner, UserName,
12};
13use serde::{Deserialize, Serialize};
14use thiserror::Error;
15use tracing::instrument;
16
17use plsql_output::SchemaVersion;
18
19macro_rules! catalog_name {
20    ($name:ident) => {
21        #[derive(
22            Clone,
23            Copy,
24            Debug,
25            Default,
26            Eq,
27            PartialEq,
28            Ord,
29            PartialOrd,
30            Hash,
31            Serialize,
32            Deserialize,
33        )]
34        #[serde(transparent)]
35        pub struct $name(SymbolId);
36
37        impl $name {
38            #[must_use]
39            #[instrument(level = "trace")]
40            pub fn new(symbol: SymbolId) -> Self {
41                Self(symbol)
42            }
43
44            #[must_use]
45            #[instrument(level = "trace", skip(self))]
46            pub fn symbol(self) -> SymbolId {
47                self.0
48            }
49        }
50
51        impl From<SymbolId> for $name {
52            fn from(value: SymbolId) -> Self {
53                Self::new(value)
54            }
55        }
56    };
57}
58
59catalog_name!(SynonymName);
60catalog_name!(IndexName);
61catalog_name!(ConstraintName);
62catalog_name!(TriggerName);
63
64#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
65#[serde(transparent)]
66pub struct Hash(String);
67
68impl Hash {
69    #[must_use]
70    #[instrument(level = "trace", skip(value))]
71    pub fn new(value: impl Into<String>) -> Self {
72        Self(value.into())
73    }
74
75    #[must_use]
76    #[instrument(level = "trace", skip(self))]
77    pub fn as_str(&self) -> &str {
78        self.0.as_str()
79    }
80}
81
82#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
83pub struct DbmsMetadataDdl {
84    pub ddl_text: String,
85    pub normalized_ddl: Option<String>,
86    pub xml_text: Option<String>,
87}
88
89#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
90pub enum CatalogSourceKind {
91    #[default]
92    JsonSnapshot,
93    LiveConnection,
94    DbmsMetadataFiles,
95    SyntheticTestCatalog,
96}
97
98#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
99pub struct CatalogSource {
100    pub kind: CatalogSourceKind,
101    pub path: Option<PathBuf>,
102    pub description: Option<String>,
103}
104
105#[derive(
106    Clone, Copy, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize,
107)]
108pub enum ObjectType {
109    Table,
110    View,
111    MaterializedView,
112    Sequence,
113    Type,
114    Package,
115    Procedure,
116    Function,
117    Trigger,
118    SchedulerJob,
119    EditioningView,
120    Synonym,
121    Index,
122    Constraint,
123    #[default]
124    Unknown,
125}
126
127#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
128pub enum ObjectStatus {
129    Valid,
130    Invalid,
131    #[default]
132    NotApplicable,
133}
134
135#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
136pub struct ObjectCommon {
137    pub owner: SchemaName,
138    pub name: ObjectName,
139    pub object_type: ObjectType,
140    pub status: ObjectStatus,
141    pub edition_name: Option<EditionName>,
142    pub editionable: Option<bool>,
143    pub last_ddl_time: Option<DateTime<Utc>>,
144    pub source_hash: Option<Hash>,
145    pub ddl: Option<DbmsMetadataDdl>,
146}
147
148#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
149pub struct CatalogCapabilities {
150    pub can_query_dba_views: bool,
151    pub can_query_all_views: bool,
152    pub can_use_dbms_metadata: bool,
153    pub can_read_source: bool,
154    pub plscope_enabled: bool,
155    pub can_query_scheduler: bool,
156    pub can_query_roles_and_grants: bool,
157    pub warnings: Vec<CapabilityWarning>,
158}
159
160#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
161pub struct CapabilityWarning {
162    pub code: String,
163    pub message: String,
164    pub remediation: Option<String>,
165}
166
167#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
168pub struct CatalogSnapshot {
169    pub schemas: HashMap<SchemaName, SchemaCatalog>,
170    pub profile: AnalysisProfile,
171    pub capabilities: CatalogCapabilities,
172    pub generated_at: DateTime<Utc>,
173    pub source: CatalogSource,
174    pub interner: SymbolInterner,
175    /// Database-wide edition tree from `ALL_EDITIONS`. Empty when EBR
176    /// is not in use.
177    #[serde(default)]
178    pub editions: Vec<Edition>,
179    /// Set of database usernames observed from `ALL_USERS`, used during
180    /// live extraction to discriminate an object-privilege grantee
181    /// (`ALL_TAB_PRIVS.GRANTEE`) between a real user and a database role —
182    /// Oracle's `ALL_TAB_PRIVS` carries no user/role discriminator column.
183    ///
184    /// `None` means the username set was never loaded (the `ALL_USERS`
185    /// probe was not run or failed); in that state grantee classification
186    /// is *undetermined* and, honoring R13, the extractor must NOT
187    /// silently assume a direct (high-confidence) user grant. `Some(_)`
188    /// (even when empty) means the set was loaded and is authoritative for
189    /// the schemas under analysis.
190    ///
191    /// This is transient extraction state: it is never serialized, because
192    /// the resulting `Grantee` discrimination is already baked into each
193    /// persisted `Grant`. JSON snapshots therefore round-trip unchanged.
194    #[serde(default, skip)]
195    pub known_users: Option<HashSet<UserName>>,
196}
197
198impl CatalogSnapshot {
199    #[must_use]
200    #[instrument(level = "trace", skip(profile, capabilities, source))]
201    pub fn new(
202        profile: AnalysisProfile,
203        capabilities: CatalogCapabilities,
204        source: CatalogSource,
205        generated_at: DateTime<Utc>,
206    ) -> Self {
207        Self {
208            schemas: HashMap::new(),
209            profile,
210            capabilities,
211            generated_at,
212            source,
213            interner: SymbolInterner::new(),
214            editions: Vec::new(),
215            known_users: None,
216        }
217    }
218
219    /// Intern `text` as a [`UserName`] without changing classification
220    /// state. Mirrors [`SymbolInterner::intern_user_name`] but routes
221    /// through this snapshot's interner.
222    #[must_use]
223    #[instrument(level = "trace", skip(self, text))]
224    pub fn intern_user_name(&mut self, text: impl Into<String>) -> Option<UserName> {
225        self.interner.intern_user_name(text)
226    }
227
228    /// Intern `text` as a [`RoleName`] through this snapshot's interner.
229    #[must_use]
230    #[instrument(level = "trace", skip(self, text))]
231    pub fn intern_role_name(&mut self, text: impl Into<String>) -> Option<RoleName> {
232        self.interner.intern_role_name(text)
233    }
234
235    #[must_use]
236    #[instrument(level = "trace", skip(self, text))]
237    pub fn intern_schema_name(&mut self, text: impl Into<String>) -> Option<SchemaName> {
238        self.interner.intern_schema_name(text)
239    }
240
241    #[must_use]
242    #[instrument(level = "trace", skip(self, text))]
243    pub fn intern_object_name(&mut self, text: impl Into<String>) -> Option<ObjectName> {
244        self.interner.intern(text).map(ObjectName::from)
245    }
246
247    #[must_use]
248    #[instrument(level = "trace", skip(self, text))]
249    pub fn intern_column_name(&mut self, text: impl Into<String>) -> Option<ColumnName> {
250        self.interner.intern(text).map(ColumnName::from)
251    }
252
253    #[must_use]
254    #[instrument(level = "trace", skip(self, text))]
255    pub fn intern_member_name(&mut self, text: impl Into<String>) -> Option<MemberName> {
256        self.interner.intern(text).map(MemberName::from)
257    }
258
259    #[must_use]
260    #[instrument(level = "trace", skip(self, text))]
261    pub fn intern_synonym_name(&mut self, text: impl Into<String>) -> Option<SynonymName> {
262        self.interner.intern(text).map(SynonymName::from)
263    }
264
265    #[must_use]
266    #[instrument(level = "trace", skip(self, text))]
267    pub fn intern_index_name(&mut self, text: impl Into<String>) -> Option<IndexName> {
268        self.interner.intern(text).map(IndexName::from)
269    }
270
271    #[must_use]
272    #[instrument(level = "trace", skip(self, text))]
273    pub fn intern_constraint_name(&mut self, text: impl Into<String>) -> Option<ConstraintName> {
274        self.interner.intern(text).map(ConstraintName::from)
275    }
276
277    #[must_use]
278    #[instrument(level = "trace", skip(self, text))]
279    pub fn intern_trigger_name(&mut self, text: impl Into<String>) -> Option<TriggerName> {
280        self.interner.intern(text).map(TriggerName::from)
281    }
282}
283
284/// Dictionary row family accepted by [`CatalogSnapshotBuilder`].
285///
286/// Each variant corresponds to one Oracle dictionary query shape used by the
287/// live extractor. The enum is intentionally stable and DB-free so
288/// `oraclemcp` can own live querying while this crate owns row normalization.
289#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)]
290pub enum CatalogRowSet {
291    Objects,
292    Columns,
293    Constraints,
294    Indexes,
295    Triggers,
296    Synonyms,
297    Routines,
298    RoutineArguments,
299    Views,
300    MaterializedViews,
301    Sequences,
302    TypeAttributes,
303    Users,
304    Grants,
305    DatabaseLinks,
306    TableComments,
307    ColumnComments,
308    Editions,
309    EditioningViews,
310    VpdPolicies,
311    Dependencies,
312    PlScopeAvailability,
313    PlScopeIdentifiers,
314}
315
316impl CatalogRowSet {
317    #[must_use]
318    pub fn as_str(self) -> &'static str {
319        match self {
320            Self::Objects => "objects",
321            Self::Columns => "columns",
322            Self::Constraints => "constraints",
323            Self::Indexes => "indexes",
324            Self::Triggers => "triggers",
325            Self::Synonyms => "synonyms",
326            Self::Routines => "routines",
327            Self::RoutineArguments => "routine_arguments",
328            Self::Views => "views",
329            Self::MaterializedViews => "materialized_views",
330            Self::Sequences => "sequences",
331            Self::TypeAttributes => "type_attributes",
332            Self::Users => "users",
333            Self::Grants => "grants",
334            Self::DatabaseLinks => "database_links",
335            Self::TableComments => "table_comments",
336            Self::ColumnComments => "column_comments",
337            Self::Editions => "editions",
338            Self::EditioningViews => "editioning_views",
339            Self::VpdPolicies => "vpd_policies",
340            Self::Dependencies => "dependencies",
341            Self::PlScopeAvailability => "plscope_availability",
342            Self::PlScopeIdentifiers => "plscope_identifiers",
343        }
344    }
345}
346
347/// Stable, offline builder for Oracle dictionary rows.
348///
349/// The builder accepts already-fetched [`OracleRow`] values and applies the
350/// same normalization used by the live loader. It performs no network or
351/// database I/O; callers such as `oraclemcp` own extraction and feed rows into
352/// this crate through [`CatalogRowSet`].
353///
354/// ```
355/// use chrono::Utc;
356/// use plsql_catalog::{
357///     CatalogCapabilities, CatalogRowSet, CatalogSnapshotBuilder, CatalogSource,
358///     CatalogSourceKind, ObjectType, OracleRow,
359/// };
360/// use plsql_core::AnalysisProfile;
361///
362/// fn row(columns: &[(&str, &str, Option<&str>)]) -> OracleRow {
363///     let mut row = OracleRow::default();
364///     for (name, oracle_type, value) in columns {
365///         row.insert(*name, *oracle_type, value.map(String::from));
366///     }
367///     row
368/// }
369///
370/// let mut builder = CatalogSnapshotBuilder::new(
371///     AnalysisProfile::default(),
372///     CatalogCapabilities::default(),
373///     CatalogSource {
374///         kind: CatalogSourceKind::LiveConnection,
375///         description: Some("synthetic rows from an external extractor".to_string()),
376///         ..CatalogSource::default()
377///     },
378///     Utc::now(),
379/// );
380///
381/// builder.apply_row(
382///     CatalogRowSet::Objects,
383///     &row(&[
384///         ("OWNER", "VARCHAR2(128)", Some("BILLING")),
385///         ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
386///         ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
387///         ("STATUS", "VARCHAR2(7)", Some("VALID")),
388///     ]),
389/// )?;
390///
391/// let snapshot = builder.finish()?;
392/// let report = snapshot.doctor_report();
393/// assert_eq!(report.totals.schemas_observed, 1);
394/// assert_eq!(
395///     report.object_counts.first().map(|count| count.object_type),
396///     Some(ObjectType::Table),
397/// );
398/// # Ok::<(), plsql_catalog::CatalogError>(())
399/// ```
400pub struct CatalogSnapshotBuilder {
401    snapshot: CatalogSnapshot,
402    routines: HashMap<RoutineLocator, RoutineAccumulator>,
403    plscope_tallies: HashMap<SchemaName, PlScopeTally>,
404}
405
406impl CatalogSnapshotBuilder {
407    #[must_use]
408    #[instrument(level = "trace", skip(profile, capabilities, source))]
409    pub fn new(
410        profile: AnalysisProfile,
411        capabilities: CatalogCapabilities,
412        source: CatalogSource,
413        generated_at: DateTime<Utc>,
414    ) -> Self {
415        Self::from_snapshot(CatalogSnapshot::new(
416            profile,
417            capabilities,
418            source,
419            generated_at,
420        ))
421    }
422
423    #[must_use]
424    #[instrument(level = "trace", skip(snapshot))]
425    pub fn from_snapshot(snapshot: CatalogSnapshot) -> Self {
426        Self {
427            snapshot,
428            routines: HashMap::new(),
429            plscope_tallies: HashMap::new(),
430        }
431    }
432
433    #[must_use]
434    #[instrument(level = "trace", skip(self))]
435    pub fn snapshot(&self) -> &CatalogSnapshot {
436        &self.snapshot
437    }
438
439    #[must_use]
440    #[instrument(level = "trace", skip(self))]
441    pub fn snapshot_mut(&mut self) -> &mut CatalogSnapshot {
442        &mut self.snapshot
443    }
444
445    #[instrument(level = "trace", skip(self, row), fields(row_set = row_set.as_str()))]
446    pub fn apply_row(
447        &mut self,
448        row_set: CatalogRowSet,
449        row: &OracleRow,
450    ) -> Result<&mut Self, CatalogError> {
451        match row_set {
452            CatalogRowSet::Objects => apply_object_row(&mut self.snapshot, row)?,
453            CatalogRowSet::Columns => apply_column_row(&mut self.snapshot, row)?,
454            CatalogRowSet::Constraints => apply_constraint_row(&mut self.snapshot, row)?,
455            CatalogRowSet::Indexes => apply_index_row(&mut self.snapshot, row)?,
456            CatalogRowSet::Triggers => apply_trigger_row(&mut self.snapshot, row)?,
457            CatalogRowSet::Synonyms => apply_synonym_row(&mut self.snapshot, row)?,
458            CatalogRowSet::Routines => {
459                apply_routine_row(&mut self.snapshot, row, &mut self.routines)?;
460            }
461            CatalogRowSet::RoutineArguments => {
462                apply_argument_row(&mut self.snapshot, row, &mut self.routines)?;
463            }
464            CatalogRowSet::Views => apply_view_row(&mut self.snapshot, row)?,
465            CatalogRowSet::MaterializedViews => apply_mview_row(&mut self.snapshot, row)?,
466            CatalogRowSet::Sequences => apply_sequence_row(&mut self.snapshot, row)?,
467            CatalogRowSet::TypeAttributes => apply_type_attr_row(&mut self.snapshot, row)?,
468            CatalogRowSet::Users => apply_user_row(&mut self.snapshot, row)?,
469            CatalogRowSet::Grants => apply_grant_row(&mut self.snapshot, row)?,
470            CatalogRowSet::DatabaseLinks => apply_db_link_row(&mut self.snapshot, row)?,
471            CatalogRowSet::TableComments => apply_table_comment_row(&mut self.snapshot, row)?,
472            CatalogRowSet::ColumnComments => apply_column_comment_row(&mut self.snapshot, row)?,
473            CatalogRowSet::Editions => apply_edition_row(&mut self.snapshot, row)?,
474            CatalogRowSet::EditioningViews => apply_editioning_view_row(&mut self.snapshot, row)?,
475            CatalogRowSet::VpdPolicies => apply_vpd_policy_row(&mut self.snapshot, row)?,
476            CatalogRowSet::Dependencies => apply_dependency_row(&mut self.snapshot, row)?,
477            CatalogRowSet::PlScopeAvailability => {
478                apply_plscope_availability_row(&mut self.snapshot, row, &mut self.plscope_tallies)?;
479            }
480            CatalogRowSet::PlScopeIdentifiers => {
481                apply_plscope_identifier_row(&mut self.snapshot, row)?;
482            }
483        }
484        Ok(self)
485    }
486
487    #[instrument(level = "trace", skip(self, rows), fields(row_set = row_set.as_str()))]
488    pub fn apply_rows<'a, I>(
489        &mut self,
490        row_set: CatalogRowSet,
491        rows: I,
492    ) -> Result<&mut Self, CatalogError>
493    where
494        I: IntoIterator<Item = &'a OracleRow>,
495    {
496        if row_set.eq(&CatalogRowSet::Users) {
497            self.snapshot.known_users.get_or_insert_with(HashSet::new);
498        }
499        for row in rows {
500            self.apply_row(row_set, row)?;
501        }
502        Ok(self)
503    }
504
505    #[instrument(level = "trace", skip(self))]
506    pub fn finish(mut self) -> Result<CatalogSnapshot, CatalogError> {
507        let routines = std::mem::take(&mut self.routines);
508        finalize_routines(&mut self.snapshot, routines)?;
509        let plscope_tallies = std::mem::take(&mut self.plscope_tallies);
510        finalize_plscope_availability(&mut self.snapshot, plscope_tallies);
511        Ok(self.snapshot)
512    }
513}
514
515impl Default for CatalogSnapshotBuilder {
516    fn default() -> Self {
517        Self::new(
518            AnalysisProfile::default(),
519            CatalogCapabilities::default(),
520            CatalogSource::default(),
521            Utc::now(),
522        )
523    }
524}
525
526pub const CATALOG_SNAPSHOT_SCHEMA_ID: &str = "plsql.catalog.snapshot";
527pub const CATALOG_SNAPSHOT_SCHEMA_VERSION: SchemaVersion = SchemaVersion::new(1, 1, 0);
528
529pub const CATALOG_DOCTOR_SCHEMA_ID: &str = "plsql.catalog.doctor";
530pub const CATALOG_DOCTOR_SCHEMA_VERSION: SchemaVersion = SchemaVersion::new(1, 0, 0);
531
532/// Per-`ObjectType` count tile shown in the doctor report.
533#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
534pub struct DoctorObjectCount {
535    pub object_type: ObjectType,
536    pub total: usize,
537    pub valid: usize,
538    pub invalid: usize,
539    pub other: usize,
540}
541
542/// Summary of how many catalog rows landed per family and how many
543/// schema-scoped buckets are populated.
544#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
545pub struct DoctorExtractionTotals {
546    pub schemas_observed: usize,
547    pub objects_total: usize,
548    pub columns_total: usize,
549    pub indexes_total: usize,
550    pub constraints_total: usize,
551    pub triggers_total: usize,
552    pub synonyms_total: usize,
553    pub grants_total: usize,
554    pub dependencies_total: usize,
555}
556
557/// Doctor-flagged missing privilege: the `plsql-catalog` driver could not
558/// observe an Oracle dictionary view that some upstream features require.
559#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
560pub struct MissingPermissionReport {
561    pub view_name: String,
562    pub required_for: Vec<String>,
563    pub suggested_grant: String,
564}
565
566/// Structured doctor report for a `CatalogSnapshot`.
567///
568/// Consumers (`plsql catalog doctor --robot-json`, `plsql-mcp` foundation
569/// tools, and the planned `plsql doctor` umbrella surface) can render the
570/// report directly or wrap it in a `RobotJsonEnvelope` for stable,
571/// versioned output.
572#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
573pub struct CatalogDoctorReport {
574    /// Identifier of the snapshot's origin (`live extraction via ...` or the
575    /// JSON snapshot path).
576    pub source_description: String,
577    pub source_kind: CatalogSourceKind,
578    pub generated_at: Option<DateTime<Utc>>,
579    pub totals: DoctorExtractionTotals,
580    pub object_counts: Vec<DoctorObjectCount>,
581    pub capability_warnings: Vec<CapabilityWarning>,
582    pub missing_permissions: Vec<MissingPermissionReport>,
583    /// Per-schema PL/Scope availability. Empty when the snapshot has no
584    /// PL/Scope detection wired.
585    pub plscope_availability_per_schema: Vec<PlScopeAvailabilityRow>,
586    /// Capability-bit copy for downstream consumers that don't want to read
587    /// the full `CatalogSnapshot` to learn whether a query family worked.
588    pub can_query_dba_views: bool,
589    pub can_query_all_views: bool,
590    pub can_use_dbms_metadata: bool,
591    pub can_read_source: bool,
592    pub plscope_enabled: bool,
593    pub can_query_scheduler: bool,
594    pub can_query_roles_and_grants: bool,
595}
596
597/// One row of the doctor report's per-schema PL/Scope availability summary.
598/// The `schema_name` is rendered through the snapshot's `SymbolInterner` so
599/// the report is stable across JSON snapshots and live extractions.
600#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
601pub struct PlScopeAvailabilityRow {
602    pub schema_name: String,
603    pub availability: PlScopeAvailability,
604}
605
606impl CatalogSnapshot {
607    /// Build the doctor report directly from this snapshot.
608    ///
609    /// The doctor is read-only — it never queries the DB itself; it
610    /// summarizes what was already extracted into the snapshot plus any
611    /// `CapabilityWarning`s the loader recorded. Missing-permission diagnoses
612    /// are inferred from per-family capability bits so the report is
613    /// equally useful for live-extracted and JSON-loaded snapshots.
614    #[must_use]
615    #[instrument(level = "trace", skip(self))]
616    pub fn doctor_report(&self) -> CatalogDoctorReport {
617        let mut counts: BTreeMap<ObjectType, DoctorObjectCount> = BTreeMap::new();
618        let mut columns_total = 0usize;
619        let mut indexes_total = 0usize;
620        let mut constraints_total = 0usize;
621        let mut triggers_total = 0usize;
622        let mut synonyms_total = 0usize;
623        let mut grants_total = 0usize;
624        let mut dependencies_total = 0usize;
625        let mut objects_total = 0usize;
626
627        for schema_catalog in self.schemas.values() {
628            for object in schema_catalog.objects.values() {
629                let common = catalog_object_common(object);
630                let tile = counts
631                    .entry(common.object_type)
632                    .or_insert(DoctorObjectCount {
633                        object_type: common.object_type,
634                        ..DoctorObjectCount::default()
635                    });
636                tile.total = tile.total.saturating_add(1);
637                match common.status {
638                    ObjectStatus::Valid => {
639                        tile.valid = tile.valid.saturating_add(1);
640                    }
641                    ObjectStatus::Invalid => {
642                        tile.invalid = tile.invalid.saturating_add(1);
643                    }
644                    ObjectStatus::NotApplicable => {
645                        tile.other = tile.other.saturating_add(1);
646                    }
647                }
648                objects_total = objects_total.saturating_add(1);
649
650                columns_total = columns_total.saturating_add(catalog_object_column_count(object));
651            }
652            indexes_total = indexes_total.saturating_add(schema_catalog.indexes.len());
653            constraints_total = constraints_total.saturating_add(schema_catalog.constraints.len());
654            triggers_total = triggers_total.saturating_add(schema_catalog.triggers.len());
655            synonyms_total = synonyms_total.saturating_add(schema_catalog.synonyms.len());
656            grants_total = grants_total.saturating_add(schema_catalog.grants.len());
657            dependencies_total =
658                dependencies_total.saturating_add(schema_catalog.dependencies.len());
659        }
660
661        let totals = DoctorExtractionTotals {
662            schemas_observed: self.schemas.len(),
663            objects_total,
664            columns_total,
665            indexes_total,
666            constraints_total,
667            triggers_total,
668            synonyms_total,
669            grants_total,
670            dependencies_total,
671        };
672
673        let mut object_counts: Vec<DoctorObjectCount> = counts.into_values().collect();
674        object_counts.sort_by_key(|tile| std::cmp::Reverse(tile.total));
675
676        let missing_permissions =
677            derive_missing_permission_reports(&self.capabilities, &self.source);
678
679        let mut plscope_availability_per_schema: Vec<PlScopeAvailabilityRow> = self
680            .schemas
681            .iter()
682            .filter_map(|(owner, schema_catalog)| {
683                let availability = schema_catalog.plscope.as_ref()?.availability;
684                let schema_name = self.interner.resolve(owner.symbol())?.to_string();
685                Some(PlScopeAvailabilityRow {
686                    schema_name,
687                    availability,
688                })
689            })
690            .collect();
691        plscope_availability_per_schema.sort_by(|a, b| a.schema_name.cmp(&b.schema_name));
692
693        CatalogDoctorReport {
694            source_description: self.source.description.clone().unwrap_or_default(),
695            source_kind: self.source.kind,
696            generated_at: Some(self.generated_at),
697            totals,
698            object_counts,
699            capability_warnings: self.capabilities.warnings.clone(),
700            missing_permissions,
701            plscope_availability_per_schema,
702            can_query_dba_views: self.capabilities.can_query_dba_views,
703            can_query_all_views: self.capabilities.can_query_all_views,
704            can_use_dbms_metadata: self.capabilities.can_use_dbms_metadata,
705            can_read_source: self.capabilities.can_read_source,
706            plscope_enabled: self.capabilities.plscope_enabled,
707            can_query_scheduler: self.capabilities.can_query_scheduler,
708            can_query_roles_and_grants: self.capabilities.can_query_roles_and_grants,
709        }
710    }
711}
712
713fn catalog_object_common(object: &CatalogObject) -> &ObjectCommon {
714    match object {
715        CatalogObject::Table(metadata) => &metadata.common,
716        CatalogObject::View(metadata) => &metadata.common,
717        CatalogObject::MaterializedView(metadata) => &metadata.common,
718        CatalogObject::Sequence(metadata) => &metadata.common,
719        CatalogObject::Type(metadata) => &metadata.common,
720        CatalogObject::Package(metadata) => &metadata.common,
721        CatalogObject::Procedure(metadata) => &metadata.common,
722        CatalogObject::Function(metadata) => &metadata.common,
723        CatalogObject::Trigger(metadata) => &metadata.common,
724        CatalogObject::SchedulerJob(metadata) => &metadata.common,
725        CatalogObject::EditioningView(metadata) => &metadata.common,
726    }
727}
728
729fn catalog_object_column_count(object: &CatalogObject) -> usize {
730    match object {
731        CatalogObject::Table(metadata) => metadata.columns.len(),
732        CatalogObject::View(metadata) => metadata.columns.len(),
733        CatalogObject::MaterializedView(metadata) => metadata.columns.len(),
734        CatalogObject::EditioningView(metadata) => metadata.columns.len(),
735        CatalogObject::Sequence(_)
736        | CatalogObject::Type(_)
737        | CatalogObject::Package(_)
738        | CatalogObject::Procedure(_)
739        | CatalogObject::Function(_)
740        | CatalogObject::Trigger(_)
741        | CatalogObject::SchedulerJob(_) => 0,
742    }
743}
744
745fn derive_missing_permission_reports(
746    capabilities: &CatalogCapabilities,
747    source: &CatalogSource,
748) -> Vec<MissingPermissionReport> {
749    // Missing-permission diagnoses only make sense for live extractions. A
750    // JSON snapshot was already produced once — its capability bits reflect
751    // the original extraction; we surface them verbatim instead of inventing
752    // new grant suggestions.
753    if !matches!(source.kind, CatalogSourceKind::LiveConnection) {
754        return Vec::new();
755    }
756
757    let mut reports = Vec::new();
758    if !capabilities.can_query_dba_views {
759        reports.push(MissingPermissionReport {
760            view_name: String::from("DBA_OBJECTS / DBA_TAB_COLUMNS / DBA_DEPENDENCIES"),
761            required_for: vec![
762                String::from("cross-schema extraction beyond ALL_*"),
763                String::from("PLSQL-CAT-014 dependency reachability over schemas"),
764            ],
765            suggested_grant: String::from(
766                "grant select_catalog_role to <user>; -- or individual grants on DBA_* views",
767            ),
768        });
769    }
770    if !capabilities.can_use_dbms_metadata {
771        reports.push(MissingPermissionReport {
772            view_name: String::from("DBMS_METADATA"),
773            required_for: vec![
774                String::from("PLSQL-CAT-015 DBMS_METADATA.GET_DDL extraction"),
775                String::from("normalized DDL hashes for `what-breaks`"),
776            ],
777            suggested_grant: String::from("grant execute on DBMS_METADATA to <user>;"),
778        });
779    }
780    if !capabilities.can_read_source {
781        reports.push(MissingPermissionReport {
782            view_name: String::from("ALL_SOURCE / DBA_SOURCE"),
783            required_for: vec![
784                String::from("packaged routine body inspection"),
785                String::from("get_object_source MCP tool"),
786            ],
787            suggested_grant: String::from(
788                "grant select on ALL_SOURCE to <user>; -- ALL_SOURCE itself is normally readable; ensure no DROP/REVOKE narrowed it",
789            ),
790        });
791    }
792    if !capabilities.plscope_enabled {
793        reports.push(MissingPermissionReport {
794            view_name: String::from("PLSCOPE_SETTINGS / ALL_IDENTIFIERS"),
795            required_for: vec![
796                String::from("PLSQL-CAT-010 PL/Scope availability detection"),
797                String::from("PLSQL-CAT-011 identifier extraction"),
798            ],
799            suggested_grant: String::from(
800                "alter session set plscope_settings = 'identifiers:all'; -- and recompile target objects",
801            ),
802        });
803    }
804    if !capabilities.can_query_scheduler {
805        reports.push(MissingPermissionReport {
806            view_name: String::from("ALL_SCHEDULER_JOBS / ALL_SCHEDULER_PROGRAMS"),
807            required_for: vec![String::from("scheduler job lineage edges")],
808            suggested_grant: String::from(
809                "grant select on ALL_SCHEDULER_JOBS to <user>; grant select on ALL_SCHEDULER_PROGRAMS to <user>;",
810            ),
811        });
812    }
813    if !capabilities.can_query_roles_and_grants {
814        reports.push(MissingPermissionReport {
815            view_name: String::from("DBA_ROLE_PRIVS / DBA_SYS_PRIVS / DBA_TAB_PRIVS"),
816            required_for: vec![
817                String::from("definer-rights privilege chain analysis"),
818                String::from("role-mediated execution evidence (PRIVILEGES-* beads)"),
819            ],
820            suggested_grant: String::from(
821                "grant select_catalog_role to <user>; -- enables DBA_*_PRIVS reads",
822            ),
823        });
824    }
825    reports
826}
827
828#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
829pub struct CatalogSnapshotDocument {
830    pub schema_id: String,
831    pub schema_version: SchemaVersion,
832    pub snapshot: CatalogSnapshot,
833}
834
835impl CatalogSnapshotDocument {
836    #[must_use]
837    #[instrument(level = "trace", skip(snapshot))]
838    pub fn new(snapshot: CatalogSnapshot) -> Self {
839        Self {
840            schema_id: String::from(CATALOG_SNAPSHOT_SCHEMA_ID),
841            schema_version: CATALOG_SNAPSHOT_SCHEMA_VERSION,
842            snapshot,
843        }
844    }
845}
846
847#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
848pub struct CatalogLoadRequest {
849    pub schema_filters: Vec<CatalogSchemaFilter>,
850}
851
852impl CatalogLoadRequest {
853    #[must_use]
854    #[instrument(level = "trace")]
855    pub fn for_current_schema() -> Self {
856        Self {
857            schema_filters: vec![CatalogSchemaFilter::CurrentSchema],
858        }
859    }
860
861    #[must_use]
862    #[instrument(level = "trace", skip(schema_names))]
863    pub fn for_named_schemas<I, S>(schema_names: I) -> Self
864    where
865        I: IntoIterator<Item = S>,
866        S: Into<String>,
867    {
868        Self {
869            schema_filters: schema_names
870                .into_iter()
871                .map(CatalogSchemaFilter::named)
872                .collect(),
873        }
874    }
875}
876
877impl Default for CatalogLoadRequest {
878    fn default() -> Self {
879        Self::for_current_schema()
880    }
881}
882
883#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
884pub enum CatalogSchemaFilter {
885    CurrentSchema,
886    Named(String),
887}
888
889impl CatalogSchemaFilter {
890    #[must_use]
891    #[instrument(level = "trace")]
892    pub fn current_schema() -> Self {
893        Self::CurrentSchema
894    }
895
896    #[must_use]
897    #[instrument(level = "trace", skip(schema_name))]
898    pub fn named(schema_name: impl Into<String>) -> Self {
899        Self::Named(schema_name.into())
900    }
901}
902
903#[derive(Debug, Error)]
904pub enum CatalogError {
905    #[error("i/o error: {0}")]
906    Io(#[from] std::io::Error),
907    #[error("json error: {0}")]
908    Json(#[from] serde_json::Error),
909    #[error("oracle backend `{backend}` is unavailable in this build; use `{feature}`")]
910    OracleBackendNotCompiled {
911        backend: OracleBackend,
912        feature: &'static str,
913    },
914    #[error("oracle backend `{backend}` error: {message}")]
915    OracleBackendError {
916        backend: OracleBackend,
917        message: String,
918    },
919    #[error("expected {expected} row(s) but received {actual}")]
920    UnexpectedRowCount { expected: String, actual: usize },
921    #[error("required column `{column}` was missing from the query result")]
922    MissingColumn { column: String },
923    #[error("column `{column}` was null")]
924    NullColumnValue { column: String },
925    #[error("column `{column}` could not be parsed as {expected}: `{value}`")]
926    InvalidColumnValue {
927        column: String,
928        expected: &'static str,
929        value: String,
930    },
931    #[error("unsupported catalog snapshot schema {found} for {schema_id}; expected {expected}")]
932    UnsupportedSchemaVersion {
933        schema_id: String,
934        found: SchemaVersion,
935        expected: SchemaVersion,
936    },
937    #[error("unexpected catalog snapshot schema id `{0}`")]
938    UnexpectedSchemaId(String),
939    #[error("catalog load request could not resolve the current schema from the Oracle connection")]
940    CurrentSchemaUnavailable,
941    #[error("schema filter `{schema_name}` is invalid: schema names must not be blank")]
942    InvalidSchemaFilter { schema_name: String },
943}
944
945#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
946pub enum OracleBackend {
947    RustOracle,
948    OracleRs,
949}
950
951impl OracleBackend {
952    #[must_use]
953    #[instrument(level = "trace", skip(self))]
954    pub fn as_str(self) -> &'static str {
955        match self {
956            Self::RustOracle => "oracle",
957            Self::OracleRs => "oracle-rs",
958        }
959    }
960}
961
962impl std::fmt::Display for OracleBackend {
963    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
964        f.write_str(self.as_str())
965    }
966}
967
968#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
969pub struct OracleConnectOptions {
970    pub username: String,
971    pub password: String,
972    pub connect_string: String,
973    pub current_schema: Option<String>,
974    pub module: Option<String>,
975    pub action: Option<String>,
976    pub client_info: Option<String>,
977    pub client_identifier: Option<String>,
978}
979
980impl OracleConnectOptions {
981    #[must_use]
982    pub fn new(
983        username: impl Into<String>,
984        password: impl Into<String>,
985        connect_string: impl Into<String>,
986    ) -> Self {
987        Self {
988            username: username.into(),
989            password: password.into(),
990            connect_string: connect_string.into(),
991            current_schema: None,
992            module: None,
993            action: None,
994            client_info: None,
995            client_identifier: None,
996        }
997    }
998
999    #[must_use]
1000    pub fn with_current_schema(mut self, current_schema: impl Into<String>) -> Self {
1001        self.current_schema = Some(current_schema.into());
1002        self
1003    }
1004
1005    #[must_use]
1006    pub fn with_module(mut self, module: impl Into<String>) -> Self {
1007        self.module = Some(module.into());
1008        self
1009    }
1010
1011    #[must_use]
1012    pub fn with_action(mut self, action: impl Into<String>) -> Self {
1013        self.action = Some(action.into());
1014        self
1015    }
1016
1017    #[must_use]
1018    pub fn with_client_info(mut self, client_info: impl Into<String>) -> Self {
1019        self.client_info = Some(client_info.into());
1020        self
1021    }
1022
1023    #[must_use]
1024    pub fn with_client_identifier(mut self, client_identifier: impl Into<String>) -> Self {
1025        self.client_identifier = Some(client_identifier.into());
1026        self
1027    }
1028}
1029
1030#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1031pub enum OracleBind {
1032    String(String),
1033    I64(i64),
1034    U64(u64),
1035    Bool(bool),
1036}
1037
1038impl From<&str> for OracleBind {
1039    fn from(value: &str) -> Self {
1040        Self::String(String::from(value))
1041    }
1042}
1043
1044impl From<String> for OracleBind {
1045    fn from(value: String) -> Self {
1046        Self::String(value)
1047    }
1048}
1049
1050impl From<i32> for OracleBind {
1051    fn from(value: i32) -> Self {
1052        Self::I64(i64::from(value))
1053    }
1054}
1055
1056impl From<i64> for OracleBind {
1057    fn from(value: i64) -> Self {
1058        Self::I64(value)
1059    }
1060}
1061
1062impl From<u32> for OracleBind {
1063    fn from(value: u32) -> Self {
1064        Self::U64(u64::from(value))
1065    }
1066}
1067
1068impl From<u64> for OracleBind {
1069    fn from(value: u64) -> Self {
1070        Self::U64(value)
1071    }
1072}
1073
1074impl From<bool> for OracleBind {
1075    fn from(value: bool) -> Self {
1076        Self::Bool(value)
1077    }
1078}
1079
1080#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1081pub struct OracleCell {
1082    pub oracle_type: String,
1083    pub value: Option<String>,
1084}
1085
1086impl OracleCell {
1087    #[must_use]
1088    #[instrument(level = "trace", skip(oracle_type, value))]
1089    pub fn new(oracle_type: impl Into<String>, value: Option<String>) -> Self {
1090        Self {
1091            oracle_type: oracle_type.into(),
1092            value,
1093        }
1094    }
1095}
1096
1097#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1098pub struct OracleRow {
1099    pub columns: BTreeMap<String, OracleCell>,
1100}
1101
1102impl OracleRow {
1103    pub fn insert(
1104        &mut self,
1105        name: impl Into<String>,
1106        oracle_type: impl Into<String>,
1107        value: Option<String>,
1108    ) {
1109        self.columns.insert(
1110            name.into().to_ascii_uppercase(),
1111            OracleCell::new(oracle_type, value),
1112        );
1113    }
1114
1115    #[must_use]
1116    #[instrument(level = "trace", skip(self))]
1117    pub fn cell(&self, name: &str) -> Option<&OracleCell> {
1118        self.columns.get(&name.to_ascii_uppercase())
1119    }
1120
1121    #[must_use]
1122    #[instrument(level = "trace", skip(self))]
1123    pub fn text(&self, name: &str) -> Option<&str> {
1124        self.cell(name).and_then(|cell| cell.value.as_deref())
1125    }
1126
1127    #[instrument(level = "trace", skip(self))]
1128    pub fn require_text(&self, name: &str) -> Result<&str, CatalogError> {
1129        let Some(cell) = self.cell(name) else {
1130            return Err(CatalogError::MissingColumn {
1131                column: name.to_ascii_uppercase(),
1132            });
1133        };
1134        cell.value
1135            .as_deref()
1136            .ok_or_else(|| CatalogError::NullColumnValue {
1137                column: name.to_ascii_uppercase(),
1138            })
1139    }
1140
1141    #[instrument(level = "trace", skip(self))]
1142    pub fn parse_i64(&self, name: &str) -> Result<i64, CatalogError> {
1143        let text = self.require_text(name)?;
1144        text.parse::<i64>()
1145            .map_err(|_| CatalogError::InvalidColumnValue {
1146                column: name.to_ascii_uppercase(),
1147                expected: "i64",
1148                value: String::from(text),
1149            })
1150    }
1151
1152    #[instrument(level = "trace", skip(self))]
1153    pub fn parse_u64(&self, name: &str) -> Result<u64, CatalogError> {
1154        let text = self.require_text(name)?;
1155        text.parse::<u64>()
1156            .map_err(|_| CatalogError::InvalidColumnValue {
1157                column: name.to_ascii_uppercase(),
1158                expected: "u64",
1159                value: String::from(text),
1160            })
1161    }
1162
1163    #[instrument(level = "trace", skip(self))]
1164    pub fn parse_bool(&self, name: &str) -> Result<bool, CatalogError> {
1165        let text = self.require_text(name)?;
1166        let normalized = text.trim().to_ascii_uppercase();
1167        match normalized.as_str() {
1168            "Y" | "YES" | "TRUE" | "1" => Ok(true),
1169            "N" | "NO" | "FALSE" | "0" => Ok(false),
1170            _ => Err(CatalogError::InvalidColumnValue {
1171                column: name.to_ascii_uppercase(),
1172                expected: "bool",
1173                value: String::from(text),
1174            }),
1175        }
1176    }
1177}
1178
1179#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
1180pub struct OracleConnectionInfo {
1181    pub backend: OracleBackend,
1182    pub connect_string: String,
1183    pub current_schema: Option<String>,
1184    pub server_version: String,
1185    pub db_name: String,
1186    pub db_domain: String,
1187    pub service_name: String,
1188    pub instance_name: String,
1189    pub server_type: String,
1190    pub max_identifier_length: u32,
1191    pub max_open_cursors: u32,
1192}
1193
1194#[instrument(level = "trace")]
1195pub fn load_snapshot_from_json(path: &std::path::Path) -> Result<CatalogSnapshot, CatalogError> {
1196    let raw = fs::read_to_string(path)?;
1197    let document: CatalogSnapshotDocument = serde_json::from_str(&raw)?;
1198
1199    if !document.schema_id.as_str().eq(CATALOG_SNAPSHOT_SCHEMA_ID) {
1200        return Err(CatalogError::UnexpectedSchemaId(document.schema_id));
1201    }
1202
1203    if !matches!(
1204        document
1205            .schema_version
1206            .cmp(&CATALOG_SNAPSHOT_SCHEMA_VERSION),
1207        std::cmp::Ordering::Equal
1208    ) {
1209        return Err(CatalogError::UnsupportedSchemaVersion {
1210            schema_id: String::from(CATALOG_SNAPSHOT_SCHEMA_ID),
1211            found: document.schema_version,
1212            expected: CATALOG_SNAPSHOT_SCHEMA_VERSION,
1213        });
1214    }
1215
1216    Ok(document.snapshot)
1217}
1218
1219#[instrument(level = "trace", skip(snapshot))]
1220pub fn export_snapshot_to_json(
1221    snapshot: &CatalogSnapshot,
1222    path: &std::path::Path,
1223) -> Result<(), CatalogError> {
1224    let document = CatalogSnapshotDocument::new(snapshot.clone());
1225    let rendered = serde_json::to_string_pretty(&document)?;
1226    fs::write(path, rendered)?;
1227    Ok(())
1228}
1229
1230/// Load a catalog snapshot from a directory of DBMS_METADATA-exported .sql files.
1231///
1232/// Load a `CatalogSnapshot` by classifying every `.sql` file under `dir` as a
1233/// single top-level CREATE DDL statement (the shape `DBMS_METADATA.GET_DDL`
1234/// emits when written per-object to disk).
1235///
1236/// For each file:
1237///
1238/// * The object kind is read from the leading `CREATE …` keyword
1239///   (`TABLE` / `VIEW` / `PACKAGE` / `PROCEDURE` / `FUNCTION` /
1240///   `SEQUENCE` / `TRIGGER` / `TYPE`); statements whose keyword does
1241///   not match a known kind are skipped (graceful degradation per
1242///   R13).
1243/// * The owner schema is read from the optional `OWNER.OBJECT` prefix
1244///   on the CREATE target. Unqualified statements (no `OWNER.`
1245///   prefix) are filed under a stable `PUBLIC` schema interned through
1246///   the regular interner — never `SymbolId::new(0)`, which would
1247///   collide with whatever the first object name happens to be.
1248/// * The raw file bytes are stored verbatim on
1249///   [`ObjectCommon::ddl`] as a [`DbmsMetadataDdl`] so downstream
1250///   consumers (doc generation, lineage, the doctor's
1251///   ddl-extraction ratio) can inspect the exact source the catalog
1252///   was derived from.
1253///
1254/// This classifier is keyword-shaped and does not parse arbitrary
1255/// PL/SQL bodies — column definitions, parameter signatures, view
1256/// projections and constraint details are *not* populated. When the
1257/// full parser (Layer 1) lands, callers that need column- or
1258/// signature-level fidelity should switch to that path; the
1259/// `DbmsMetadataDdl` stored here is sufficient seed for re-parsing
1260/// on demand without re-reading the disk.
1261#[instrument(level = "info", skip_all, fields(dir = %dir.display()))]
1262pub fn load_from_dbms_metadata_dir(dir: &std::path::Path) -> Result<CatalogSnapshot, CatalogError> {
1263    if !dir.is_dir() {
1264        return Err(CatalogError::Io(std::io::Error::new(
1265            std::io::ErrorKind::NotFound,
1266            format!("not a directory: {}", dir.display()),
1267        )));
1268    }
1269
1270    let mut interner = SymbolInterner::default();
1271    let mut schemas: HashMap<SchemaName, SchemaCatalog> = HashMap::new();
1272    let mut file_count = 0usize;
1273    let mut classified_count = 0usize;
1274
1275    // Collect + sort entries so the resulting snapshot (and its
1276    // interner symbol ids) are deterministic across runs and
1277    // platforms — `read_dir` ordering is unspecified.
1278    let mut paths: Vec<std::path::PathBuf> = fs::read_dir(dir)?
1279        .filter_map(|e| e.ok().map(|e| e.path()))
1280        .filter(|p| {
1281            p.extension()
1282                .and_then(|e| e.to_str())
1283                .is_some_and(|ext| ext.eq("sql"))
1284        })
1285        .collect();
1286    paths.sort();
1287
1288    for path in paths {
1289        file_count += 1;
1290        let ddl_text = match fs::read_to_string(&path) {
1291            Ok(text) => text,
1292            Err(_) => continue,
1293        };
1294
1295        if let Some((schema, obj_name, obj)) = classify_dbms_metadata_ddl(&ddl_text, &mut interner)
1296        {
1297            let schema_catalog = schemas.entry(schema).or_default();
1298            schema_catalog.objects.insert(obj_name, obj);
1299            classified_count += 1;
1300        }
1301    }
1302
1303    tracing::info!(
1304        files = file_count,
1305        classified = classified_count,
1306        "loaded DBMS_METADATA directory"
1307    );
1308
1309    Ok(CatalogSnapshot {
1310        schemas,
1311        profile: AnalysisProfile::default(),
1312        capabilities: CatalogCapabilities {
1313            can_query_all_views: false,
1314            can_query_dba_views: false,
1315            can_use_dbms_metadata: true,
1316            can_read_source: true,
1317            plscope_enabled: false,
1318            can_query_scheduler: false,
1319            can_query_roles_and_grants: false,
1320            warnings: vec![],
1321        },
1322        generated_at: Utc::now(),
1323        source: CatalogSource {
1324            kind: CatalogSourceKind::DbmsMetadataFiles,
1325            description: Some(format!("loaded from {}", dir.display())),
1326            ..CatalogSource::default()
1327        },
1328        interner,
1329        editions: Vec::new(),
1330        // DBMS_METADATA directory loads do not query ALL_USERS; grantee
1331        // classification is not exercised on this path.
1332        known_users: None,
1333    })
1334}
1335
1336/// Default schema name used when a CREATE statement has no `OWNER.`
1337/// prefix. Interned through the regular interner so the resulting
1338/// `SchemaName` has a real, resolvable text — never a collision with
1339/// `SymbolId::new(0)`.
1340const UNQUALIFIED_DDL_SCHEMA: &str = "PUBLIC";
1341
1342/// Classify a single per-file DDL statement into a `CatalogObject`.
1343///
1344/// Returns `None` for whitespace-only / comment-only files and for
1345/// CREATE statements whose object kind keyword is not in the
1346/// known set. The DDL bytes are preserved verbatim on
1347/// [`ObjectCommon::ddl`] so downstream code can re-parse them as
1348/// fidelity improves.
1349fn classify_dbms_metadata_ddl(
1350    ddl_text: &str,
1351    interner: &mut SymbolInterner,
1352) -> Option<(SchemaName, ObjectName, CatalogObject)> {
1353    // Parse the DDL HEADER as a real token stream — never substring-match
1354    // the whole DDL. Body / comment text that mentions `TABLE` etc. used
1355    // to silently re-classify VIEWs and PROCEDUREs as tables.
1356    let header = parse_create_header(ddl_text)?;
1357
1358    // `PACKAGE BODY` / `TYPE BODY` are bodies — the spec's catalog row
1359    // is the source of truth. Honest uncertainty: return None.
1360    if matches!(
1361        header.kind,
1362        DdlKind::PackageBody | DdlKind::TypeBody | DdlKind::Unknown
1363    ) {
1364        return None;
1365    }
1366
1367    let (owner_text, object_text) = extract_owner_and_name(&header.after_kind)?;
1368
1369    let owner_text = owner_text.unwrap_or_else(|| UNQUALIFIED_DDL_SCHEMA.to_string());
1370    let owner = interner.intern_schema_name(owner_text)?;
1371    let name_sid = interner.intern(&object_text)?;
1372    let obj_name = ObjectName::new(name_sid);
1373
1374    let ddl = DbmsMetadataDdl {
1375        ddl_text: ddl_text.to_string(),
1376        normalized_ddl: Some(normalize_dbms_metadata_ddl(ddl_text)),
1377        xml_text: None,
1378    };
1379
1380    let common = ObjectCommon {
1381        owner,
1382        name: obj_name,
1383        object_type: header.kind.object_type(),
1384        ddl: Some(ddl),
1385        ..ObjectCommon::default()
1386    };
1387
1388    let object = match header.kind {
1389        DdlKind::Table => CatalogObject::Table(TableMetadata {
1390            common,
1391            ..TableMetadata::default()
1392        }),
1393        DdlKind::View => CatalogObject::View(ViewMetadata {
1394            common,
1395            ..ViewMetadata::default()
1396        }),
1397        DdlKind::MaterializedView => CatalogObject::MaterializedView(MViewMetadata {
1398            common,
1399            ..MViewMetadata::default()
1400        }),
1401        DdlKind::Package => CatalogObject::Package(PackageMetadata {
1402            common,
1403            ..PackageMetadata::default()
1404        }),
1405        DdlKind::Procedure => CatalogObject::Procedure(ProcedureMetadata {
1406            common,
1407            signature: RoutineSignature {
1408                routine_name: obj_name,
1409                ..RoutineSignature::default()
1410            },
1411        }),
1412        DdlKind::Function => CatalogObject::Function(FunctionMetadata {
1413            common,
1414            signature: RoutineSignature {
1415                routine_name: obj_name,
1416                ..RoutineSignature::default()
1417            },
1418            ..FunctionMetadata::default()
1419        }),
1420        DdlKind::Sequence => CatalogObject::Sequence(SequenceMetadata {
1421            common,
1422            ..SequenceMetadata::default()
1423        }),
1424        DdlKind::Trigger => CatalogObject::Trigger(TriggerMetadata {
1425            common,
1426            ..TriggerMetadata::default()
1427        }),
1428        DdlKind::Type => CatalogObject::Type(TypeMetadata {
1429            common,
1430            ..TypeMetadata::default()
1431        }),
1432        // Filtered above — the match is exhaustive only because we
1433        // handle every concrete kind.
1434        DdlKind::PackageBody | DdlKind::TypeBody | DdlKind::Unknown => return None,
1435    };
1436
1437    Some((owner, obj_name, object))
1438}
1439
1440/// Object kinds the per-file DDL classifier recognizes. `Unknown`
1441/// represents honest uncertainty (R13) — the header didn't tokenize
1442/// into a kind we model. `PackageBody` / `TypeBody` are recognized
1443/// separately so the classifier can skip them without confusing them
1444/// with their specs.
1445#[derive(Clone, Copy, Debug, Eq, PartialEq)]
1446enum DdlKind {
1447    Table,
1448    View,
1449    MaterializedView,
1450    Package,
1451    PackageBody,
1452    Procedure,
1453    Function,
1454    Sequence,
1455    Trigger,
1456    Type,
1457    TypeBody,
1458    Unknown,
1459}
1460
1461impl DdlKind {
1462    fn object_type(self) -> ObjectType {
1463        match self {
1464            DdlKind::Table => ObjectType::Table,
1465            DdlKind::View => ObjectType::View,
1466            DdlKind::MaterializedView => ObjectType::MaterializedView,
1467            DdlKind::Package | DdlKind::PackageBody => ObjectType::Package,
1468            DdlKind::Procedure => ObjectType::Procedure,
1469            DdlKind::Function => ObjectType::Function,
1470            DdlKind::Sequence => ObjectType::Sequence,
1471            DdlKind::Trigger => ObjectType::Trigger,
1472            DdlKind::Type | DdlKind::TypeBody => ObjectType::Type,
1473            DdlKind::Unknown => ObjectType::Unknown,
1474        }
1475    }
1476}
1477
1478/// Parsed CREATE header: the typed `DdlKind` plus the upper-cased
1479/// remainder of the DDL starting immediately after the kind tokens.
1480/// Callers use `after_kind` to locate the `[OWNER.]NAME` — it has
1481/// already been stripped of leading comments / whitespace / `CREATE`
1482/// modifiers / kind tokens so a substring match in there cannot be
1483/// fooled by body content.
1484#[derive(Clone, Debug)]
1485struct ParsedCreateHeader {
1486    kind: DdlKind,
1487    after_kind: String,
1488}
1489
1490/// Parse the CREATE header of a raw DDL string.
1491///
1492/// Skips leading whitespace, `--` line comments, and `/* … */` block
1493/// comments. Consumes `CREATE` then optional `OR REPLACE`, optional
1494/// `FORCE` / `EDITIONABLE` / `NONEDITIONABLE` (in any order), then
1495/// reads one or two tokens to form a [`DdlKind`] (multi-word kinds
1496/// `MATERIALIZED VIEW`, `PACKAGE BODY`, `TYPE BODY` handled). Returns
1497/// `None` only when the input has no `CREATE` token at all; an
1498/// unrecognized kind word produces `DdlKind::Unknown` so callers can
1499/// represent honest uncertainty (R13).
1500fn parse_create_header(ddl: &str) -> Option<ParsedCreateHeader> {
1501    let mut cursor = Cursor::new(ddl);
1502    cursor.skip_ws_and_comments();
1503
1504    // Must start with `CREATE`.
1505    if !cursor.consume_keyword("CREATE") {
1506        return None;
1507    }
1508    cursor.skip_ws_and_comments();
1509
1510    // Optional `OR REPLACE`.
1511    if cursor.consume_keyword("OR") {
1512        cursor.skip_ws_and_comments();
1513        // `OR` without `REPLACE` is malformed; let it fall through to
1514        // kind parsing — the kind word won't match and we'll honestly
1515        // return `Unknown`.
1516        let _ = cursor.consume_keyword("REPLACE");
1517        cursor.skip_ws_and_comments();
1518    }
1519
1520    // Optional `FORCE` / `EDITIONABLE` / `NONEDITIONABLE` modifiers,
1521    // any order, any subset.
1522    loop {
1523        if cursor.consume_keyword("FORCE")
1524            || cursor.consume_keyword("NONEDITIONABLE")
1525            || cursor.consume_keyword("EDITIONABLE")
1526            || cursor.consume_keyword("NO")
1527        {
1528            cursor.skip_ws_and_comments();
1529            continue;
1530        }
1531        break;
1532    }
1533
1534    // Read the kind word (one token, possibly extended to two for
1535    // `MATERIALIZED VIEW` / `PACKAGE BODY` / `TYPE BODY`).
1536    let first = match cursor.consume_identifier() {
1537        Some(tok) => tok,
1538        None => {
1539            return Some(ParsedCreateHeader {
1540                kind: DdlKind::Unknown,
1541                after_kind: cursor.upper_remainder(),
1542            });
1543        }
1544    };
1545    cursor.skip_ws_and_comments();
1546
1547    // Speculatively look at the second token without committing — only
1548    // commit if it forms a known two-word kind.
1549    let kind = match first.as_str() {
1550        "MATERIALIZED" => {
1551            if cursor.peek_keyword("VIEW") {
1552                cursor.consume_keyword("VIEW");
1553                cursor.skip_ws_and_comments();
1554                DdlKind::MaterializedView
1555            } else {
1556                DdlKind::Unknown
1557            }
1558        }
1559        "PACKAGE" => {
1560            if cursor.peek_keyword("BODY") {
1561                cursor.consume_keyword("BODY");
1562                cursor.skip_ws_and_comments();
1563                DdlKind::PackageBody
1564            } else {
1565                DdlKind::Package
1566            }
1567        }
1568        "TYPE" => {
1569            if cursor.peek_keyword("BODY") {
1570                cursor.consume_keyword("BODY");
1571                cursor.skip_ws_and_comments();
1572                DdlKind::TypeBody
1573            } else {
1574                DdlKind::Type
1575            }
1576        }
1577        "TABLE" => DdlKind::Table,
1578        "VIEW" => DdlKind::View,
1579        "PROCEDURE" => DdlKind::Procedure,
1580        "FUNCTION" => DdlKind::Function,
1581        "SEQUENCE" => DdlKind::Sequence,
1582        "TRIGGER" => DdlKind::Trigger,
1583        _ => DdlKind::Unknown,
1584    };
1585
1586    Some(ParsedCreateHeader {
1587        kind,
1588        after_kind: cursor.upper_remainder(),
1589    })
1590}
1591
1592/// Hand-rolled byte cursor for the CREATE header tokenizer.
1593///
1594/// Only knows enough about SQL to skip whitespace / `--` line
1595/// comments / `/* … */` block comments and to read alphabetic
1596/// identifier keywords case-insensitively. It deliberately does
1597/// **not** try to parse the whole DDL — every operation past the
1598/// kind word is delegated to [`extract_owner_and_name`] working on
1599/// the upper-cased remainder.
1600struct Cursor<'a> {
1601    bytes: &'a [u8],
1602    pos: usize,
1603}
1604
1605impl<'a> Cursor<'a> {
1606    fn new(text: &'a str) -> Self {
1607        Self {
1608            bytes: text.as_bytes(),
1609            pos: 0,
1610        }
1611    }
1612
1613    fn skip_ws_and_comments(&mut self) {
1614        loop {
1615            // Skip ASCII whitespace.
1616            while self.pos < self.bytes.len() && self.bytes[self.pos].is_ascii_whitespace() {
1617                self.pos += 1;
1618            }
1619            // `--` line comment.
1620            if self.pos + 1 < self.bytes.len()
1621                && self.bytes[self.pos].eq(&b'-')
1622                && self.bytes[self.pos + 1].eq(&b'-')
1623            {
1624                self.pos += 2;
1625                while self.pos < self.bytes.len() && self.bytes[self.pos].ne(&b'\n') {
1626                    self.pos += 1;
1627                }
1628                continue;
1629            }
1630            // `/* … */` block comment.
1631            if self.pos + 1 < self.bytes.len()
1632                && self.bytes[self.pos].eq(&b'/')
1633                && self.bytes[self.pos + 1].eq(&b'*')
1634            {
1635                self.pos += 2;
1636                while self.pos + 1 < self.bytes.len()
1637                    && !(self.bytes[self.pos].eq(&b'*') && self.bytes[self.pos + 1].eq(&b'/'))
1638                {
1639                    self.pos += 1;
1640                }
1641                if self.pos + 1 < self.bytes.len() {
1642                    self.pos += 2; // consume the closing `*/`
1643                } else {
1644                    self.pos = self.bytes.len(); // unterminated — end-of-input
1645                }
1646                continue;
1647            }
1648            break;
1649        }
1650    }
1651
1652    /// Returns true if the next identifier token matches `kw`
1653    /// case-insensitively (and is followed by a non-identifier
1654    /// character or end-of-input). Does not advance the cursor.
1655    fn peek_keyword(&self, kw: &str) -> bool {
1656        let end = self.pos + kw.len();
1657        if end > self.bytes.len() {
1658            return false;
1659        }
1660        if !self.bytes[self.pos..end].eq_ignore_ascii_case(kw.as_bytes()) {
1661            return false;
1662        }
1663        // Word boundary check — `CREATEDOC` must not match `CREATE`.
1664        if end < self.bytes.len() {
1665            let next = self.bytes[end];
1666            if next.eq(&b'_') || next.is_ascii_alphanumeric() {
1667                return false;
1668            }
1669        }
1670        true
1671    }
1672
1673    fn consume_keyword(&mut self, kw: &str) -> bool {
1674        if self.peek_keyword(kw) {
1675            self.pos += kw.len();
1676            true
1677        } else {
1678            false
1679        }
1680    }
1681
1682    /// Consume the next bare ASCII identifier (letters / digits /
1683    /// underscore, must start with a letter) and return it
1684    /// upper-cased. Returns `None` if the cursor is not on an
1685    /// identifier start character — e.g. a quoted identifier or a
1686    /// punctuation token. Quoted identifiers in the header position
1687    /// (the kind word) are not legal Oracle DDL so we don't bother.
1688    fn consume_identifier(&mut self) -> Option<String> {
1689        if self.pos >= self.bytes.len() {
1690            return None;
1691        }
1692        let first = self.bytes[self.pos];
1693        if !first.is_ascii_alphabetic() {
1694            return None;
1695        }
1696        let start = self.pos;
1697        while self.pos < self.bytes.len() {
1698            let b = self.bytes[self.pos];
1699            if b.is_ascii_alphanumeric() || b.eq(&b'_') {
1700                self.pos += 1;
1701            } else {
1702                break;
1703            }
1704        }
1705        let raw = std::str::from_utf8(&self.bytes[start..self.pos]).ok()?;
1706        Some(raw.to_ascii_uppercase())
1707    }
1708
1709    /// Return the rest of the input from the current cursor position,
1710    /// upper-cased. Used to hand off to [`extract_owner_and_name`].
1711    fn upper_remainder(&self) -> String {
1712        std::str::from_utf8(&self.bytes[self.pos..])
1713            .unwrap_or("")
1714            .to_ascii_uppercase()
1715    }
1716}
1717
1718/// Extract the optional `OWNER` and the bare `OBJECT` name from the
1719/// upper-cased remainder that follows the parsed `CREATE <KIND>`
1720/// header. Strips surrounding quotes (so `CREATE TABLE "HR"."EMP"`
1721/// works) and trailing punctuation / parenthesis that the column
1722/// list would attach. Operates on the post-header slice only — never
1723/// on the body — so it can't be fooled by `TABLE` appearing later.
1724fn extract_owner_and_name(after_kind: &str) -> Option<(Option<String>, String)> {
1725    let after = after_kind.trim_start();
1726
1727    // Scan the `[OWNER.]NAME` token honouring double-quoted Oracle
1728    // identifiers. A `"..."` segment is a single token that may contain
1729    // whitespace and runs to its closing `"`; an unquoted segment stops
1730    // at whitespace, `(`, `;`, or other DDL punctuation. The owner/name
1731    // split is the first top-level (outside-quotes) `.`.
1732    let mut segments: Vec<Segment> = Vec::new();
1733    let bytes = after.as_bytes();
1734    let mut i = 0usize;
1735    'scan: while i < bytes.len() {
1736        if bytes[i].eq(&b'"') {
1737            // Quoted segment: consume up to (and including) the closing `"`.
1738            let content_start = i + 1;
1739            let mut j = content_start;
1740            while j < bytes.len() && bytes[j].ne(&b'"') {
1741                j += 1;
1742            }
1743            // Unterminated quote ⇒ malformed header; give up.
1744            if j >= bytes.len() {
1745                return None;
1746            }
1747            segments.push(Segment {
1748                text: after[content_start..j].to_string(),
1749                quoted: true,
1750            });
1751            i = j + 1; // skip closing quote
1752        } else {
1753            // Unquoted run: identifier chars only. Anything else (space,
1754            // `(`, `;`, `,`, …) terminates the `[OWNER.]NAME` token —
1755            // except a top-level `.` which separates owner from name.
1756            let start = i;
1757            while i < bytes.len() {
1758                let c = bytes[i] as char;
1759                if c.is_ascii_alphanumeric() || c.eq(&'_') {
1760                    i += 1;
1761                } else {
1762                    break;
1763                }
1764            }
1765            // An empty unquoted run means we hit a non-identifier byte
1766            // that is not a segment separator: stop scanning the token.
1767            if i.eq(&start) {
1768                break 'scan;
1769            }
1770            segments.push(Segment {
1771                text: after[start..i].to_string(),
1772                quoted: false,
1773            });
1774        }
1775
1776        // After a segment, a `.` continues into the next (NAME) segment;
1777        // anything else ends the `[OWNER.]NAME` token.
1778        if i < bytes.len() && bytes[i].eq(&b'.') {
1779            i += 1;
1780        } else {
1781            break 'scan;
1782        }
1783    }
1784
1785    // Validate each segment: quoted segments accept any non-empty
1786    // content; unquoted segments must be a real identifier.
1787    let valid = |seg: &Segment| -> bool {
1788        if seg.text.is_empty() {
1789            return false;
1790        }
1791        seg.quoted || seg.text.chars().all(|c| c.is_alphanumeric() || c.eq(&'_'))
1792    };
1793
1794    match segments.as_slice() {
1795        [name] if valid(name) => Some((None, name.text.clone())),
1796        [owner, name] if valid(owner) && valid(name) => {
1797            Some((Some(owner.text.clone()), name.text.clone()))
1798        }
1799        _ => None,
1800    }
1801}
1802
1803/// One dot-delimited segment of a `[OWNER.]NAME` token, tracking whether
1804/// it originated from a double-quoted Oracle identifier (which may hold
1805/// whitespace and bypasses the unquoted identifier-char validity rule).
1806struct Segment {
1807    text: String,
1808    quoted: bool,
1809}
1810
1811#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1812enum RoutineKind {
1813    Procedure,
1814    Function,
1815}
1816
1817#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1818struct RoutineLocator {
1819    owner: SchemaName,
1820    package_name: Option<ObjectName>,
1821    routine_name: ObjectName,
1822    subprogram_id: Option<u32>,
1823    overload: Option<u32>,
1824}
1825
1826#[derive(Clone, Debug, Default)]
1827struct RoutineAccumulator {
1828    signature: Option<RoutineSignature>,
1829    kind_hint: Option<RoutineKind>,
1830    deterministic: bool,
1831    pipelined: bool,
1832}
1833
1834#[derive(Clone, Debug, Default)]
1835struct PlScopeTally {
1836    total: usize,
1837    with_identifiers: usize,
1838    with_statements: usize,
1839}
1840
1841#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
1842pub struct SchemaCatalog {
1843    pub objects: HashMap<ObjectName, CatalogObject>,
1844    pub synonyms: HashMap<SynonymName, SynonymTarget>,
1845    pub grants: Vec<Grant>,
1846    pub indexes: HashMap<IndexName, IndexMetadata>,
1847    pub constraints: HashMap<ConstraintName, ConstraintMetadata>,
1848    pub triggers: HashMap<TriggerName, TriggerMetadata>,
1849    pub dependencies: Vec<CatalogDependency>,
1850    pub plscope: Option<PlScopeSnapshot>,
1851    /// Database links owned by this schema. Public links live in the
1852    /// synthetic `PUBLIC` schema. Sourced from `ALL_DB_LINKS`.
1853    #[serde(default)]
1854    pub db_links: Vec<DatabaseLink>,
1855    /// Per-object COMMENT ON TABLE / VIEW text. Sourced from
1856    /// `ALL_TAB_COMMENTS`.
1857    #[serde(default)]
1858    pub table_comments: Vec<TableComment>,
1859    /// Per-column COMMENT ON COLUMN text. Sourced from
1860    /// `ALL_COL_COMMENTS`.
1861    #[serde(default)]
1862    pub column_comments: Vec<ColumnComment>,
1863    /// Editioning views owned by this schema (the views that mask the
1864    /// underlying base table in an EBR shop). Sourced from
1865    /// `ALL_EDITIONING_VIEWS`.
1866    #[serde(default)]
1867    pub editioning_views: Vec<EditioningView>,
1868    /// VPD/RLS policies attached to objects in this schema. Sourced
1869    /// from `ALL_POLICIES`.
1870    #[serde(default)]
1871    pub vpd_policies: Vec<VpdPolicy>,
1872}
1873
1874/// Normalize DDL text emitted by `DBMS_METADATA.GET_DDL` so equality checks
1875/// across runs ignore cosmetic differences:
1876///
1877/// - Trim leading + trailing whitespace.
1878/// - Collapse runs of whitespace inside the body to a single space (newlines
1879///   are preserved as-is so the result remains readable).
1880/// - Strip the trailing `/` SQL*Plus terminator if present.
1881#[must_use]
1882pub fn normalize_dbms_metadata_ddl(text: &str) -> String {
1883    let trimmed = text.trim();
1884    let trimmed = trimmed.strip_suffix('/').unwrap_or(trimmed).trim_end();
1885    let mut normalized = String::with_capacity(trimmed.len());
1886    let mut prev_space = false;
1887    for c in trimmed.chars() {
1888        if c.eq(&' ') || c.eq(&'\t') {
1889            if !prev_space {
1890                normalized.push(' ');
1891                prev_space = true;
1892            }
1893        } else {
1894            normalized.push(c);
1895            prev_space = false;
1896        }
1897    }
1898    normalized
1899}
1900
1901/// Map an `ObjectType` to the string the `DBMS_METADATA.GET_DDL` /
1902/// `GET_XML` overloads expect as their first parameter. Returns `None` for
1903/// types that have no DBMS_METADATA representation (e.g.
1904/// `ObjectType::Unknown`, `ObjectType::Constraint`).
1905#[must_use]
1906pub fn object_type_to_dbms_metadata_value(object_type: ObjectType) -> Option<&'static str> {
1907    match object_type {
1908        ObjectType::Table => Some("TABLE"),
1909        ObjectType::View => Some("VIEW"),
1910        ObjectType::MaterializedView => Some("MATERIALIZED_VIEW"),
1911        ObjectType::Sequence => Some("SEQUENCE"),
1912        ObjectType::Type => Some("TYPE"),
1913        ObjectType::Package => Some("PACKAGE"),
1914        ObjectType::Procedure => Some("PROCEDURE"),
1915        ObjectType::Function => Some("FUNCTION"),
1916        ObjectType::Trigger => Some("TRIGGER"),
1917        ObjectType::EditioningView => Some("VIEW"),
1918        ObjectType::SchedulerJob => Some("PROCOBJ"),
1919        ObjectType::Synonym => Some("SYNONYM"),
1920        ObjectType::Index => Some("INDEX"),
1921        ObjectType::Constraint | ObjectType::Unknown => None,
1922    }
1923}
1924
1925fn hash_text(text: &str) -> Hash {
1926    use sha2::{Digest as _, Sha256};
1927    let mut hasher = Sha256::new();
1928    hasher.update(text.as_bytes());
1929    // sha2 0.11+ returns `Array<u8, …>` from `finalize` which no
1930    // longer impls `LowerHex` directly; render byte-by-byte (matches
1931    // the plsql-store pattern). Keeps the bump from being a breaking
1932    // change for callers.
1933    let digest = hasher.finalize();
1934    let mut rendered = String::with_capacity(7 + digest.len() * 2);
1935    rendered.push_str("sha256:");
1936    for byte in digest {
1937        rendered.push_str(&format!("{byte:02x}"));
1938    }
1939    Hash::new(rendered)
1940}
1941
1942fn apply_object_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
1943    let owner_text = row.require_text("OWNER")?;
1944    let object_name_text = row.require_text("OBJECT_NAME")?;
1945    let object_type_text = row.require_text("OBJECT_TYPE")?;
1946    let Some(object_type) = object_type_from_dictionary_value(object_type_text) else {
1947        return Ok(());
1948    };
1949
1950    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
1951        return Err(CatalogError::InvalidColumnValue {
1952            column: String::from("OWNER"),
1953            expected: "interned schema name",
1954            value: String::from(owner_text),
1955        });
1956    };
1957    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
1958        return Err(CatalogError::InvalidColumnValue {
1959            column: String::from("OBJECT_NAME"),
1960            expected: "interned object name",
1961            value: String::from(object_name_text),
1962        });
1963    };
1964
1965    let last_ddl_time =
1966        optional_nonblank_text(row, "LAST_DDL_TIME_ISO").and_then(parse_dictionary_timestamp);
1967    let editionable = optional_bool(row, "EDITIONABLE")?;
1968    let edition_name = optional_nonblank_text(row, "EDITION_NAME")
1969        .map(|value| {
1970            snapshot
1971                .interner
1972                .intern(value)
1973                .map(EditionName::from)
1974                .ok_or(CatalogError::InvalidColumnValue {
1975                    column: String::from("EDITION_NAME"),
1976                    expected: "interned edition name",
1977                    value: String::from(value),
1978                })
1979        })
1980        .transpose()?;
1981
1982    let common = ObjectCommon {
1983        owner,
1984        name: object_name,
1985        object_type,
1986        status: row
1987            .text("STATUS")
1988            .map(object_status_from_dictionary_value)
1989            .unwrap_or_default(),
1990        edition_name,
1991        editionable,
1992        last_ddl_time,
1993        ..ObjectCommon::default()
1994    };
1995
1996    let Some(catalog_object) = blank_catalog_object(common) else {
1997        return Ok(());
1998    };
1999
2000    snapshot
2001        .schemas
2002        .entry(owner)
2003        .or_default()
2004        .objects
2005        .insert(object_name, catalog_object);
2006
2007    Ok(())
2008}
2009
2010fn apply_dependency_row(
2011    snapshot: &mut CatalogSnapshot,
2012    row: &OracleRow,
2013) -> Result<(), CatalogError> {
2014    let owner_text = row.require_text("OWNER")?;
2015    let name_text = row.require_text("NAME")?;
2016    let referenced_owner_text = row.require_text("REFERENCED_OWNER")?;
2017    let referenced_name_text = row.require_text("REFERENCED_NAME")?;
2018
2019    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2020        return Err(CatalogError::InvalidColumnValue {
2021            column: String::from("OWNER"),
2022            expected: "interned schema name",
2023            value: String::from(owner_text),
2024        });
2025    };
2026    let Some(object_name) = snapshot.intern_object_name(name_text) else {
2027        return Err(CatalogError::InvalidColumnValue {
2028            column: String::from("NAME"),
2029            expected: "interned object name",
2030            value: String::from(name_text),
2031        });
2032    };
2033    let Some(referenced_owner) = snapshot.intern_schema_name(referenced_owner_text) else {
2034        return Err(CatalogError::InvalidColumnValue {
2035            column: String::from("REFERENCED_OWNER"),
2036            expected: "interned schema name",
2037            value: String::from(referenced_owner_text),
2038        });
2039    };
2040    let Some(referenced_name) = snapshot.intern_object_name(referenced_name_text) else {
2041        return Err(CatalogError::InvalidColumnValue {
2042            column: String::from("REFERENCED_NAME"),
2043            expected: "interned object name",
2044            value: String::from(referenced_name_text),
2045        });
2046    };
2047
2048    let object_type = optional_nonblank_text(row, "TYPE")
2049        .and_then(object_type_from_dictionary_value)
2050        .unwrap_or_default();
2051    let referenced_type =
2052        optional_nonblank_text(row, "REFERENCED_TYPE").and_then(object_type_from_dictionary_value);
2053
2054    let dependency = CatalogDependency {
2055        owner,
2056        name: object_name,
2057        object_type,
2058        referenced_owner: Some(referenced_owner),
2059        referenced_name,
2060        referenced_type,
2061        dependency_kind: optional_nonblank_text(row, "DEPENDENCY_TYPE")
2062            .map(catalog_dependency_kind_from_dictionary_value)
2063            .unwrap_or_default(),
2064        via_db_link: None,
2065    };
2066
2067    snapshot
2068        .schemas
2069        .entry(owner)
2070        .or_default()
2071        .dependencies
2072        .push(dependency);
2073
2074    Ok(())
2075}
2076
2077fn parse_dictionary_timestamp(text: &str) -> Option<DateTime<Utc>> {
2078    // Expected shape from the loader query: `YYYY-MM-DD"T"HH24:MI:SS`.
2079    chrono::NaiveDateTime::parse_from_str(text, "%Y-%m-%dT%H:%M:%S")
2080        .ok()
2081        .map(|naive| DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc))
2082}
2083
2084fn catalog_dependency_kind_from_dictionary_value(text: &str) -> CatalogDependencyKind {
2085    match text.to_ascii_uppercase().as_str() {
2086        "HARD" => CatalogDependencyKind::Hard,
2087        "REF" => CatalogDependencyKind::Reference,
2088        "EXTENDED" => CatalogDependencyKind::Extended,
2089        _ => CatalogDependencyKind::default(),
2090    }
2091}
2092
2093fn apply_column_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2094    let owner_text = row.require_text("OWNER")?;
2095    let table_name_text = row.require_text("TABLE_NAME")?;
2096    let column_name_text = row.require_text("COLUMN_NAME")?;
2097
2098    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2099        return Err(CatalogError::InvalidColumnValue {
2100            column: String::from("OWNER"),
2101            expected: "interned schema name",
2102            value: String::from(owner_text),
2103        });
2104    };
2105    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2106        return Err(CatalogError::InvalidColumnValue {
2107            column: String::from("TABLE_NAME"),
2108            expected: "interned object name",
2109            value: String::from(table_name_text),
2110        });
2111    };
2112    let Some(column_name) = snapshot.intern_column_name(column_name_text) else {
2113        return Err(CatalogError::InvalidColumnValue {
2114            column: String::from("COLUMN_NAME"),
2115            expected: "interned column name",
2116            value: String::from(column_name_text),
2117        });
2118    };
2119    let data_type = data_type_ref_from_row(snapshot, row)?;
2120
2121    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2122        return Ok(());
2123    };
2124    let Some(catalog_object) = schema_catalog.objects.get_mut(&table_name) else {
2125        return Ok(());
2126    };
2127
2128    let default_expression = row
2129        .text("DATA_DEFAULT_VC")
2130        .map(String::from)
2131        .filter(|value| !value.trim().is_empty());
2132    let virtual_column = optional_bool(row, "VIRTUAL_COLUMN")?.unwrap_or(false);
2133    let column = ColumnMetadata {
2134        name: column_name,
2135        position: required_u32(row, "COLUMN_POSITION")?,
2136        data_type,
2137        nullable: optional_bool(row, "NULLABLE")?.unwrap_or(false),
2138        default_expression: if virtual_column {
2139            None
2140        } else {
2141            default_expression.clone()
2142        },
2143        generated_expression: if virtual_column {
2144            default_expression
2145        } else {
2146            None
2147        },
2148        hidden: optional_bool(row, "HIDDEN_COLUMN")?.unwrap_or(false),
2149    };
2150
2151    match catalog_object {
2152        CatalogObject::Table(metadata) => {
2153            metadata.columns.insert(column.name, column);
2154        }
2155        CatalogObject::View(metadata) => {
2156            metadata.columns.insert(column.name, column);
2157        }
2158        CatalogObject::MaterializedView(metadata) => {
2159            metadata.columns.insert(column.name, column);
2160        }
2161        CatalogObject::EditioningView(metadata) => {
2162            metadata.columns.insert(column.name, column);
2163        }
2164        CatalogObject::Sequence(_)
2165        | CatalogObject::Type(_)
2166        | CatalogObject::Package(_)
2167        | CatalogObject::Procedure(_)
2168        | CatalogObject::Function(_)
2169        | CatalogObject::Trigger(_)
2170        | CatalogObject::SchedulerJob(_) => {}
2171    }
2172
2173    Ok(())
2174}
2175
2176fn apply_constraint_row(
2177    snapshot: &mut CatalogSnapshot,
2178    row: &OracleRow,
2179) -> Result<(), CatalogError> {
2180    let owner_text = row.require_text("OWNER")?;
2181    let constraint_name_text = row.require_text("CONSTRAINT_NAME")?;
2182    let table_name_text = row.require_text("TABLE_NAME")?;
2183    let search_condition = optional_nonblank_text(row, "SEARCH_CONDITION_VC").map(String::from);
2184
2185    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2186        return Err(CatalogError::InvalidColumnValue {
2187            column: String::from("OWNER"),
2188            expected: "interned schema name",
2189            value: String::from(owner_text),
2190        });
2191    };
2192    let Some(constraint_name) = snapshot.intern_constraint_name(constraint_name_text) else {
2193        return Err(CatalogError::InvalidColumnValue {
2194            column: String::from("CONSTRAINT_NAME"),
2195            expected: "interned constraint name",
2196            value: String::from(constraint_name_text),
2197        });
2198    };
2199    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2200        return Err(CatalogError::InvalidColumnValue {
2201            column: String::from("TABLE_NAME"),
2202            expected: "interned object name",
2203            value: String::from(table_name_text),
2204        });
2205    };
2206    let referenced_table_owner = optional_nonblank_text(row, "REFERENCED_TABLE_OWNER")
2207        .map(|value| {
2208            snapshot
2209                .intern_schema_name(value)
2210                .ok_or(CatalogError::InvalidColumnValue {
2211                    column: String::from("REFERENCED_TABLE_OWNER"),
2212                    expected: "interned schema name",
2213                    value: String::from(value),
2214                })
2215        })
2216        .transpose()?;
2217    let referenced_table_name = optional_nonblank_text(row, "REFERENCED_TABLE_NAME")
2218        .map(|value| {
2219            snapshot
2220                .intern_object_name(value)
2221                .ok_or(CatalogError::InvalidColumnValue {
2222                    column: String::from("REFERENCED_TABLE_NAME"),
2223                    expected: "interned object name",
2224                    value: String::from(value),
2225                })
2226        })
2227        .transpose()?;
2228    let child_column = optional_nonblank_text(row, "COLUMN_NAME")
2229        .map(|value| {
2230            snapshot
2231                .intern_column_name(value)
2232                .ok_or(CatalogError::InvalidColumnValue {
2233                    column: String::from("COLUMN_NAME"),
2234                    expected: "interned column name",
2235                    value: String::from(value),
2236                })
2237        })
2238        .transpose()?;
2239    let referenced_column = optional_nonblank_text(row, "REFERENCED_COLUMN_NAME")
2240        .map(|value| {
2241            snapshot
2242                .intern_column_name(value)
2243                .ok_or(CatalogError::InvalidColumnValue {
2244                    column: String::from("REFERENCED_COLUMN_NAME"),
2245                    expected: "interned column name",
2246                    value: String::from(value),
2247                })
2248        })
2249        .transpose()?;
2250
2251    let constraint_type = constraint_type_from_dictionary_value(
2252        row.require_text("CONSTRAINT_TYPE")?,
2253        search_condition.as_deref(),
2254        child_column.is_some(),
2255    );
2256
2257    let metadata = snapshot
2258        .schemas
2259        .entry(owner)
2260        .or_default()
2261        .constraints
2262        .entry(constraint_name)
2263        .or_insert_with(|| ConstraintMetadata {
2264            name: constraint_name,
2265            table_owner: owner,
2266            table_name,
2267            constraint_type,
2268            columns: Vec::new(),
2269            referenced_table_owner,
2270            referenced_table_name,
2271            referenced_columns: Vec::new(),
2272            search_condition: search_condition.clone(),
2273            deferrable: optional_bool(row, "IS_DEFERRABLE").ok().flatten(),
2274            initially_deferred: optional_bool(row, "IS_DEFERRED").ok().flatten(),
2275        });
2276
2277    metadata.table_name = table_name;
2278    metadata.constraint_type = constraint_type;
2279    metadata.referenced_table_owner = referenced_table_owner;
2280    metadata.referenced_table_name = referenced_table_name;
2281    metadata.search_condition = search_condition;
2282    metadata.deferrable = optional_bool(row, "IS_DEFERRABLE")?;
2283    metadata.initially_deferred = optional_bool(row, "IS_DEFERRED")?;
2284
2285    if let Some(column_name) = child_column {
2286        push_unique_column(&mut metadata.columns, column_name);
2287    }
2288    if let Some(column_name) = referenced_column {
2289        push_unique_column(&mut metadata.referenced_columns, column_name);
2290    }
2291
2292    Ok(())
2293}
2294
2295fn apply_index_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2296    let owner_text = row.require_text("OWNER")?;
2297    let index_name_text = row.require_text("INDEX_NAME")?;
2298    let table_owner_text = row.require_text("TABLE_OWNER")?;
2299    let table_name_text = row.require_text("TABLE_NAME")?;
2300
2301    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2302        return Err(CatalogError::InvalidColumnValue {
2303            column: String::from("OWNER"),
2304            expected: "interned schema name",
2305            value: String::from(owner_text),
2306        });
2307    };
2308    let Some(index_name) = snapshot.intern_index_name(index_name_text) else {
2309        return Err(CatalogError::InvalidColumnValue {
2310            column: String::from("INDEX_NAME"),
2311            expected: "interned index name",
2312            value: String::from(index_name_text),
2313        });
2314    };
2315    let Some(table_owner) = snapshot.intern_schema_name(table_owner_text) else {
2316        return Err(CatalogError::InvalidColumnValue {
2317            column: String::from("TABLE_OWNER"),
2318            expected: "interned schema name",
2319            value: String::from(table_owner_text),
2320        });
2321    };
2322    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2323        return Err(CatalogError::InvalidColumnValue {
2324            column: String::from("TABLE_NAME"),
2325            expected: "interned object name",
2326            value: String::from(table_name_text),
2327        });
2328    };
2329    let index_column = optional_nonblank_text(row, "COLUMN_NAME")
2330        .map(|value| {
2331            snapshot
2332                .intern_column_name(value)
2333                .ok_or(CatalogError::InvalidColumnValue {
2334                    column: String::from("COLUMN_NAME"),
2335                    expected: "interned column name",
2336                    value: String::from(value),
2337                })
2338        })
2339        .transpose()?;
2340
2341    let metadata = snapshot
2342        .schemas
2343        .entry(owner)
2344        .or_default()
2345        .indexes
2346        .entry(index_name)
2347        .or_insert_with(|| IndexMetadata {
2348            name: index_name,
2349            table_owner,
2350            table_name,
2351            unique: optional_bool(row, "IS_UNIQUE")
2352                .ok()
2353                .flatten()
2354                .unwrap_or(false),
2355            columns: Vec::new(),
2356            index_type: String::from(row.text("INDEX_TYPE").unwrap_or_default()),
2357            status: row
2358                .text("STATUS")
2359                .map(object_status_from_dictionary_value)
2360                .unwrap_or_default(),
2361        });
2362
2363    metadata.table_owner = table_owner;
2364    metadata.table_name = table_name;
2365    metadata.unique = optional_bool(row, "IS_UNIQUE")?.unwrap_or(false);
2366    metadata.index_type = String::from(row.text("INDEX_TYPE").unwrap_or_default());
2367    metadata.status = row
2368        .text("STATUS")
2369        .map(object_status_from_dictionary_value)
2370        .unwrap_or_default();
2371
2372    if let Some(column_name) = index_column {
2373        push_unique_column(&mut metadata.columns, column_name);
2374    }
2375
2376    Ok(())
2377}
2378
2379fn apply_trigger_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2380    let owner_text = row.require_text("OWNER")?;
2381    let trigger_name_text = row.require_text("TRIGGER_NAME")?;
2382    let table_owner_text = row.require_text("TABLE_OWNER")?;
2383    let table_name_text = row.require_text("TABLE_NAME")?;
2384
2385    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2386        return Err(CatalogError::InvalidColumnValue {
2387            column: String::from("OWNER"),
2388            expected: "interned schema name",
2389            value: String::from(owner_text),
2390        });
2391    };
2392    let Some(trigger_name) = snapshot.intern_trigger_name(trigger_name_text) else {
2393        return Err(CatalogError::InvalidColumnValue {
2394            column: String::from("TRIGGER_NAME"),
2395            expected: "interned trigger name",
2396            value: String::from(trigger_name_text),
2397        });
2398    };
2399    let Some(object_name) = snapshot.intern_object_name(trigger_name_text) else {
2400        return Err(CatalogError::InvalidColumnValue {
2401            column: String::from("TRIGGER_NAME"),
2402            expected: "interned object name",
2403            value: String::from(trigger_name_text),
2404        });
2405    };
2406    let Some(target_owner) = snapshot.intern_schema_name(table_owner_text) else {
2407        return Err(CatalogError::InvalidColumnValue {
2408            column: String::from("TABLE_OWNER"),
2409            expected: "interned schema name",
2410            value: String::from(table_owner_text),
2411        });
2412    };
2413    let Some(target_name) = snapshot.intern_object_name(table_name_text) else {
2414        return Err(CatalogError::InvalidColumnValue {
2415            column: String::from("TABLE_NAME"),
2416            expected: "interned object name",
2417            value: String::from(table_name_text),
2418        });
2419    };
2420
2421    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2422    let common = schema_catalog
2423        .objects
2424        .get(&object_name)
2425        .and_then(|object| {
2426            if let CatalogObject::Trigger(metadata) = object {
2427                Some(metadata.common.clone())
2428            } else {
2429                None
2430            }
2431        })
2432        .unwrap_or_else(|| ObjectCommon {
2433            owner,
2434            name: object_name,
2435            object_type: ObjectType::Trigger,
2436            ..ObjectCommon::default()
2437        });
2438
2439    let metadata = TriggerMetadata {
2440        common,
2441        target_owner,
2442        target_name,
2443        timing: trigger_timing_from_dictionary_value(row.text("TRIGGER_TYPE").unwrap_or_default()),
2444        level: trigger_level_from_dictionary_value(row.text("TRIGGER_TYPE").unwrap_or_default()),
2445        events: trigger_events_from_dictionary_value(
2446            row.text("TRIGGERING_EVENT").unwrap_or_default(),
2447        ),
2448        when_clause: optional_nonblank_text(row, "WHEN_CLAUSE").map(String::from),
2449        body_hash: None,
2450    };
2451
2452    schema_catalog
2453        .triggers
2454        .insert(trigger_name, metadata.clone());
2455    schema_catalog
2456        .objects
2457        .insert(object_name, CatalogObject::Trigger(metadata));
2458
2459    Ok(())
2460}
2461
2462fn apply_synonym_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2463    let owner_text = row.require_text("OWNER")?;
2464    let synonym_name_text = row.require_text("SYNONYM_NAME")?;
2465    let target_name_text = row.require_text("TABLE_NAME")?;
2466
2467    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2468        return Err(CatalogError::InvalidColumnValue {
2469            column: String::from("OWNER"),
2470            expected: "interned schema name",
2471            value: String::from(owner_text),
2472        });
2473    };
2474    let Some(synonym_name) = snapshot.intern_synonym_name(synonym_name_text) else {
2475        return Err(CatalogError::InvalidColumnValue {
2476            column: String::from("SYNONYM_NAME"),
2477            expected: "interned synonym name",
2478            value: String::from(synonym_name_text),
2479        });
2480    };
2481    let Some(target_name) = snapshot.intern_object_name(target_name_text) else {
2482        return Err(CatalogError::InvalidColumnValue {
2483            column: String::from("TABLE_NAME"),
2484            expected: "interned object name",
2485            value: String::from(target_name_text),
2486        });
2487    };
2488    let target_owner = optional_nonblank_text(row, "TABLE_OWNER")
2489        .map(|value| {
2490            snapshot
2491                .intern_schema_name(value)
2492                .ok_or(CatalogError::InvalidColumnValue {
2493                    column: String::from("TABLE_OWNER"),
2494                    expected: "interned schema name",
2495                    value: String::from(value),
2496                })
2497        })
2498        .transpose()?;
2499
2500    snapshot.schemas.entry(owner).or_default().synonyms.insert(
2501        synonym_name,
2502        SynonymTarget {
2503            target_owner,
2504            target_name,
2505            target_type: None,
2506            db_link: optional_nonblank_text(row, "DB_LINK").map(String::from),
2507            public_synonym: owner_text.eq_ignore_ascii_case("PUBLIC"),
2508        },
2509    );
2510
2511    Ok(())
2512}
2513
2514fn apply_view_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2515    let owner_text = row.require_text("OWNER")?;
2516    let view_name_text = row.require_text("VIEW_NAME")?;
2517
2518    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2519        return Err(CatalogError::InvalidColumnValue {
2520            column: String::from("OWNER"),
2521            expected: "interned schema name",
2522            value: String::from(owner_text),
2523        });
2524    };
2525    let Some(view_name) = snapshot.intern_object_name(view_name_text) else {
2526        return Err(CatalogError::InvalidColumnValue {
2527            column: String::from("VIEW_NAME"),
2528            expected: "interned object name",
2529            value: String::from(view_name_text),
2530        });
2531    };
2532
2533    let query_hash = optional_nonblank_text(row, "TEXT_VC").map(hash_text);
2534    let read_only = optional_bool(row, "READ_ONLY")?;
2535
2536    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2537        return Ok(());
2538    };
2539    let Some(catalog_object) = schema_catalog.objects.get_mut(&view_name) else {
2540        return Ok(());
2541    };
2542
2543    if let CatalogObject::View(metadata) = catalog_object {
2544        metadata.query_hash = query_hash;
2545        metadata.read_only = read_only;
2546    }
2547
2548    Ok(())
2549}
2550
2551fn apply_mview_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2552    let owner_text = row.require_text("OWNER")?;
2553    let mview_name_text = row.require_text("MVIEW_NAME")?;
2554
2555    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2556        return Err(CatalogError::InvalidColumnValue {
2557            column: String::from("OWNER"),
2558            expected: "interned schema name",
2559            value: String::from(owner_text),
2560        });
2561    };
2562    let Some(mview_name) = snapshot.intern_object_name(mview_name_text) else {
2563        return Err(CatalogError::InvalidColumnValue {
2564            column: String::from("MVIEW_NAME"),
2565            expected: "interned object name",
2566            value: String::from(mview_name_text),
2567        });
2568    };
2569
2570    let refresh_mode = optional_nonblank_text(row, "REFRESH_MODE").map(String::from);
2571    let refresh_method = optional_nonblank_text(row, "REFRESH_METHOD").map(String::from);
2572    let query_hash = optional_nonblank_text(row, "QUERY").map(hash_text);
2573
2574    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2575        return Ok(());
2576    };
2577    let Some(catalog_object) = schema_catalog.objects.get_mut(&mview_name) else {
2578        return Ok(());
2579    };
2580
2581    if let CatalogObject::MaterializedView(metadata) = catalog_object {
2582        metadata.refresh_mode = refresh_mode;
2583        metadata.refresh_method = refresh_method;
2584        metadata.query_hash = query_hash;
2585    }
2586
2587    Ok(())
2588}
2589
2590fn apply_sequence_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2591    let owner_text = row.require_text("SEQUENCE_OWNER")?;
2592    let sequence_name_text = row.require_text("SEQUENCE_NAME")?;
2593
2594    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2595        return Err(CatalogError::InvalidColumnValue {
2596            column: String::from("SEQUENCE_OWNER"),
2597            expected: "interned schema name",
2598            value: String::from(owner_text),
2599        });
2600    };
2601    let Some(sequence_name) = snapshot.intern_object_name(sequence_name_text) else {
2602        return Err(CatalogError::InvalidColumnValue {
2603            column: String::from("SEQUENCE_NAME"),
2604            expected: "interned object name",
2605            value: String::from(sequence_name_text),
2606        });
2607    };
2608
2609    let increment_by = row.parse_i64("INCREMENT_BY").unwrap_or(1);
2610    let min_value = row.parse_i64("MIN_VALUE").ok();
2611    let max_value = row.parse_i64("MAX_VALUE").ok();
2612    let cycle = row
2613        .text("CYCLE_FLAG")
2614        .map(|value| value.eq_ignore_ascii_case("Y"))
2615        .unwrap_or(false);
2616    let ordered = row
2617        .text("ORDER_FLAG")
2618        .map(|value| value.eq_ignore_ascii_case("Y"))
2619        .unwrap_or(false);
2620    let cache_size = row.parse_u64("CACHE_SIZE").ok();
2621
2622    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2623        return Ok(());
2624    };
2625    let Some(catalog_object) = schema_catalog.objects.get_mut(&sequence_name) else {
2626        return Ok(());
2627    };
2628
2629    if let CatalogObject::Sequence(metadata) = catalog_object {
2630        metadata.increment_by = increment_by;
2631        metadata.min_value = min_value;
2632        metadata.max_value = max_value;
2633        metadata.cycle = cycle;
2634        metadata.ordered = ordered;
2635        metadata.cache_size = cache_size;
2636    }
2637
2638    Ok(())
2639}
2640
2641fn apply_type_attr_row(
2642    snapshot: &mut CatalogSnapshot,
2643    row: &OracleRow,
2644) -> Result<(), CatalogError> {
2645    let owner_text = row.require_text("OWNER")?;
2646    let type_name_text = row.require_text("TYPE_NAME")?;
2647    let attr_name_text = row.require_text("ATTR_NAME")?;
2648
2649    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2650        return Err(CatalogError::InvalidColumnValue {
2651            column: String::from("OWNER"),
2652            expected: "interned schema name",
2653            value: String::from(owner_text),
2654        });
2655    };
2656    let Some(type_name) = snapshot.intern_object_name(type_name_text) else {
2657        return Err(CatalogError::InvalidColumnValue {
2658            column: String::from("TYPE_NAME"),
2659            expected: "interned object name",
2660            value: String::from(type_name_text),
2661        });
2662    };
2663    let Some(attr_name) = snapshot.intern_member_name(attr_name_text) else {
2664        return Err(CatalogError::InvalidColumnValue {
2665            column: String::from("ATTR_NAME"),
2666            expected: "interned member name",
2667            value: String::from(attr_name_text),
2668        });
2669    };
2670
2671    let attr_type_owner = optional_nonblank_text(row, "ATTR_TYPE_OWNER")
2672        .map(|value| {
2673            snapshot
2674                .intern_schema_name(value)
2675                .ok_or(CatalogError::InvalidColumnValue {
2676                    column: String::from("ATTR_TYPE_OWNER"),
2677                    expected: "interned schema name",
2678                    value: String::from(value),
2679                })
2680        })
2681        .transpose()?;
2682    let attr_type_name = row
2683        .text("ATTR_TYPE_NAME")
2684        .map(String::from)
2685        .unwrap_or_default();
2686
2687    let attribute = TypeAttribute {
2688        name: attr_name,
2689        position: required_u32(row, "ATTR_NO")?,
2690        data_type: DataTypeRef {
2691            owner: attr_type_owner,
2692            name: attr_type_name,
2693            length: optional_u32(row, "LENGTH")?,
2694            precision: optional_u32(row, "PRECISION")?,
2695            scale: optional_i32(row, "SCALE")?,
2696            char_semantics: None,
2697        },
2698    };
2699
2700    let Some(schema_catalog) = snapshot.schemas.get_mut(&owner) else {
2701        return Ok(());
2702    };
2703    let Some(catalog_object) = schema_catalog.objects.get_mut(&type_name) else {
2704        return Ok(());
2705    };
2706
2707    if let CatalogObject::Type(metadata) = catalog_object {
2708        match metadata
2709            .attributes
2710            .iter()
2711            .position(|existing| existing.position.eq(&attribute.position))
2712        {
2713            Some(index) => metadata.attributes[index] = attribute,
2714            None => metadata.attributes.push(attribute),
2715        }
2716        metadata
2717            .attributes
2718            .sort_by_key(|attribute| attribute.position);
2719    }
2720
2721    Ok(())
2722}
2723
2724/// Apply a single `ALL_DB_LINKS` row into the snapshot. Ensures the
2725/// owning schema entry exists (lazily creates it) so a `PUBLIC` row
2726/// lands even when no other catalog object has been recorded for that
2727/// synthetic schema.
2728fn apply_db_link_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2729    let owner_text = row.require_text("OWNER")?;
2730    let link_name_text = row.require_text("DB_LINK")?;
2731
2732    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2733        return Err(CatalogError::InvalidColumnValue {
2734            column: String::from("OWNER"),
2735            expected: "interned schema name",
2736            value: String::from(owner_text),
2737        });
2738    };
2739
2740    let host = optional_nonblank_text(row, "HOST").map(String::from);
2741    let public_link = owner_text.eq_ignore_ascii_case("PUBLIC");
2742
2743    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2744    schema_catalog.db_links.push(DatabaseLink {
2745        owner,
2746        name: String::from(link_name_text),
2747        host,
2748        public_link,
2749    });
2750
2751    Ok(())
2752}
2753
2754/// Apply a single `ALL_POLICIES` row.
2755fn apply_vpd_policy_row(
2756    snapshot: &mut CatalogSnapshot,
2757    row: &OracleRow,
2758) -> Result<(), CatalogError> {
2759    let object_owner_text = row.require_text("OBJECT_OWNER")?;
2760    let object_name_text = row.require_text("OBJECT_NAME")?;
2761    let policy_name = row.require_text("POLICY_NAME")?.to_string();
2762    let function_owner_text = row.require_text("PF_OWNER")?;
2763    let function_name = row.require_text("FUNCTION")?.to_string();
2764
2765    let Some(object_owner) = snapshot.intern_schema_name(object_owner_text) else {
2766        return Err(CatalogError::InvalidColumnValue {
2767            column: String::from("OBJECT_OWNER"),
2768            expected: "interned schema name",
2769            value: String::from(object_owner_text),
2770        });
2771    };
2772    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
2773        return Err(CatalogError::InvalidColumnValue {
2774            column: String::from("OBJECT_NAME"),
2775            expected: "interned object name",
2776            value: String::from(object_name_text),
2777        });
2778    };
2779    let Some(function_owner) = snapshot.intern_schema_name(function_owner_text) else {
2780        return Err(CatalogError::InvalidColumnValue {
2781            column: String::from("PF_OWNER"),
2782            expected: "interned schema name",
2783            value: String::from(function_owner_text),
2784        });
2785    };
2786
2787    let policy_group = optional_nonblank_text(row, "POLICY_GROUP").map(String::from);
2788    let function_package = optional_nonblank_text(row, "PACKAGE").map(String::from);
2789
2790    let yn = |col: &str| {
2791        row.text(col)
2792            .map(|v| v.eq_ignore_ascii_case("Y") || v.eq_ignore_ascii_case("YES"))
2793            .unwrap_or(false)
2794    };
2795
2796    let schema_catalog = snapshot.schemas.entry(object_owner).or_default();
2797    schema_catalog.vpd_policies.push(VpdPolicy {
2798        object_owner,
2799        object_name,
2800        policy_group,
2801        policy_name,
2802        function_owner,
2803        function_package,
2804        function_name,
2805        on_select: yn("SEL"),
2806        on_insert: yn("INS"),
2807        on_update: yn("UPD"),
2808        on_delete: yn("DEL"),
2809        enabled: yn("ENABLE"),
2810    });
2811    Ok(())
2812}
2813
2814/// Apply a single `ALL_EDITIONS` row.
2815fn apply_edition_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2816    let edition_name = row.require_text("EDITION_NAME")?.to_string();
2817    let parent_edition_name = optional_nonblank_text(row, "PARENT_EDITION_NAME").map(String::from);
2818    let usable = row
2819        .text("USABLE")
2820        .map(|v| v.eq_ignore_ascii_case("Y"))
2821        .unwrap_or(true);
2822    snapshot.editions.push(Edition {
2823        edition_name,
2824        parent_edition_name,
2825        usable,
2826    });
2827    Ok(())
2828}
2829
2830/// Apply a single `ALL_EDITIONING_VIEWS` row.
2831fn apply_editioning_view_row(
2832    snapshot: &mut CatalogSnapshot,
2833    row: &OracleRow,
2834) -> Result<(), CatalogError> {
2835    let owner_text = row.require_text("OWNER")?;
2836    let view_name_text = row.require_text("VIEW_NAME")?;
2837    let table_name_text = row.require_text("TABLE_NAME")?;
2838
2839    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2840        return Err(CatalogError::InvalidColumnValue {
2841            column: String::from("OWNER"),
2842            expected: "interned schema name",
2843            value: String::from(owner_text),
2844        });
2845    };
2846    let Some(view_name) = snapshot.intern_object_name(view_name_text) else {
2847        return Err(CatalogError::InvalidColumnValue {
2848            column: String::from("VIEW_NAME"),
2849            expected: "interned object name",
2850            value: String::from(view_name_text),
2851        });
2852    };
2853    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2854        return Err(CatalogError::InvalidColumnValue {
2855            column: String::from("TABLE_NAME"),
2856            expected: "interned object name",
2857            value: String::from(table_name_text),
2858        });
2859    };
2860
2861    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2862    schema_catalog.editioning_views.push(EditioningView {
2863        owner,
2864        view_name,
2865        table_name,
2866    });
2867    Ok(())
2868}
2869
2870/// Apply a single `ALL_TAB_COMMENTS` row into the snapshot.
2871fn apply_table_comment_row(
2872    snapshot: &mut CatalogSnapshot,
2873    row: &OracleRow,
2874) -> Result<(), CatalogError> {
2875    let owner_text = row.require_text("OWNER")?;
2876    let table_name_text = row.require_text("TABLE_NAME")?;
2877    let table_type = row.text("TABLE_TYPE").map(String::from).unwrap_or_default();
2878    let comments = row.require_text("COMMENTS")?.to_string();
2879
2880    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2881        return Err(CatalogError::InvalidColumnValue {
2882            column: String::from("OWNER"),
2883            expected: "interned schema name",
2884            value: String::from(owner_text),
2885        });
2886    };
2887    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2888        return Err(CatalogError::InvalidColumnValue {
2889            column: String::from("TABLE_NAME"),
2890            expected: "interned object name",
2891            value: String::from(table_name_text),
2892        });
2893    };
2894
2895    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2896    schema_catalog.table_comments.push(TableComment {
2897        owner,
2898        table_name,
2899        table_type,
2900        comments,
2901    });
2902    Ok(())
2903}
2904
2905/// Apply a single `ALL_COL_COMMENTS` row into the snapshot.
2906fn apply_column_comment_row(
2907    snapshot: &mut CatalogSnapshot,
2908    row: &OracleRow,
2909) -> Result<(), CatalogError> {
2910    let owner_text = row.require_text("OWNER")?;
2911    let table_name_text = row.require_text("TABLE_NAME")?;
2912    let column_name_text = row.require_text("COLUMN_NAME")?;
2913    let comments = row.require_text("COMMENTS")?.to_string();
2914
2915    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2916        return Err(CatalogError::InvalidColumnValue {
2917            column: String::from("OWNER"),
2918            expected: "interned schema name",
2919            value: String::from(owner_text),
2920        });
2921    };
2922    let Some(table_name) = snapshot.intern_object_name(table_name_text) else {
2923        return Err(CatalogError::InvalidColumnValue {
2924            column: String::from("TABLE_NAME"),
2925            expected: "interned object name",
2926            value: String::from(table_name_text),
2927        });
2928    };
2929    let Some(column_name) = snapshot.intern_column_name(column_name_text) else {
2930        return Err(CatalogError::InvalidColumnValue {
2931            column: String::from("COLUMN_NAME"),
2932            expected: "interned column name",
2933            value: String::from(column_name_text),
2934        });
2935    };
2936
2937    let schema_catalog = snapshot.schemas.entry(owner).or_default();
2938    schema_catalog.column_comments.push(ColumnComment {
2939        owner,
2940        table_name,
2941        column_name,
2942        comments,
2943    });
2944    Ok(())
2945}
2946
2947fn apply_user_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
2948    let username = row.require_text("USERNAME")?;
2949    let Some(user) = snapshot.intern_user_name(username) else {
2950        return Err(CatalogError::InvalidColumnValue {
2951            column: String::from("USERNAME"),
2952            expected: "interned user name",
2953            value: String::from(username),
2954        });
2955    };
2956    snapshot
2957        .known_users
2958        .get_or_insert_with(HashSet::new)
2959        .insert(user);
2960    Ok(())
2961}
2962
2963fn apply_plscope_availability_row(
2964    snapshot: &mut CatalogSnapshot,
2965    row: &OracleRow,
2966    tallies: &mut HashMap<SchemaName, PlScopeTally>,
2967) -> Result<(), CatalogError> {
2968    let Some(owner_text) = optional_nonblank_text(row, "OWNER") else {
2969        return Ok(());
2970    };
2971    let settings = row
2972        .text("PLSCOPE_SETTINGS")
2973        .unwrap_or("")
2974        .to_ascii_uppercase();
2975    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
2976        return Ok(());
2977    };
2978    let tally = tallies.entry(owner).or_default();
2979    tally.total = tally.total.saturating_add(1);
2980    if settings.contains("STATEMENTS:") && !settings.contains("STATEMENTS:NONE") {
2981        tally.with_statements = tally.with_statements.saturating_add(1);
2982    }
2983    if settings.contains("IDENTIFIERS:") && !settings.contains("IDENTIFIERS:NONE") {
2984        tally.with_identifiers = tally.with_identifiers.saturating_add(1);
2985    }
2986    Ok(())
2987}
2988
2989fn finalize_plscope_availability(
2990    snapshot: &mut CatalogSnapshot,
2991    tallies: HashMap<SchemaName, PlScopeTally>,
2992) {
2993    for (owner, tally) in tallies {
2994        let availability = if tally.with_statements > 0 {
2995            PlScopeAvailability::IdentifiersAndStatements
2996        } else if tally.with_identifiers > 0 {
2997            PlScopeAvailability::IdentifiersOnly
2998        } else if tally.total > 0 {
2999            PlScopeAvailability::AvailableButStale
3000        } else {
3001            PlScopeAvailability::NotAvailable
3002        };
3003        let schema_catalog = snapshot.schemas.entry(owner).or_default();
3004        let plscope = schema_catalog
3005            .plscope
3006            .get_or_insert_with(PlScopeSnapshot::default);
3007        plscope.availability = availability;
3008        plscope.collected_at = Some(snapshot.generated_at);
3009    }
3010}
3011
3012fn apply_plscope_identifier_row(
3013    snapshot: &mut CatalogSnapshot,
3014    row: &OracleRow,
3015) -> Result<(), CatalogError> {
3016    let Some(owner_text) = optional_nonblank_text(row, "OWNER") else {
3017        return Ok(());
3018    };
3019    let Some(object_name_text) = optional_nonblank_text(row, "OBJECT_NAME") else {
3020        return Ok(());
3021    };
3022    let Some(identifier_name_text) = optional_nonblank_text(row, "NAME") else {
3023        return Ok(());
3024    };
3025    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3026        return Ok(());
3027    };
3028    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
3029        return Ok(());
3030    };
3031    let Some(identifier_name) = snapshot.intern_member_name(identifier_name_text) else {
3032        return Ok(());
3033    };
3034    let identifier = CompilerIdentifier {
3035        owner,
3036        object_name,
3037        identifier_name,
3038        identifier_type: optional_nonblank_text(row, "TYPE")
3039            .map(String::from)
3040            .unwrap_or_default(),
3041        usage: optional_nonblank_text(row, "USAGE")
3042            .map(String::from)
3043            .unwrap_or_default(),
3044        line: optional_u32(row, "LINE")?.unwrap_or(0),
3045        column: optional_u32(row, "COL")?.unwrap_or(0),
3046    };
3047
3048    let plscope = snapshot
3049        .schemas
3050        .entry(owner)
3051        .or_default()
3052        .plscope
3053        .get_or_insert_with(|| PlScopeSnapshot {
3054            availability: PlScopeAvailability::IdentifiersOnly,
3055            collected_at: Some(snapshot.generated_at),
3056            ..PlScopeSnapshot::default()
3057        });
3058    plscope.identifiers.push(identifier);
3059    Ok(())
3060}
3061
3062fn apply_grant_row(snapshot: &mut CatalogSnapshot, row: &OracleRow) -> Result<(), CatalogError> {
3063    let owner_text = row.require_text("TABLE_SCHEMA")?;
3064    let object_name_text = row.require_text("TABLE_NAME")?;
3065    let grantee_text = row.require_text("GRANTEE")?;
3066    let privilege_text = row.require_text("PRIVILEGE")?;
3067
3068    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3069        return Err(CatalogError::InvalidColumnValue {
3070            column: String::from("TABLE_SCHEMA"),
3071            expected: "interned schema name",
3072            value: String::from(owner_text),
3073        });
3074    };
3075    let Some(object_name) = snapshot.intern_object_name(object_name_text) else {
3076        return Err(CatalogError::InvalidColumnValue {
3077            column: String::from("TABLE_NAME"),
3078            expected: "interned object name",
3079            value: String::from(object_name_text),
3080        });
3081    };
3082
3083    let grantee = grantee_from_dictionary_value(snapshot, grantee_text)?;
3084    let privilege = grant_privilege_from_dictionary_value(privilege_text);
3085    let grantable = row
3086        .text("GRANTABLE")
3087        .map(|value| value.eq_ignore_ascii_case("YES"))
3088        .unwrap_or(false);
3089    let with_hierarchy = row
3090        .text("HIERARCHY")
3091        .map(|value| value.eq_ignore_ascii_case("YES"))
3092        .unwrap_or(false);
3093
3094    let grant = Grant {
3095        object_owner: owner,
3096        object_name,
3097        privilege,
3098        grantee,
3099        grantable,
3100        via_role: None,
3101        with_hierarchy,
3102    };
3103
3104    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3105    if !schema_catalog
3106        .grants
3107        .iter()
3108        .any(|existing| existing.eq(&grant))
3109    {
3110        schema_catalog.grants.push(grant);
3111    }
3112
3113    Ok(())
3114}
3115
3116/// Classify an `ALL_TAB_PRIVS.GRANTEE` value into a [`Grantee`].
3117///
3118/// `ALL_TAB_PRIVS` carries no user/role discriminator column, so the
3119/// grantee universe — `{ user, role, PUBLIC }` — is resolved against the
3120/// `ALL_USERS`-derived [`CatalogSnapshot::known_users`] set:
3121///
3122/// * `PUBLIC` -> [`Grantee::Public`].
3123/// * known username -> [`Grantee::User`] (a statically certain direct grant).
3124/// * loaded set, name absent -> [`Grantee::Role`] (the only remaining class;
3125///   the resolver then caps it at Low confidence and emits a
3126///   `RuntimeGrantOrRole` ambiguity because a role grant only applies when
3127///   the role is enabled in `SESSION_ROLES` at runtime).
3128/// * username set NOT loaded (`known_users` is `None`) -> [`Grantee::Role`]
3129///   as well. This is the R13 fail-toward-restrictive choice: when the
3130///   grantee class is genuinely undetermined we must NOT default to a
3131///   high-confidence direct user grant (a fail-toward-permissive result in
3132///   a privilege/SAST product); treating it as a role routes it through the
3133///   runtime-ambiguity downgrade instead of over-claiming certainty.
3134fn grantee_from_dictionary_value(
3135    snapshot: &mut CatalogSnapshot,
3136    text: &str,
3137) -> Result<Grantee, CatalogError> {
3138    if text.eq_ignore_ascii_case("PUBLIC") {
3139        return Ok(Grantee::Public);
3140    }
3141    let Some(symbol) = snapshot.interner.intern(text) else {
3142        return Err(CatalogError::InvalidColumnValue {
3143            column: String::from("GRANTEE"),
3144            expected: "interned grantee name",
3145            value: String::from(text),
3146        });
3147    };
3148    let is_known_user = snapshot
3149        .known_users
3150        .as_ref()
3151        .is_some_and(|users| users.contains(&UserName::from(symbol)));
3152    if is_known_user {
3153        Ok(Grantee::User(UserName::from(symbol)))
3154    } else {
3155        Ok(Grantee::Role(RoleName::from(symbol)))
3156    }
3157}
3158
3159fn grant_privilege_from_dictionary_value(text: &str) -> GrantPrivilege {
3160    match text.to_ascii_uppercase().as_str() {
3161        "SELECT" => GrantPrivilege::Select,
3162        "INSERT" => GrantPrivilege::Insert,
3163        "UPDATE" => GrantPrivilege::Update,
3164        "DELETE" => GrantPrivilege::Delete,
3165        "EXECUTE" => GrantPrivilege::Execute,
3166        "ALTER" => GrantPrivilege::Alter,
3167        "INDEX" => GrantPrivilege::Index,
3168        "REFERENCES" => GrantPrivilege::References,
3169        "DEBUG" => GrantPrivilege::Debug,
3170        _ => GrantPrivilege::Other,
3171    }
3172}
3173
3174fn apply_routine_row(
3175    snapshot: &mut CatalogSnapshot,
3176    row: &OracleRow,
3177    routines: &mut HashMap<RoutineLocator, RoutineAccumulator>,
3178) -> Result<(), CatalogError> {
3179    let locator = routine_locator_from_procedure_row(snapshot, row)?;
3180    let deterministic = optional_bool(row, "DETERMINISTIC")?.unwrap_or(false);
3181    let pipelined = optional_bool(row, "PIPELINED")?.unwrap_or(false);
3182    let kind_hint = routine_kind_from_dictionary_value(optional_nonblank_text(row, "OBJECT_TYPE"));
3183
3184    let accumulator = routines.entry(locator).or_default();
3185    accumulator
3186        .signature
3187        .get_or_insert_with(|| RoutineSignature {
3188            routine_name: locator.routine_name,
3189            overload: locator.overload,
3190            ..RoutineSignature::default()
3191        });
3192    accumulator.kind_hint = kind_hint.or(accumulator.kind_hint);
3193    accumulator.deterministic = deterministic;
3194    accumulator.pipelined = pipelined;
3195
3196    Ok(())
3197}
3198
3199fn apply_argument_row(
3200    snapshot: &mut CatalogSnapshot,
3201    row: &OracleRow,
3202    routines: &mut HashMap<RoutineLocator, RoutineAccumulator>,
3203) -> Result<(), CatalogError> {
3204    let locator = routine_locator_from_argument_row(snapshot, row)?;
3205    let data_type = data_type_ref_from_argument_row(snapshot, row)?;
3206    let accumulator = routines.entry(locator).or_default();
3207    let signature = accumulator
3208        .signature
3209        .get_or_insert_with(|| RoutineSignature {
3210            routine_name: locator.routine_name,
3211            overload: locator.overload,
3212            ..RoutineSignature::default()
3213        });
3214    let position = required_u32(row, "POSITION")?;
3215
3216    if position.eq(&0) {
3217        signature.return_type = Some(data_type);
3218        accumulator.kind_hint = Some(RoutineKind::Function);
3219        return Ok(());
3220    }
3221
3222    signature.arguments.push(ArgumentMetadata {
3223        position,
3224        name: optional_nonblank_text(row, "ARGUMENT_NAME")
3225            .map(|value| {
3226                snapshot
3227                    .intern_member_name(value)
3228                    .ok_or(CatalogError::InvalidColumnValue {
3229                        column: String::from("ARGUMENT_NAME"),
3230                        expected: "interned member name",
3231                        value: String::from(value),
3232                    })
3233            })
3234            .transpose()?,
3235        mode: parameter_mode_from_dictionary_value(row.text("IN_OUT")),
3236        data_type,
3237        defaulted: optional_bool(row, "DEFAULTED")?.unwrap_or(false),
3238    });
3239
3240    Ok(())
3241}
3242
3243fn finalize_routines(
3244    snapshot: &mut CatalogSnapshot,
3245    routines: HashMap<RoutineLocator, RoutineAccumulator>,
3246) -> Result<(), CatalogError> {
3247    for (locator, accumulator) in routines {
3248        let Some(signature) = accumulator.signature else {
3249            continue;
3250        };
3251        let kind = accumulator
3252            .kind_hint
3253            .or_else(|| {
3254                if signature.return_type.is_some() {
3255                    Some(RoutineKind::Function)
3256                } else {
3257                    Some(RoutineKind::Procedure)
3258                }
3259            })
3260            .unwrap_or(RoutineKind::Procedure);
3261
3262        if let Some(package_name) = locator.package_name {
3263            upsert_packaged_routine(snapshot, locator.owner, package_name, kind, signature)?;
3264        } else {
3265            upsert_top_level_routine(
3266                snapshot,
3267                locator.owner,
3268                locator.routine_name,
3269                kind,
3270                signature,
3271                accumulator.deterministic,
3272                accumulator.pipelined,
3273            )?;
3274        }
3275    }
3276
3277    Ok(())
3278}
3279
3280fn object_type_from_dictionary_value(text: &str) -> Option<ObjectType> {
3281    match text.trim().to_ascii_uppercase().as_str() {
3282        "TABLE" => Some(ObjectType::Table),
3283        "VIEW" => Some(ObjectType::View),
3284        "MATERIALIZED VIEW" => Some(ObjectType::MaterializedView),
3285        "SEQUENCE" => Some(ObjectType::Sequence),
3286        "TYPE" => Some(ObjectType::Type),
3287        "PACKAGE" => Some(ObjectType::Package),
3288        "PROCEDURE" => Some(ObjectType::Procedure),
3289        "FUNCTION" => Some(ObjectType::Function),
3290        "TRIGGER" => Some(ObjectType::Trigger),
3291        "EDITIONING VIEW" => Some(ObjectType::EditioningView),
3292        _ => None,
3293    }
3294}
3295
3296fn object_status_from_dictionary_value(text: &str) -> ObjectStatus {
3297    match text.trim().to_ascii_uppercase().as_str() {
3298        "VALID" => ObjectStatus::Valid,
3299        "ENABLED" => ObjectStatus::Valid,
3300        "INVALID" => ObjectStatus::Invalid,
3301        "UNUSABLE" | "DISABLED" => ObjectStatus::Invalid,
3302        _ => ObjectStatus::NotApplicable,
3303    }
3304}
3305
3306fn routine_kind_from_dictionary_value(text: Option<&str>) -> Option<RoutineKind> {
3307    match text.map(|value| value.trim().to_ascii_uppercase()) {
3308        Some(value) if value.eq("FUNCTION") => Some(RoutineKind::Function),
3309        Some(value) if value.eq("PROCEDURE") => Some(RoutineKind::Procedure),
3310        _ => None,
3311    }
3312}
3313
3314fn constraint_type_from_dictionary_value(
3315    text: &str,
3316    search_condition: Option<&str>,
3317    has_columns: bool,
3318) -> ConstraintType {
3319    match text.trim().to_ascii_uppercase().as_str() {
3320        "P" => ConstraintType::PrimaryKey,
3321        "R" => ConstraintType::ForeignKey,
3322        "U" => ConstraintType::Unique,
3323        "F" => ConstraintType::Ref,
3324        "C" => {
3325            if has_columns
3326                && search_condition
3327                    .map(|condition| {
3328                        condition
3329                            .trim()
3330                            .to_ascii_uppercase()
3331                            .contains("IS NOT NULL")
3332                    })
3333                    .unwrap_or(false)
3334            {
3335                ConstraintType::NotNull
3336            } else {
3337                ConstraintType::Check
3338            }
3339        }
3340        _ => ConstraintType::Other,
3341    }
3342}
3343
3344fn trigger_timing_from_dictionary_value(text: &str) -> TriggerTiming {
3345    let normalized = text.trim().to_ascii_uppercase();
3346    if normalized.contains("INSTEAD OF") {
3347        TriggerTiming::InsteadOf
3348    } else if normalized.contains("BEFORE") {
3349        TriggerTiming::Before
3350    } else if normalized.contains("AFTER") {
3351        TriggerTiming::After
3352    } else {
3353        TriggerTiming::Unknown
3354    }
3355}
3356
3357fn trigger_level_from_dictionary_value(text: &str) -> TriggerLevel {
3358    let normalized = text.trim().to_ascii_uppercase();
3359    if normalized.contains("EACH ROW") {
3360        TriggerLevel::Row
3361    } else if normalized.contains("STATEMENT") {
3362        TriggerLevel::Statement
3363    } else {
3364        TriggerLevel::Unknown
3365    }
3366}
3367
3368fn trigger_events_from_dictionary_value(text: &str) -> Vec<TriggerEvent> {
3369    let normalized = text.trim().to_ascii_uppercase();
3370    let mut events = Vec::<TriggerEvent>::new();
3371
3372    if normalized.contains("INSERT") {
3373        events.push(TriggerEvent::Insert);
3374    }
3375    if normalized.contains("UPDATE") {
3376        events.push(TriggerEvent::Update);
3377    }
3378    if normalized.contains("DELETE") {
3379        events.push(TriggerEvent::Delete);
3380    }
3381    if normalized.contains("LOGON") {
3382        events.push(TriggerEvent::Logon);
3383    }
3384    if normalized.contains("LOGOFF") {
3385        events.push(TriggerEvent::Logoff);
3386    }
3387    if normalized.contains("DDL") {
3388        events.push(TriggerEvent::Ddl);
3389    }
3390
3391    if events.is_empty() {
3392        events.push(TriggerEvent::Other);
3393    }
3394
3395    events
3396}
3397
3398fn push_unique_column(columns: &mut Vec<ColumnName>, column_name: ColumnName) {
3399    if !columns.contains(&column_name) {
3400        columns.push(column_name);
3401    }
3402}
3403
3404fn routine_locator_from_procedure_row(
3405    snapshot: &mut CatalogSnapshot,
3406    row: &OracleRow,
3407) -> Result<RoutineLocator, CatalogError> {
3408    let owner_text = row.require_text("OWNER")?;
3409    let container_name_text = row.require_text("OBJECT_NAME")?;
3410    let routine_name_text = row
3411        .text("PROCEDURE_NAME")
3412        .unwrap_or(container_name_text)
3413        .trim();
3414
3415    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3416        return Err(CatalogError::InvalidColumnValue {
3417            column: String::from("OWNER"),
3418            expected: "interned schema name",
3419            value: String::from(owner_text),
3420        });
3421    };
3422    let Some(container_name) = snapshot.intern_object_name(container_name_text) else {
3423        return Err(CatalogError::InvalidColumnValue {
3424            column: String::from("OBJECT_NAME"),
3425            expected: "interned object name",
3426            value: String::from(container_name_text),
3427        });
3428    };
3429    let Some(routine_name) = snapshot.intern_object_name(routine_name_text) else {
3430        return Err(CatalogError::InvalidColumnValue {
3431            column: String::from("PROCEDURE_NAME"),
3432            expected: "interned object name",
3433            value: String::from(routine_name_text),
3434        });
3435    };
3436
3437    Ok(RoutineLocator {
3438        owner,
3439        package_name: if optional_nonblank_text(row, "PROCEDURE_NAME").is_some() {
3440            Some(container_name)
3441        } else {
3442            None
3443        },
3444        routine_name,
3445        subprogram_id: optional_u32(row, "SUBPROGRAM_ID")?,
3446        overload: optional_u32(row, "OVERLOAD")?,
3447    })
3448}
3449
3450fn routine_locator_from_argument_row(
3451    snapshot: &mut CatalogSnapshot,
3452    row: &OracleRow,
3453) -> Result<RoutineLocator, CatalogError> {
3454    let owner_text = row.require_text("OWNER")?;
3455    let routine_name_text = row.require_text("OBJECT_NAME")?;
3456
3457    let Some(owner) = snapshot.intern_schema_name(owner_text) else {
3458        return Err(CatalogError::InvalidColumnValue {
3459            column: String::from("OWNER"),
3460            expected: "interned schema name",
3461            value: String::from(owner_text),
3462        });
3463    };
3464    let package_name = optional_nonblank_text(row, "PACKAGE_NAME")
3465        .map(|value| {
3466            snapshot
3467                .intern_object_name(value)
3468                .ok_or(CatalogError::InvalidColumnValue {
3469                    column: String::from("PACKAGE_NAME"),
3470                    expected: "interned object name",
3471                    value: String::from(value),
3472                })
3473        })
3474        .transpose()?;
3475    let Some(routine_name) = snapshot.intern_object_name(routine_name_text) else {
3476        return Err(CatalogError::InvalidColumnValue {
3477            column: String::from("OBJECT_NAME"),
3478            expected: "interned object name",
3479            value: String::from(routine_name_text),
3480        });
3481    };
3482
3483    Ok(RoutineLocator {
3484        owner,
3485        package_name,
3486        routine_name,
3487        subprogram_id: optional_u32(row, "SUBPROGRAM_ID")?,
3488        overload: optional_u32(row, "OVERLOAD")?,
3489    })
3490}
3491
3492fn upsert_packaged_routine(
3493    snapshot: &mut CatalogSnapshot,
3494    owner: SchemaName,
3495    package_name: ObjectName,
3496    kind: RoutineKind,
3497    signature: RoutineSignature,
3498) -> Result<(), CatalogError> {
3499    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3500
3501    schema_catalog
3502        .objects
3503        .entry(package_name)
3504        .or_insert_with(|| {
3505            CatalogObject::Package(PackageMetadata {
3506                common: ObjectCommon {
3507                    owner,
3508                    name: package_name,
3509                    object_type: ObjectType::Package,
3510                    ..ObjectCommon::default()
3511                },
3512                ..PackageMetadata::default()
3513            })
3514        });
3515
3516    let Some(CatalogObject::Package(metadata)) = schema_catalog.objects.get_mut(&package_name)
3517    else {
3518        return Ok(());
3519    };
3520
3521    match kind {
3522        RoutineKind::Procedure => upsert_signature(&mut metadata.procedures, signature),
3523        RoutineKind::Function => upsert_signature(&mut metadata.functions, signature),
3524    }
3525
3526    Ok(())
3527}
3528
3529fn upsert_top_level_routine(
3530    snapshot: &mut CatalogSnapshot,
3531    owner: SchemaName,
3532    routine_name: ObjectName,
3533    kind: RoutineKind,
3534    signature: RoutineSignature,
3535    deterministic: bool,
3536    pipelined: bool,
3537) -> Result<(), CatalogError> {
3538    let schema_catalog = snapshot.schemas.entry(owner).or_default();
3539    let common = schema_catalog
3540        .objects
3541        .get(&routine_name)
3542        .and_then(|object| match object {
3543            CatalogObject::Procedure(metadata) => Some(metadata.common.clone()),
3544            CatalogObject::Function(metadata) => Some(metadata.common.clone()),
3545            _ => None,
3546        })
3547        .unwrap_or_else(|| ObjectCommon {
3548            owner,
3549            name: routine_name,
3550            object_type: match kind {
3551                RoutineKind::Procedure => ObjectType::Procedure,
3552                RoutineKind::Function => ObjectType::Function,
3553            },
3554            ..ObjectCommon::default()
3555        });
3556
3557    let catalog_object = match kind {
3558        RoutineKind::Procedure => CatalogObject::Procedure(ProcedureMetadata { common, signature }),
3559        RoutineKind::Function => CatalogObject::Function(FunctionMetadata {
3560            common,
3561            signature,
3562            deterministic,
3563            pipelined,
3564        }),
3565    };
3566    schema_catalog.objects.insert(routine_name, catalog_object);
3567
3568    Ok(())
3569}
3570
3571fn upsert_signature(signatures: &mut Vec<RoutineSignature>, signature: RoutineSignature) {
3572    if let Some(existing) = signatures.iter_mut().find(|candidate| {
3573        candidate.routine_name.eq(&signature.routine_name)
3574            && candidate.overload.eq(&signature.overload)
3575    }) {
3576        *existing = signature;
3577    } else {
3578        signatures.push(signature);
3579    }
3580}
3581
3582fn data_type_ref_from_argument_row(
3583    snapshot: &mut CatalogSnapshot,
3584    row: &OracleRow,
3585) -> Result<DataTypeRef, CatalogError> {
3586    let owner = optional_nonblank_text(row, "TYPE_OWNER")
3587        .map(|value| {
3588            snapshot
3589                .intern_schema_name(value)
3590                .ok_or(CatalogError::InvalidColumnValue {
3591                    column: String::from("TYPE_OWNER"),
3592                    expected: "interned schema name",
3593                    value: String::from(value),
3594                })
3595        })
3596        .transpose()?;
3597    let type_name = optional_nonblank_text(row, "TYPE_NAME")
3598        .or_else(|| optional_nonblank_text(row, "DATA_TYPE"))
3599        .unwrap_or_default();
3600
3601    Ok(DataTypeRef {
3602        owner,
3603        name: String::from(type_name),
3604        length: optional_u32(row, "DATA_LENGTH")?,
3605        precision: optional_u32(row, "DATA_PRECISION")?,
3606        scale: optional_i32(row, "DATA_SCALE")?,
3607        char_semantics: None,
3608    })
3609}
3610
3611fn parameter_mode_from_dictionary_value(text: Option<&str>) -> ParameterMode {
3612    match text.map(|value| value.trim().to_ascii_uppercase()) {
3613        Some(value) if value.eq("OUT") => ParameterMode::Out,
3614        Some(value) if value.eq("IN/OUT") => ParameterMode::InOut,
3615        _ => ParameterMode::In,
3616    }
3617}
3618
3619fn blank_catalog_object(common: ObjectCommon) -> Option<CatalogObject> {
3620    match common.object_type {
3621        ObjectType::Table => Some(CatalogObject::Table(TableMetadata {
3622            common,
3623            ..TableMetadata::default()
3624        })),
3625        ObjectType::View => Some(CatalogObject::View(ViewMetadata {
3626            common,
3627            ..ViewMetadata::default()
3628        })),
3629        ObjectType::MaterializedView => Some(CatalogObject::MaterializedView(MViewMetadata {
3630            common,
3631            ..MViewMetadata::default()
3632        })),
3633        ObjectType::Sequence => Some(CatalogObject::Sequence(SequenceMetadata {
3634            common,
3635            ..SequenceMetadata::default()
3636        })),
3637        ObjectType::Type => Some(CatalogObject::Type(TypeMetadata {
3638            common,
3639            ..TypeMetadata::default()
3640        })),
3641        ObjectType::Package => Some(CatalogObject::Package(PackageMetadata {
3642            common,
3643            ..PackageMetadata::default()
3644        })),
3645        ObjectType::Procedure => Some(CatalogObject::Procedure(ProcedureMetadata {
3646            common,
3647            ..ProcedureMetadata::default()
3648        })),
3649        ObjectType::Function => Some(CatalogObject::Function(FunctionMetadata {
3650            common,
3651            ..FunctionMetadata::default()
3652        })),
3653        ObjectType::Trigger => Some(CatalogObject::Trigger(TriggerMetadata {
3654            common,
3655            ..TriggerMetadata::default()
3656        })),
3657        ObjectType::SchedulerJob => Some(CatalogObject::SchedulerJob(SchedulerJobMetadata {
3658            common,
3659            ..SchedulerJobMetadata::default()
3660        })),
3661        ObjectType::EditioningView => Some(CatalogObject::EditioningView(EditioningViewMetadata {
3662            common,
3663            ..EditioningViewMetadata::default()
3664        })),
3665        ObjectType::Synonym | ObjectType::Index | ObjectType::Constraint | ObjectType::Unknown => {
3666            None
3667        }
3668    }
3669}
3670
3671fn data_type_ref_from_row(
3672    snapshot: &mut CatalogSnapshot,
3673    row: &OracleRow,
3674) -> Result<DataTypeRef, CatalogError> {
3675    let owner = row
3676        .text("DATA_TYPE_OWNER")
3677        .map(str::trim)
3678        .filter(|value| !value.is_empty())
3679        .map(|value| {
3680            snapshot
3681                .intern_schema_name(value)
3682                .ok_or(CatalogError::InvalidColumnValue {
3683                    column: String::from("DATA_TYPE_OWNER"),
3684                    expected: "interned schema name",
3685                    value: String::from(value),
3686                })
3687        })
3688        .transpose()?;
3689
3690    Ok(DataTypeRef {
3691        owner,
3692        name: String::from(row.require_text("DATA_TYPE")?),
3693        length: optional_u32(row, "DATA_LENGTH")?,
3694        precision: optional_u32(row, "DATA_PRECISION")?,
3695        scale: optional_i32(row, "DATA_SCALE")?,
3696        char_semantics: row.text("CHAR_USED").map(String::from),
3697    })
3698}
3699
3700fn optional_bool(row: &OracleRow, column: &str) -> Result<Option<bool>, CatalogError> {
3701    match row.text(column) {
3702        Some(_) => row.parse_bool(column).map(Some),
3703        None => Ok(None),
3704    }
3705}
3706
3707fn optional_nonblank_text<'a>(row: &'a OracleRow, column: &str) -> Option<&'a str> {
3708    row.text(column)
3709        .map(str::trim)
3710        .filter(|value| !value.is_empty())
3711}
3712
3713fn optional_u32(row: &OracleRow, column: &str) -> Result<Option<u32>, CatalogError> {
3714    match row.text(column) {
3715        Some(_) => {
3716            let parsed = row.parse_u64(column)?;
3717            u32::try_from(parsed)
3718                .map(Some)
3719                .map_err(|_| CatalogError::InvalidColumnValue {
3720                    column: column.to_ascii_uppercase(),
3721                    expected: "u32",
3722                    value: parsed.to_string(),
3723                })
3724        }
3725        None => Ok(None),
3726    }
3727}
3728
3729fn required_u32(row: &OracleRow, column: &str) -> Result<u32, CatalogError> {
3730    let parsed = row.parse_u64(column)?;
3731    u32::try_from(parsed).map_err(|_| CatalogError::InvalidColumnValue {
3732        column: column.to_ascii_uppercase(),
3733        expected: "u32",
3734        value: parsed.to_string(),
3735    })
3736}
3737
3738fn optional_i32(row: &OracleRow, column: &str) -> Result<Option<i32>, CatalogError> {
3739    match row.text(column) {
3740        Some(_) => {
3741            let parsed = row.parse_i64(column)?;
3742            i32::try_from(parsed)
3743                .map(Some)
3744                .map_err(|_| CatalogError::InvalidColumnValue {
3745                    column: column.to_ascii_uppercase(),
3746                    expected: "i32",
3747                    value: parsed.to_string(),
3748                })
3749        }
3750        None => Ok(None),
3751    }
3752}
3753
3754#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3755pub struct SynonymTarget {
3756    pub target_owner: Option<SchemaName>,
3757    pub target_name: ObjectName,
3758    pub target_type: Option<ObjectType>,
3759    pub db_link: Option<String>,
3760    pub public_synonym: bool,
3761}
3762
3763/// Virtual Private Database (VPD / RLS) policy entry from
3764/// `ALL_POLICIES`. Each row describes one policy attached to an object;
3765/// the policy function (PF_OWNER.PACKAGE.FUNCTION) is the predicate
3766/// generator that Oracle invokes at parse time to inject a WHERE clause
3767/// into reads (and optional ones into INSERT/UPDATE/DELETE).
3768///
3769/// Lineage flags VPD-protected objects with
3770/// `UnknownReason::DbLinkRemoteObject` reused-as-marker pending a
3771/// dedicated `UnknownReason::VpdPolicyApplied` variant.
3772#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3773pub struct VpdPolicy {
3774    /// Owning schema (the object's owner).
3775    pub object_owner: SchemaName,
3776    /// Protected object.
3777    pub object_name: ObjectName,
3778    /// Optional policy-group name (Oracle policy-groups; usually NULL).
3779    pub policy_group: Option<String>,
3780    /// Policy name within the group.
3781    pub policy_name: String,
3782    /// Owner of the policy function.
3783    pub function_owner: SchemaName,
3784    /// Package containing the policy function (NULL for standalone).
3785    pub function_package: Option<String>,
3786    /// Function name that produces the WHERE-clause predicate.
3787    pub function_name: String,
3788    /// Statement-type bits — true means the policy applies to that DML.
3789    pub on_select: bool,
3790    pub on_insert: bool,
3791    pub on_update: bool,
3792    pub on_delete: bool,
3793    /// Whether the policy is currently enabled.
3794    pub enabled: bool,
3795}
3796
3797/// Edition entry from `ALL_EDITIONS` — the per-database edition tree
3798/// used by Oracle Edition-Based Redefinition (EBR). Linked into
3799/// [`CatalogSnapshot::editions`].
3800#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3801pub struct Edition {
3802    /// Edition name (Oracle identifiers are case-preserving but case-
3803    /// insensitive when unquoted; stored as the dictionary value).
3804    pub edition_name: String,
3805    /// Parent edition, if any. `None` for the root edition (typically
3806    /// `ORA$BASE`).
3807    pub parent_edition_name: Option<String>,
3808    /// Whether the edition is currently usable (`USABLE = 'Y'`).
3809    pub usable: bool,
3810}
3811
3812/// Editioning view from `ALL_EDITIONING_VIEWS` — a view that masks an
3813/// editioned table during EBR cutovers. Linked into
3814/// [`SchemaCatalog::editioning_views`].
3815#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3816pub struct EditioningView {
3817    /// Owning schema.
3818    pub owner: SchemaName,
3819    /// Editioning view name.
3820    pub view_name: ObjectName,
3821    /// Base table the editioning view masks.
3822    pub table_name: ObjectName,
3823}
3824
3825/// Documentation comment attached to a table, view, or materialized
3826/// view via `COMMENT ON TABLE owner.name IS '...'`. Sourced from
3827/// `ALL_TAB_COMMENTS`.
3828///
3829/// `plsql-docgen` consumes these to render description text alongside
3830/// object docs; dependency analysis does not interact with them.
3831#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3832pub struct TableComment {
3833    /// Owning schema.
3834    pub owner: SchemaName,
3835    /// Object name.
3836    pub table_name: ObjectName,
3837    /// `TABLE` / `VIEW` / `MATERIALIZED VIEW` — preserved verbatim
3838    /// from `ALL_TAB_COMMENTS.TABLE_TYPE` so docgen can pick a
3839    /// per-kind template.
3840    pub table_type: String,
3841    /// Comment text. Always present (we filter out NULL rows server-side
3842    /// to keep the snapshot compact).
3843    pub comments: String,
3844}
3845
3846/// Documentation comment attached to a column via
3847/// `COMMENT ON COLUMN owner.table.column IS '...'`. Sourced from
3848/// `ALL_COL_COMMENTS`.
3849#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3850pub struct ColumnComment {
3851    /// Owning schema.
3852    pub owner: SchemaName,
3853    /// Object name.
3854    pub table_name: ObjectName,
3855    /// Column name.
3856    pub column_name: ColumnName,
3857    /// Comment text.
3858    pub comments: String,
3859}
3860
3861/// Database link metadata sourced from `ALL_DB_LINKS`.
3862///
3863/// PL/SQL code that references `remote_object@my_link` resolves through
3864/// one of these entries. Public links have `owner = PUBLIC`. Lineage uses
3865/// the `host` field to classify the remote endpoint (a TNS alias, a full
3866/// EZCONNECT string, or the legacy `(DESCRIPTION=...)` form).
3867///
3868/// The shape intentionally avoids `username` — `ALL_DB_LINKS.USERNAME`
3869/// is the *connect user*, not a privilege grant, and most consumers
3870/// don't need it. If a future product surface needs the connect user it
3871/// can be added behind `#[serde(default)]` without breaking older
3872/// snapshots.
3873#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3874pub struct DatabaseLink {
3875    /// Owning schema. `PUBLIC` for public links.
3876    pub owner: SchemaName,
3877    /// Link name, e.g. `REPORTING.WORLD`.
3878    pub name: String,
3879    /// Connect-target host string. Can be a TNS alias, an EZCONNECT
3880    /// string, or a full `(DESCRIPTION=...)` block.
3881    pub host: Option<String>,
3882    /// `true` when `owner` is `PUBLIC`. Surfaced for fast filtering in
3883    /// downstream lineage without re-resolving the schema interner.
3884    pub public_link: bool,
3885}
3886
3887#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3888pub enum GrantPrivilege {
3889    Select,
3890    Insert,
3891    Update,
3892    Delete,
3893    Execute,
3894    Alter,
3895    Index,
3896    References,
3897    Debug,
3898    #[default]
3899    Other,
3900}
3901
3902#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3903pub enum Grantee {
3904    User(UserName),
3905    Role(RoleName),
3906    #[default]
3907    Public,
3908}
3909
3910#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3911pub struct Grant {
3912    pub object_owner: SchemaName,
3913    pub object_name: ObjectName,
3914    pub privilege: GrantPrivilege,
3915    pub grantee: Grantee,
3916    pub grantable: bool,
3917    pub via_role: Option<RoleName>,
3918    pub with_hierarchy: bool,
3919}
3920
3921#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3922pub struct IndexMetadata {
3923    pub name: IndexName,
3924    pub table_owner: SchemaName,
3925    pub table_name: ObjectName,
3926    pub unique: bool,
3927    pub columns: Vec<ColumnName>,
3928    pub index_type: String,
3929    pub status: ObjectStatus,
3930}
3931
3932#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3933pub enum ConstraintType {
3934    PrimaryKey,
3935    ForeignKey,
3936    Unique,
3937    Check,
3938    NotNull,
3939    Ref,
3940    #[default]
3941    Other,
3942}
3943
3944#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3945pub struct ConstraintMetadata {
3946    pub name: ConstraintName,
3947    pub table_owner: SchemaName,
3948    pub table_name: ObjectName,
3949    pub constraint_type: ConstraintType,
3950    pub columns: Vec<ColumnName>,
3951    pub referenced_table_owner: Option<SchemaName>,
3952    pub referenced_table_name: Option<ObjectName>,
3953    pub referenced_columns: Vec<ColumnName>,
3954    pub search_condition: Option<String>,
3955    pub deferrable: Option<bool>,
3956    pub initially_deferred: Option<bool>,
3957}
3958
3959#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3960pub enum CatalogDependencyKind {
3961    Hard,
3962    Reference,
3963    Extended,
3964    #[default]
3965    Other,
3966}
3967
3968#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3969pub struct CatalogDependency {
3970    pub owner: SchemaName,
3971    pub name: ObjectName,
3972    pub object_type: ObjectType,
3973    pub referenced_owner: Option<SchemaName>,
3974    pub referenced_name: ObjectName,
3975    pub referenced_type: Option<ObjectType>,
3976    pub dependency_kind: CatalogDependencyKind,
3977    pub via_db_link: Option<String>,
3978}
3979
3980#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3981pub enum PlScopeAvailability {
3982    #[default]
3983    NotAvailable,
3984    AvailableButStale,
3985    IdentifiersOnly,
3986    IdentifiersAndStatements,
3987}
3988
3989#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
3990pub struct CompilerIdentifier {
3991    pub owner: SchemaName,
3992    pub object_name: ObjectName,
3993    pub identifier_name: MemberName,
3994    pub identifier_type: String,
3995    pub usage: String,
3996    pub line: u32,
3997    pub column: u32,
3998}
3999
4000#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4001pub struct CompilerReference {
4002    pub owner: SchemaName,
4003    pub object_name: ObjectName,
4004    pub usage_line: u32,
4005    pub usage_column: u32,
4006    pub target_owner: Option<SchemaName>,
4007    pub target_object_name: Option<ObjectName>,
4008    pub target_identifier_name: Option<MemberName>,
4009}
4010
4011#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4012pub struct CompilerStatementUsage {
4013    pub owner: SchemaName,
4014    pub object_name: ObjectName,
4015    pub statement_kind: String,
4016    pub line: u32,
4017    pub column: u32,
4018}
4019
4020#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4021pub struct PlScopeSnapshot {
4022    pub availability: PlScopeAvailability,
4023    pub identifiers: Vec<CompilerIdentifier>,
4024    pub references: Vec<CompilerReference>,
4025    pub statements: Vec<CompilerStatementUsage>,
4026    pub collected_at: Option<DateTime<Utc>>,
4027    pub source_hash: Option<Hash>,
4028    pub warnings: Vec<CapabilityWarning>,
4029}
4030
4031#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4032pub struct DataTypeRef {
4033    pub owner: Option<SchemaName>,
4034    pub name: String,
4035    pub length: Option<u32>,
4036    pub precision: Option<u32>,
4037    pub scale: Option<i32>,
4038    pub char_semantics: Option<String>,
4039}
4040
4041#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4042pub struct ColumnMetadata {
4043    pub name: ColumnName,
4044    pub position: u32,
4045    pub data_type: DataTypeRef,
4046    pub nullable: bool,
4047    pub default_expression: Option<String>,
4048    pub generated_expression: Option<String>,
4049    pub hidden: bool,
4050}
4051
4052#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4053pub enum TemporaryTableDuration {
4054    #[default]
4055    Transaction,
4056    Session,
4057}
4058
4059#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4060pub struct TableMetadata {
4061    pub common: ObjectCommon,
4062    pub columns: HashMap<ColumnName, ColumnMetadata>,
4063    pub temporary: bool,
4064    pub temporary_duration: Option<TemporaryTableDuration>,
4065    pub index_organized: bool,
4066}
4067
4068#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4069pub struct ViewMetadata {
4070    pub common: ObjectCommon,
4071    pub columns: HashMap<ColumnName, ColumnMetadata>,
4072    pub query_hash: Option<Hash>,
4073    pub read_only: Option<bool>,
4074    pub check_option: Option<String>,
4075}
4076
4077#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4078pub struct MViewMetadata {
4079    pub common: ObjectCommon,
4080    pub columns: HashMap<ColumnName, ColumnMetadata>,
4081    pub refresh_mode: Option<String>,
4082    pub refresh_method: Option<String>,
4083    pub query_hash: Option<Hash>,
4084}
4085
4086#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4087pub struct SequenceMetadata {
4088    pub common: ObjectCommon,
4089    pub increment_by: i64,
4090    pub min_value: Option<i64>,
4091    pub max_value: Option<i64>,
4092    pub cycle: bool,
4093    pub ordered: bool,
4094    pub cache_size: Option<u64>,
4095}
4096
4097#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4098pub enum ParameterMode {
4099    #[default]
4100    In,
4101    Out,
4102    InOut,
4103}
4104
4105#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4106pub struct ArgumentMetadata {
4107    pub position: u32,
4108    pub name: Option<MemberName>,
4109    pub mode: ParameterMode,
4110    pub data_type: DataTypeRef,
4111    pub defaulted: bool,
4112}
4113
4114#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4115pub struct AccessibleByTarget {
4116    pub owner: Option<SchemaName>,
4117    pub object_name: ObjectName,
4118}
4119
4120#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4121pub struct RoutineSignature {
4122    pub routine_name: ObjectName,
4123    pub overload: Option<u32>,
4124    pub arguments: Vec<ArgumentMetadata>,
4125    pub return_type: Option<DataTypeRef>,
4126    pub authid_current_user: Option<bool>,
4127    pub accessible_by: Vec<AccessibleByTarget>,
4128}
4129
4130#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4131pub enum TypeFinality {
4132    Final,
4133    NotFinal,
4134    #[default]
4135    Unknown,
4136}
4137
4138#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4139pub enum TypeInstantiable {
4140    Instantiable,
4141    NotInstantiable,
4142    #[default]
4143    Unknown,
4144}
4145
4146#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4147pub struct TypeAttribute {
4148    pub name: MemberName,
4149    pub position: u32,
4150    pub data_type: DataTypeRef,
4151}
4152
4153#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4154pub struct TypeMetadata {
4155    pub common: ObjectCommon,
4156    pub attributes: Vec<TypeAttribute>,
4157    pub methods: Vec<RoutineSignature>,
4158    pub supertype_owner: Option<SchemaName>,
4159    pub supertype_name: Option<ObjectName>,
4160    pub finality: TypeFinality,
4161    pub instantiable: TypeInstantiable,
4162}
4163
4164#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4165pub struct PackageMetadata {
4166    pub common: ObjectCommon,
4167    pub procedures: Vec<RoutineSignature>,
4168    pub functions: Vec<RoutineSignature>,
4169    pub package_stateful: Option<bool>,
4170    pub authid_current_user: Option<bool>,
4171    pub accessible_by: Vec<AccessibleByTarget>,
4172}
4173
4174#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4175pub struct ProcedureMetadata {
4176    pub common: ObjectCommon,
4177    pub signature: RoutineSignature,
4178}
4179
4180#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4181pub struct FunctionMetadata {
4182    pub common: ObjectCommon,
4183    pub signature: RoutineSignature,
4184    pub deterministic: bool,
4185    pub pipelined: bool,
4186}
4187
4188#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4189pub enum TriggerTiming {
4190    Before,
4191    After,
4192    InsteadOf,
4193    #[default]
4194    Unknown,
4195}
4196
4197#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4198pub enum TriggerLevel {
4199    Statement,
4200    Row,
4201    #[default]
4202    Unknown,
4203}
4204
4205#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4206pub enum TriggerEvent {
4207    Insert,
4208    Update,
4209    Delete,
4210    Logon,
4211    Logoff,
4212    Ddl,
4213    #[default]
4214    Other,
4215}
4216
4217#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4218pub struct TriggerMetadata {
4219    pub common: ObjectCommon,
4220    pub target_owner: SchemaName,
4221    pub target_name: ObjectName,
4222    pub timing: TriggerTiming,
4223    pub level: TriggerLevel,
4224    pub events: Vec<TriggerEvent>,
4225    pub when_clause: Option<String>,
4226    pub body_hash: Option<Hash>,
4227}
4228
4229#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4230pub struct SchedulerJobMetadata {
4231    pub common: ObjectCommon,
4232    pub enabled: bool,
4233    pub job_type: String,
4234    pub program_name: Option<ObjectName>,
4235    pub schedule_name: Option<ObjectName>,
4236    pub job_action: Option<String>,
4237}
4238
4239#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
4240pub struct EditioningViewMetadata {
4241    pub common: ObjectCommon,
4242    pub base_table_owner: SchemaName,
4243    pub base_table_name: ObjectName,
4244    pub columns: HashMap<ColumnName, ColumnMetadata>,
4245}
4246
4247#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
4248pub enum CatalogObject {
4249    Table(TableMetadata),
4250    View(ViewMetadata),
4251    MaterializedView(MViewMetadata),
4252    Sequence(SequenceMetadata),
4253    Type(TypeMetadata),
4254    Package(PackageMetadata),
4255    Procedure(ProcedureMetadata),
4256    Function(FunctionMetadata),
4257    Trigger(TriggerMetadata),
4258    SchedulerJob(SchedulerJobMetadata),
4259    EditioningView(EditioningViewMetadata),
4260}
4261
4262#[cfg(test)]
4263mod builder_tests {
4264    use chrono::{DateTime, Utc};
4265    use plsql_core::AnalysisProfile;
4266
4267    use crate::{
4268        CatalogCapabilities, CatalogRowSet, CatalogSnapshotBuilder, CatalogSource,
4269        CatalogSourceKind, CatalogSourceKind::LiveConnection, GrantPrivilege, Grantee, ObjectType,
4270        OracleRow, PlScopeAvailability,
4271    };
4272
4273    fn oracle_row(columns: &[(&str, &str, Option<&str>)]) -> OracleRow {
4274        let mut row = OracleRow::default();
4275        for (name, oracle_type, value) in columns {
4276            row.insert(*name, *oracle_type, value.map(String::from));
4277        }
4278        row
4279    }
4280
4281    fn fixed_generated_at() -> DateTime<Utc> {
4282        DateTime::parse_from_rfc3339("2026-06-29T00:00:00Z")
4283            .expect("fixed timestamp")
4284            .with_timezone(&Utc)
4285    }
4286
4287    fn builder() -> CatalogSnapshotBuilder {
4288        CatalogSnapshotBuilder::new(
4289            AnalysisProfile::default(),
4290            CatalogCapabilities {
4291                can_query_all_views: true,
4292                can_query_dba_views: true,
4293                can_use_dbms_metadata: true,
4294                can_read_source: true,
4295                plscope_enabled: true,
4296                can_query_scheduler: true,
4297                can_query_roles_and_grants: true,
4298                ..CatalogCapabilities::default()
4299            },
4300            CatalogSource {
4301                kind: LiveConnection,
4302                description: Some(String::from("synthetic external extractor")),
4303                ..CatalogSource::default()
4304            },
4305            fixed_generated_at(),
4306        )
4307    }
4308
4309    fn apply_synthetic_builder_rows(
4310        builder: &mut CatalogSnapshotBuilder,
4311    ) -> Result<(), crate::CatalogError> {
4312        builder.apply_row(
4313            CatalogRowSet::Objects,
4314            &oracle_row(&[
4315                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4316                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES")),
4317                ("OBJECT_TYPE", "VARCHAR2(30)", Some("TABLE")),
4318                ("STATUS", "VARCHAR2(7)", Some("VALID")),
4319            ]),
4320        )?;
4321        builder.apply_row(
4322            CatalogRowSet::Columns,
4323            &oracle_row(&[
4324                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4325                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4326                ("COLUMN_NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4327                ("COLUMN_POSITION", "NUMBER", Some("1")),
4328                ("DATA_TYPE", "VARCHAR2(30)", Some("NUMBER")),
4329                ("DATA_PRECISION", "NUMBER", Some("10")),
4330                ("DATA_SCALE", "NUMBER", Some("0")),
4331                ("NULLABLE", "VARCHAR2(1)", Some("N")),
4332            ]),
4333        )?;
4334        builder.apply_row(
4335            CatalogRowSet::Users,
4336            &oracle_row(&[("USERNAME", "VARCHAR2(128)", Some("APP_USER"))]),
4337        )?;
4338        builder.apply_row(
4339            CatalogRowSet::Grants,
4340            &oracle_row(&[
4341                ("TABLE_SCHEMA", "VARCHAR2(128)", Some("BILLING")),
4342                ("TABLE_NAME", "VARCHAR2(128)", Some("INVOICES")),
4343                ("GRANTEE", "VARCHAR2(128)", Some("APP_USER")),
4344                ("PRIVILEGE", "VARCHAR2(40)", Some("SELECT")),
4345                ("GRANTABLE", "VARCHAR2(3)", Some("NO")),
4346                ("HIERARCHY", "VARCHAR2(3)", Some("NO")),
4347            ]),
4348        )?;
4349        builder.apply_row(
4350            CatalogRowSet::PlScopeAvailability,
4351            &oracle_row(&[
4352                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4353                (
4354                    "PLSCOPE_SETTINGS",
4355                    "VARCHAR2(4000)",
4356                    Some("IDENTIFIERS:ALL, STATEMENTS:ALL"),
4357                ),
4358            ]),
4359        )?;
4360        builder.apply_row(
4361            CatalogRowSet::PlScopeIdentifiers,
4362            &oracle_row(&[
4363                ("OWNER", "VARCHAR2(128)", Some("BILLING")),
4364                ("OBJECT_NAME", "VARCHAR2(128)", Some("INVOICES_PKG")),
4365                ("NAME", "VARCHAR2(128)", Some("INVOICE_ID")),
4366                ("TYPE", "VARCHAR2(128)", Some("VARIABLE")),
4367                ("USAGE", "VARCHAR2(128)", Some("DECLARATION")),
4368                ("LINE", "NUMBER", Some("7")),
4369                ("COL", "NUMBER", Some("12")),
4370            ]),
4371        )?;
4372        Ok(())
4373    }
4374
4375    #[test]
4376    fn catalog_snapshot_builder_applies_synthetic_dictionary_rows_on_stable()
4377    -> Result<(), crate::CatalogError> {
4378        let mut builder = builder();
4379        apply_synthetic_builder_rows(&mut builder)?;
4380
4381        let snapshot = builder.finish()?;
4382        let report = snapshot.doctor_report();
4383        assert_eq!(report.source_kind, CatalogSourceKind::LiveConnection);
4384        assert_eq!(report.totals.schemas_observed, 1);
4385        assert_eq!(report.totals.objects_total, 1);
4386        assert_eq!(report.totals.columns_total, 1);
4387        assert_eq!(report.totals.grants_total, 1);
4388        assert_eq!(
4389            report.object_counts.first().map(|count| count.object_type),
4390            Some(ObjectType::Table)
4391        );
4392        assert_eq!(
4393            report
4394                .plscope_availability_per_schema
4395                .first()
4396                .map(|entry| entry.availability),
4397            Some(PlScopeAvailability::IdentifiersAndStatements)
4398        );
4399        assert!(snapshot.schemas.values().any(|schema| {
4400            schema.grants.iter().any(|grant| {
4401                grant.privilege == GrantPrivilege::Select
4402                    && matches!(grant.grantee, Grantee::User(_))
4403            })
4404        }));
4405        assert_eq!(
4406            snapshot
4407                .schemas
4408                .values()
4409                .filter_map(|schema| schema.plscope.as_ref())
4410                .map(|plscope| plscope.identifiers.len())
4411                .sum::<usize>(),
4412            1
4413        );
4414        Ok(())
4415    }
4416
4417    #[test]
4418    fn catalog_snapshot_builder_doctor_report_matches_golden() -> Result<(), crate::CatalogError> {
4419        let mut builder = builder();
4420        apply_synthetic_builder_rows(&mut builder)?;
4421
4422        let actual = serde_json::to_value(builder.finish()?.doctor_report())?;
4423        let expected: serde_json::Value = serde_json::from_str(include_str!(
4424            "../tests/golden/catalog_snapshot_builder_doctor_report.json"
4425        ))?;
4426
4427        assert_eq!(actual, expected);
4428        Ok(())
4429    }
4430
4431    #[test]
4432    fn catalog_snapshot_builder_can_mark_user_universe_known_empty()
4433    -> Result<(), crate::CatalogError> {
4434        let mut builder = builder();
4435        let rows: Vec<OracleRow> = Vec::new();
4436        builder.apply_rows(CatalogRowSet::Users, &rows)?;
4437        let snapshot = builder.finish()?;
4438        assert_eq!(snapshot.known_users, Some(Default::default()));
4439        Ok(())
4440    }
4441}