Skip to main content

qail_core/ast/cmd/
rls.rs

1//! RLS tenant-scope injection for Qail queries.
2//!
3//! Provides `with_rls()` — the "one call to rule them all" method that
4//! auto-injects tenant isolation at the AST level based on query action.
5//!
6//! # Architecture
7//!
8//! ```text
9//!  Qail::get("orders")
10//!    .with_rls(&ctx)              ← Phase 4: AST injection (primary)
11//!    → WHERE tenant_id = 'uuid'
12//!
13//!  acquire_with_rls(ctx)      ← Phase 2: DB session vars (backup)
14//!    → SET app.current_tenant_id = 'uuid'
15//!
16//!  CREATE POLICY ...          ← Phase 3: DB policies (safety net)
17//!    → ENABLE ROW LEVEL SECURITY
18//! ```
19//!
20//! # Example
21//! ```
22//! use qail_core::Qail;
23//! use qail_core::rls::RlsContext;
24//! use qail_core::rls::tenant::register_tenant_table;
25//!
26//! register_tenant_table("orders", "tenant_id");
27//!
28//! let ctx = RlsContext::tenant("550e8400-e29b-41d4-a716-446655440000");
29//! let query = Qail::get("orders").with_rls(&ctx).expect("rls should apply");
30//! // Transpiles to: SELECT * FROM orders WHERE tenant_id = '550e8400-...'
31//! ```
32
33use crate::ast::{
34    Action, Cage, CageKind, Condition, Expr, LogicalOp, MergeAction, MergeMatchKind, MergeSource,
35    Operator, Qail, Value,
36};
37use crate::error::{QailBuildError, QailBuildResult};
38use crate::rls::RlsContext;
39use crate::rls::tenant::lookup_tenant_column;
40
41fn normalize_ident(raw: &str) -> String {
42    let trimmed = raw.trim();
43    if trimmed.starts_with('$') {
44        return trimmed.to_string();
45    }
46
47    let segment = trimmed.rsplit('.').next().unwrap_or(trimmed).trim();
48    let unquoted = if segment.len() >= 2 {
49        let bytes = segment.as_bytes();
50        let first = bytes[0] as char;
51        let last = bytes[bytes.len() - 1] as char;
52        if (first == '"' && last == '"')
53            || (first == '`' && last == '`')
54            || (first == '[' && last == ']')
55        {
56            &segment[1..segment.len() - 1]
57        } else {
58            segment
59        }
60    } else {
61        segment
62    };
63    unquoted.to_ascii_lowercase()
64}
65
66fn split_table_reference(table_ref: &str) -> (&str, Option<&str>) {
67    let parts = table_ref.split_whitespace().collect::<Vec<_>>();
68    match parts.as_slice() {
69        [table, alias] => (table, Some(alias)),
70        [table, as_keyword, alias] if as_keyword.eq_ignore_ascii_case("as") => (table, Some(alias)),
71        _ => (table_ref.trim(), None),
72    }
73}
74
75fn expr_named_eq(expr: &Expr, name: &str) -> bool {
76    matches!(expr, Expr::Named(existing) if normalize_ident(existing) == normalize_ident(name))
77}
78
79fn is_tenant_column_condition(cond: &Condition, tenant_col: &str) -> bool {
80    expr_named_eq(&cond.left, tenant_col)
81}
82
83fn condition_references_tenant_column(cond: &Condition, tenant_col: &str) -> bool {
84    is_tenant_column_condition(cond, tenant_col)
85        || matches!(&cond.value, Value::Column(col) if normalize_ident(col) == normalize_ident(tenant_col))
86}
87
88fn payload_is_positional(cage: &Cage) -> bool {
89    cage.conditions.iter().all(|cond| {
90        matches!(
91            &cond.left,
92            Expr::Named(name) if name.starts_with('$') && name[1..].chars().all(|c| c.is_ascii_digit())
93        )
94    })
95}
96
97fn make_named_condition(column: &str, value: Value) -> Condition {
98    Condition {
99        left: Expr::Named(column.to_string()),
100        op: Operator::Eq,
101        value,
102        is_array_unnest: false,
103    }
104}
105
106fn make_positional_condition(index: usize, value: Value) -> Condition {
107    Condition {
108        left: Expr::Named(format!("${}", index + 1)),
109        op: Operator::Eq,
110        value,
111        is_array_unnest: false,
112    }
113}
114
115fn expr_projects_all_columns(expr: &Expr) -> bool {
116    matches!(expr, Expr::Star)
117        || matches!(expr, Expr::Named(name) if name == "*" || name.trim().ends_with(".*"))
118}
119
120fn expr_projects_tenant_col(expr: &Expr, tenant_col: &str) -> bool {
121    match expr {
122        Expr::Named(name) => normalize_ident(name) == normalize_ident(tenant_col),
123        Expr::Aliased { alias, .. } => normalize_ident(alias) == normalize_ident(tenant_col),
124        Expr::JsonAccess {
125            alias: Some(alias), ..
126        }
127        | Expr::FunctionCall {
128            alias: Some(alias), ..
129        }
130        | Expr::Cast {
131            alias: Some(alias), ..
132        }
133        | Expr::Binary {
134            alias: Some(alias), ..
135        }
136        | Expr::Case {
137            alias: Some(alias), ..
138        }
139        | Expr::SpecialFunction {
140            alias: Some(alias), ..
141        }
142        | Expr::ArrayConstructor {
143            alias: Some(alias), ..
144        }
145        | Expr::RowConstructor {
146            alias: Some(alias), ..
147        }
148        | Expr::Subscript {
149            alias: Some(alias), ..
150        }
151        | Expr::Collate {
152            alias: Some(alias), ..
153        }
154        | Expr::FieldAccess {
155            alias: Some(alias), ..
156        }
157        | Expr::Subquery {
158            alias: Some(alias), ..
159        }
160        | Expr::Exists {
161            alias: Some(alias), ..
162        } => normalize_ident(alias) == normalize_ident(tenant_col),
163        _ => false,
164    }
165}
166
167fn query_projects_tenant_col(query: &Qail, tenant_col: &str) -> bool {
168    query.columns.is_empty()
169        || query.columns.iter().any(|expr| {
170            expr_projects_all_columns(expr) || expr_projects_tenant_col(expr, tenant_col)
171        })
172}
173
174fn query_can_append_tenant_projection(query: &Qail) -> bool {
175    query.set_ops.is_empty()
176        && query.having.is_empty()
177        && !query
178            .columns
179            .iter()
180            .any(|expr| matches!(expr, Expr::Aggregate { .. } | Expr::Window { .. }))
181}
182
183fn ensure_merge_query_source_projects_tenant(
184    mut query: Qail,
185    target_table: &str,
186    tenant_col: &str,
187) -> QailBuildResult<Qail> {
188    if query_projects_tenant_col(&query, tenant_col) {
189        return Ok(query);
190    }
191
192    if !query_can_append_tenant_projection(&query) {
193        return Err(QailBuildError::RlsMergeSourceTenantProjectionRequired {
194            table: target_table.to_string(),
195            tenant_column: tenant_col.to_string(),
196        });
197    }
198
199    query.columns.push(Expr::Named(tenant_col.to_string()));
200    Ok(query)
201}
202
203impl Qail {
204    /// Apply tenant-scope isolation based on the query action.
205    ///
206    /// - **GET/SET/DEL** → injects `WHERE tenant_col = $value`
207    /// - **ADD/Upsert** → auto-sets `tenant_col` in payload
208    /// - **Global context** → injects `tenant_col IS NULL` (or payload `tenant_col = NULL`)
209    /// - **Super admins** → no-op (bypasses isolation)
210    /// - **Unregistered tables** → no-op (not a tenant table)
211    /// - **DDL/etc** → no-op
212    ///
213    /// # Example
214    /// ```ignore
215    /// let ctx = RlsContext::tenant("tenant-uuid");
216    /// let query = Qail::get("orders").with_rls(&ctx)?;
217    /// ```
218    pub fn with_rls(self, ctx: &RlsContext) -> QailBuildResult<Self> {
219        if ctx.bypasses_rls() {
220            return Ok(self);
221        }
222
223        if !ctx.is_global() && !ctx.has_tenant() {
224            return Ok(self);
225        }
226
227        let scoped = self.scope_nested_rls(ctx)?;
228
229        let (tenant_table, _) = split_table_reference(&scoped.table);
230        let Some(tenant_col) = lookup_tenant_column(tenant_table) else {
231            return Ok(scoped);
232        };
233
234        if ctx.is_global() {
235            return match scoped.action {
236                Action::Get
237                | Action::Cnt
238                | Action::Del
239                | Action::Over
240                | Action::Gen
241                | Action::Export
242                | Action::Search
243                | Action::Scroll => {
244                    let condition_col = scoped.primary_tenant_condition_col(&tenant_col);
245                    Ok(scoped.scope_to_global(&condition_col))
246                }
247                Action::Set => scoped.scope_update_global(&tenant_col),
248                Action::Add | Action::Upsert | Action::Put => {
249                    scoped.scope_insert_global(&tenant_col)
250                }
251                Action::Merge => scoped.scope_merge_global(&tenant_col),
252                _ => Ok(scoped),
253            };
254        }
255
256        match scoped.action {
257            // Read / Update / Delete → inject WHERE filter
258            Action::Get
259            | Action::Cnt
260            | Action::Del
261            | Action::Over
262            | Action::Gen
263            | Action::Export
264            | Action::Search
265            | Action::Scroll => {
266                let condition_col = scoped.primary_tenant_condition_col(&tenant_col);
267                Ok(scoped.scope_to_tenant(&condition_col, ctx))
268            }
269            Action::Set => scoped.scope_update_tenant(&tenant_col, ctx),
270            // Insert / Upsert → auto-set tenant column in payload
271            Action::Add | Action::Upsert | Action::Put => {
272                scoped.scope_insert_tenant(&tenant_col, ctx)
273            }
274            Action::Merge => scoped.scope_merge_tenant(&tenant_col, ctx),
275            // DDL, transactions, etc. → no injection
276            _ => Ok(scoped),
277        }
278    }
279
280    fn scope_nested_rls(mut self, ctx: &RlsContext) -> QailBuildResult<Self> {
281        for cte in &mut self.ctes {
282            *cte.base_query = cte.base_query.as_ref().clone().with_rls(ctx)?;
283            if let Some(ref mut recursive_query) = cte.recursive_query {
284                **recursive_query = recursive_query.as_ref().clone().with_rls(ctx)?;
285            }
286        }
287
288        if let Some(ref mut source_query) = self.source_query {
289            **source_query = source_query.as_ref().clone().with_rls(ctx)?;
290        }
291
292        for (_, set_query) in &mut self.set_ops {
293            **set_query = set_query.as_ref().clone().with_rls(ctx)?;
294        }
295
296        self.scope_embedded_expr_rls(ctx)?;
297
298        Ok(self)
299    }
300
301    fn scope_value_nested_rls(value: &mut Value, ctx: &RlsContext) -> QailBuildResult<()> {
302        match value {
303            Value::Array(values) => {
304                for value in values {
305                    Self::scope_value_nested_rls(value, ctx)?;
306                }
307            }
308            Value::Subquery(query) => {
309                **query = query.as_ref().clone().with_rls(ctx)?;
310            }
311            Value::Expr(expr) => Self::scope_expr_nested_rls(expr, ctx)?,
312            _ => {}
313        }
314
315        Ok(())
316    }
317
318    fn scope_condition_nested_rls(
319        condition: &mut Condition,
320        ctx: &RlsContext,
321    ) -> QailBuildResult<()> {
322        Self::scope_expr_nested_rls(&mut condition.left, ctx)?;
323        Self::scope_value_nested_rls(&mut condition.value, ctx)
324    }
325
326    fn scope_expr_nested_rls(expr: &mut Expr, ctx: &RlsContext) -> QailBuildResult<()> {
327        match expr {
328            Expr::Aggregate {
329                filter: Some(filter),
330                ..
331            } => {
332                for condition in filter {
333                    Self::scope_condition_nested_rls(condition, ctx)?;
334                }
335            }
336            Expr::Cast { expr, .. } | Expr::Mod { col: expr, .. } | Expr::Collate { expr, .. } => {
337                Self::scope_expr_nested_rls(expr, ctx)?;
338            }
339            Expr::Window { params, order, .. } => {
340                for expr in params {
341                    Self::scope_expr_nested_rls(expr, ctx)?;
342                }
343                for cage in order {
344                    for condition in &mut cage.conditions {
345                        Self::scope_condition_nested_rls(condition, ctx)?;
346                    }
347                }
348            }
349            Expr::Case {
350                when_clauses,
351                else_value,
352                ..
353            } => {
354                for (condition, then_expr) in when_clauses {
355                    Self::scope_condition_nested_rls(condition, ctx)?;
356                    Self::scope_expr_nested_rls(then_expr, ctx)?;
357                }
358                if let Some(expr) = else_value {
359                    Self::scope_expr_nested_rls(expr, ctx)?;
360                }
361            }
362            Expr::FunctionCall { args, .. } => {
363                for expr in args {
364                    Self::scope_expr_nested_rls(expr, ctx)?;
365                }
366            }
367            Expr::SpecialFunction { args, .. } => {
368                for (_, expr) in args {
369                    Self::scope_expr_nested_rls(expr, ctx)?;
370                }
371            }
372            Expr::Binary { left, right, .. } => {
373                Self::scope_expr_nested_rls(left, ctx)?;
374                Self::scope_expr_nested_rls(right, ctx)?;
375            }
376            Expr::Literal(value) => Self::scope_value_nested_rls(value, ctx)?,
377            Expr::ArrayConstructor { elements, .. } | Expr::RowConstructor { elements, .. } => {
378                for expr in elements {
379                    Self::scope_expr_nested_rls(expr, ctx)?;
380                }
381            }
382            Expr::Subscript { expr, index, .. } => {
383                Self::scope_expr_nested_rls(expr, ctx)?;
384                Self::scope_expr_nested_rls(index, ctx)?;
385            }
386            Expr::FieldAccess { expr, .. } => Self::scope_expr_nested_rls(expr, ctx)?,
387            Expr::Subquery { query, .. } | Expr::Exists { query, .. } => {
388                **query = query.as_ref().clone().with_rls(ctx)?;
389            }
390            Expr::Star
391            | Expr::Named(_)
392            | Expr::Aliased { .. }
393            | Expr::Aggregate { filter: None, .. }
394            | Expr::Def { .. }
395            | Expr::JsonAccess { .. } => {}
396        }
397
398        Ok(())
399    }
400
401    fn scope_embedded_expr_rls(&mut self, ctx: &RlsContext) -> QailBuildResult<()> {
402        for expr in &mut self.columns {
403            Self::scope_expr_nested_rls(expr, ctx)?;
404        }
405        for expr in &mut self.distinct_on {
406            Self::scope_expr_nested_rls(expr, ctx)?;
407        }
408        if let Some(returning) = &mut self.returning {
409            for expr in returning {
410                Self::scope_expr_nested_rls(expr, ctx)?;
411            }
412        }
413        for cage in &mut self.cages {
414            for condition in &mut cage.conditions {
415                Self::scope_condition_nested_rls(condition, ctx)?;
416            }
417        }
418        for condition in &mut self.having {
419            Self::scope_condition_nested_rls(condition, ctx)?;
420        }
421        for join in &mut self.joins {
422            if let Some(conditions) = &mut join.on {
423                for condition in conditions {
424                    Self::scope_condition_nested_rls(condition, ctx)?;
425                }
426            }
427        }
428        if let Some(on_conflict) = &mut self.on_conflict
429            && let crate::ast::ConflictAction::DoUpdate { assignments } = &mut on_conflict.action
430        {
431            for (_, expr) in assignments {
432                Self::scope_expr_nested_rls(expr, ctx)?;
433            }
434        }
435        if let Some(merge) = &mut self.merge {
436            for condition in &mut merge.on {
437                Self::scope_condition_nested_rls(condition, ctx)?;
438            }
439            for clause in &mut merge.clauses {
440                for condition in &mut clause.condition {
441                    Self::scope_condition_nested_rls(condition, ctx)?;
442                }
443                match &mut clause.action {
444                    MergeAction::Update { assignments } => {
445                        for (_, expr) in assignments {
446                            Self::scope_expr_nested_rls(expr, ctx)?;
447                        }
448                    }
449                    MergeAction::Insert { values, .. } => {
450                        for expr in values {
451                            Self::scope_expr_nested_rls(expr, ctx)?;
452                        }
453                    }
454                    MergeAction::Delete | MergeAction::DoNothing => {}
455                }
456            }
457        }
458
459        Ok(())
460    }
461
462    fn scope_update_tenant(self, tenant_col: &str, ctx: &RlsContext) -> QailBuildResult<Self> {
463        self.reject_tenant_payload_mutation(tenant_col)?;
464        let condition_col = self.primary_tenant_condition_col(tenant_col);
465        Ok(self.scope_to_tenant(&condition_col, ctx))
466    }
467
468    fn scope_update_global(self, tenant_col: &str) -> QailBuildResult<Self> {
469        self.reject_tenant_payload_mutation(tenant_col)?;
470        let condition_col = self.primary_tenant_condition_col(tenant_col);
471        Ok(self.scope_to_global(&condition_col))
472    }
473
474    fn reject_tenant_payload_mutation(&self, tenant_col: &str) -> QailBuildResult<()> {
475        let assigns_tenant = self
476            .cages
477            .iter()
478            .filter(|cage| matches!(cage.kind, CageKind::Payload))
479            .flat_map(|cage| cage.conditions.iter())
480            .any(|cond| expr_named_eq(&cond.left, tenant_col));
481
482        if assigns_tenant {
483            return Err(QailBuildError::RlsTenantColumnMutationDenied {
484                table: self.table.clone(),
485                tenant_column: tenant_col.to_string(),
486            });
487        }
488
489        Ok(())
490    }
491
492    /// Inject a `WHERE tenant_col = scope_id` filter for reads.
493    ///
494    /// Adds the condition to the existing Filter cage (AND), or creates
495    /// a new one. Uses the same pattern as `.filter()`.
496    fn scope_to_tenant(mut self, tenant_col: &str, ctx: &RlsContext) -> Self {
497        let condition = make_named_condition(tenant_col, Value::String(ctx.tenant_id.clone()));
498
499        // Try to append to existing filter cage
500        let existing = self
501            .cages
502            .iter_mut()
503            .find(|c| matches!(c.kind, CageKind::Filter) && c.logical_op == LogicalOp::And);
504
505        if let Some(cage) = existing {
506            cage.conditions
507                .retain(|cond| !is_tenant_column_condition(cond, tenant_col));
508            cage.conditions.push(condition);
509        } else {
510            self.cages.push(Cage {
511                kind: CageKind::Filter,
512                conditions: vec![condition],
513                logical_op: LogicalOp::And,
514            });
515        }
516
517        self
518    }
519
520    fn primary_tenant_condition_col(&self, tenant_col: &str) -> String {
521        let (_, alias) = split_table_reference(&self.table);
522        alias
523            .map(|alias| format!("{alias}.{tenant_col}"))
524            .unwrap_or_else(|| tenant_col.to_string())
525    }
526
527    /// Inject a `WHERE tenant_col IS NULL` filter for global/platform reads.
528    fn scope_to_global(mut self, tenant_col: &str) -> Self {
529        let condition = Condition {
530            left: Expr::Named(tenant_col.to_string()),
531            op: Operator::IsNull,
532            value: Value::Null,
533            is_array_unnest: false,
534        };
535
536        let existing = self
537            .cages
538            .iter_mut()
539            .find(|c| matches!(c.kind, CageKind::Filter) && c.logical_op == LogicalOp::And);
540
541        if let Some(cage) = existing {
542            cage.conditions
543                .retain(|cond| !is_tenant_column_condition(cond, tenant_col));
544            cage.conditions.push(condition);
545        } else {
546            self.cages.push(Cage {
547                kind: CageKind::Filter,
548                conditions: vec![condition],
549                logical_op: LogicalOp::And,
550            });
551        }
552
553        self
554    }
555
556    /// Auto-set tenant scope in INSERT/UPSERT payload.
557    ///
558    /// Adds the tenant column to the Payload cage so the scope id
559    /// is always included in INSERT statements.
560    fn scope_insert_tenant(self, tenant_col: &str, ctx: &RlsContext) -> QailBuildResult<Self> {
561        self.scope_insert_value(tenant_col, Value::String(ctx.tenant_id.clone()))
562    }
563
564    /// Auto-set `tenant_col = NULL` in INSERT/UPSERT payload for global rows.
565    fn scope_insert_global(self, tenant_col: &str) -> QailBuildResult<Self> {
566        self.scope_insert_value(tenant_col, Value::Null)
567    }
568
569    fn scope_insert_value(
570        mut self,
571        tenant_col: &str,
572        tenant_value: Value,
573    ) -> QailBuildResult<Self> {
574        let payload_idx = self
575            .cages
576            .iter()
577            .position(|c| matches!(c.kind, CageKind::Payload));
578
579        let Some(idx) = payload_idx else {
580            self.cages.push(Cage {
581                kind: CageKind::Payload,
582                conditions: vec![make_named_condition(tenant_col, tenant_value)],
583                logical_op: LogicalOp::And,
584            });
585            return Ok(self);
586        };
587
588        let positional = payload_is_positional(&self.cages[idx]);
589        if positional {
590            if self.columns.is_empty() {
591                return Err(QailBuildError::RlsInsertRequiresExplicitColumns {
592                    table: self.table,
593                    tenant_column: tenant_col.to_string(),
594                });
595            }
596
597            if let Some(col_idx) = self
598                .columns
599                .iter()
600                .position(|expr| expr_named_eq(expr, tenant_col))
601            {
602                let placeholder = format!("${}", col_idx + 1);
603                let cage = &mut self.cages[idx];
604                if let Some(cond) = cage
605                    .conditions
606                    .iter_mut()
607                    .find(|cond| expr_named_eq(&cond.left, &placeholder))
608                {
609                    cond.value = tenant_value;
610                    cond.op = Operator::Eq;
611                    cond.is_array_unnest = false;
612                } else {
613                    cage.conditions
614                        .push(make_positional_condition(col_idx, tenant_value));
615                }
616                return Ok(self);
617            }
618
619            if !self.columns.is_empty() {
620                self.columns.push(Expr::Named(tenant_col.to_string()));
621                let idx_col = self.columns.len() - 1;
622                let cage = &mut self.cages[idx];
623                cage.conditions
624                    .push(make_positional_condition(idx_col, tenant_value));
625                return Ok(self);
626            }
627        }
628
629        let cage = &mut self.cages[idx];
630        cage.conditions
631            .retain(|cond| !is_tenant_column_condition(cond, tenant_col));
632        cage.conditions
633            .push(make_named_condition(tenant_col, tenant_value));
634        Ok(self)
635    }
636
637    fn scope_merge_tenant(mut self, tenant_col: &str, ctx: &RlsContext) -> QailBuildResult<Self> {
638        self.scope_merge_query_source(ctx, tenant_col)?;
639        self.reject_merge_tenant_update_mutation(tenant_col)?;
640        let target_col = self.merge_target_tenant_col(tenant_col);
641        let source_col = self.merge_source_tenant_col(tenant_col);
642        self.scope_merge_on_tenant_equality(tenant_col, target_col.clone(), source_col.clone());
643
644        let condition = Condition {
645            left: Expr::Named(target_col),
646            op: Operator::Eq,
647            value: Value::String(ctx.tenant_id.clone()),
648            is_array_unnest: false,
649        };
650        let source_condition = source_col.map(|source_col| Condition {
651            left: Expr::Named(source_col),
652            op: Operator::Eq,
653            value: Value::String(ctx.tenant_id.clone()),
654            is_array_unnest: false,
655        });
656        self.scope_merge_clause_conditions(tenant_col, condition, source_condition);
657        self.scope_merge_insert_value(
658            tenant_col,
659            Expr::Literal(Value::String(ctx.tenant_id.clone())),
660        )?;
661        Ok(self)
662    }
663
664    fn scope_merge_global(mut self, tenant_col: &str) -> QailBuildResult<Self> {
665        self.scope_merge_query_source(&RlsContext::global(), tenant_col)?;
666        self.reject_merge_tenant_update_mutation(tenant_col)?;
667        let target_col = self.merge_target_tenant_col(tenant_col);
668        let source_col = self.merge_source_tenant_col(tenant_col);
669        self.scope_merge_on_tenant_equality(tenant_col, target_col.clone(), source_col.clone());
670
671        let condition = Condition {
672            left: Expr::Named(target_col),
673            op: Operator::IsNull,
674            value: Value::Null,
675            is_array_unnest: false,
676        };
677        let source_condition = source_col.map(|source_col| Condition {
678            left: Expr::Named(source_col),
679            op: Operator::IsNull,
680            value: Value::Null,
681            is_array_unnest: false,
682        });
683        self.scope_merge_clause_conditions(tenant_col, condition, source_condition);
684        self.scope_merge_insert_value(tenant_col, Expr::Literal(Value::Null))?;
685        Ok(self)
686    }
687
688    fn scope_merge_query_source(
689        &mut self,
690        ctx: &RlsContext,
691        tenant_col: &str,
692    ) -> QailBuildResult<()> {
693        let has_query_source = matches!(
694            self.merge.as_ref().map(|merge| &merge.source),
695            Some(MergeSource::Query { .. })
696        );
697        let Some(source_tenant_col) = self.merge_query_source_tenant_col(tenant_col) else {
698            if has_query_source {
699                return Err(QailBuildError::RlsMergeSourceTenantProjectionRequired {
700                    table: self.table.clone(),
701                    tenant_column: tenant_col.to_string(),
702                });
703            }
704            return Ok(());
705        };
706        let target_table = self.table.clone();
707
708        let Some(merge) = &mut self.merge else {
709            return Ok(());
710        };
711        let MergeSource::Query { query, .. } = &mut merge.source else {
712            return Ok(());
713        };
714
715        let scoped_query = query.as_ref().clone().with_rls(ctx)?;
716        let scoped_query = ensure_merge_query_source_projects_tenant(
717            scoped_query,
718            &target_table,
719            &source_tenant_col,
720        )?;
721        **query = scoped_query;
722        Ok(())
723    }
724
725    fn merge_target_tenant_col(&self, tenant_col: &str) -> String {
726        let (target_table, inline_alias) = split_table_reference(&self.table);
727        let qualifier = self
728            .merge
729            .as_ref()
730            .and_then(|merge| merge.target_alias.as_ref())
731            .map(String::as_str)
732            .or(inline_alias)
733            .unwrap_or(target_table);
734        format!("{qualifier}.{tenant_col}")
735    }
736
737    fn merge_source_tenant_col(&self, tenant_col: &str) -> Option<String> {
738        let merge = self.merge.as_ref()?;
739        match &merge.source {
740            MergeSource::Table { name, alias } => {
741                let (source_table, inline_alias) = split_table_reference(name);
742                let source_tenant_col = lookup_tenant_column(source_table)?;
743                let qualifier = alias.as_deref().or(inline_alias).unwrap_or(source_table);
744                Some(format!("{qualifier}.{source_tenant_col}"))
745            }
746            MergeSource::Query { query, alias } => {
747                let source_tenant_col = self.merge_query_source_tenant_col(tenant_col)?;
748                let qualifier = alias.as_deref()?;
749                if query_projects_tenant_col(query, &source_tenant_col) {
750                    Some(format!("{qualifier}.{source_tenant_col}"))
751                } else {
752                    None
753                }
754            }
755        }
756    }
757
758    fn merge_query_source_tenant_col(&self, tenant_col: &str) -> Option<String> {
759        let merge = self.merge.as_ref()?;
760        let MergeSource::Query { query, .. } = &merge.source else {
761            return None;
762        };
763
764        let (source_table, _) = split_table_reference(&query.table);
765        if let Some(source_tenant_col) = lookup_tenant_column(source_table) {
766            return Some(source_tenant_col);
767        }
768
769        if query_projects_tenant_col(query, tenant_col)
770            || self.cte_exposes_tenant_col(source_table, tenant_col)
771        {
772            return Some(tenant_col.to_string());
773        }
774
775        None
776    }
777
778    fn cte_exposes_tenant_col(&self, cte_name: &str, tenant_col: &str) -> bool {
779        self.ctes
780            .iter()
781            .find(|cte| normalize_ident(&cte.name) == normalize_ident(cte_name))
782            .is_some_and(|cte| {
783                if !cte.columns.is_empty() {
784                    cte.columns
785                        .iter()
786                        .any(|col| normalize_ident(col) == normalize_ident(tenant_col))
787                } else {
788                    let (base_table, _) = split_table_reference(&cte.base_query.table);
789                    query_projects_tenant_col(&cte.base_query, tenant_col)
790                        || lookup_tenant_column(base_table)
791                            .is_some_and(|col| normalize_ident(&col) == normalize_ident(tenant_col))
792                }
793            })
794    }
795
796    fn scope_merge_on_tenant_equality(
797        &mut self,
798        tenant_col: &str,
799        target_col: String,
800        source_col: Option<String>,
801    ) {
802        let Some(merge) = &mut self.merge else {
803            return;
804        };
805        merge
806            .on
807            .retain(|cond| !condition_references_tenant_column(cond, tenant_col));
808
809        if let Some(source_col) = source_col {
810            merge.on.push(Condition {
811                left: Expr::Named(target_col),
812                op: Operator::Eq,
813                value: Value::Column(source_col),
814                is_array_unnest: false,
815            });
816        }
817    }
818
819    fn scope_merge_clause_conditions(
820        &mut self,
821        tenant_col: &str,
822        target_condition: Condition,
823        source_condition: Option<Condition>,
824    ) {
825        let Some(merge) = &mut self.merge else {
826            return;
827        };
828
829        for clause in &mut merge.clauses {
830            clause
831                .condition
832                .retain(|cond| !condition_references_tenant_column(cond, tenant_col));
833
834            match clause.match_kind {
835                MergeMatchKind::Matched | MergeMatchKind::NotMatchedBySource => {
836                    clause.condition.push(target_condition.clone());
837                }
838                MergeMatchKind::NotMatchedByTarget => {
839                    if let Some(condition) = &source_condition {
840                        clause.condition.push(condition.clone());
841                    }
842                }
843            }
844        }
845    }
846
847    fn scope_merge_insert_value(
848        &mut self,
849        tenant_col: &str,
850        tenant_expr: Expr,
851    ) -> QailBuildResult<()> {
852        let Some(merge) = &mut self.merge else {
853            return Ok(());
854        };
855
856        for clause in &mut merge.clauses {
857            let MergeAction::Insert { columns, values } = &mut clause.action else {
858                continue;
859            };
860
861            if columns.is_empty() {
862                return Err(QailBuildError::RlsInsertRequiresExplicitColumns {
863                    table: self.table.clone(),
864                    tenant_column: tenant_col.to_string(),
865                });
866            }
867
868            if let Some(pos) = columns
869                .iter()
870                .position(|col| normalize_ident(col) == normalize_ident(tenant_col))
871            {
872                if let Some(value) = values.get_mut(pos) {
873                    *value = tenant_expr.clone();
874                } else {
875                    values.push(tenant_expr.clone());
876                }
877            } else {
878                columns.push(tenant_col.to_string());
879                values.push(tenant_expr.clone());
880            }
881        }
882
883        Ok(())
884    }
885
886    fn reject_merge_tenant_update_mutation(&self, tenant_col: &str) -> QailBuildResult<()> {
887        let assigns_tenant = self
888            .merge
889            .as_ref()
890            .is_some_and(|merge| {
891                merge.clauses.iter().any(|clause| {
892                    matches!(&clause.action, MergeAction::Update { assignments }
893                        if assignments
894                            .iter()
895                            .any(|(column, _)| normalize_ident(column) == normalize_ident(tenant_col)))
896                })
897            });
898
899        if assigns_tenant {
900            return Err(QailBuildError::RlsTenantColumnMutationDenied {
901                table: self.table.clone(),
902                tenant_column: tenant_col.to_string(),
903            });
904        }
905
906        Ok(())
907    }
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use crate::rls::tenant::register_tenant_table;
914    use crate::transpiler::ToSql;
915
916    // Each test uses a UNIQUE table name to avoid parallel-test interference
917    // on the global TENANT_TABLES registry.
918
919    #[test]
920    fn test_with_rls_injects_filter_on_get() {
921        register_tenant_table("_rls_get_orders", "tenant_id");
922
923        let ctx = RlsContext::tenant("t-123");
924        let query = Qail::get("_rls_get_orders")
925            .with_rls(&ctx)
926            .expect("rls should apply");
927
928        let filter = query
929            .cages
930            .iter()
931            .find(|c| matches!(c.kind, CageKind::Filter));
932        assert!(filter.is_some(), "Expected filter cage");
933
934        let conditions = &filter.unwrap().conditions;
935        assert!(
936            conditions.iter().any(|c| {
937                matches!(&c.left, Expr::Named(n) if n == "tenant_id")
938                    && matches!(&c.value, Value::String(v) if v == "t-123")
939            }),
940            "Expected tenant_id = 't-123' condition"
941        );
942    }
943
944    #[test]
945    fn test_with_rls_resolves_primary_table_alias_on_get() {
946        register_tenant_table("_rls_alias_get_orders", "tenant_id");
947
948        let ctx = RlsContext::tenant("tenant-alias");
949        let query = Qail::get("_rls_alias_get_orders")
950            .table_alias("o")
951            .with_rls(&ctx)
952            .expect("rls should apply through primary table alias");
953
954        let sql = query.to_sql();
955        assert!(
956            sql.contains("FROM _rls_alias_get_orders o"),
957            "expected aliased FROM table: {sql}"
958        );
959        assert!(
960            sql.contains("WHERE o.tenant_id = 'tenant-alias'"),
961            "RLS tenant filter should use the primary alias: {sql}"
962        );
963    }
964
965    #[test]
966    fn test_with_rls_injects_payload_on_add() {
967        register_tenant_table("_rls_add_orders", "tenant_id");
968
969        let ctx = RlsContext::tenant("t-456");
970        let query = Qail::add("_rls_add_orders")
971            .set_value("total", 100)
972            .with_rls(&ctx)
973            .expect("rls should apply");
974
975        let payload = query
976            .cages
977            .iter()
978            .find(|c| matches!(c.kind, CageKind::Payload));
979        assert!(payload.is_some(), "Expected payload cage");
980
981        let conditions = &payload.unwrap().conditions;
982        assert!(
983            conditions.iter().any(|c| {
984                matches!(&c.left, Expr::Named(n) if n == "tenant_id")
985                    && matches!(&c.value, Value::String(v) if v == "t-456")
986            }),
987            "Expected tenant_id = 't-456' in payload"
988        );
989    }
990
991    #[test]
992    fn test_with_rls_noop_for_super_admin() {
993        register_tenant_table("_rls_admin_orders", "tenant_id");
994
995        let token = crate::rls::SuperAdminToken::for_system_process("test_super_admin_noop");
996        let ctx = RlsContext::super_admin(token);
997        let query = Qail::get("_rls_admin_orders")
998            .with_rls(&ctx)
999            .expect("super admin rls should no-op");
1000
1001        let filter = query
1002            .cages
1003            .iter()
1004            .find(|c| matches!(c.kind, CageKind::Filter));
1005        assert!(filter.is_none(), "Super admin should not have filter");
1006    }
1007
1008    #[test]
1009    fn test_with_rls_noop_for_unregistered_table() {
1010        let ctx = RlsContext::tenant("t-789");
1011        let query = Qail::get("_rls_unreg_migrations")
1012            .with_rls(&ctx)
1013            .expect("unregistered table rls should no-op");
1014
1015        let filter = query
1016            .cages
1017            .iter()
1018            .find(|c| matches!(c.kind, CageKind::Filter));
1019        assert!(
1020            filter.is_none(),
1021            "Unregistered table should not have filter"
1022        );
1023    }
1024
1025    #[test]
1026    fn test_with_rls_noop_for_ddl() {
1027        register_tenant_table("_rls_ddl_orders", "tenant_id");
1028
1029        let ctx = RlsContext::tenant("t-000");
1030        let query = Qail {
1031            action: Action::Make,
1032            table: "_rls_ddl_orders".to_string(),
1033            ..Default::default()
1034        };
1035        let query = query.with_rls(&ctx).expect("ddl rls should no-op");
1036
1037        assert!(query.cages.is_empty(), "DDL should not inject cages");
1038    }
1039
1040    #[test]
1041    fn test_with_rls_appends_to_existing_filter() {
1042        register_tenant_table("_rls_merge_orders", "tenant_id");
1043
1044        let ctx = RlsContext::tenant("t-merge");
1045        let query = Qail::get("_rls_merge_orders")
1046            .filter("status", Operator::Eq, "active")
1047            .with_rls(&ctx)
1048            .expect("rls should apply");
1049
1050        let filters: Vec<_> = query
1051            .cages
1052            .iter()
1053            .filter(|c| matches!(c.kind, CageKind::Filter))
1054            .collect();
1055        assert_eq!(filters.len(), 1, "Should merge into one filter cage");
1056        assert_eq!(
1057            filters[0].conditions.len(),
1058            2,
1059            "Should have 2 conditions: status + tenant_id"
1060        );
1061    }
1062
1063    #[test]
1064    fn test_with_rls_does_not_merge_tenant_scope_into_or_filter_cage() {
1065        register_tenant_table("_rls_or_orders", "tenant_id");
1066
1067        let ctx = RlsContext::tenant("t-or");
1068        let query = Qail::get("_rls_or_orders")
1069            .or_filter("status", Operator::Eq, "active")
1070            .or_filter("status", Operator::Eq, "pending")
1071            .with_rls(&ctx)
1072            .expect("rls should apply");
1073
1074        let or_filter = query
1075            .cages
1076            .iter()
1077            .find(|c| matches!(c.kind, CageKind::Filter) && c.logical_op == LogicalOp::Or)
1078            .expect("Expected OR filter cage");
1079        assert_eq!(
1080            or_filter.conditions.len(),
1081            2,
1082            "OR cage should keep only OR terms"
1083        );
1084        assert!(
1085            !or_filter
1086                .conditions
1087                .iter()
1088                .any(|c| is_tenant_column_condition(c, "tenant_id")),
1089            "tenant scope must not be injected into OR cage"
1090        );
1091
1092        let and_filter = query
1093            .cages
1094            .iter()
1095            .find(|c| matches!(c.kind, CageKind::Filter) && c.logical_op == LogicalOp::And)
1096            .expect("Expected AND filter cage for tenant scope");
1097        assert!(
1098            and_filter
1099                .conditions
1100                .iter()
1101                .any(|c| is_tenant_column_condition(c, "tenant_id")),
1102            "tenant scope must be enforced via AND cage"
1103        );
1104
1105        let sql = query.to_sql();
1106        assert!(
1107            sql.contains("tenant_id = 't-or'"),
1108            "Expected tenant scope in SQL: {sql}"
1109        );
1110        assert!(
1111            !sql.contains("OR tenant_id = 't-or'"),
1112            "tenant scope must not be OR-ed with user conditions: {sql}"
1113        );
1114    }
1115
1116    #[test]
1117    fn test_with_rls_on_set_injects_filter() {
1118        register_tenant_table("_rls_set_orders", "tenant_id");
1119
1120        let ctx = RlsContext::tenant("t-set");
1121        let query = Qail::set("_rls_set_orders")
1122            .set_value("status", "shipped")
1123            .with_rls(&ctx)
1124            .expect("rls should apply");
1125
1126        let filter = query
1127            .cages
1128            .iter()
1129            .find(|c| matches!(c.kind, CageKind::Filter));
1130        assert!(filter.is_some(), "SET should inject filter");
1131
1132        let conditions = &filter.unwrap().conditions;
1133        assert!(
1134            conditions
1135                .iter()
1136                .any(|c| { matches!(&c.left, Expr::Named(n) if n == "tenant_id") }),
1137            "Expected tenant_id filter on SET"
1138        );
1139    }
1140
1141    #[test]
1142    fn test_with_rls_resolves_primary_table_alias_on_set() {
1143        register_tenant_table("_rls_alias_set_orders", "tenant_id");
1144
1145        let ctx = RlsContext::tenant("tenant-set-alias");
1146        let query = Qail::set("_rls_alias_set_orders")
1147            .table_alias("o")
1148            .set_value("status", "paid")
1149            .with_rls(&ctx)
1150            .expect("rls should apply through UPDATE alias");
1151
1152        let sql = query.to_sql();
1153        assert!(
1154            sql.contains("UPDATE _rls_alias_set_orders o SET status = 'paid'"),
1155            "expected aliased UPDATE target: {sql}"
1156        );
1157        assert!(
1158            sql.contains("WHERE o.tenant_id = 'tenant-set-alias'"),
1159            "RLS tenant filter should use the UPDATE alias: {sql}"
1160        );
1161    }
1162
1163    #[test]
1164    fn test_with_rls_on_set_rejects_tenant_column_update() {
1165        register_tenant_table("_rls_set_tenant_rewrite_orders", "tenant_id");
1166
1167        let ctx = RlsContext::tenant("tenant-a");
1168        let err = Qail::set("_rls_set_tenant_rewrite_orders")
1169            .set_value("tenant_id", "tenant-b")
1170            .with_rls(&ctx)
1171            .expect_err("tenant column updates must fail closed");
1172
1173        assert!(err.to_string().contains("tenant column mutation"));
1174    }
1175
1176    #[test]
1177    fn test_with_rls_injects_filter_on_read_like_actions() {
1178        let actions = [
1179            (Action::Cnt, "_rls_cnt_orders"),
1180            (Action::Export, "_rls_export_orders"),
1181            (Action::Search, "_rls_search_vectors"),
1182            (Action::Scroll, "_rls_scroll_vectors"),
1183        ];
1184
1185        for (action, table) in actions {
1186            register_tenant_table(table, "tenant_id");
1187
1188            let ctx = RlsContext::tenant("tenant-read-like");
1189            let query = Qail {
1190                action,
1191                table: table.to_string(),
1192                ..Default::default()
1193            }
1194            .with_rls(&ctx)
1195            .expect("read-like action should apply RLS");
1196
1197            let filter = query
1198                .cages
1199                .iter()
1200                .find(|c| matches!(c.kind, CageKind::Filter))
1201                .expect("Expected filter cage");
1202
1203            assert!(
1204                filter.conditions.iter().any(|c| {
1205                    matches!(&c.left, Expr::Named(n) if n == "tenant_id")
1206                        && matches!(&c.value, Value::String(v) if v == "tenant-read-like")
1207                }),
1208                "Expected tenant filter on {action:?}"
1209            );
1210        }
1211    }
1212
1213    #[test]
1214    fn test_with_rls_noop_no_tenant() {
1215        register_tenant_table("_rls_noops_orders", "tenant_id");
1216
1217        // Agent-only context without tenant_id
1218        let ctx = RlsContext::agent("ag-only");
1219        let query = Qail::get("_rls_noops_orders")
1220            .with_rls(&ctx)
1221            .expect("missing tenant rls should no-op");
1222
1223        let filter = query
1224            .cages
1225            .iter()
1226            .find(|c| matches!(c.kind, CageKind::Filter));
1227        assert!(
1228            filter.is_none(),
1229            "Agent-only should not inject tenant filter"
1230        );
1231    }
1232
1233    #[test]
1234    fn test_with_rls_global_injects_is_null_filter() {
1235        register_tenant_table("_rls_global_get_orders", "tenant_id");
1236
1237        let ctx = RlsContext::global();
1238        let query = Qail::get("_rls_global_get_orders")
1239            .with_rls(&ctx)
1240            .expect("global rls should apply");
1241
1242        let filter = query
1243            .cages
1244            .iter()
1245            .find(|c| matches!(c.kind, CageKind::Filter));
1246        assert!(filter.is_some(), "Expected filter cage for global scope");
1247
1248        let conditions = &filter.expect("filter cage").conditions;
1249        assert!(
1250            conditions.iter().any(|c| {
1251                matches!(&c.left, Expr::Named(n) if n == "tenant_id")
1252                    && c.op == Operator::IsNull
1253                    && matches!(&c.value, Value::Null)
1254            }),
1255            "Expected tenant_id IS NULL condition"
1256        );
1257    }
1258
1259    #[test]
1260    fn test_with_rls_global_injects_null_payload_on_add() {
1261        register_tenant_table("_rls_global_add_catalog", "tenant_id");
1262
1263        let ctx = RlsContext::global();
1264        let query = Qail::add("_rls_global_add_catalog")
1265            .set_value("name", "item")
1266            .with_rls(&ctx)
1267            .expect("global rls should apply");
1268
1269        let payload = query
1270            .cages
1271            .iter()
1272            .find(|c| matches!(c.kind, CageKind::Payload));
1273        assert!(payload.is_some(), "Expected payload cage");
1274
1275        let conditions = &payload.expect("payload cage").conditions;
1276        assert!(
1277            conditions.iter().any(|c| {
1278                matches!(&c.left, Expr::Named(n) if n == "tenant_id")
1279                    && matches!(&c.value, Value::Null)
1280            }),
1281            "Expected tenant_id = NULL in payload"
1282        );
1283    }
1284
1285    #[test]
1286    fn test_with_rls_scopes_expression_subquery() {
1287        register_tenant_table("_rls_expr_orders", "tenant_id");
1288        register_tenant_table("_rls_expr_invoices", "tenant_id");
1289
1290        let ctx = RlsContext::tenant("tenant-expr");
1291        let mut query = Qail::get("_rls_expr_orders").columns(["id"]);
1292        query.columns.push(Expr::Subquery {
1293            query: Box::new(Qail::get("_rls_expr_invoices").columns(["total"])),
1294            alias: Some("invoice_total".to_string()),
1295        });
1296
1297        let query = query.with_rls(&ctx).expect("rls should apply");
1298        let subquery = query
1299            .columns
1300            .iter()
1301            .find_map(|expr| {
1302                if let Expr::Subquery { query, .. } = expr {
1303                    Some(query)
1304                } else {
1305                    None
1306                }
1307            })
1308            .expect("expression subquery");
1309
1310        assert!(subquery.cages.iter().any(|cage| {
1311            matches!(cage.kind, CageKind::Filter) && cage.conditions.iter().any(|condition| {
1312                matches!(&condition.left, Expr::Named(name) if name == "tenant_id")
1313                    && matches!(&condition.value, Value::String(value) if value == "tenant-expr")
1314            })
1315        }));
1316    }
1317
1318    #[test]
1319    fn test_with_rls_scopes_condition_value_subquery() {
1320        register_tenant_table("_rls_condition_orders", "tenant_id");
1321        register_tenant_table("_rls_condition_invoices", "tenant_id");
1322
1323        let ctx = RlsContext::tenant("tenant-condition");
1324        let query = Qail::get("_rls_condition_orders")
1325            .filter(
1326                "id",
1327                Operator::In,
1328                Value::Subquery(Box::new(
1329                    Qail::get("_rls_condition_invoices").columns(["order_id"]),
1330                )),
1331            )
1332            .with_rls(&ctx)
1333            .expect("rls should apply");
1334
1335        let subquery = query
1336            .cages
1337            .iter()
1338            .flat_map(|cage| &cage.conditions)
1339            .find_map(|condition| {
1340                if let Value::Subquery(query) = &condition.value {
1341                    Some(query)
1342                } else {
1343                    None
1344                }
1345            })
1346            .expect("condition subquery");
1347
1348        assert!(subquery.cages.iter().any(|cage| {
1349            matches!(cage.kind, CageKind::Filter)
1350                && cage.conditions.iter().any(|condition| {
1351                    matches!(&condition.left, Expr::Named(name) if name == "tenant_id")
1352                        && matches!(&condition.value, Value::String(value) if value == "tenant-condition")
1353                })
1354        }));
1355    }
1356
1357    #[test]
1358    fn test_with_rls_scopes_merge_on_and_insert_action() {
1359        register_tenant_table("_rls_merge_upsert_orders", "tenant_id");
1360        register_tenant_table("_rls_merge_source_orders", "tenant_id");
1361
1362        let ctx = RlsContext::tenant("tenant-merge");
1363        let query = Qail::merge_into("_rls_merge_upsert_orders")
1364            .target_alias("t")
1365            .using_table_as("_rls_merge_source_orders", "s")
1366            .merge_on_column("t.id", Operator::Eq, "s.id")
1367            .when_matched_update(&[("status", Expr::Named("s.status".to_string()))])
1368            .when_not_matched_insert(
1369                &["id", "status"],
1370                &[
1371                    Expr::Named("s.id".to_string()),
1372                    Expr::Named("s.status".to_string()),
1373                ],
1374            )
1375            .with_rls(&ctx)
1376            .expect("merge rls should apply");
1377
1378        let sql = query.to_sql();
1379        assert!(
1380            sql.contains("ON t.id = s.id AND t.tenant_id = s.tenant_id"),
1381            "MERGE ON must preserve target/source tenant equality: {sql}"
1382        );
1383        assert!(
1384            sql.contains("WHEN MATCHED AND t.tenant_id = 'tenant-merge' THEN UPDATE"),
1385            "MERGE matched branch must be target-tenant scoped: {sql}"
1386        );
1387        assert!(
1388            sql.contains("WHEN NOT MATCHED BY TARGET AND s.tenant_id = 'tenant-merge' THEN INSERT"),
1389            "MERGE insert branch must be source-tenant scoped: {sql}"
1390        );
1391        assert!(
1392            sql.contains("INSERT (id, status, tenant_id) VALUES (s.id, s.status, 'tenant-merge')"),
1393            "MERGE insert branch must include tenant value: {sql}"
1394        );
1395    }
1396
1397    #[test]
1398    fn test_with_rls_scopes_merge_inline_source_alias() {
1399        register_tenant_table("_rls_merge_inline_target_orders", "tenant_id");
1400        register_tenant_table("_rls_merge_inline_source_orders", "tenant_id");
1401
1402        let ctx = RlsContext::tenant("tenant-inline");
1403        let query = Qail::merge_into("_rls_merge_inline_target_orders")
1404            .target_alias("t")
1405            .using_table("_rls_merge_inline_source_orders s")
1406            .merge_on_column("t.id", Operator::Eq, "s.id")
1407            .when_matched_update(&[("status", Expr::Named("s.status".to_string()))])
1408            .when_not_matched_insert(
1409                &["id", "status"],
1410                &[
1411                    Expr::Named("s.id".to_string()),
1412                    Expr::Named("s.status".to_string()),
1413                ],
1414            )
1415            .with_rls(&ctx)
1416            .expect("merge rls should apply through inline source alias");
1417
1418        let sql = query.to_sql();
1419        assert!(
1420            sql.contains("USING _rls_merge_inline_source_orders s"),
1421            "MERGE source should keep inline alias: {sql}"
1422        );
1423        assert!(
1424            sql.contains("ON t.id = s.id AND t.tenant_id = s.tenant_id"),
1425            "MERGE ON must scope inline source alias tenant equality: {sql}"
1426        );
1427        assert!(
1428            sql.contains(
1429                "WHEN NOT MATCHED BY TARGET AND s.tenant_id = 'tenant-inline' THEN INSERT"
1430            ),
1431            "MERGE insert branch must scope inline source alias: {sql}"
1432        );
1433    }
1434
1435    #[test]
1436    fn test_with_rls_scopes_merge_query_source() {
1437        register_tenant_table("_rls_merge_query_target_orders", "tenant_id");
1438        register_tenant_table("_rls_merge_query_source_orders", "tenant_id");
1439
1440        let ctx = RlsContext::tenant("tenant-query");
1441        let source = Qail::get("_rls_merge_query_source_orders").columns(["id", "status"]);
1442        let query = Qail::merge_into("_rls_merge_query_target_orders")
1443            .target_alias("t")
1444            .using_query_as(source, "s")
1445            .merge_on_column("t.id", Operator::Eq, "s.id")
1446            .when_not_matched_insert(
1447                &["id", "status"],
1448                &[
1449                    Expr::Named("s.id".to_string()),
1450                    Expr::Named("s.status".to_string()),
1451                ],
1452            )
1453            .with_rls(&ctx)
1454            .expect("merge rls should apply");
1455
1456        let merge = query.merge.as_ref().expect("merge spec");
1457        let MergeSource::Query {
1458            query: source_query,
1459            ..
1460        } = &merge.source
1461        else {
1462            panic!("expected query source");
1463        };
1464        assert!(
1465            source_query.cages.iter().any(|cage| {
1466                matches!(cage.kind, CageKind::Filter)
1467                    && cage.conditions.iter().any(|condition| {
1468                        matches!(&condition.left, Expr::Named(name) if name == "tenant_id")
1469                            && condition.op == Operator::Eq
1470                            && matches!(&condition.value, Value::String(value) if value == "tenant-query")
1471                    })
1472            }),
1473            "MERGE query source must be tenant-scoped"
1474        );
1475        assert!(
1476            source_query
1477                .columns
1478                .iter()
1479                .any(|expr| matches!(expr, Expr::Named(name) if name == "tenant_id")),
1480            "MERGE query source must project tenant_id for ON classification"
1481        );
1482
1483        let sql = query.to_sql();
1484        assert!(
1485            sql.contains("ON t.id = s.id AND t.tenant_id = s.tenant_id"),
1486            "MERGE query source ON must include target/source tenant equality: {sql}"
1487        );
1488        assert!(
1489            sql.contains("WHEN NOT MATCHED BY TARGET AND s.tenant_id = 'tenant-query' THEN INSERT"),
1490            "MERGE query source insert branch must be source-tenant scoped: {sql}"
1491        );
1492    }
1493
1494    #[test]
1495    fn test_with_rls_scopes_aliased_merge_query_source_table() {
1496        register_tenant_table("_rls_merge_query_alias_target_orders", "tenant_id");
1497        register_tenant_table("_rls_merge_query_alias_source_orders", "tenant_id");
1498
1499        let ctx = RlsContext::tenant("tenant-query-alias");
1500        let source = Qail::get("_rls_merge_query_alias_source_orders")
1501            .table_alias("base")
1502            .columns(["id", "status"]);
1503        let query = Qail::merge_into("_rls_merge_query_alias_target_orders")
1504            .target_alias("t")
1505            .using_query_as(source, "s")
1506            .merge_on_column("t.id", Operator::Eq, "s.id")
1507            .when_matched_update(&[("status", Expr::Named("s.status".to_string()))])
1508            .when_not_matched_insert(
1509                &["id", "status"],
1510                &[
1511                    Expr::Named("s.id".to_string()),
1512                    Expr::Named("s.status".to_string()),
1513                ],
1514            )
1515            .with_rls(&ctx)
1516            .expect("merge rls should apply through aliased source query table");
1517
1518        let sql = query.to_sql();
1519        assert!(
1520            sql.contains("FROM _rls_merge_query_alias_source_orders base WHERE base.tenant_id = 'tenant-query-alias'"),
1521            "MERGE source query should be scoped through its base-table alias: {sql}"
1522        );
1523        assert!(
1524            sql.contains("ON t.id = s.id AND t.tenant_id = s.tenant_id"),
1525            "MERGE query source ON must include outer source tenant equality: {sql}"
1526        );
1527    }
1528
1529    #[test]
1530    fn test_with_rls_scopes_cte_backed_merge_source() {
1531        register_tenant_table("_rls_merge_cte_target_orders", "tenant_id");
1532        register_tenant_table("_rls_merge_cte_source_orders", "tenant_id");
1533
1534        let ctx = RlsContext::tenant("tenant-cte");
1535        let incoming =
1536            Qail::get("_rls_merge_cte_source_orders").columns(["id", "status", "tenant_id"]);
1537        let source_query = Qail::get("incoming").columns(["id", "status", "tenant_id"]);
1538        let query = Qail::merge_into("_rls_merge_cte_target_orders")
1539            .target_alias("t")
1540            .with("incoming", incoming)
1541            .using_query_as(source_query, "s")
1542            .merge_on_column("t.id", Operator::Eq, "s.id")
1543            .when_matched_update(&[("status", Expr::Named("s.status".to_string()))])
1544            .when_not_matched_insert(
1545                &["id", "status"],
1546                &[
1547                    Expr::Named("s.id".to_string()),
1548                    Expr::Named("s.status".to_string()),
1549                ],
1550            )
1551            .with_rls(&ctx)
1552            .expect("merge rls should apply");
1553
1554        let cte = query.ctes.first().expect("incoming CTE");
1555        assert!(
1556            cte.base_query.cages.iter().any(|cage| {
1557                matches!(cage.kind, CageKind::Filter) && cage.conditions.iter().any(|condition| {
1558                    matches!(&condition.left, Expr::Named(name) if name == "tenant_id")
1559                        && condition.op == Operator::Eq
1560                        && matches!(&condition.value, Value::String(value) if value == "tenant-cte")
1561                })
1562            }),
1563            "outer MERGE CTE source must be tenant-scoped"
1564        );
1565
1566        let sql = query.to_sql();
1567        assert!(
1568            sql.contains("ON t.id = s.id AND t.tenant_id = s.tenant_id"),
1569            "CTE-backed MERGE query source ON must include tenant equality: {sql}"
1570        );
1571        assert!(
1572            sql.contains("WHEN NOT MATCHED BY TARGET AND s.tenant_id = 'tenant-cte' THEN INSERT"),
1573            "CTE-backed MERGE insert branch must be source-tenant scoped: {sql}"
1574        );
1575    }
1576
1577    #[test]
1578    fn test_with_rls_scopes_cte_alias_queries_before_table_lookup() {
1579        register_tenant_table("_rls_cte_alias_source_orders", "tenant_id");
1580
1581        let ctx = RlsContext::tenant("tenant-alias");
1582        let query = Qail::get("incoming")
1583            .with(
1584                "incoming",
1585                Qail::get("_rls_cte_alias_source_orders").columns(["id", "tenant_id"]),
1586            )
1587            .with_rls(&ctx)
1588            .expect("cte alias query should still scope registered CTE body");
1589
1590        let cte = query.ctes.first().expect("incoming CTE");
1591        assert!(
1592            cte.base_query.cages.iter().any(|cage| {
1593                matches!(cage.kind, CageKind::Filter)
1594                    && cage.conditions.iter().any(|condition| {
1595                        matches!(&condition.left, Expr::Named(name) if name == "tenant_id")
1596                            && matches!(&condition.value, Value::String(value) if value == "tenant-alias")
1597                    })
1598            }),
1599            "registered CTE bodies must be scoped even when outer table is a CTE alias"
1600        );
1601    }
1602
1603    #[test]
1604    fn test_with_rls_rejects_merge_tenant_column_update() {
1605        register_tenant_table("_rls_merge_tenant_rewrite_orders", "tenant_id");
1606        register_tenant_table("_rls_merge_tenant_rewrite_source", "tenant_id");
1607
1608        let ctx = RlsContext::tenant("tenant-a");
1609        let err = Qail::merge_into("_rls_merge_tenant_rewrite_orders")
1610            .using_table_as("_rls_merge_tenant_rewrite_source", "s")
1611            .merge_on_column("_rls_merge_tenant_rewrite_orders.id", Operator::Eq, "s.id")
1612            .when_matched_update(&[("tenant_id", Expr::Named("s.tenant_id".to_string()))])
1613            .with_rls(&ctx)
1614            .expect_err("MERGE tenant column updates must fail closed");
1615
1616        assert!(err.to_string().contains("tenant column mutation"));
1617    }
1618
1619    #[test]
1620    fn test_with_rls_global_scopes_merge_query_source() {
1621        register_tenant_table("_rls_global_merge_query_target", "tenant_id");
1622        register_tenant_table("_rls_global_merge_query_source", "tenant_id");
1623
1624        let source = Qail::get("_rls_global_merge_query_source").columns(["id", "name"]);
1625        let query = Qail::merge_into("_rls_global_merge_query_target")
1626            .using_query_as(source, "s")
1627            .merge_on_column("_rls_global_merge_query_target.id", Operator::Eq, "s.id")
1628            .when_not_matched_insert(
1629                &["id", "name"],
1630                &[
1631                    Expr::Named("s.id".to_string()),
1632                    Expr::Named("s.name".to_string()),
1633                ],
1634            )
1635            .with_rls(&RlsContext::global())
1636            .expect("global merge rls should apply");
1637
1638        let merge = query.merge.as_ref().expect("merge spec");
1639        let MergeSource::Query {
1640            query: source_query,
1641            ..
1642        } = &merge.source
1643        else {
1644            panic!("expected query source");
1645        };
1646        assert!(
1647            source_query.cages.iter().any(|cage| {
1648                matches!(cage.kind, CageKind::Filter)
1649                    && cage.conditions.iter().any(|condition| {
1650                        matches!(&condition.left, Expr::Named(name) if name == "tenant_id")
1651                            && condition.op == Operator::IsNull
1652                            && matches!(condition.value, Value::Null)
1653                    })
1654            }),
1655            "global MERGE query source must be scoped to NULL tenant rows"
1656        );
1657
1658        let sql = query.to_sql();
1659        assert!(
1660            sql.contains("ON _rls_global_merge_query_target.id = s.id AND _rls_global_merge_query_target.tenant_id = s.tenant_id"),
1661            "global MERGE query source ON must include target/source tenant equality: {sql}"
1662        );
1663        assert!(
1664            sql.contains("WHEN NOT MATCHED BY TARGET AND s.tenant_id IS NULL THEN INSERT"),
1665            "global MERGE query source insert branch must be source-tenant scoped: {sql}"
1666        );
1667    }
1668
1669    #[test]
1670    fn test_with_rls_rejects_merge_query_source_without_tenant_projection() {
1671        register_tenant_table("_rls_merge_aggregate_target", "tenant_id");
1672        register_tenant_table("_rls_merge_aggregate_source", "tenant_id");
1673
1674        let mut source = Qail::get("_rls_merge_aggregate_source");
1675        source.columns.push(Expr::Aggregate {
1676            col: "*".to_string(),
1677            func: crate::ast::AggregateFunc::Count,
1678            distinct: false,
1679            filter: None,
1680            alias: Some("total".to_string()),
1681        });
1682
1683        let err = Qail::merge_into("_rls_merge_aggregate_target")
1684            .target_alias("t")
1685            .using_query_as(source, "s")
1686            .merge_on_column("t.id", Operator::Eq, "s.id")
1687            .when_not_matched_insert(&["id"], &[Expr::Named("s.id".to_string())])
1688            .with_rls(&RlsContext::tenant("tenant-aggregate"))
1689            .expect_err("aggregate query source without tenant projection must fail closed");
1690
1691        assert!(err.to_string().contains("MERGE query sources"));
1692    }
1693
1694    #[test]
1695    fn test_with_rls_scopes_merge_by_source_delete_without_target_only_on_predicate() {
1696        register_tenant_table("_rls_merge_prune_orders", "tenant_id");
1697        register_tenant_table("_rls_merge_prune_source_orders", "tenant_id");
1698
1699        let ctx = RlsContext::tenant("tenant-prune");
1700        let query = Qail::merge_into("_rls_merge_prune_orders")
1701            .target_alias("t")
1702            .using_table_as("_rls_merge_prune_source_orders", "s")
1703            .merge_on_column("t.id", Operator::Eq, "s.id")
1704            .when_not_matched_by_source_delete()
1705            .with_rls(&ctx)
1706            .expect("merge rls should apply");
1707
1708        let sql = query.to_sql();
1709        assert!(
1710            sql.contains("ON t.id = s.id AND t.tenant_id = s.tenant_id"),
1711            "MERGE ON should use target/source tenant equality, not a target-only literal: {sql}"
1712        );
1713        assert!(
1714            sql.contains("WHEN NOT MATCHED BY SOURCE AND t.tenant_id = 'tenant-prune' THEN DELETE"),
1715            "BY SOURCE delete must be target-tenant scoped in the WHEN branch: {sql}"
1716        );
1717        assert!(
1718            !sql.contains("ON t.id = s.id AND t.tenant_id = 'tenant-prune'"),
1719            "target-only tenant predicates in ON can misclassify BY SOURCE rows: {sql}"
1720        );
1721    }
1722
1723    #[test]
1724    fn test_with_rls_global_scopes_merge_to_null_tenant() {
1725        register_tenant_table("_rls_global_merge_catalog", "tenant_id");
1726        register_tenant_table("_rls_global_merge_source", "tenant_id");
1727
1728        let query = Qail::merge_into("_rls_global_merge_catalog")
1729            .using_table_as("_rls_global_merge_source", "s")
1730            .merge_on_column("_rls_global_merge_catalog.id", Operator::Eq, "s.id")
1731            .when_not_matched_insert(
1732                &["id", "name"],
1733                &[
1734                    Expr::Named("s.id".to_string()),
1735                    Expr::Named("s.name".to_string()),
1736                ],
1737            )
1738            .with_rls(&RlsContext::global())
1739            .expect("global merge rls should apply");
1740
1741        let sql = query.to_sql();
1742        assert!(
1743            sql.contains(
1744                "ON _rls_global_merge_catalog.id = s.id AND _rls_global_merge_catalog.tenant_id = s.tenant_id"
1745            ),
1746            "global MERGE ON must preserve target/source tenant equality: {sql}"
1747        );
1748        assert!(
1749            sql.contains("WHEN NOT MATCHED BY TARGET AND s.tenant_id IS NULL THEN INSERT"),
1750            "global MERGE insert branch must be source-null scoped: {sql}"
1751        );
1752        assert!(
1753            sql.contains("INSERT (id, name, tenant_id) VALUES (s.id, s.name, NULL)"),
1754            "global MERGE insert branch must include NULL tenant: {sql}"
1755        );
1756    }
1757
1758    #[test]
1759    fn test_with_rls_is_idempotent_on_filter_scope() {
1760        register_tenant_table("_rls_idempotent_get_orders", "tenant_id");
1761
1762        let ctx = RlsContext::tenant("t-idempotent");
1763        let query = Qail::get("_rls_idempotent_get_orders")
1764            .with_rls(&ctx)
1765            .expect("rls should apply")
1766            .with_rls(&ctx);
1767        let query = query.expect("rls should remain idempotent");
1768
1769        let filter = query
1770            .cages
1771            .iter()
1772            .find(|c| matches!(c.kind, CageKind::Filter))
1773            .expect("filter cage");
1774
1775        let tenant_matches = filter
1776            .conditions
1777            .iter()
1778            .filter(|c| matches!(&c.left, Expr::Named(n) if n == "tenant_id"))
1779            .count();
1780        assert_eq!(tenant_matches, 1, "tenant scope should not duplicate");
1781    }
1782
1783    #[test]
1784    fn test_with_rls_add_positional_payload_aligns_insert_columns() {
1785        register_tenant_table("_rls_positional_add_orders", "tenant_id");
1786
1787        let ctx = RlsContext::tenant("tenant-positional");
1788        let query = Qail::add("_rls_positional_add_orders")
1789            .columns(["id", "total"])
1790            .values([Value::Int(1), Value::Int(100)])
1791            .with_rls(&ctx)
1792            .expect("rls should apply");
1793
1794        let sql = query.to_sql();
1795        assert!(
1796            sql.contains("tenant_id"),
1797            "tenant column should be injected"
1798        );
1799        assert!(
1800            sql.contains("VALUES (1, 100, 'tenant-positional')"),
1801            "insert payload should include injected tenant value in positional order: {sql}"
1802        );
1803    }
1804
1805    #[test]
1806    fn test_with_rls_add_positional_payload_overrides_existing_tenant_column_value() {
1807        register_tenant_table("_rls_positional_add_override_orders", "tenant_id");
1808
1809        let ctx = RlsContext::tenant("tenant-final");
1810        let query = Qail::add("_rls_positional_add_override_orders")
1811            .columns(["id", "tenant_id", "total"])
1812            .values([
1813                Value::Int(1),
1814                Value::String("tenant-wrong".to_string()),
1815                Value::Int(50),
1816            ])
1817            .with_rls(&ctx)
1818            .expect("rls should apply");
1819
1820        let sql = query.to_sql();
1821        assert!(sql.contains("'tenant-final'"));
1822        assert!(!sql.contains("'tenant-wrong'"));
1823    }
1824
1825    #[test]
1826    fn test_with_rls_add_positional_payload_without_columns_errors() {
1827        register_tenant_table("_rls_positional_add_without_columns_orders", "tenant_id");
1828
1829        let ctx = RlsContext::tenant("tenant-without-columns");
1830        let err = Qail::add("_rls_positional_add_without_columns_orders")
1831            .values([Value::Int(1), Value::Int(100)])
1832            .with_rls(&ctx)
1833            .expect_err("positional payload without columns should fail");
1834
1835        assert!(err.to_string().contains("requires explicit columns"));
1836    }
1837
1838    #[test]
1839    fn test_with_rls_replaces_qualified_tenant_filter() {
1840        register_tenant_table("_rls_qualified_tenant_filter_orders", "tenant_id");
1841
1842        let ctx = RlsContext::tenant("tenant-final");
1843        let query = Qail::get("_rls_qualified_tenant_filter_orders")
1844            .filter("orders.tenant_id", Operator::Eq, "tenant-wrong")
1845            .with_rls(&ctx)
1846            .expect("rls should apply");
1847
1848        let sql = query.to_sql();
1849        assert!(sql.contains("'tenant-final'"));
1850        assert!(!sql.contains("'tenant-wrong'"));
1851    }
1852}