Skip to main content

pgroles_core/
suggest.rs

1//! Profile suggestion: deterministically refactor a flat manifest into a
2//! manifest that uses reusable [`Profile`]s.
3//!
4//! ## Algorithm
5//!
6//! 1. Bucket grants and default privileges by role.
7//! 2. Classify each role:
8//!    - Eligible if it touches exactly one *declared* schema, has no role
9//!      attributes that profiles can't express (only `login` / `inherit` are
10//!      promotable), and every default privilege uses that schema's owner.
11//! 3. Compute a *schema-relative signature* for each eligible role — the
12//!    grants and default privileges with the schema replaced by a placeholder.
13//! 4. Cluster eligible roles by `(signature, login, inherit)`.
14//! 5. For each cluster of `>= min_schemas` schemas, pick a uniform role-name
15//!    pattern (`{schema}-{profile}` etc.) such that every member maps the
16//!    same way. Profile name is the shared portion.
17//! 6. Resolve per-schema role-pattern conflicts by giving the first cluster
18//!    (in deterministic iteration order) priority and dropping the rest.
19//! 7. Build a candidate manifest with the extracted profiles.
20//! 8. **Round-trip safety check**: re-expand the new manifest into a
21//!    [`RoleGraph`] and diff it against the original. The only acceptable
22//!    differences are auto-generated role comments (profile expansion
23//!    annotates each generated role). Any other delta means we'd silently
24//!    change semantics — fall back to the original manifest.
25//!
26//! The result is **deterministic**: the same input always produces the same
27//! output. No LLM, no heuristics that depend on iteration order of unstable
28//! collections.
29
30use std::collections::{BTreeMap, BTreeSet};
31
32use crate::diff::{Change, diff};
33use crate::manifest::{
34    DefaultPrivilege, DefaultPrivilegeGrant, Grant, ObjectTarget, ObjectType, PolicyManifest,
35    Privilege, Profile, ProfileGrant, ProfileObjectTarget, RoleDefinition, SchemaBinding,
36    expand_manifest,
37};
38use crate::model::RoleGraph;
39
40/// Knobs for the suggester. The defaults are conservative.
41#[derive(Debug, Clone)]
42pub struct SuggestOptions {
43    /// Minimum number of distinct schemas a candidate cluster must span before
44    /// it becomes a profile. Below this, the original flat roles are kept.
45    /// Default `2` — a profile with one schema is just an indirection.
46    pub min_schemas: usize,
47
48    /// Complete object inventory `(schema, object_type) → set of names`,
49    /// **as observed in the live database** (i.e. from
50    /// [`pgroles_inspect::fetch_object_inventory`]). When provided, the
51    /// suggester collapses per-name grants into wildcards (`name: "*"`) for
52    /// `(schema, object_type)` buckets where a role covers every object,
53    /// which is what makes profile clustering across schemas useful for
54    /// `pgroles generate` output (Postgres expands `GRANT … ON ALL TABLES`
55    /// into per-relation rows).
56    ///
57    /// **Why required**: a grant-derived inventory would treat ungranted
58    /// objects as nonexistent. A role granted on every *currently-granted*
59    /// table would collapse to `name: "*"`, and applying the suggested
60    /// manifest would silently grant on previously-ungranted tables —
61    /// broadening privileges beyond the original manifest's intent. With a
62    /// real introspected inventory we know what *exists* vs what's
63    /// *granted*, so the collapse is sound.
64    ///
65    /// `None` (default) disables wildcard collapse entirely. Roles only
66    /// cluster when their grants reference identical literal names.
67    pub full_inventory: Option<Inventory>,
68}
69
70impl Default for SuggestOptions {
71    fn default() -> Self {
72        Self {
73            min_schemas: 2,
74            full_inventory: None,
75        }
76    }
77}
78
79/// A single profile that the suggester extracted.
80#[derive(Debug, Clone)]
81pub struct SuggestedProfile {
82    pub profile_name: String,
83    pub role_pattern: String,
84    /// Schema → original role name now produced by `profile × schema` expansion.
85    pub schema_to_role: BTreeMap<String, String>,
86}
87
88/// Why a particular role wasn't promoted into a profile.
89#[derive(Debug, Clone)]
90pub enum SkipReason {
91    /// Role's grants/default-privs touch more than one schema.
92    MultiSchema { role: String, schemas: Vec<String> },
93    /// Role references a schema that isn't declared in `schemas:`.
94    SchemaNotDeclared { role: String, schema: String },
95    /// Role has a default privilege whose owner doesn't match the schema's owner.
96    OwnerMismatch { role: String, schema: String },
97    /// Role has role attributes (superuser, connection_limit, ...) that profiles
98    /// can't express.
99    UniqueAttributes { role: String },
100    /// Role has grants on cluster-wide objects (database, etc.) that profiles
101    /// can't express.
102    UnrepresentableGrant { role: String },
103    /// Cluster spans fewer than `min_schemas` schemas.
104    SoleSchema { role: String, schema: String },
105    /// Couldn't find a role-pattern that all cluster members agree on.
106    NoUniformPattern { roles: Vec<String> },
107    /// Two clusters wanted to bind to the same schema with different patterns.
108    SchemaPatternConflict {
109        schema: String,
110        winning_pattern: String,
111        dropped_roles: Vec<String>,
112    },
113    /// The candidate manifest didn't round-trip cleanly; we abandoned it.
114    RoundTripFailure { reason: String },
115    /// The provided `full_inventory` was missing object names that already
116    /// appear in the input's flat grants — a sure sign the inventory wasn't
117    /// sourced from a complete introspection. Wildcard collapse was
118    /// disabled for safety.
119    IncompleteFullInventory { reason: String },
120}
121
122/// What [`suggest_profiles`] returns: the new manifest, the profiles it built,
123/// and the reasons each non-promoted role stayed flat.
124#[derive(Debug, Clone)]
125pub struct SuggestReport {
126    pub manifest: PolicyManifest,
127    pub profiles: Vec<SuggestedProfile>,
128    pub skipped: Vec<SkipReason>,
129    /// `true` if the new manifest round-trips identically (modulo
130    /// auto-generated role comments) to the input.
131    pub round_trip_ok: bool,
132}
133
134/// Run the suggester. Always returns; if anything goes wrong the original
135/// manifest is returned unchanged with `round_trip_ok = false`.
136pub fn suggest_profiles(input: &PolicyManifest, opts: &SuggestOptions) -> SuggestReport {
137    // If the input already has profiles, the user has already curated this
138    // manifest. Don't overwrite their work.
139    if !input.profiles.is_empty() {
140        return SuggestReport {
141            manifest: input.clone(),
142            profiles: vec![],
143            skipped: vec![],
144            round_trip_ok: true,
145        };
146    }
147
148    let mut skipped: Vec<SkipReason> = Vec::new();
149
150    // --- Inventory ---------------------------------------------------------
151    //
152    // Two distinct inventories are involved:
153    //   * `full_inventory` (caller-provided, from live DB introspection):
154    //     authoritative list of every object that *exists*. Required to
155    //     safely collapse per-name grants into a wildcard, because we need
156    //     to know whether a role covers *every* object — not just every
157    //     object that happens to appear in some grant.
158    //   * `grant_inventory` (always built from the input's grants): the
159    //     domain over which wildcard grants in the candidate manifest must
160    //     be expanded for the round-trip diff. This is what guarantees the
161    //     candidate's wildcard expression matches the original's per-name
162    //     entries, regardless of whether collapse ran.
163    let grant_inventory = build_inventory(input);
164    // Defense-in-depth: if a caller hands us a `full_inventory` that's
165    // demonstrably incomplete (missing object names that already appear in
166    // the input's per-name grants), we can't trust it for collapse. Disable
167    // collapse and surface the issue. This catches accidental misuse like
168    // passing `inventory_from_manifest_grants(manifest)` as `full_inventory`.
169    let collapse_inventory: Option<&Inventory> = match opts.full_inventory.as_ref() {
170        None => None,
171        Some(full) => match validate_full_inventory(&grant_inventory, full) {
172            Ok(()) => Some(full),
173            Err(reason) => {
174                skipped.push(SkipReason::IncompleteFullInventory { reason });
175                None
176            }
177        },
178    };
179
180    // --- Bucket grants and default privileges by grantee role ---------------
181
182    let mut role_grants: BTreeMap<String, Vec<Grant>> = BTreeMap::new();
183    for grant in &input.grants {
184        role_grants
185            .entry(grant.role.clone())
186            .or_default()
187            .push(grant.clone());
188    }
189
190    // Collapse per-role per-name grants that fully cover their (schema,
191    // object_type) bucket — only when a real introspected inventory is
192    // available. Without it, "full coverage" can't be soundly determined.
193    if let Some(inv) = collapse_inventory {
194        for grants in role_grants.values_mut() {
195            collapse_full_coverage_grants(grants, inv);
196        }
197    }
198
199    // Each (role, schema) → (owner, Vec<DefaultPrivilegeGrant>)
200    // We keep the owner so we can compare it against the schema owner later.
201    let mut role_dps: BTreeMap<String, Vec<(String, String, DefaultPrivilegeGrant)>> =
202        BTreeMap::new();
203    for dp in &input.default_privileges {
204        let owner = dp
205            .owner
206            .clone()
207            .or_else(|| input.default_owner.clone())
208            .unwrap_or_default();
209        for grant in &dp.grant {
210            if let Some(role) = &grant.role {
211                role_dps.entry(role.clone()).or_default().push((
212                    owner.clone(),
213                    dp.schema.clone(),
214                    grant.clone(),
215                ));
216            }
217        }
218    }
219
220    // --- Index schemas ------------------------------------------------------
221
222    let schema_owner: BTreeMap<String, Option<String>> = input
223        .schemas
224        .iter()
225        .map(|s| {
226            (
227                s.name.clone(),
228                s.owner.clone().or_else(|| input.default_owner.clone()),
229            )
230        })
231        .collect();
232
233    // --- Classify each role -------------------------------------------------
234
235    /// Eligibility outcome for a single role.
236    struct Eligible {
237        role_name: String,
238        schema: String,
239        signature: RoleSignature,
240        login: Option<bool>,
241        inherit: Option<bool>,
242    }
243
244    let mut eligible: Vec<Eligible> = Vec::new();
245    let mut clustered_role_names: BTreeSet<String> = BTreeSet::new();
246
247    for role_def in &input.roles {
248        let role_name = &role_def.name;
249
250        // Profiles can only express `login` and `inherit`. Any other
251        // explicitly-set attribute disqualifies the role.
252        //
253        // Comments are treated as user-set documentation *unless* they match
254        // pgroles' own auto-generated annotation pattern (which `pgroles
255        // apply` writes when expanding a profile). Ignoring auto-comments
256        // makes `--suggest-profiles` idempotent across runs.
257        let has_user_comment = role_def
258            .comment
259            .as_deref()
260            .is_some_and(|c| !is_auto_profile_comment(c));
261        if role_def.superuser.is_some()
262            || role_def.createdb.is_some()
263            || role_def.createrole.is_some()
264            || role_def.replication.is_some()
265            || role_def.bypassrls.is_some()
266            || role_def.connection_limit.is_some()
267            || role_def.password.is_some()
268            || role_def.password_valid_until.is_some()
269            || has_user_comment
270        {
271            skipped.push(SkipReason::UniqueAttributes {
272                role: role_name.clone(),
273            });
274            continue;
275        }
276
277        // What schemas does this role touch (via grants and DPs)?
278        // Roles with grants that profiles can't express (e.g. database-level
279        // CONNECT) are excluded outright — even if the rest of their grants
280        // would cluster, we'd silently drop the unrepresentable ones.
281        let mut schemas_seen: BTreeSet<String> = BTreeSet::new();
282        let mut has_unrepresentable_grant = false;
283        let role_grants_vec = role_grants.get(role_name).cloned().unwrap_or_default();
284        for g in &role_grants_vec {
285            match g.object.object_type {
286                ObjectType::Schema => match &g.object.name {
287                    Some(name) => {
288                        schemas_seen.insert(name.clone());
289                    }
290                    None => has_unrepresentable_grant = true,
291                },
292                ObjectType::Database => has_unrepresentable_grant = true,
293                _ => match &g.object.schema {
294                    Some(s) => {
295                        schemas_seen.insert(s.clone());
296                    }
297                    None => has_unrepresentable_grant = true,
298                },
299            }
300        }
301        if has_unrepresentable_grant {
302            skipped.push(SkipReason::UnrepresentableGrant {
303                role: role_name.clone(),
304            });
305            continue;
306        }
307        let role_dp_vec = role_dps.get(role_name).cloned().unwrap_or_default();
308        for (_, schema, _) in &role_dp_vec {
309            schemas_seen.insert(schema.clone());
310        }
311
312        // No grants, no default privileges → can't promote, keep flat.
313        if schemas_seen.is_empty() {
314            continue;
315        }
316
317        if schemas_seen.len() > 1 {
318            skipped.push(SkipReason::MultiSchema {
319                role: role_name.clone(),
320                schemas: schemas_seen.into_iter().collect(),
321            });
322            continue;
323        }
324
325        let schema = schemas_seen.into_iter().next().unwrap();
326
327        // The schema must be declared in the manifest (otherwise we can't bind
328        // a profile to it).
329        let Some(owner_for_schema) = schema_owner.get(&schema) else {
330            skipped.push(SkipReason::SchemaNotDeclared {
331                role: role_name.clone(),
332                schema,
333            });
334            continue;
335        };
336
337        // Every default privilege owned-by must equal the schema's owner.
338        let mut owner_mismatch = false;
339        for (owner, _, _) in &role_dp_vec {
340            if Some(owner.as_str()) != owner_for_schema.as_deref() {
341                owner_mismatch = true;
342                break;
343            }
344        }
345        if owner_mismatch {
346            skipped.push(SkipReason::OwnerMismatch {
347                role: role_name.clone(),
348                schema,
349            });
350            continue;
351        }
352
353        let signature = compute_signature(&role_grants_vec, &role_dp_vec, &schema);
354
355        eligible.push(Eligible {
356            role_name: role_name.clone(),
357            schema,
358            signature,
359            login: role_def.login,
360            inherit: role_def.inherit,
361        });
362    }
363
364    // --- Cluster ------------------------------------------------------------
365
366    // Key = (signature, login, inherit). Value = Vec<member>.
367    type ClusterKey = (RoleSignature, Option<bool>, Option<bool>);
368    let mut clusters: BTreeMap<ClusterKey, Vec<&Eligible>> = BTreeMap::new();
369    for el in &eligible {
370        clusters
371            .entry((el.signature.clone(), el.login, el.inherit))
372            .or_default()
373            .push(el);
374    }
375
376    // --- Pattern resolution -------------------------------------------------
377
378    // Iterate clusters in size-descending order so that bigger clusters claim
379    // schema patterns first. Tie-break by signature for determinism.
380    let mut cluster_entries: Vec<_> = clusters.into_iter().collect();
381    cluster_entries.sort_by(|a, b| b.1.len().cmp(&a.1.len()).then_with(|| a.0.cmp(&b.0)));
382
383    let pattern_priority = [
384        "{schema}-{profile}",
385        "{schema}_{profile}",
386        "{profile}-{schema}",
387        "{profile}_{schema}",
388    ];
389
390    // schema → committed pattern. Once a cluster lands, the pattern is sticky.
391    let mut schema_pattern: BTreeMap<String, String> = BTreeMap::new();
392    // schema → list of profile names already attached.
393    let mut schema_profiles: BTreeMap<String, Vec<String>> = BTreeMap::new();
394    // profile name → built Profile object.
395    let mut profiles_out: BTreeMap<String, Profile> = BTreeMap::new();
396    // profile name → sources (schema, original role name) for the report.
397    let mut suggested: Vec<SuggestedProfile> = Vec::new();
398    // Profile names already taken (avoid collisions).
399    let mut taken_profile_names: BTreeSet<String> = BTreeSet::new();
400
401    for ((_signature, login, inherit), members) in cluster_entries {
402        // Need at least `min_schemas` distinct schemas.
403        let distinct_schemas: BTreeSet<&str> = members.iter().map(|m| m.schema.as_str()).collect();
404        if distinct_schemas.len() < opts.min_schemas {
405            for m in &members {
406                skipped.push(SkipReason::SoleSchema {
407                    role: m.role_name.clone(),
408                    schema: m.schema.clone(),
409                });
410            }
411            continue;
412        }
413
414        // Sanity: each schema appears at most once in a cluster (otherwise the
415        // signature wouldn't match — distinct grants per role per schema).
416        // Defensive — drop the duplicates.
417        let mut seen_schemas: BTreeSet<&str> = BTreeSet::new();
418        let unique_members: Vec<&Eligible> = members
419            .iter()
420            .filter(|m| seen_schemas.insert(m.schema.as_str()))
421            .copied()
422            .collect();
423
424        // Find a (pattern, profile_name) that all members agree on AND that
425        // doesn't conflict with already-committed schema patterns.
426        //
427        // For diagnostics: when no viable pattern can be chosen, surface
428        // `SchemaPatternConflict` if some pattern *would* have succeeded
429        // except for an already-locked schema; otherwise the failure is a
430        // role-name disagreement / collision and we report `NoUniformPattern`.
431        let mut chosen: Option<(String, String)> = None;
432        // Records the schema/locked-pattern of the first pattern that was
433        // viable in every other respect but blocked by a schema lock.
434        let mut schema_conflict_blocking: Option<(String, String)> = None;
435        for pat in pattern_priority {
436            // Pattern viability ignoring schema lock: do role names match
437            // uniformly, is the resulting profile name a valid identifier,
438            // and is it free?
439            let viable_name: Option<String> = {
440                let mut names: BTreeSet<String> = BTreeSet::new();
441                let mut ok = true;
442                for m in &unique_members {
443                    if let Some(prof) = match_pattern(pat, &m.role_name, &m.schema) {
444                        names.insert(prof);
445                    } else {
446                        ok = false;
447                        break;
448                    }
449                }
450                if !ok || names.len() != 1 {
451                    None
452                } else {
453                    let n = names.into_iter().next().unwrap();
454                    if !is_valid_identifier(&n)
455                        || taken_profile_names.contains(&n)
456                        || input.profiles.contains_key(&n)
457                    {
458                        None
459                    } else {
460                        Some(n)
461                    }
462                }
463            };
464
465            // Is any of this cluster's schemas already locked to a different
466            // pattern?
467            let blocked_by_schema = unique_members.iter().find_map(|m| {
468                schema_pattern
469                    .get(&m.schema)
470                    .filter(|committed| *committed != pat)
471                    .map(|committed| (m.schema.clone(), committed.clone()))
472            });
473
474            match (viable_name, blocked_by_schema) {
475                (Some(name), None) => {
476                    chosen = Some((pat.to_string(), name));
477                    break;
478                }
479                (Some(_), Some(conflict)) if schema_conflict_blocking.is_none() => {
480                    schema_conflict_blocking = Some(conflict);
481                }
482                _ => {}
483            }
484        }
485
486        let Some((pattern, profile_name)) = chosen else {
487            if let Some((schema, winning_pattern)) = schema_conflict_blocking {
488                skipped.push(SkipReason::SchemaPatternConflict {
489                    schema,
490                    winning_pattern,
491                    dropped_roles: unique_members.iter().map(|m| m.role_name.clone()).collect(),
492                });
493            } else {
494                skipped.push(SkipReason::NoUniformPattern {
495                    roles: unique_members.iter().map(|m| m.role_name.clone()).collect(),
496                });
497            }
498            continue;
499        };
500
501        // Commit the pattern on every schema this cluster touches.
502        for m in &unique_members {
503            schema_pattern.insert(m.schema.clone(), pattern.clone());
504            schema_profiles
505                .entry(m.schema.clone())
506                .or_default()
507                .push(profile_name.clone());
508            clustered_role_names.insert(m.role_name.clone());
509        }
510
511        // Build the Profile from one representative member.
512        let representative = unique_members[0];
513        let rep_grants = role_grants
514            .get(&representative.role_name)
515            .cloned()
516            .unwrap_or_default();
517        let rep_dps = role_dps
518            .get(&representative.role_name)
519            .cloned()
520            .unwrap_or_default();
521
522        let profile = build_profile(
523            login,
524            inherit,
525            &rep_grants,
526            &rep_dps,
527            &representative.schema,
528        );
529
530        profiles_out.insert(profile_name.clone(), profile);
531        taken_profile_names.insert(profile_name.clone());
532
533        let schema_to_role: BTreeMap<String, String> = unique_members
534            .iter()
535            .map(|m| (m.schema.clone(), m.role_name.clone()))
536            .collect();
537        suggested.push(SuggestedProfile {
538            profile_name,
539            role_pattern: pattern,
540            schema_to_role,
541        });
542    }
543
544    // --- Build the candidate output manifest --------------------------------
545
546    let mut new_schemas: Vec<SchemaBinding> = input
547        .schemas
548        .iter()
549        .map(|s| {
550            let mut bound_profiles = schema_profiles.get(&s.name).cloned().unwrap_or_default();
551            bound_profiles.sort();
552            let pattern = schema_pattern
553                .get(&s.name)
554                .cloned()
555                .unwrap_or_else(|| s.role_pattern.clone());
556            SchemaBinding {
557                name: s.name.clone(),
558                profiles: bound_profiles,
559                role_pattern: pattern,
560                owner: s.owner.clone(),
561            }
562        })
563        .collect();
564    new_schemas.sort_by(|a, b| a.name.cmp(&b.name));
565
566    let new_roles: Vec<RoleDefinition> = input
567        .roles
568        .iter()
569        .filter(|r| !clustered_role_names.contains(&r.name))
570        .cloned()
571        .collect();
572
573    let new_grants: Vec<Grant> = input
574        .grants
575        .iter()
576        .filter(|g| !clustered_role_names.contains(&g.role))
577        .cloned()
578        .collect();
579
580    let new_default_privileges: Vec<DefaultPrivilege> = input
581        .default_privileges
582        .iter()
583        .filter_map(|dp| {
584            let kept: Vec<DefaultPrivilegeGrant> = dp
585                .grant
586                .iter()
587                .filter(|g| match &g.role {
588                    Some(r) => !clustered_role_names.contains(r),
589                    None => true,
590                })
591                .cloned()
592                .collect();
593            if kept.is_empty() {
594                None
595            } else {
596                Some(DefaultPrivilege {
597                    owner: dp.owner.clone(),
598                    schema: dp.schema.clone(),
599                    grant: kept,
600                })
601            }
602        })
603        .collect();
604
605    let candidate = PolicyManifest {
606        default_owner: input.default_owner.clone(),
607        auth_providers: input.auth_providers.clone(),
608        profiles: profiles_out,
609        schemas: new_schemas,
610        roles: new_roles,
611        grants: new_grants,
612        default_privileges: new_default_privileges,
613        memberships: input.memberships.clone(),
614        retirements: input.retirements.clone(),
615    };
616
617    // --- Round-trip safety check -------------------------------------------
618
619    // Round-trip wildcard expansion uses the most authoritative inventory
620    // available. With a full introspected inventory we expand against the
621    // *real* set of objects in each schema; otherwise we fall back to the
622    // grant-derived view (sufficient when collapse didn't run).
623    let round_trip_inventory = collapse_inventory.cloned().unwrap_or(grant_inventory);
624    let round_trip_ok = match check_round_trip(input, &candidate, &round_trip_inventory) {
625        Ok(()) => true,
626        Err(reason) => {
627            skipped.push(SkipReason::RoundTripFailure {
628                reason: reason.clone(),
629            });
630            false
631        }
632    };
633
634    let manifest = if round_trip_ok {
635        candidate
636    } else {
637        input.clone()
638    };
639
640    SuggestReport {
641        manifest,
642        profiles: if round_trip_ok { suggested } else { vec![] },
643        skipped,
644        round_trip_ok,
645    }
646}
647
648// ---------------------------------------------------------------------------
649// Object-name inventory (for full-coverage collapse and round-trip check)
650// ---------------------------------------------------------------------------
651
652/// `(schema, object_type) → set of object names` referenced in the manifest's
653/// flat grants. For schema-typed grants the schema is the grant's `name`
654/// field; the inventory stores no entries for `Schema` (those are 1:1).
655pub type Inventory = BTreeMap<(String, ObjectType), BTreeSet<String>>;
656
657/// Build a `(schema, object_type) → set of names` map from the flat grants
658/// in a manifest. Wildcards (`name: "*"`) and schema/database-typed grants
659/// are excluded.
660///
661/// **Do not pass this to [`SuggestOptions::full_inventory`].** A grant-only
662/// view treats ungranted objects as nonexistent, and would let
663/// `collapse_full_coverage_grants` silently broaden privileges. This
664/// function exists for the wildcard-aware round-trip comparison the
665/// suggester uses internally — re-exported so test code can perform the
666/// same comparison. Production callers should source `full_inventory` from
667/// [`pgroles_inspect::fetch_object_inventory`].
668pub fn inventory_from_manifest_grants(m: &PolicyManifest) -> Inventory {
669    build_inventory(m)
670}
671
672/// Deprecated alias for [`inventory_from_manifest_grants`].
673#[deprecated(
674    note = "renamed to `inventory_from_manifest_grants` — must NOT be used as full_inventory"
675)]
676pub fn build_inventory_pub(m: &PolicyManifest) -> Inventory {
677    build_inventory(m)
678}
679
680/// Replace each `name: "*"` table/sequence/function/etc. grant with one named
681/// grant per entry in `inventory[(schema, object_type)]`. Schema- and
682/// database-typed grants are passed through. Mutates `grants` in place.
683pub fn expand_wildcard_grants(grants: &mut Vec<Grant>, inventory: &Inventory) {
684    expand_wildcards_in_place(grants, inventory)
685}
686
687/// Verify that every object name appearing in `grant_inventory` (i.e. every
688/// per-name grant referenced in the flat manifest) is also present in
689/// `full_inventory`. If a granted object is missing from the supposedly
690/// "full" inventory, the inventory is provably incomplete — likely the
691/// caller passed a grant-derived view by mistake.
692fn validate_full_inventory(
693    grant_inventory: &Inventory,
694    full_inventory: &Inventory,
695) -> Result<(), String> {
696    for (key, granted_names) in grant_inventory {
697        let Some(full_names) = full_inventory.get(key) else {
698            return Err(format!(
699                "full_inventory missing entry for (schema={}, type={:?}) — but {} object name(s) are referenced in input grants",
700                key.0,
701                key.1,
702                granted_names.len()
703            ));
704        };
705        if let Some(missing) = granted_names.iter().find(|n| !full_names.contains(*n)) {
706            return Err(format!(
707                "full_inventory[(schema={}, type={:?})] does not contain {missing:?} but it appears in input grants",
708                key.0, key.1
709            ));
710        }
711    }
712    Ok(())
713}
714
715fn build_inventory(m: &PolicyManifest) -> Inventory {
716    let mut inv: Inventory = BTreeMap::new();
717    for g in &m.grants {
718        match g.object.object_type {
719            ObjectType::Schema | ObjectType::Database => continue,
720            _ => {}
721        }
722        let Some(name) = g.object.name.as_ref() else {
723            continue;
724        };
725        if name == "*" {
726            continue;
727        }
728        let Some(schema) = g.object.schema.as_ref() else {
729            continue;
730        };
731        inv.entry((schema.clone(), g.object.object_type))
732            .or_default()
733            .insert(name.clone());
734    }
735    inv
736}
737
738/// Replace per-name grants with a single wildcard grant when a role's grants
739/// fully cover every object of a given `(schema, object_type)` with identical
740/// privileges. Mutates `grants` in place.
741fn collapse_full_coverage_grants(grants: &mut Vec<Grant>, inventory: &Inventory) {
742    // Group grants by (schema, object_type). Skip schema-typed grants — they
743    // have a 1:1 mapping with the schema name and don't need collapsing.
744    // Track which (schema, type) buckets already have a wildcard grant —
745    // those cannot be collapsed (would produce two wildcards on the same
746    // GrantKey, which the model can't hold).
747    let mut buckets: BTreeMap<(String, ObjectType), Vec<usize>> = BTreeMap::new();
748    let mut has_wildcard: BTreeSet<(String, ObjectType)> = BTreeSet::new();
749    for (i, g) in grants.iter().enumerate() {
750        match g.object.object_type {
751            ObjectType::Schema | ObjectType::Database => continue,
752            _ => {}
753        }
754        let Some(schema) = g.object.schema.as_ref() else {
755            continue;
756        };
757        let Some(name) = g.object.name.as_ref() else {
758            continue;
759        };
760        if name == "*" {
761            has_wildcard.insert((schema.clone(), g.object.object_type));
762            continue;
763        }
764        buckets
765            .entry((schema.clone(), g.object.object_type))
766            .or_default()
767            .push(i);
768    }
769    buckets.retain(|key, _| !has_wildcard.contains(key));
770
771    let mut to_remove: BTreeSet<usize> = BTreeSet::new();
772    let mut to_add: Vec<Grant> = Vec::new();
773
774    for ((schema, object_type), idxs) in buckets {
775        // All entries must share the same privilege set.
776        let first_privs = canonical_privs(&grants[idxs[0]].privileges);
777        let all_same = idxs
778            .iter()
779            .all(|&i| canonical_privs(&grants[i].privileges) == first_privs);
780        if !all_same {
781            continue;
782        }
783        // Collected names must equal the inventory for that (schema, type).
784        let mut covered: BTreeSet<String> = BTreeSet::new();
785        for &i in &idxs {
786            if let Some(name) = grants[i].object.name.as_ref() {
787                covered.insert(name.clone());
788            }
789        }
790        let inv_names = inventory.get(&(schema.clone(), object_type));
791        let full_coverage = match inv_names {
792            Some(names) => &covered == names,
793            None => false,
794        };
795        if !full_coverage {
796            continue;
797        }
798        // Collapse: remove all per-name entries; add one wildcard.
799        for &i in &idxs {
800            to_remove.insert(i);
801        }
802        let role = grants[idxs[0]].role.clone();
803        to_add.push(Grant {
804            role,
805            privileges: first_privs.into_iter().collect(),
806            object: ObjectTarget {
807                object_type,
808                schema: Some(schema),
809                name: Some("*".to_string()),
810            },
811        });
812    }
813
814    // Apply removals (in reverse order) and additions.
815    let mut remaining = Vec::with_capacity(grants.len() - to_remove.len() + to_add.len());
816    for (i, g) in grants.drain(..).enumerate() {
817        if !to_remove.contains(&i) {
818            remaining.push(g);
819        }
820    }
821    remaining.extend(to_add);
822    *grants = remaining;
823}
824
825fn canonical_privs(privs: &[Privilege]) -> Vec<Privilege> {
826    let mut out = privs.to_vec();
827    out.sort_by_key(|p| privilege_sort_key(*p));
828    out.dedup();
829    out
830}
831
832// ---------------------------------------------------------------------------
833// Internals
834// ---------------------------------------------------------------------------
835
836/// Schema-relative signature: the set of grants and default privileges with
837/// the schema replaced by a placeholder. Stored as a sorted Vec so the type
838/// implements `Ord` for use as a `BTreeMap` key.
839#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
840struct RoleSignature {
841    grants: Vec<SignatureGrant>,
842    defaults: Vec<SignatureDefault>,
843}
844
845#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
846struct SignatureGrant {
847    object_type: ObjectType,
848    /// `None` for schema-typed grants, otherwise the object name (e.g. `"*"`).
849    name: Option<String>,
850    privileges: Vec<Privilege>,
851}
852
853#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
854struct SignatureDefault {
855    on_type: ObjectType,
856    privileges: Vec<Privilege>,
857}
858
859fn compute_signature(
860    grants: &[Grant],
861    dps: &[(String, String, DefaultPrivilegeGrant)],
862    schema: &str,
863) -> RoleSignature {
864    let mut sig_grants: Vec<SignatureGrant> = grants
865        .iter()
866        .map(|g| {
867            let name = match g.object.object_type {
868                // Schema-typed grants have schema as `name`. Drop it — the
869                // signature is schema-relative.
870                ObjectType::Schema => {
871                    if g.object.name.as_deref() == Some(schema) {
872                        None
873                    } else {
874                        // Pointed at a *different* schema — preserve the
875                        // literal name to keep the signature distinct.
876                        g.object.name.clone()
877                    }
878                }
879                _ => g.object.name.clone(),
880            };
881            let mut privs = g.privileges.clone();
882            privs.sort_by_key(|p| privilege_sort_key(*p));
883            privs.dedup();
884            SignatureGrant {
885                object_type: g.object.object_type,
886                name,
887                privileges: privs,
888            }
889        })
890        .collect();
891    sig_grants.sort();
892    sig_grants.dedup();
893
894    let mut sig_defaults: Vec<SignatureDefault> = dps
895        .iter()
896        .map(|(_, _, dpg)| {
897            let mut privs = dpg.privileges.clone();
898            privs.sort_by_key(|p| privilege_sort_key(*p));
899            privs.dedup();
900            SignatureDefault {
901                on_type: dpg.on_type,
902                privileges: privs,
903            }
904        })
905        .collect();
906    sig_defaults.sort();
907    sig_defaults.dedup();
908
909    RoleSignature {
910        grants: sig_grants,
911        defaults: sig_defaults,
912    }
913}
914
915fn privilege_sort_key(p: Privilege) -> u8 {
916    match p {
917        Privilege::Select => 0,
918        Privilege::Insert => 1,
919        Privilege::Update => 2,
920        Privilege::Delete => 3,
921        Privilege::Truncate => 4,
922        Privilege::References => 5,
923        Privilege::Trigger => 6,
924        Privilege::Execute => 7,
925        Privilege::Usage => 8,
926        Privilege::Create => 9,
927        Privilege::Connect => 10,
928        Privilege::Temporary => 11,
929    }
930}
931
932fn match_pattern(pattern: &str, role_name: &str, schema: &str) -> Option<String> {
933    match pattern {
934        "{schema}-{profile}" => role_name
935            .strip_prefix(schema)
936            .and_then(|r| r.strip_prefix('-'))
937            .filter(|p| !p.is_empty())
938            .map(|p| p.to_string()),
939        "{schema}_{profile}" => role_name
940            .strip_prefix(schema)
941            .and_then(|r| r.strip_prefix('_'))
942            .filter(|p| !p.is_empty())
943            .map(|p| p.to_string()),
944        "{profile}-{schema}" => role_name
945            .strip_suffix(schema)
946            .and_then(|r| r.strip_suffix('-'))
947            .filter(|p| !p.is_empty())
948            .map(|p| p.to_string()),
949        "{profile}_{schema}" => role_name
950            .strip_suffix(schema)
951            .and_then(|r| r.strip_suffix('_'))
952            .filter(|p| !p.is_empty())
953            .map(|p| p.to_string()),
954        _ => None,
955    }
956}
957
958/// Recognise the auto-generated role comment that `expand_manifest` writes
959/// when materializing a `profile × schema` role. Format:
960/// `"Generated from profile 'X' for schema 'Y'"`.
961fn is_auto_profile_comment(c: &str) -> bool {
962    c.starts_with("Generated from profile '") && c.contains("' for schema '") && c.ends_with('\'')
963}
964
965fn is_valid_identifier(s: &str) -> bool {
966    !s.is_empty()
967        && s.chars()
968            .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
969        && !s.starts_with('-')
970        && !s.starts_with('_')
971}
972
973fn build_profile(
974    login: Option<bool>,
975    inherit: Option<bool>,
976    grants: &[Grant],
977    dps: &[(String, String, DefaultPrivilegeGrant)],
978    #[cfg_attr(not(debug_assertions), allow(unused_variables))] schema: &str,
979) -> Profile {
980    // Build profile grants in a deterministic order.
981    let mut profile_grants: Vec<ProfileGrant> = grants
982        .iter()
983        .map(|g| {
984            let object = match g.object.object_type {
985                ObjectType::Schema => ProfileObjectTarget {
986                    object_type: ObjectType::Schema,
987                    // Profile expansion ignores `name` for schema-typed grants
988                    // (it always uses the schema_binding name). Setting None
989                    // keeps the YAML clean.
990                    name: None,
991                },
992                _ => {
993                    // The grant's schema must equal `schema` (otherwise the
994                    // role wouldn't be eligible). Drop the schema; preserve
995                    // `name` (e.g. `"*"` or a specific table name).
996                    debug_assert_eq!(g.object.schema.as_deref(), Some(schema));
997                    ProfileObjectTarget {
998                        object_type: g.object.object_type,
999                        name: g.object.name.clone(),
1000                    }
1001                }
1002            };
1003            let mut privs = g.privileges.clone();
1004            privs.sort_by_key(|p| privilege_sort_key(*p));
1005            privs.dedup();
1006            ProfileGrant {
1007                privileges: privs,
1008                object,
1009            }
1010        })
1011        .collect();
1012    profile_grants.sort_by(|a, b| {
1013        let key_a = (a.object.object_type, a.object.name.clone());
1014        let key_b = (b.object.object_type, b.object.name.clone());
1015        key_a.cmp(&key_b)
1016    });
1017
1018    let mut profile_defaults: Vec<DefaultPrivilegeGrant> = dps
1019        .iter()
1020        .map(|(_, _, dpg)| {
1021            let mut privs = dpg.privileges.clone();
1022            privs.sort_by_key(|p| privilege_sort_key(*p));
1023            privs.dedup();
1024            DefaultPrivilegeGrant {
1025                role: None, // expansion fills this in
1026                privileges: privs,
1027                on_type: dpg.on_type,
1028            }
1029        })
1030        .collect();
1031    profile_defaults.sort_by_key(|d| d.on_type);
1032
1033    Profile {
1034        login,
1035        inherit,
1036        grants: profile_grants,
1037        default_privileges: profile_defaults,
1038    }
1039}
1040
1041fn check_round_trip(
1042    original: &PolicyManifest,
1043    candidate: &PolicyManifest,
1044    inventory: &Inventory,
1045) -> Result<(), String> {
1046    let mut original_expanded =
1047        expand_manifest(original).map_err(|e| format!("original expand: {e}"))?;
1048    expand_wildcards_in_place(&mut original_expanded.grants, inventory);
1049    let original_graph =
1050        RoleGraph::from_expanded(&original_expanded, original.default_owner.as_deref())
1051            .map_err(|e| format!("original graph: {e}"))?;
1052
1053    let mut candidate_expanded =
1054        expand_manifest(candidate).map_err(|e| format!("candidate expand: {e}"))?;
1055    expand_wildcards_in_place(&mut candidate_expanded.grants, inventory);
1056    let candidate_graph =
1057        RoleGraph::from_expanded(&candidate_expanded, candidate.default_owner.as_deref())
1058            .map_err(|e| format!("candidate graph: {e}"))?;
1059
1060    let changes = diff(&original_graph, &candidate_graph);
1061    let unacceptable: Vec<&Change> = changes
1062        .iter()
1063        .filter(|c| !matches!(c, Change::SetComment { .. }))
1064        .collect();
1065    if !unacceptable.is_empty() {
1066        return Err(format!(
1067            "{} structural change(s) after suggestion (sample: {:?})",
1068            unacceptable.len(),
1069            unacceptable.first()
1070        ));
1071    }
1072    Ok(())
1073}
1074
1075/// Expand any `name: "*"` grant against the inventory: emit one named grant
1076/// per inventory entry. Schema and Database object_types are passed through.
1077fn expand_wildcards_in_place(grants: &mut Vec<Grant>, inventory: &Inventory) {
1078    let mut out: Vec<Grant> = Vec::with_capacity(grants.len());
1079    for g in grants.drain(..) {
1080        let is_wildcard = matches!(
1081            g.object.object_type,
1082            ObjectType::Table
1083                | ObjectType::View
1084                | ObjectType::MaterializedView
1085                | ObjectType::Sequence
1086                | ObjectType::Function
1087                | ObjectType::Type
1088        ) && g.object.name.as_deref() == Some("*");
1089        if !is_wildcard {
1090            out.push(g);
1091            continue;
1092        }
1093        let Some(schema) = g.object.schema.as_ref() else {
1094            out.push(g);
1095            continue;
1096        };
1097        let key = (schema.clone(), g.object.object_type);
1098        if let Some(names) = inventory.get(&key) {
1099            for name in names {
1100                out.push(Grant {
1101                    role: g.role.clone(),
1102                    privileges: g.privileges.clone(),
1103                    object: ObjectTarget {
1104                        object_type: g.object.object_type,
1105                        schema: g.object.schema.clone(),
1106                        name: Some(name.clone()),
1107                    },
1108                });
1109            }
1110        } else {
1111            // No objects of this type in the schema — wildcard is a no-op,
1112            // but we keep it so the model is preserved.
1113            out.push(g);
1114        }
1115    }
1116    *grants = out;
1117}
1118
1119// ---------------------------------------------------------------------------
1120// Tests
1121// ---------------------------------------------------------------------------
1122
1123#[cfg(test)]
1124mod tests {
1125    use super::*;
1126    use crate::manifest::parse_manifest;
1127
1128    fn parse(yaml: &str) -> PolicyManifest {
1129        parse_manifest(yaml).expect("parse")
1130    }
1131
1132    #[test]
1133    fn no_input_profiles_no_clusters_returns_unchanged() {
1134        let m = parse(
1135            r#"
1136roles:
1137  - name: alice
1138    login: true
1139"#,
1140        );
1141        let report = suggest_profiles(&m, &SuggestOptions::default());
1142        assert!(report.profiles.is_empty());
1143        assert!(report.round_trip_ok);
1144    }
1145
1146    #[test]
1147    fn input_with_existing_profiles_is_left_alone() {
1148        let m = parse(
1149            r#"
1150profiles:
1151  reader:
1152    grants:
1153      - privileges: [USAGE]
1154        object: { type: schema }
1155schemas:
1156  - name: x
1157    profiles: [reader]
1158"#,
1159        );
1160        let report = suggest_profiles(&m, &SuggestOptions::default());
1161        assert!(report.profiles.is_empty());
1162        assert_eq!(report.manifest.profiles.len(), 1);
1163    }
1164
1165    #[test]
1166    fn clusters_two_schemas_with_dash_pattern() {
1167        // Three schemas, three roles, all with identical schema-relative shape.
1168        let m = parse(
1169            r#"
1170default_owner: app_owner
1171schemas:
1172  - name: inventory
1173    owner: app_owner
1174  - name: checkout
1175    owner: app_owner
1176  - name: analytics
1177    owner: app_owner
1178
1179roles:
1180  - name: inventory-reader
1181  - name: checkout-reader
1182  - name: analytics-reader
1183
1184grants:
1185  - role: inventory-reader
1186    privileges: [USAGE]
1187    object: { type: schema, name: inventory }
1188  - role: inventory-reader
1189    privileges: [SELECT]
1190    object: { type: table, schema: inventory, name: "*" }
1191  - role: checkout-reader
1192    privileges: [USAGE]
1193    object: { type: schema, name: checkout }
1194  - role: checkout-reader
1195    privileges: [SELECT]
1196    object: { type: table, schema: checkout, name: "*" }
1197  - role: analytics-reader
1198    privileges: [USAGE]
1199    object: { type: schema, name: analytics }
1200  - role: analytics-reader
1201    privileges: [SELECT]
1202    object: { type: table, schema: analytics, name: "*" }
1203"#,
1204        );
1205        let report = suggest_profiles(&m, &SuggestOptions::default());
1206        assert!(report.round_trip_ok, "skipped: {:?}", report.skipped);
1207        assert_eq!(report.profiles.len(), 1);
1208        let p = &report.profiles[0];
1209        assert_eq!(p.profile_name, "reader");
1210        assert_eq!(p.role_pattern, "{schema}-{profile}");
1211        assert_eq!(p.schema_to_role.len(), 3);
1212        assert!(report.manifest.profiles.contains_key("reader"));
1213        // Roles section should no longer hold the clustered roles.
1214        assert!(
1215            report
1216                .manifest
1217                .roles
1218                .iter()
1219                .all(|r| !r.name.ends_with("-reader"))
1220        );
1221        // Schema bindings should reference the new profile.
1222        for s in &report.manifest.schemas {
1223            assert_eq!(s.profiles, vec!["reader"]);
1224            assert_eq!(s.role_pattern, "{schema}-{profile}");
1225        }
1226    }
1227
1228    #[test]
1229    fn clusters_with_underscore_pattern() {
1230        let m = parse(
1231            r#"
1232default_owner: app_owner
1233schemas:
1234  - name: inventory
1235    owner: app_owner
1236  - name: checkout
1237    owner: app_owner
1238roles:
1239  - name: inventory_app
1240    login: true
1241  - name: checkout_app
1242    login: true
1243grants:
1244  - role: inventory_app
1245    privileges: [USAGE]
1246    object: { type: schema, name: inventory }
1247  - role: inventory_app
1248    privileges: [SELECT, INSERT, UPDATE, DELETE]
1249    object: { type: table, schema: inventory, name: "*" }
1250  - role: checkout_app
1251    privileges: [USAGE]
1252    object: { type: schema, name: checkout }
1253  - role: checkout_app
1254    privileges: [SELECT, INSERT, UPDATE, DELETE]
1255    object: { type: table, schema: checkout, name: "*" }
1256"#,
1257        );
1258        let report = suggest_profiles(&m, &SuggestOptions::default());
1259        assert!(report.round_trip_ok);
1260        assert_eq!(report.profiles.len(), 1);
1261        let p = &report.profiles[0];
1262        assert_eq!(p.profile_name, "app");
1263        assert_eq!(p.role_pattern, "{schema}_{profile}");
1264        // Profile carries `login: true`.
1265        let prof = report.manifest.profiles.get("app").unwrap();
1266        assert_eq!(prof.login, Some(true));
1267    }
1268
1269    #[test]
1270    fn does_not_cluster_single_schema_role() {
1271        let m = parse(
1272            r#"
1273schemas:
1274  - name: inventory
1275    owner: app_owner
1276roles:
1277  - name: inventory-reader
1278grants:
1279  - role: inventory-reader
1280    privileges: [SELECT]
1281    object: { type: table, schema: inventory, name: "*" }
1282"#,
1283        );
1284        let report = suggest_profiles(&m, &SuggestOptions::default());
1285        assert!(report.profiles.is_empty());
1286        assert!(matches!(
1287            report.skipped.first(),
1288            Some(SkipReason::SoleSchema { .. })
1289        ));
1290    }
1291
1292    #[test]
1293    fn min_schemas_one_promotes_single_schema_role() {
1294        let m = parse(
1295            r#"
1296schemas:
1297  - name: inventory
1298    owner: app_owner
1299roles:
1300  - name: inventory-reader
1301grants:
1302  - role: inventory-reader
1303    privileges: [SELECT]
1304    object: { type: table, schema: inventory, name: "*" }
1305"#,
1306        );
1307        let report = suggest_profiles(
1308            &m,
1309            &SuggestOptions {
1310                min_schemas: 1,
1311                ..Default::default()
1312            },
1313        );
1314        assert!(report.round_trip_ok);
1315        assert_eq!(report.profiles.len(), 1);
1316    }
1317
1318    #[test]
1319    fn role_with_unique_attributes_stays_flat() {
1320        let m = parse(
1321            r#"
1322schemas:
1323  - name: inventory
1324    owner: app_owner
1325  - name: checkout
1326    owner: app_owner
1327roles:
1328  - name: inventory-reader
1329    connection_limit: 5
1330  - name: checkout-reader
1331grants:
1332  - role: inventory-reader
1333    privileges: [SELECT]
1334    object: { type: table, schema: inventory, name: "*" }
1335  - role: checkout-reader
1336    privileges: [SELECT]
1337    object: { type: table, schema: checkout, name: "*" }
1338"#,
1339        );
1340        let report = suggest_profiles(&m, &SuggestOptions::default());
1341        // Only one role qualifies → SoleSchema skip; no cluster formed.
1342        assert!(report.profiles.is_empty());
1343        assert!(report.skipped.iter().any(
1344            |s| matches!(s, SkipReason::UniqueAttributes { role } if role == "inventory-reader")
1345        ));
1346    }
1347
1348    #[test]
1349    fn multi_schema_role_skipped() {
1350        let m = parse(
1351            r#"
1352schemas:
1353  - name: inventory
1354    owner: app_owner
1355  - name: checkout
1356    owner: app_owner
1357roles:
1358  - name: cross
1359grants:
1360  - role: cross
1361    privileges: [SELECT]
1362    object: { type: table, schema: inventory, name: "*" }
1363  - role: cross
1364    privileges: [SELECT]
1365    object: { type: table, schema: checkout, name: "*" }
1366"#,
1367        );
1368        let report = suggest_profiles(&m, &SuggestOptions::default());
1369        assert!(report.profiles.is_empty());
1370        assert!(
1371            report
1372                .skipped
1373                .iter()
1374                .any(|s| matches!(s, SkipReason::MultiSchema { role, .. } if role == "cross"))
1375        );
1376    }
1377
1378    #[test]
1379    fn non_uniform_pattern_skipped() {
1380        let m = parse(
1381            r#"
1382schemas:
1383  - name: inventory
1384    owner: app_owner
1385  - name: checkout
1386    owner: app_owner
1387roles:
1388  - name: inventory-reader
1389  - name: checkout_reader
1390grants:
1391  - role: inventory-reader
1392    privileges: [SELECT]
1393    object: { type: table, schema: inventory, name: "*" }
1394  - role: checkout_reader
1395    privileges: [SELECT]
1396    object: { type: table, schema: checkout, name: "*" }
1397"#,
1398        );
1399        // inventory-reader matches {schema}-{profile} → "reader"
1400        // checkout_reader matches {schema}_{profile} → "reader"
1401        // They have the SAME signature, but different patterns. Our resolver
1402        // picks the first pattern in priority order that all members agree on
1403        // — neither pattern works for both, so no cluster.
1404        let report = suggest_profiles(&m, &SuggestOptions::default());
1405        assert!(report.profiles.is_empty());
1406        assert!(
1407            report
1408                .skipped
1409                .iter()
1410                .any(|s| matches!(s, SkipReason::NoUniformPattern { .. }))
1411        );
1412    }
1413
1414    #[test]
1415    fn different_login_split_into_separate_clusters() {
1416        let m = parse(
1417            r#"
1418schemas:
1419  - name: a
1420    owner: o
1421  - name: b
1422    owner: o
1423  - name: c
1424    owner: o
1425  - name: d
1426    owner: o
1427roles:
1428  - name: a-svc
1429    login: true
1430  - name: b-svc
1431    login: true
1432  - name: c-svc
1433  - name: d-svc
1434grants:
1435  - role: a-svc
1436    privileges: [SELECT]
1437    object: { type: table, schema: a, name: "*" }
1438  - role: b-svc
1439    privileges: [SELECT]
1440    object: { type: table, schema: b, name: "*" }
1441  - role: c-svc
1442    privileges: [SELECT]
1443    object: { type: table, schema: c, name: "*" }
1444  - role: d-svc
1445    privileges: [SELECT]
1446    object: { type: table, schema: d, name: "*" }
1447"#,
1448        );
1449        let report = suggest_profiles(&m, &SuggestOptions::default());
1450        assert!(report.round_trip_ok);
1451        // Both clusters resolve to profile name "svc"; only one wins (the
1452        // larger one, or the lexicographically-first by signature in a tie).
1453        // The other cluster's roles are skipped as NoUniformPattern.
1454        assert_eq!(report.profiles.len(), 1);
1455        assert_eq!(report.profiles[0].profile_name, "svc");
1456        // The "losing" cluster's two roles must remain in the flat roles list.
1457        let kept_role_names: BTreeSet<&str> = report
1458            .manifest
1459            .roles
1460            .iter()
1461            .map(|r| r.name.as_str())
1462            .collect();
1463        assert_eq!(kept_role_names.len(), 2);
1464    }
1465
1466    #[test]
1467    fn round_trip_zero_diff() {
1468        // A representative manifest, including default privileges.
1469        let m = parse(
1470            r#"
1471default_owner: app_owner
1472schemas:
1473  - name: inventory
1474    owner: app_owner
1475  - name: checkout
1476    owner: app_owner
1477
1478roles:
1479  - name: inventory-rw
1480  - name: checkout-rw
1481
1482grants:
1483  - role: inventory-rw
1484    privileges: [USAGE]
1485    object: { type: schema, name: inventory }
1486  - role: inventory-rw
1487    privileges: [SELECT, INSERT, UPDATE, DELETE]
1488    object: { type: table, schema: inventory, name: "*" }
1489  - role: inventory-rw
1490    privileges: [USAGE, SELECT]
1491    object: { type: sequence, schema: inventory, name: "*" }
1492  - role: checkout-rw
1493    privileges: [USAGE]
1494    object: { type: schema, name: checkout }
1495  - role: checkout-rw
1496    privileges: [SELECT, INSERT, UPDATE, DELETE]
1497    object: { type: table, schema: checkout, name: "*" }
1498  - role: checkout-rw
1499    privileges: [USAGE, SELECT]
1500    object: { type: sequence, schema: checkout, name: "*" }
1501
1502default_privileges:
1503  - owner: app_owner
1504    schema: inventory
1505    grant:
1506      - role: inventory-rw
1507        privileges: [SELECT, INSERT, UPDATE, DELETE]
1508        on_type: table
1509  - owner: app_owner
1510    schema: checkout
1511    grant:
1512      - role: checkout-rw
1513        privileges: [SELECT, INSERT, UPDATE, DELETE]
1514        on_type: table
1515"#,
1516        );
1517
1518        let report = suggest_profiles(&m, &SuggestOptions::default());
1519        assert!(report.round_trip_ok);
1520        assert_eq!(report.profiles.len(), 1);
1521        let prof = report.manifest.profiles.get("rw").unwrap();
1522        assert_eq!(prof.grants.len(), 3);
1523        assert_eq!(prof.default_privileges.len(), 1);
1524
1525        // Compare the structural state (RoleGraph) of input vs suggested.
1526        let original_expanded = expand_manifest(&m).unwrap();
1527        let original_graph =
1528            RoleGraph::from_expanded(&original_expanded, m.default_owner.as_deref()).unwrap();
1529        let new_expanded = expand_manifest(&report.manifest).unwrap();
1530        let new_graph =
1531            RoleGraph::from_expanded(&new_expanded, report.manifest.default_owner.as_deref())
1532                .unwrap();
1533        let changes = diff(&original_graph, &new_graph);
1534        // Only role-comment changes (auto-generated by profile expansion) are
1535        // acceptable.
1536        let bad: Vec<_> = changes
1537            .iter()
1538            .filter(|c| !matches!(c, Change::SetComment { .. }))
1539            .collect();
1540        assert!(bad.is_empty(), "unexpected diff: {bad:?}");
1541    }
1542
1543    #[test]
1544    fn schema_pattern_conflict_drops_smaller_cluster() {
1545        // Two clusters compete for schema "inventory":
1546        //   "inventory-reader" + "checkout-reader" → wants "{schema}-{profile}"
1547        //   "inventory_app" + "stage_app" → wants "{schema}_{profile}"
1548        // Wait — these touch different schemas, so they don't actually conflict
1549        // unless the same schema appears in both. Construct a real conflict:
1550        // make role "inventory-reader" + "checkout-reader" (cluster A) and
1551        // "inventory_writer" + "checkout_writer" (cluster B). Both want to
1552        // bind to inventory and checkout, but with different patterns. Only
1553        // the first cluster (alphabetically: dash < underscore) wins.
1554        let m = parse(
1555            r#"
1556default_owner: o
1557schemas:
1558  - name: inventory
1559    owner: o
1560  - name: checkout
1561    owner: o
1562
1563roles:
1564  - name: inventory-reader
1565  - name: checkout-reader
1566  - name: inventory_writer
1567  - name: checkout_writer
1568
1569grants:
1570  - role: inventory-reader
1571    privileges: [SELECT]
1572    object: { type: table, schema: inventory, name: "*" }
1573  - role: checkout-reader
1574    privileges: [SELECT]
1575    object: { type: table, schema: checkout, name: "*" }
1576  - role: inventory_writer
1577    privileges: [INSERT]
1578    object: { type: table, schema: inventory, name: "*" }
1579  - role: checkout_writer
1580    privileges: [INSERT]
1581    object: { type: table, schema: checkout, name: "*" }
1582"#,
1583        );
1584
1585        let report = suggest_profiles(&m, &SuggestOptions::default());
1586        // Either "reader" wins outright (both schemas commit to dash pattern,
1587        // then the underscore cluster can't take its preferred pattern),
1588        // or vice-versa. Whichever one wins, the other should be left flat
1589        // and round-trip should still succeed.
1590        assert!(report.round_trip_ok);
1591        assert_eq!(
1592            report.profiles.len(),
1593            1,
1594            "exactly one profile should win: {:?}",
1595            report.profiles
1596        );
1597        // The losing cluster must surface a SchemaPatternConflict skip
1598        // pointing at the schema whose pattern was already locked.
1599        let conflicts: Vec<_> = report
1600            .skipped
1601            .iter()
1602            .filter_map(|s| match s {
1603                SkipReason::SchemaPatternConflict {
1604                    schema,
1605                    winning_pattern,
1606                    dropped_roles,
1607                } => Some((schema, winning_pattern, dropped_roles)),
1608                _ => None,
1609            })
1610            .collect();
1611        assert_eq!(
1612            conflicts.len(),
1613            1,
1614            "expected one SchemaPatternConflict skip, got: {:?}",
1615            report.skipped
1616        );
1617        let (_, winning, dropped) = conflicts[0];
1618        // Either pattern can win (depends on signature ordering); the
1619        // important thing is that the *other* one is reported as conflicting.
1620        assert!(
1621            winning == "{schema}-{profile}" || winning == "{schema}_{profile}",
1622            "unexpected winning_pattern: {winning}"
1623        );
1624        assert_eq!(dropped.len(), 2);
1625    }
1626
1627    #[test]
1628    fn match_pattern_basic() {
1629        assert_eq!(
1630            match_pattern("{schema}-{profile}", "inventory-reader", "inventory"),
1631            Some("reader".into())
1632        );
1633        assert_eq!(
1634            match_pattern("{schema}_{profile}", "inventory_app", "inventory"),
1635            Some("app".into())
1636        );
1637        assert_eq!(
1638            match_pattern("{profile}-{schema}", "ro-inventory", "inventory"),
1639            Some("ro".into())
1640        );
1641        assert_eq!(
1642            match_pattern("{profile}_{schema}", "ro_inventory", "inventory"),
1643            Some("ro".into())
1644        );
1645        // Schema not matched.
1646        assert_eq!(
1647            match_pattern("{schema}-{profile}", "checkout-reader", "inventory"),
1648            None
1649        );
1650        // Empty profile component.
1651        assert_eq!(
1652            match_pattern("{schema}-{profile}", "inventory-", "inventory"),
1653            None
1654        );
1655        // No separator.
1656        assert_eq!(
1657            match_pattern("{schema}-{profile}", "inventoryreader", "inventory"),
1658            None
1659        );
1660    }
1661
1662    #[test]
1663    fn database_grants_excluded_from_clustering() {
1664        // A role with a CONNECT-on-database grant has an unrepresentable
1665        // grant; profiles can't carry it. Even if its other grants are
1666        // schema-shaped and shared with another role, exclude it.
1667        let m = parse(
1668            r#"
1669schemas:
1670  - name: a
1671    owner: o
1672  - name: b
1673    owner: o
1674roles:
1675  - name: a-svc
1676  - name: b-svc
1677grants:
1678  - role: a-svc
1679    privileges: [CONNECT]
1680    object: { type: database, name: mydb }
1681  - role: a-svc
1682    privileges: [SELECT]
1683    object: { type: table, schema: a, name: "*" }
1684  - role: b-svc
1685    privileges: [SELECT]
1686    object: { type: table, schema: b, name: "*" }
1687"#,
1688        );
1689        let report = suggest_profiles(&m, &SuggestOptions::default());
1690        // a-svc is excluded, b-svc is single-schema → no cluster.
1691        assert!(report.profiles.is_empty());
1692        assert!(
1693            report
1694                .skipped
1695                .iter()
1696                .any(|s| matches!(s, SkipReason::UnrepresentableGrant { role } if role == "a-svc"))
1697        );
1698    }
1699
1700    #[test]
1701    fn membership_targets_clustered_role_still_resolve_after_suggestion() {
1702        // A membership targets `inventory-reader`; after clustering, the
1703        // expanded manifest must still produce a role with that exact name.
1704        let m = parse(
1705            r#"
1706schemas:
1707  - name: inventory
1708    owner: o
1709  - name: checkout
1710    owner: o
1711roles:
1712  - name: inventory-reader
1713  - name: checkout-reader
1714  - name: alice
1715    login: true
1716grants:
1717  - role: inventory-reader
1718    privileges: [SELECT]
1719    object: { type: table, schema: inventory, name: "*" }
1720  - role: checkout-reader
1721    privileges: [SELECT]
1722    object: { type: table, schema: checkout, name: "*" }
1723memberships:
1724  - role: inventory-reader
1725    members:
1726      - name: alice
1727"#,
1728        );
1729        let report = suggest_profiles(&m, &SuggestOptions::default());
1730        assert!(report.round_trip_ok);
1731        // The membership entry is preserved verbatim.
1732        assert_eq!(report.manifest.memberships.len(), 1);
1733        assert_eq!(report.manifest.memberships[0].role, "inventory-reader");
1734        // Re-expanding produces the role.
1735        let expanded = expand_manifest(&report.manifest).unwrap();
1736        assert!(expanded.roles.iter().any(|r| r.name == "inventory-reader"));
1737        assert!(expanded.roles.iter().any(|r| r.name == "checkout-reader"));
1738    }
1739
1740    #[test]
1741    fn wildcard_object_names_preserved_in_profile() {
1742        let m = parse(
1743            r#"
1744schemas:
1745  - name: a
1746    owner: o
1747  - name: b
1748    owner: o
1749roles:
1750  - name: a-rw
1751  - name: b-rw
1752grants:
1753  - role: a-rw
1754    privileges: [SELECT, INSERT]
1755    object: { type: table, schema: a, name: "*" }
1756  - role: a-rw
1757    privileges: [USAGE]
1758    object: { type: sequence, schema: a, name: orders_id_seq }
1759  - role: b-rw
1760    privileges: [SELECT, INSERT]
1761    object: { type: table, schema: b, name: "*" }
1762  - role: b-rw
1763    privileges: [USAGE]
1764    object: { type: sequence, schema: b, name: orders_id_seq }
1765"#,
1766        );
1767
1768        // Default options (no full_inventory) → no collapse → literal names
1769        // are preserved.
1770        let report = suggest_profiles(&m, &SuggestOptions::default());
1771        assert!(report.round_trip_ok);
1772        assert_eq!(report.profiles.len(), 1);
1773        let prof = report.manifest.profiles.get("rw").unwrap();
1774        let seq_grant = prof
1775            .grants
1776            .iter()
1777            .find(|g| g.object.object_type == ObjectType::Sequence)
1778            .unwrap();
1779        assert_eq!(seq_grant.object.name.as_deref(), Some("orders_id_seq"));
1780
1781        // With a full inventory provided, full-coverage names become
1782        // wildcards.
1783        let inv = inventory_from_manifest_grants(&m);
1784        let report = suggest_profiles(
1785            &m,
1786            &SuggestOptions {
1787                full_inventory: Some(inv),
1788                ..Default::default()
1789            },
1790        );
1791        assert!(report.round_trip_ok);
1792        let prof = report.manifest.profiles.get("rw").unwrap();
1793        let seq_grant = prof
1794            .grants
1795            .iter()
1796            .find(|g| g.object.object_type == ObjectType::Sequence)
1797            .unwrap();
1798        assert_eq!(
1799            seq_grant.object.name.as_deref(),
1800            Some("*"),
1801            "single-object full coverage should collapse to wildcard"
1802        );
1803    }
1804
1805    #[test]
1806    fn collapse_clusters_roles_with_different_object_names() {
1807        // The motivating real-world case: per-name grants from `pgroles
1808        // generate` (Postgres expands `GRANT … ON ALL TABLES` to per-relation
1809        // rows). After collapse, the two roles share a wildcard signature and
1810        // cluster.
1811        let m = parse(
1812            r#"
1813schemas:
1814  - name: inventory
1815    owner: o
1816  - name: checkout
1817    owner: o
1818roles:
1819  - name: inventory-reader
1820  - name: checkout-reader
1821grants:
1822  - role: inventory-reader
1823    privileges: [USAGE]
1824    object: { type: schema, name: inventory }
1825  - role: inventory-reader
1826    privileges: [SELECT]
1827    object: { type: table, schema: inventory, name: products }
1828  - role: inventory-reader
1829    privileges: [SELECT]
1830    object: { type: table, schema: inventory, name: stock_levels }
1831  - role: checkout-reader
1832    privileges: [USAGE]
1833    object: { type: schema, name: checkout }
1834  - role: checkout-reader
1835    privileges: [SELECT]
1836    object: { type: table, schema: checkout, name: orders }
1837  - role: checkout-reader
1838    privileges: [SELECT]
1839    object: { type: table, schema: checkout, name: order_items }
1840"#,
1841        );
1842        // With a full inventory provided, the per-name grants get collapsed
1843        // and the two roles cluster on a wildcard signature.
1844        let inv = inventory_from_manifest_grants(&m);
1845        let report = suggest_profiles(
1846            &m,
1847            &SuggestOptions {
1848                full_inventory: Some(inv),
1849                ..Default::default()
1850            },
1851        );
1852        assert!(report.round_trip_ok, "skipped: {:?}", report.skipped);
1853        assert_eq!(report.profiles.len(), 1);
1854        let prof = report.manifest.profiles.get("reader").unwrap();
1855        // Profile carries a wildcard table grant.
1856        let table_grant = prof
1857            .grants
1858            .iter()
1859            .find(|g| g.object.object_type == ObjectType::Table)
1860            .unwrap();
1861        assert_eq!(table_grant.object.name.as_deref(), Some("*"));
1862    }
1863
1864    #[test]
1865    fn no_full_inventory_prevents_clustering_across_different_names() {
1866        // Same input as `collapse_clusters_roles_with_different_object_names`
1867        // but without a full_inventory — should NOT cluster, since literal
1868        // names differ and we can't safely collapse without DB introspection.
1869        let m = parse(
1870            r#"
1871schemas:
1872  - name: inventory
1873    owner: o
1874  - name: checkout
1875    owner: o
1876roles:
1877  - name: inventory-reader
1878  - name: checkout-reader
1879grants:
1880  - role: inventory-reader
1881    privileges: [SELECT]
1882    object: { type: table, schema: inventory, name: products }
1883  - role: checkout-reader
1884    privileges: [SELECT]
1885    object: { type: table, schema: checkout, name: orders }
1886"#,
1887        );
1888        // Default (no full_inventory) → no collapse → different literal names
1889        // produce different signatures → no cluster.
1890        let report = suggest_profiles(&m, &SuggestOptions::default());
1891        assert!(report.profiles.is_empty());
1892    }
1893
1894    #[test]
1895    fn collapse_partial_coverage_preserves_per_name_grants() {
1896        // Two tables in schema `a`, but role `a-ro` only has SELECT on one of
1897        // them. Coverage isn't full → no collapse → no cluster with `b-ro`
1898        // (which has full coverage of its single-table schema).
1899        let m = parse(
1900            r#"
1901schemas:
1902  - name: a
1903    owner: o
1904  - name: b
1905    owner: o
1906roles:
1907  - name: a-ro
1908  - name: b-ro
1909grants:
1910  - role: a-ro
1911    privileges: [SELECT]
1912    object: { type: table, schema: a, name: t1 }
1913  # a-ro has no grant on a.t2 (which exists, evidenced by another role)
1914  - role: filler
1915    privileges: [SELECT]
1916    object: { type: table, schema: a, name: t2 }
1917  - role: b-ro
1918    privileges: [SELECT]
1919    object: { type: table, schema: b, name: only_one }
1920"#,
1921        );
1922        // Full inventory says schema `a` has {t1, t2}, schema `b` has
1923        // {only_one}. a-ro covers only t1 (partial) → no collapse for a-ro.
1924        // b-ro covers all of {only_one} (full) → collapses to wildcard.
1925        // Different signatures → no cluster.
1926        let inv = inventory_from_manifest_grants(&m);
1927        let report = suggest_profiles(
1928            &m,
1929            &SuggestOptions {
1930                full_inventory: Some(inv),
1931                ..Default::default()
1932            },
1933        );
1934        assert!(report.profiles.is_empty());
1935    }
1936
1937    #[test]
1938    fn incomplete_full_inventory_disables_collapse_with_skip_reason() {
1939        // Hand the suggester a `full_inventory` that's *missing* an object
1940        // that already appears in the manifest's flat grants. This is
1941        // exactly the failure mode of passing `inventory_from_manifest_grants`
1942        // (or any partial view) — the suggester must detect it and refuse
1943        // to collapse, surfacing an `IncompleteFullInventory` skip.
1944        let m = parse(
1945            r#"
1946schemas:
1947  - name: a
1948    owner: o
1949  - name: b
1950    owner: o
1951roles:
1952  - name: a-rw
1953  - name: b-rw
1954grants:
1955  - role: a-rw
1956    privileges: [SELECT]
1957    object: { type: table, schema: a, name: products }
1958  - role: b-rw
1959    privileges: [SELECT]
1960    object: { type: table, schema: b, name: orders }
1961"#,
1962        );
1963        // Provide an inventory that omits `products` — pretend the caller
1964        // missed it.
1965        let mut bad: Inventory = BTreeMap::new();
1966        bad.entry(("a".to_string(), ObjectType::Table)).or_default(); // empty set
1967        bad.entry(("b".to_string(), ObjectType::Table))
1968            .or_default()
1969            .insert("orders".to_string());
1970        let report = suggest_profiles(
1971            &m,
1972            &SuggestOptions {
1973                full_inventory: Some(bad),
1974                ..Default::default()
1975            },
1976        );
1977        // Collapse must have been disabled; literal names differ across
1978        // schemas → no cluster.
1979        assert!(report.profiles.is_empty());
1980        assert!(
1981            report
1982                .skipped
1983                .iter()
1984                .any(|s| matches!(s, SkipReason::IncompleteFullInventory { .. })),
1985            "expected IncompleteFullInventory skip; got: {:?}",
1986            report.skipped
1987        );
1988    }
1989
1990    #[test]
1991    fn full_inventory_with_ungranted_objects_blocks_unsafe_collapse() {
1992        // Schema `a` has 2 tables; role `a-ro` has SELECT on only one. With a
1993        // grant-derived view of the world we'd think coverage was full and
1994        // collapse to wildcard — which would silently grant on `t2` after
1995        // applying. With a real introspected inventory (containing both
1996        // tables), the suggester correctly sees partial coverage and refuses
1997        // to collapse.
1998        let m = parse(
1999            r#"
2000schemas:
2001  - name: a
2002    owner: o
2003  - name: b
2004    owner: o
2005roles:
2006  - name: a-ro
2007  - name: b-ro
2008grants:
2009  - role: a-ro
2010    privileges: [SELECT]
2011    object: { type: table, schema: a, name: t1 }
2012  - role: b-ro
2013    privileges: [SELECT]
2014    object: { type: table, schema: b, name: only_one }
2015"#,
2016        );
2017        // Inventory reports schema `a` actually has *two* tables.
2018        let mut inv = inventory_from_manifest_grants(&m);
2019        inv.entry(("a".to_string(), ObjectType::Table))
2020            .or_default()
2021            .insert("t2_ungranted".to_string());
2022        let report = suggest_profiles(
2023            &m,
2024            &SuggestOptions {
2025                full_inventory: Some(inv),
2026                ..Default::default()
2027            },
2028        );
2029        // a-ro has partial coverage now → no collapse → no cluster.
2030        assert!(report.profiles.is_empty());
2031    }
2032
2033    #[test]
2034    fn auto_generated_profile_comments_dont_block_resuggestion() {
2035        // When `pgroles apply` materializes a profile, it sets a comment on
2036        // each generated role. Re-running `--suggest-profiles` later must not
2037        // treat those auto-comments as user-set documentation that
2038        // disqualifies the role.
2039        let m = parse(
2040            r#"
2041schemas:
2042  - name: inventory
2043    owner: o
2044  - name: checkout
2045    owner: o
2046roles:
2047  - name: inventory-reader
2048    comment: "Generated from profile 'reader' for schema 'inventory'"
2049  - name: checkout-reader
2050    comment: "Generated from profile 'reader' for schema 'checkout'"
2051grants:
2052  - role: inventory-reader
2053    privileges: [SELECT]
2054    object: { type: table, schema: inventory, name: "*" }
2055  - role: checkout-reader
2056    privileges: [SELECT]
2057    object: { type: table, schema: checkout, name: "*" }
2058"#,
2059        );
2060        let report = suggest_profiles(&m, &SuggestOptions::default());
2061        assert!(report.round_trip_ok);
2062        assert_eq!(report.profiles.len(), 1);
2063        assert_eq!(report.profiles[0].profile_name, "reader");
2064    }
2065
2066    #[test]
2067    fn user_set_comments_still_block_clustering() {
2068        // A real user-set comment (not the auto-generated pattern) keeps the
2069        // role flat — profiles can't carry per-role comments.
2070        let m = parse(
2071            r#"
2072schemas:
2073  - name: inventory
2074    owner: o
2075  - name: checkout
2076    owner: o
2077roles:
2078  - name: inventory-reader
2079    comment: "Owned by data team — Q3 access only"
2080  - name: checkout-reader
2081grants:
2082  - role: inventory-reader
2083    privileges: [SELECT]
2084    object: { type: table, schema: inventory, name: "*" }
2085  - role: checkout-reader
2086    privileges: [SELECT]
2087    object: { type: table, schema: checkout, name: "*" }
2088"#,
2089        );
2090        let report = suggest_profiles(&m, &SuggestOptions::default());
2091        // inventory-reader excluded for user comment → checkout-reader is
2092        // now sole-schema → no cluster.
2093        assert!(report.profiles.is_empty());
2094        assert!(report.skipped.iter().any(
2095            |s| matches!(s, SkipReason::UniqueAttributes { role } if role == "inventory-reader")
2096        ));
2097    }
2098
2099    #[test]
2100    fn is_auto_profile_comment_basic() {
2101        assert!(is_auto_profile_comment(
2102            "Generated from profile 'reader' for schema 'inventory'"
2103        ));
2104        assert!(is_auto_profile_comment(
2105            "Generated from profile 'app-rw' for schema 'app_v2'"
2106        ));
2107        assert!(!is_auto_profile_comment("Random user note"));
2108        assert!(!is_auto_profile_comment(
2109            "Generated from profile 'reader' for schema 'inventory"
2110        )); // missing trailing quote
2111        assert!(!is_auto_profile_comment("Generated from profile 'reader'")); // missing schema part
2112    }
2113
2114    #[test]
2115    fn function_grants_with_signature_in_name_round_trip() {
2116        // Functions are emitted by `pgroles generate` with their argument
2117        // signature in `name`, e.g. `order_total(_id bigint)`. Verify those
2118        // round-trip correctly through the suggester.
2119        let m = parse(
2120            r#"
2121schemas:
2122  - name: a
2123    owner: o
2124  - name: b
2125    owner: o
2126roles:
2127  - name: a-rw
2128  - name: b-rw
2129grants:
2130  - role: a-rw
2131    privileges: [EXECUTE]
2132    object: { type: function, schema: a, name: "order_total(bigint)" }
2133  - role: b-rw
2134    privileges: [EXECUTE]
2135    object: { type: function, schema: b, name: "order_total(bigint)" }
2136"#,
2137        );
2138        let report = suggest_profiles(&m, &SuggestOptions::default());
2139        assert!(report.round_trip_ok);
2140        assert_eq!(report.profiles.len(), 1);
2141    }
2142
2143    #[test]
2144    fn default_privilege_owner_mismatch_excludes_role() {
2145        let m = parse(
2146            r#"
2147schemas:
2148  - name: a
2149    owner: app_owner
2150  - name: b
2151    owner: app_owner
2152roles:
2153  - name: a-rw
2154  - name: b-rw
2155grants:
2156  - role: a-rw
2157    privileges: [SELECT]
2158    object: { type: table, schema: a, name: "*" }
2159  - role: b-rw
2160    privileges: [SELECT]
2161    object: { type: table, schema: b, name: "*" }
2162default_privileges:
2163  - owner: a_different_owner   # mismatch — schema "a" is owned by app_owner
2164    schema: a
2165    grant:
2166      - role: a-rw
2167        privileges: [SELECT]
2168        on_type: table
2169  - owner: app_owner
2170    schema: b
2171    grant:
2172      - role: b-rw
2173        privileges: [SELECT]
2174        on_type: table
2175"#,
2176        );
2177        let report = suggest_profiles(&m, &SuggestOptions::default());
2178        // a-rw is excluded for owner mismatch → b-rw is sole-schema.
2179        assert!(report.profiles.is_empty());
2180        assert!(
2181            report
2182                .skipped
2183                .iter()
2184                .any(|s| matches!(s, SkipReason::OwnerMismatch { role, .. } if role == "a-rw"))
2185        );
2186    }
2187
2188    #[test]
2189    fn role_with_zero_grants_is_left_flat() {
2190        let m = parse(
2191            r#"
2192schemas:
2193  - name: a
2194    owner: o
2195roles:
2196  - name: lonely
2197    login: true
2198"#,
2199        );
2200        let report = suggest_profiles(&m, &SuggestOptions::default());
2201        assert!(report.profiles.is_empty());
2202        assert!(report.round_trip_ok);
2203        assert!(report.manifest.roles.iter().any(|r| r.name == "lonely"));
2204    }
2205
2206    #[test]
2207    fn schema_typed_grant_pointing_to_unrelated_schema_excludes_role() {
2208        // Role `a-rw` mostly touches schema `a` but has a `USAGE on schema b`
2209        // grant — that's two schemas, so it's MultiSchema-skipped.
2210        let m = parse(
2211            r#"
2212schemas:
2213  - name: a
2214    owner: o
2215  - name: b
2216    owner: o
2217roles:
2218  - name: a-rw
2219  - name: b-rw
2220grants:
2221  - role: a-rw
2222    privileges: [USAGE]
2223    object: { type: schema, name: a }
2224  - role: a-rw
2225    privileges: [USAGE]
2226    object: { type: schema, name: b }   # surprise: also touches b
2227  - role: b-rw
2228    privileges: [USAGE]
2229    object: { type: schema, name: b }
2230"#,
2231        );
2232        let report = suggest_profiles(&m, &SuggestOptions::default());
2233        assert!(report.profiles.is_empty());
2234        assert!(
2235            report
2236                .skipped
2237                .iter()
2238                .any(|s| matches!(s, SkipReason::MultiSchema { role, .. } if role == "a-rw"))
2239        );
2240    }
2241
2242    #[test]
2243    fn determinism_same_input_same_output() {
2244        // Run the suggester twice; outputs must be byte-identical YAML.
2245        let yaml = r#"
2246default_owner: app_owner
2247schemas:
2248  - name: inventory
2249    owner: app_owner
2250  - name: checkout
2251    owner: app_owner
2252  - name: analytics
2253    owner: app_owner
2254roles:
2255  - name: inventory-reader
2256  - name: checkout-reader
2257  - name: analytics-reader
2258  - name: inventory-rw
2259  - name: checkout-rw
2260  - name: analytics-rw
2261grants:
2262  - role: inventory-reader
2263    privileges: [SELECT]
2264    object: { type: table, schema: inventory, name: "*" }
2265  - role: checkout-reader
2266    privileges: [SELECT]
2267    object: { type: table, schema: checkout, name: "*" }
2268  - role: analytics-reader
2269    privileges: [SELECT]
2270    object: { type: table, schema: analytics, name: "*" }
2271  - role: inventory-rw
2272    privileges: [SELECT, INSERT]
2273    object: { type: table, schema: inventory, name: "*" }
2274  - role: checkout-rw
2275    privileges: [SELECT, INSERT]
2276    object: { type: table, schema: checkout, name: "*" }
2277  - role: analytics-rw
2278    privileges: [SELECT, INSERT]
2279    object: { type: table, schema: analytics, name: "*" }
2280"#;
2281        let m1 = parse(yaml);
2282        let m2 = parse(yaml);
2283        let r1 = suggest_profiles(&m1, &SuggestOptions::default());
2284        let r2 = suggest_profiles(&m2, &SuggestOptions::default());
2285
2286        // PolicyManifest.profiles is a BTreeMap, so the entire manifest
2287        // serializes deterministically — compare YAML directly.
2288        assert_eq!(r1.profiles.len(), 2);
2289        assert_eq!(r2.profiles.len(), 2);
2290        assert_eq!(
2291            serde_yaml::to_string(&r1.manifest).unwrap(),
2292            serde_yaml::to_string(&r2.manifest).unwrap()
2293        );
2294    }
2295
2296    #[test]
2297    fn realistic_scenario_full_round_trip() {
2298        // The shape pgroles generate produces from a real DB: lots of granular
2299        // grants, default privileges, schemas, services, humans.
2300        let yaml = r#"
2301default_owner: app_owner
2302schemas:
2303  - name: inventory
2304    owner: app_owner
2305  - name: checkout
2306    owner: app_owner
2307  - name: analytics
2308    owner: analytics_owner
2309roles:
2310  - name: app_owner
2311  - name: analytics_owner
2312  - name: inventory-editor
2313  - name: checkout-editor
2314  - name: inventory-viewer
2315  - name: checkout-viewer
2316  - name: analytics-viewer
2317  - name: data_analyst
2318
2319grants:
2320  - role: inventory-editor
2321    privileges: [USAGE]
2322    object: { type: schema, name: inventory }
2323  - role: inventory-editor
2324    privileges: [SELECT, INSERT, UPDATE, DELETE]
2325    object: { type: table, schema: inventory, name: "*" }
2326  - role: inventory-editor
2327    privileges: [USAGE, SELECT]
2328    object: { type: sequence, schema: inventory, name: "*" }
2329
2330  - role: checkout-editor
2331    privileges: [USAGE]
2332    object: { type: schema, name: checkout }
2333  - role: checkout-editor
2334    privileges: [SELECT, INSERT, UPDATE, DELETE]
2335    object: { type: table, schema: checkout, name: "*" }
2336  - role: checkout-editor
2337    privileges: [USAGE, SELECT]
2338    object: { type: sequence, schema: checkout, name: "*" }
2339
2340  - role: inventory-viewer
2341    privileges: [USAGE]
2342    object: { type: schema, name: inventory }
2343  - role: inventory-viewer
2344    privileges: [SELECT]
2345    object: { type: table, schema: inventory, name: "*" }
2346
2347  - role: checkout-viewer
2348    privileges: [USAGE]
2349    object: { type: schema, name: checkout }
2350  - role: checkout-viewer
2351    privileges: [SELECT]
2352    object: { type: table, schema: checkout, name: "*" }
2353
2354  - role: analytics-viewer
2355    privileges: [USAGE]
2356    object: { type: schema, name: analytics }
2357  - role: analytics-viewer
2358    privileges: [SELECT]
2359    object: { type: table, schema: analytics, name: "*" }
2360
2361default_privileges:
2362  - owner: app_owner
2363    schema: inventory
2364    grant:
2365      - role: inventory-editor
2366        privileges: [SELECT, INSERT, UPDATE, DELETE]
2367        on_type: table
2368  - owner: app_owner
2369    schema: checkout
2370    grant:
2371      - role: checkout-editor
2372        privileges: [SELECT, INSERT, UPDATE, DELETE]
2373        on_type: table
2374
2375memberships:
2376  - role: inventory-editor
2377    members:
2378      - name: data_analyst
2379  - role: analytics-viewer
2380    members:
2381      - name: data_analyst
2382"#;
2383        let m = parse(yaml);
2384        let report = suggest_profiles(&m, &SuggestOptions::default());
2385        assert!(report.round_trip_ok, "skipped: {:?}", report.skipped);
2386
2387        // Expect "editor" cluster (inventory + checkout) and "viewer" cluster
2388        // (inventory + checkout + analytics).
2389        let names: BTreeSet<String> = report
2390            .profiles
2391            .iter()
2392            .map(|p| p.profile_name.clone())
2393            .collect();
2394        assert!(names.contains("editor"), "got: {names:?}");
2395        assert!(names.contains("viewer"), "got: {names:?}");
2396
2397        // Memberships untouched.
2398        assert_eq!(report.manifest.memberships.len(), 2);
2399
2400        // Re-expand and verify the role set is preserved.
2401        let expanded = expand_manifest(&report.manifest).unwrap();
2402        let role_names: BTreeSet<String> = expanded.roles.iter().map(|r| r.name.clone()).collect();
2403        for orig in [
2404            "inventory-editor",
2405            "checkout-editor",
2406            "inventory-viewer",
2407            "checkout-viewer",
2408            "analytics-viewer",
2409            "data_analyst",
2410            "app_owner",
2411            "analytics_owner",
2412        ] {
2413            assert!(
2414                role_names.contains(orig),
2415                "missing role {orig} in re-expanded manifest"
2416            );
2417        }
2418
2419        // analytics-viewer cluster has 3 schemas. inventory/checkout-editor cluster has 2.
2420        let viewer = report
2421            .profiles
2422            .iter()
2423            .find(|p| p.profile_name == "viewer")
2424            .unwrap();
2425        assert_eq!(viewer.schema_to_role.len(), 3);
2426        let editor = report
2427            .profiles
2428            .iter()
2429            .find(|p| p.profile_name == "editor")
2430            .unwrap();
2431        assert_eq!(editor.schema_to_role.len(), 2);
2432    }
2433
2434    #[test]
2435    fn round_trip_diff_engine_finds_no_structural_changes() {
2436        // Hardest test: build a flat graph, suggest profiles, expand back into
2437        // a graph, and run the actual `diff` engine. Only `SetComment` deltas
2438        // are allowed (auto-generated annotations).
2439        let yaml = r#"
2440default_owner: o
2441schemas:
2442  - name: s1
2443    owner: o
2444  - name: s2
2445    owner: o
2446  - name: s3
2447    owner: o
2448roles:
2449  - name: s1-rw
2450  - name: s2-rw
2451  - name: s3-rw
2452  - name: s1-ro
2453  - name: s2-ro
2454  - name: s3-ro
2455  - name: alice
2456    login: true
2457grants:
2458  - role: s1-rw
2459    privileges: [USAGE]
2460    object: { type: schema, name: s1 }
2461  - role: s1-rw
2462    privileges: [SELECT, INSERT, UPDATE, DELETE]
2463    object: { type: table, schema: s1, name: "*" }
2464  - role: s2-rw
2465    privileges: [USAGE]
2466    object: { type: schema, name: s2 }
2467  - role: s2-rw
2468    privileges: [SELECT, INSERT, UPDATE, DELETE]
2469    object: { type: table, schema: s2, name: "*" }
2470  - role: s3-rw
2471    privileges: [USAGE]
2472    object: { type: schema, name: s3 }
2473  - role: s3-rw
2474    privileges: [SELECT, INSERT, UPDATE, DELETE]
2475    object: { type: table, schema: s3, name: "*" }
2476  - role: s1-ro
2477    privileges: [USAGE]
2478    object: { type: schema, name: s1 }
2479  - role: s1-ro
2480    privileges: [SELECT]
2481    object: { type: table, schema: s1, name: "*" }
2482  - role: s2-ro
2483    privileges: [USAGE]
2484    object: { type: schema, name: s2 }
2485  - role: s2-ro
2486    privileges: [SELECT]
2487    object: { type: table, schema: s2, name: "*" }
2488  - role: s3-ro
2489    privileges: [USAGE]
2490    object: { type: schema, name: s3 }
2491  - role: s3-ro
2492    privileges: [SELECT]
2493    object: { type: table, schema: s3, name: "*" }
2494default_privileges:
2495  - owner: o
2496    schema: s1
2497    grant:
2498      - role: s1-rw
2499        privileges: [SELECT, INSERT, UPDATE, DELETE]
2500        on_type: table
2501      - role: s1-ro
2502        privileges: [SELECT]
2503        on_type: table
2504  - owner: o
2505    schema: s2
2506    grant:
2507      - role: s2-rw
2508        privileges: [SELECT, INSERT, UPDATE, DELETE]
2509        on_type: table
2510      - role: s2-ro
2511        privileges: [SELECT]
2512        on_type: table
2513  - owner: o
2514    schema: s3
2515    grant:
2516      - role: s3-rw
2517        privileges: [SELECT, INSERT, UPDATE, DELETE]
2518        on_type: table
2519      - role: s3-ro
2520        privileges: [SELECT]
2521        on_type: table
2522memberships:
2523  - role: s1-rw
2524    members:
2525      - name: alice
2526"#;
2527        let m = parse(yaml);
2528        let report = suggest_profiles(&m, &SuggestOptions::default());
2529        assert!(report.round_trip_ok, "skipped: {:?}", report.skipped);
2530        assert_eq!(report.profiles.len(), 2);
2531
2532        // Final: the actual diff engine should find no structural changes.
2533        let original_expanded = expand_manifest(&m).unwrap();
2534        let original_graph =
2535            RoleGraph::from_expanded(&original_expanded, m.default_owner.as_deref()).unwrap();
2536        let new_expanded = expand_manifest(&report.manifest).unwrap();
2537        let new_graph =
2538            RoleGraph::from_expanded(&new_expanded, report.manifest.default_owner.as_deref())
2539                .unwrap();
2540        let changes = diff(&original_graph, &new_graph);
2541        let bad: Vec<_> = changes
2542            .iter()
2543            .filter(|c| !matches!(c, Change::SetComment { .. }))
2544            .collect();
2545        assert!(bad.is_empty(), "structural drift: {bad:?}");
2546    }
2547
2548    #[test]
2549    fn empty_manifest_is_idempotent() {
2550        let m = parse("");
2551        let report = suggest_profiles(&m, &SuggestOptions::default());
2552        assert!(report.profiles.is_empty());
2553        assert!(report.round_trip_ok);
2554    }
2555
2556    #[test]
2557    fn schema_with_special_chars_in_name() {
2558        // Schema names can contain underscores, hyphens, digits.
2559        let m = parse(
2560            r#"
2561schemas:
2562  - name: app_v2
2563    owner: o
2564  - name: app_v3
2565    owner: o
2566roles:
2567  - name: app_v2-rw
2568  - name: app_v3-rw
2569grants:
2570  - role: app_v2-rw
2571    privileges: [SELECT]
2572    object: { type: table, schema: app_v2, name: "*" }
2573  - role: app_v3-rw
2574    privileges: [SELECT]
2575    object: { type: table, schema: app_v3, name: "*" }
2576"#,
2577        );
2578        let report = suggest_profiles(&m, &SuggestOptions::default());
2579        assert!(report.round_trip_ok);
2580        assert_eq!(report.profiles.len(), 1);
2581        assert_eq!(report.profiles[0].profile_name, "rw");
2582    }
2583
2584    #[test]
2585    fn schema_name_is_substring_of_role_name() {
2586        // Schema "app" is a substring of role "appraiser-app". The match_pattern
2587        // logic uses strip_prefix/strip_suffix, so a role name that starts with
2588        // the schema but with no separator (e.g. "appfoo") shouldn't match. Test
2589        // this and adjacent edge cases.
2590        let m = parse(
2591            r#"
2592schemas:
2593  - name: app
2594    owner: o
2595  - name: api
2596    owner: o
2597roles:
2598  - name: app-rw
2599  - name: api-rw
2600grants:
2601  - role: app-rw
2602    privileges: [SELECT]
2603    object: { type: table, schema: app, name: "*" }
2604  - role: api-rw
2605    privileges: [SELECT]
2606    object: { type: table, schema: api, name: "*" }
2607"#,
2608        );
2609        let report = suggest_profiles(&m, &SuggestOptions::default());
2610        assert!(report.round_trip_ok);
2611        assert_eq!(report.profiles.len(), 1);
2612        assert_eq!(report.profiles[0].profile_name, "rw");
2613    }
2614
2615    #[test]
2616    fn is_valid_identifier_basic() {
2617        assert!(is_valid_identifier("reader"));
2618        assert!(is_valid_identifier("read-only"));
2619        assert!(is_valid_identifier("read_only"));
2620        assert!(is_valid_identifier("rw2"));
2621        assert!(!is_valid_identifier(""));
2622        assert!(!is_valid_identifier("-reader"));
2623        assert!(!is_valid_identifier("_reader"));
2624        assert!(!is_valid_identifier("read.only"));
2625        assert!(!is_valid_identifier("read only"));
2626    }
2627}