database_replicator/
table_rules.rs

1// ABOUTME: Table-level replication rules for schema-only and filtered copies
2// ABOUTME: Supports CLI/config inputs and deterministic fingerprints
3
4use crate::utils;
5use crate::utils::quote_ident;
6use anyhow::{anyhow, bail, Context, Result};
7use sha2::{Digest, Sha256};
8use std::collections::{BTreeMap, BTreeSet};
9
10/// Represents a fully-qualified table identifier with optional database and schema
11/// Supports parsing from: `database.schema.table`, `schema.table`, or `table`
12/// Defaults to `public` schema for backward compatibility
13#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
14pub struct QualifiedTable {
15    pub database: Option<String>,
16    pub schema: String,
17    pub table: String,
18}
19
20impl QualifiedTable {
21    /// Parse a table specification from CLI or config
22    /// Formats: `database.schema.table`, `schema.table`, or `table`
23    /// Defaults to `public` schema if not specified
24    pub fn parse(spec: &str) -> Result<Self> {
25        let trimmed = spec.trim();
26        if trimmed.is_empty() {
27            bail!("Table specification cannot be empty");
28        }
29
30        let parts: Vec<&str> = trimmed.split('.').collect();
31        match parts.len() {
32            1 => {
33                // Just table name: defaults to public schema, no database
34                let table = non_empty(parts[0], "table")?;
35                utils::validate_postgres_identifier(&table)?;
36                Ok(QualifiedTable {
37                    database: None,
38                    schema: "public".to_string(),
39                    table,
40                })
41            }
42            2 => {
43                // schema.table OR database.table
44                // We treat as schema.table for consistency
45                // Can be disambiguated later if database is provided separately
46                let first = non_empty(parts[0], "schema")?;
47                let second = non_empty(parts[1], "table")?;
48                utils::validate_postgres_identifier(&first)?;
49                utils::validate_postgres_identifier(&second)?;
50                Ok(QualifiedTable {
51                    database: None,
52                    schema: first,
53                    table: second,
54                })
55            }
56            3 => {
57                // database.schema.table
58                let database = non_empty(parts[0], "database")?;
59                let schema = non_empty(parts[1], "schema")?;
60                let table = non_empty(parts[2], "table")?;
61                utils::validate_postgres_identifier(&database)?;
62                utils::validate_postgres_identifier(&schema)?;
63                utils::validate_postgres_identifier(&table)?;
64                Ok(QualifiedTable {
65                    database: Some(database),
66                    schema,
67                    table,
68                })
69            }
70            _ => bail!(
71                "Invalid table specification '{}': must be 'table', 'schema.table', or 'database.schema.table'",
72                spec
73            ),
74        }
75    }
76
77    /// Create from explicit database, schema, and table names
78    pub fn new(database: Option<String>, schema: String, table: String) -> Self {
79        QualifiedTable {
80            database,
81            schema,
82            table,
83        }
84    }
85
86    /// Set the database if not already set (for resolving ambiguous 2-part names)
87    pub fn with_database(mut self, database: Option<String>) -> Self {
88        if self.database.is_none() {
89            self.database = database;
90        }
91        self
92    }
93
94    /// Get the schema-qualified table name (schema.table)
95    pub fn schema_qualified(&self) -> String {
96        format!("{}.{}", quote_ident(&self.schema), quote_ident(&self.table))
97    }
98
99    /// Get the fully-qualified name if database is present
100    pub fn fully_qualified(&self) -> String {
101        match &self.database {
102            Some(db) => format!(
103                "{}.{}.{}",
104                quote_ident(db),
105                quote_ident(&self.schema),
106                quote_ident(&self.table)
107            ),
108            None => self.schema_qualified(),
109        }
110    }
111
112    /// Check if this matches a given database
113    pub fn matches_database(&self, database: &str) -> bool {
114        match &self.database {
115            Some(db) => db == database,
116            None => true, // No database specified means it applies to all databases
117        }
118    }
119}
120
121/// Internal key for storing table rules with schema information
122/// Used to distinguish tables with the same name in different schemas
123#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
124struct SchemaTableKey {
125    schema: String,
126    table: String,
127}
128
129impl SchemaTableKey {
130    /// Create from a QualifiedTable
131    fn from_qualified(qualified: &QualifiedTable) -> Self {
132        SchemaTableKey {
133            schema: qualified.schema.clone(),
134            table: qualified.table.clone(),
135        }
136    }
137
138    /// Create from parts, defaulting to 'public' schema if not specified
139    fn from_parts(schema: Option<&str>, table: &str) -> Self {
140        SchemaTableKey {
141            schema: schema.unwrap_or("public").to_string(),
142            table: table.to_string(),
143        }
144    }
145
146    /// Get the schema-qualified table name (schema.table)
147    fn schema_qualified(&self) -> String {
148        format!("{}.{}", quote_ident(&self.schema), quote_ident(&self.table))
149    }
150}
151
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub struct TimeFilterRule {
154    pub column: String,
155    pub interval: String,
156}
157
158impl TimeFilterRule {
159    fn predicate(&self) -> String {
160        format!(
161            "{} >= NOW() - INTERVAL '{}'",
162            quote_ident(&self.column),
163            self.interval
164        )
165    }
166}
167
168#[derive(Debug, Clone, PartialEq, Eq)]
169pub enum TableRuleKind {
170    SchemaOnly,
171    Predicate(String),
172}
173
174#[derive(Debug, Clone, Default)]
175pub struct TableRules {
176    schema_only: ScopedTableSet,
177    table_filters: ScopedTableMap<String>,
178    time_filters: ScopedTableMap<TimeFilterRule>,
179}
180
181type ScopedTableSet = BTreeMap<ScopeKey, BTreeSet<SchemaTableKey>>;
182type ScopedTableMap<V> = BTreeMap<ScopeKey, BTreeMap<SchemaTableKey, V>>;
183
184#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
185enum ScopeKey {
186    Global,
187    Database(String),
188}
189
190impl ScopeKey {
191    fn from_option(value: Option<String>) -> Self {
192        match value {
193            Some(db) => ScopeKey::Database(db),
194            None => ScopeKey::Global,
195        }
196    }
197
198    fn database(database: &str) -> Self {
199        ScopeKey::Database(database.to_string())
200    }
201}
202
203impl TableRules {
204    pub fn add_schema_only_table(&mut self, qualified: QualifiedTable) -> Result<()> {
205        let scope = ScopeKey::from_option(qualified.database.clone());
206        let key = SchemaTableKey::from_qualified(&qualified);
207        self.schema_only.entry(scope).or_default().insert(key);
208        Ok(())
209    }
210
211    pub fn add_table_filter(&mut self, qualified: QualifiedTable, predicate: String) -> Result<()> {
212        if predicate.trim().is_empty() {
213            bail!(
214                "Table filter predicate cannot be empty for '{}'",
215                qualified.schema_qualified()
216            );
217        }
218        let scope = ScopeKey::from_option(qualified.database.clone());
219        let key = SchemaTableKey::from_qualified(&qualified);
220        ensure_schema_only_free(&self.schema_only, &qualified, "table filter")?;
221        self.table_filters
222            .entry(scope)
223            .or_default()
224            .insert(key, predicate);
225        Ok(())
226    }
227
228    pub fn add_time_filter(
229        &mut self,
230        qualified: QualifiedTable,
231        column: String,
232        window: String,
233    ) -> Result<()> {
234        utils::validate_postgres_identifier(&column)?;
235        let interval = normalize_time_window(&window)?;
236        let scope = ScopeKey::from_option(qualified.database.clone());
237        let key = SchemaTableKey::from_qualified(&qualified);
238        ensure_schema_only_free(&self.schema_only, &qualified, "time filter")?;
239        if self
240            .table_filters
241            .get(&scope)
242            .and_then(|inner| inner.get(&key))
243            .is_some()
244        {
245            bail!(
246                "Cannot apply time filter to table '{}' because a table filter already exists",
247                qualified.schema_qualified()
248            );
249        }
250        self.time_filters
251            .entry(scope)
252            .or_default()
253            .insert(key, TimeFilterRule { column, interval });
254        Ok(())
255    }
256
257    pub fn apply_schema_only_cli(&mut self, specs: &[String]) -> Result<()> {
258        for spec in specs {
259            let qualified = QualifiedTable::parse(spec)?;
260            self.add_schema_only_table(qualified)?;
261        }
262        Ok(())
263    }
264
265    pub fn apply_table_filter_cli(&mut self, specs: &[String]) -> Result<()> {
266        for spec in specs {
267            let (table_part, predicate) = spec
268                .split_once(':')
269                .with_context(|| format!("Table filter '{}' missing ':' separator", spec))?;
270            if predicate.trim().is_empty() {
271                bail!("Table filter '{}' must include a predicate after ':'", spec);
272            }
273            let qualified = QualifiedTable::parse(table_part)?;
274            self.add_table_filter(qualified, predicate.trim().to_string())?;
275        }
276        Ok(())
277    }
278
279    pub fn apply_time_filter_cli(&mut self, specs: &[String]) -> Result<()> {
280        for spec in specs {
281            let (table_part, rest) = spec
282                .split_once(':')
283                .with_context(|| format!("Time filter '{}' missing second ':'", spec))?;
284            let (column, window) = rest
285                .split_once(':')
286                .with_context(|| format!("Time filter '{}' must be table:column:window", spec))?;
287            if column.trim().is_empty() || window.trim().is_empty() {
288                bail!(
289                    "Time filter '{}' must include non-empty column and window",
290                    spec
291                );
292            }
293            let qualified = QualifiedTable::parse(table_part)?;
294            self.add_time_filter(
295                qualified,
296                column.trim().to_string(),
297                window.trim().to_string(),
298            )?;
299        }
300        Ok(())
301    }
302
303    pub fn schema_only_tables(&self, database: &str) -> Vec<String> {
304        collect_tables(&self.schema_only, database)
305    }
306
307    pub fn table_filter(&self, database: &str, schema: &str, table: &str) -> Option<&String> {
308        lookup_scoped(&self.table_filters, database, schema, table)
309    }
310
311    pub fn time_filter(
312        &self,
313        database: &str,
314        schema: &str,
315        table: &str,
316    ) -> Option<&TimeFilterRule> {
317        lookup_scoped(&self.time_filters, database, schema, table)
318    }
319
320    pub fn predicate_tables(&self, database: &str) -> Vec<(String, String)> {
321        let schema_only: BTreeSet<String> = self.schema_only_tables(database).into_iter().collect();
322        let mut combined = BTreeMap::new();
323
324        for (table, predicate) in scoped_map_values(&self.table_filters, database) {
325            if schema_only.contains(&table) {
326                continue;
327            }
328            combined.insert(table, predicate);
329        }
330
331        for (table, rule) in scoped_map_values(&self.time_filters, database) {
332            if schema_only.contains(&table) || combined.contains_key(&table) {
333                continue;
334            }
335            combined.insert(table.clone(), rule.predicate());
336        }
337
338        combined.into_iter().collect()
339    }
340
341    pub fn rule_for_table(
342        &self,
343        database: &str,
344        schema: &str,
345        table: &str,
346    ) -> Option<TableRuleKind> {
347        if has_schema_only_rule(&self.schema_only, database, schema, table) {
348            return Some(TableRuleKind::SchemaOnly);
349        }
350        if let Some(predicate) = self.table_filter(database, schema, table) {
351            return Some(TableRuleKind::Predicate(predicate.clone()));
352        }
353        if let Some(rule) = self.time_filter(database, schema, table) {
354            return Some(TableRuleKind::Predicate(rule.predicate()));
355        }
356        None
357    }
358
359    pub fn merge(&mut self, other: TableRules) {
360        merge_sets(&mut self.schema_only, other.schema_only);
361        merge_maps(&mut self.table_filters, other.table_filters);
362        merge_maps(&mut self.time_filters, other.time_filters);
363    }
364
365    pub fn fingerprint(&self) -> String {
366        let mut hasher = Sha256::new();
367        hash_scoped_set(&mut hasher, &self.schema_only);
368        hash_scoped_map(&mut hasher, &self.table_filters, |value| value.clone());
369        hash_scoped_map(&mut hasher, &self.time_filters, |value| {
370            format!("{}|{}", value.column, value.interval)
371        });
372        format!("{:x}", hasher.finalize())
373    }
374
375    pub fn is_empty(&self) -> bool {
376        self.schema_only.is_empty() && self.table_filters.is_empty() && self.time_filters.is_empty()
377    }
378}
379
380fn non_empty(value: &str, label: &str) -> Result<String> {
381    let trimmed = value.trim();
382    if trimmed.is_empty() {
383        bail!("{} name cannot be empty", label);
384    }
385    Ok(trimmed.to_string())
386}
387
388fn collect_tables(map: &ScopedTableSet, database: &str) -> Vec<String> {
389    let mut tables = BTreeSet::new();
390    if let Some(global) = map.get(&ScopeKey::Global) {
391        for key in global {
392            tables.insert(key.schema_qualified());
393        }
394    }
395    let scoped = ScopeKey::database(database);
396    if let Some(specific) = map.get(&scoped) {
397        for key in specific {
398            tables.insert(key.schema_qualified());
399        }
400    }
401    tables.into_iter().collect()
402}
403
404fn lookup_scoped<'a, V>(
405    map: &'a ScopedTableMap<V>,
406    database: &str,
407    schema: &str,
408    table: &str,
409) -> Option<&'a V> {
410    let key = SchemaTableKey::from_parts(Some(schema), table);
411    let scoped = ScopeKey::database(database);
412    map.get(&scoped)
413        .and_then(|inner| inner.get(&key))
414        .or_else(|| map.get(&ScopeKey::Global).and_then(|inner| inner.get(&key)))
415}
416
417fn scoped_map_values<V: Clone>(map: &ScopedTableMap<V>, database: &str) -> BTreeMap<String, V> {
418    let mut values = BTreeMap::new();
419    if let Some(global) = map.get(&ScopeKey::Global) {
420        for (key, value) in global {
421            values.insert(key.schema_qualified(), value.clone());
422        }
423    }
424    if let Some(specific) = map.get(&ScopeKey::database(database)) {
425        for (key, value) in specific {
426            values.insert(key.schema_qualified(), value.clone());
427        }
428    }
429    values
430}
431
432fn has_schema_only_rule(
433    schema_only: &ScopedTableSet,
434    database: &str,
435    schema: &str,
436    table: &str,
437) -> bool {
438    let key = SchemaTableKey::from_parts(Some(schema), table);
439    schema_only
440        .get(&ScopeKey::Global)
441        .is_some_and(|set| set.contains(&key))
442        || schema_only
443            .get(&ScopeKey::database(database))
444            .is_some_and(|set| set.contains(&key))
445}
446
447fn ensure_schema_only_free(
448    schema_only: &ScopedTableSet,
449    qualified: &QualifiedTable,
450    rule_name: &str,
451) -> Result<()> {
452    let key = SchemaTableKey::from_qualified(qualified);
453    if schema_only
454        .get(&ScopeKey::Global)
455        .is_some_and(|set| set.contains(&key))
456    {
457        bail!(
458            "Cannot apply {} to table '{}' because it is marked schema-only globally",
459            rule_name,
460            qualified.schema_qualified()
461        );
462    }
463    if let Some(db) = &qualified.database {
464        if schema_only
465            .get(&ScopeKey::database(db))
466            .is_some_and(|set| set.contains(&key))
467        {
468            bail!(
469                "Cannot apply {} to table '{}' in database '{}' because it is schema-only",
470                rule_name,
471                qualified.schema_qualified(),
472                db
473            );
474        }
475    }
476    Ok(())
477}
478
479fn normalize_time_window(window: &str) -> Result<String> {
480    let trimmed = window.trim();
481    let mut parts = trimmed.split_whitespace();
482    let amount_str = parts
483        .next()
484        .ok_or_else(|| anyhow!("Time filter window '{}' missing amount", window))?;
485    let unit_str = parts
486        .next()
487        .ok_or_else(|| anyhow!("Time filter window '{}' missing unit", window))?;
488    if parts.next().is_some() {
489        bail!("Time filter window '{}' must be '<amount> <unit>'", window);
490    }
491
492    let amount: i64 = amount_str.parse().with_context(|| {
493        format!(
494            "Invalid time window amount '{}': must be integer",
495            amount_str
496        )
497    })?;
498    if amount <= 0 {
499        bail!("Time window amount must be positive, got {}", amount);
500    }
501
502    let unit = match unit_str.to_lowercase().as_str() {
503        "second" | "seconds" | "sec" | "secs" => "second",
504        "minute" | "minutes" | "min" | "mins" => "minute",
505        "hour" | "hours" | "hr" | "hrs" => "hour",
506        "day" | "days" => "day",
507        "week" | "weeks" => "week",
508        "month" | "months" | "mon" | "mons" => "month",
509        "year" | "years" | "yr" | "yrs" => "year",
510        other => bail!(
511            "Unsupported time window unit '{}'. Use seconds/minutes/hours/days/weeks/months/years",
512            other
513        ),
514    };
515
516    Ok(format!("{} {}", amount, unit))
517}
518
519fn merge_sets(target: &mut ScopedTableSet, source: ScopedTableSet) {
520    for (scope, tables) in source {
521        target.entry(scope).or_default().extend(tables);
522    }
523}
524
525fn merge_maps<V: Clone>(target: &mut ScopedTableMap<V>, source: ScopedTableMap<V>) {
526    for (scope, tables) in source {
527        let entry = target.entry(scope).or_default();
528        for (table, value) in tables {
529            entry.insert(table, value);
530        }
531    }
532}
533
534fn hash_scoped_set(hasher: &mut Sha256, data: &ScopedTableSet) {
535    for (scope, tables) in data {
536        hash_scope_label(hasher, scope);
537        for key in tables {
538            hasher.update(key.schema.as_bytes());
539            hasher.update(b".");
540            hasher.update(key.table.as_bytes());
541            hasher.update(b"|");
542        }
543    }
544}
545
546fn hash_scoped_map<V, F>(hasher: &mut Sha256, data: &ScopedTableMap<V>, mut encode: F)
547where
548    F: FnMut(&V) -> String,
549{
550    for (scope, tables) in data {
551        hash_scope_label(hasher, scope);
552        for (key, value) in tables {
553            hasher.update(key.schema.as_bytes());
554            hasher.update(b".");
555            hasher.update(key.table.as_bytes());
556            hasher.update(b"=");
557            hasher.update(encode(value).as_bytes());
558            hasher.update(b"|");
559        }
560    }
561}
562
563fn hash_scope_label(hasher: &mut Sha256, scope: &ScopeKey) {
564    match scope {
565        ScopeKey::Database(db) => {
566            hasher.update(b"db:");
567            hasher.update(db.as_bytes());
568        }
569        ScopeKey::Global => {
570            hasher.update(b"global");
571        }
572    }
573    hasher.update(b"#");
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579
580    // QualifiedTable tests
581    #[test]
582    fn test_qualified_table_parse_single_part() {
583        // Single part defaults to public schema
584        let t = QualifiedTable::parse("users").unwrap();
585        assert_eq!(t.database, None);
586        assert_eq!(t.schema, "public");
587        assert_eq!(t.table, "users");
588    }
589
590    #[test]
591    fn test_qualified_table_parse_two_parts() {
592        // Two parts is schema.table
593        let t = QualifiedTable::parse("analytics.orders").unwrap();
594        assert_eq!(t.database, None);
595        assert_eq!(t.schema, "analytics");
596        assert_eq!(t.table, "orders");
597    }
598
599    #[test]
600    fn test_qualified_table_parse_three_parts() {
601        // Three parts is database.schema.table
602        let t = QualifiedTable::parse("db1.public.users").unwrap();
603        assert_eq!(t.database, Some("db1".to_string()));
604        assert_eq!(t.schema, "public");
605        assert_eq!(t.table, "users");
606    }
607
608    #[test]
609    fn test_qualified_table_parse_empty() {
610        let result = QualifiedTable::parse("");
611        assert!(result.is_err());
612        assert!(result.unwrap_err().to_string().contains("cannot be empty"));
613    }
614
615    #[test]
616    fn test_qualified_table_parse_whitespace() {
617        let result = QualifiedTable::parse("   ");
618        assert!(result.is_err());
619    }
620
621    #[test]
622    fn test_qualified_table_parse_too_many_parts() {
623        let result = QualifiedTable::parse("a.b.c.d");
624        assert!(result.is_err());
625        assert!(result.unwrap_err().to_string().contains("must be"));
626    }
627
628    #[test]
629    fn test_qualified_table_schema_qualified() {
630        let t = QualifiedTable::new(None, "analytics".into(), "orders".into());
631        assert_eq!(t.schema_qualified(), "\"analytics\".\"orders\"");
632    }
633
634    #[test]
635    fn test_qualified_table_fully_qualified_with_database() {
636        let t = QualifiedTable::new(Some("db1".into()), "public".into(), "users".into());
637        assert_eq!(t.fully_qualified(), "\"db1\".\"public\".\"users\"");
638    }
639
640    #[test]
641    fn test_qualified_table_fully_qualified_without_database() {
642        let t = QualifiedTable::new(None, "analytics".into(), "orders".into());
643        assert_eq!(t.fully_qualified(), "\"analytics\".\"orders\"");
644    }
645
646    #[test]
647    fn test_qualified_table_with_database() {
648        let t = QualifiedTable::parse("analytics.orders")
649            .unwrap()
650            .with_database(Some("db1".to_string()));
651        assert_eq!(t.database, Some("db1".to_string()));
652        assert_eq!(t.schema, "analytics");
653        assert_eq!(t.table, "orders");
654    }
655
656    #[test]
657    fn test_qualified_table_with_database_no_override() {
658        let t = QualifiedTable::parse("db1.analytics.orders")
659            .unwrap()
660            .with_database(Some("db2".to_string()));
661        // Should not override existing database
662        assert_eq!(t.database, Some("db1".to_string()));
663    }
664
665    #[test]
666    fn test_qualified_table_matches_database() {
667        let t = QualifiedTable::new(Some("db1".into()), "public".into(), "users".into());
668        assert!(t.matches_database("db1"));
669        assert!(!t.matches_database("db2"));
670    }
671
672    #[test]
673    fn test_qualified_table_matches_database_no_database() {
674        let t = QualifiedTable::new(None, "public".into(), "users".into());
675        // No database specified means matches all
676        assert!(t.matches_database("db1"));
677        assert!(t.matches_database("db2"));
678        assert!(t.matches_database("any_db"));
679    }
680
681    #[test]
682    fn test_qualified_table_new() {
683        let t = QualifiedTable::new(Some("db1".into()), "analytics".into(), "metrics".into());
684        assert_eq!(t.database, Some("db1".to_string()));
685        assert_eq!(t.schema, "analytics");
686        assert_eq!(t.table, "metrics");
687    }
688
689    #[test]
690    fn test_qualified_table_ordering() {
691        let t1 = QualifiedTable::new(None, "analytics".into(), "orders".into());
692        let t2 = QualifiedTable::new(None, "public".into(), "users".into());
693        let t3 = QualifiedTable::new(None, "analytics".into(), "metrics".into());
694
695        // Should be comparable and orderable
696        assert!(t1 < t2); // analytics < public
697        assert!(t3 < t1); // metrics < orders
698    }
699
700    #[test]
701    fn cli_schema_only_parsing() {
702        let mut rules = TableRules::default();
703        // "db1.orders" is parsed as schema=db1, table=orders (not database.table!)
704        // "invoices" is parsed as schema=public, table=invoices
705        rules
706            .apply_schema_only_cli(&["analytics.orders".to_string(), "invoices".to_string()])
707            .unwrap();
708        let tables = rules.schema_only_tables("anydb");
709        // Both are global scope (no database specified), so they apply to all databases
710        assert!(tables.contains(&"\"analytics\".\"orders\"".to_string()));
711        assert!(tables.contains(&"\"public\".\"invoices\"".to_string()));
712    }
713
714    #[test]
715    fn cli_table_filter_parsing() {
716        let mut rules = TableRules::default();
717        // "analytics.logs" is parsed as schema=analytics, table=logs
718        rules
719            .apply_table_filter_cli(
720                &["analytics.logs:created_at > NOW() - INTERVAL '1 day'".into()],
721            )
722            .unwrap();
723        assert!(rules
724            .table_filter("anydb", "analytics", "logs")
725            .unwrap()
726            .contains("created_at"));
727    }
728
729    #[test]
730    fn cli_time_filter_parsing() {
731        let mut rules = TableRules::default();
732        rules
733            .apply_time_filter_cli(&["metrics:created_at:6 months".into()])
734            .unwrap();
735        let tf = rules.time_filter("any", "public", "metrics").unwrap();
736        assert_eq!(tf.column, "created_at");
737        assert_eq!(tf.interval, "6 month");
738    }
739
740    #[test]
741    fn fingerprint_changes_with_rules() {
742        let mut rules_a = TableRules::default();
743        rules_a
744            .apply_schema_only_cli(&["db.table".to_string()])
745            .unwrap();
746        let mut rules_b = TableRules::default();
747        assert_ne!(rules_a.fingerprint(), rules_b.fingerprint());
748        rules_b
749            .apply_schema_only_cli(&["db.table".to_string()])
750            .unwrap();
751        assert_eq!(rules_a.fingerprint(), rules_b.fingerprint());
752    }
753
754    #[test]
755    fn schema_only_conflicts_with_filters() {
756        let mut rules = TableRules::default();
757        rules
758            .apply_schema_only_cli(&["db1.audit".to_string()])
759            .unwrap();
760        assert!(rules
761            .apply_table_filter_cli(&["db1.audit:1=1".to_string()])
762            .is_err());
763    }
764
765    #[test]
766    fn predicate_tables_include_time_filters() {
767        let mut rules = TableRules::default();
768        rules
769            .apply_time_filter_cli(&["db1.metrics:created_at:6 months".into()])
770            .unwrap();
771        let predicates = rules.predicate_tables("db1");
772        assert_eq!(predicates.len(), 1);
773        assert!(predicates[0].1.contains("INTERVAL '6 month'"));
774    }
775
776    #[test]
777    fn test_fingerprint_changes_with_schema() {
778        // Different schemas should produce different fingerprints
779        let mut rules_a = TableRules::default();
780        rules_a
781            .apply_schema_only_cli(&["public.orders".to_string()])
782            .unwrap();
783
784        let mut rules_b = TableRules::default();
785        rules_b
786            .apply_schema_only_cli(&["analytics.orders".to_string()])
787            .unwrap();
788
789        assert_ne!(
790            rules_a.fingerprint(),
791            rules_b.fingerprint(),
792            "Different schemas should produce different fingerprints"
793        );
794    }
795
796    #[test]
797    fn test_fingerprint_stable_with_order() {
798        // Same tables, different order -> same fingerprint (BTreeSet sorts)
799        let mut rules_a = TableRules::default();
800        rules_a
801            .apply_schema_only_cli(&["public.users".to_string(), "public.orders".to_string()])
802            .unwrap();
803
804        let mut rules_b = TableRules::default();
805        rules_b
806            .apply_schema_only_cli(&[
807                "public.orders".to_string(), // Different order
808                "public.users".to_string(),
809            ])
810            .unwrap();
811
812        assert_eq!(
813            rules_a.fingerprint(),
814            rules_b.fingerprint(),
815            "Same tables in different order should produce same fingerprint"
816        );
817    }
818
819    #[test]
820    fn test_fingerprint_includes_table_filter_schema() {
821        // Table filters with different schemas should produce different fingerprints
822        let mut rules_a = TableRules::default();
823        rules_a
824            .apply_table_filter_cli(&["public.logs:created_at > NOW()".to_string()])
825            .unwrap();
826
827        let mut rules_b = TableRules::default();
828        rules_b
829            .apply_table_filter_cli(&["analytics.logs:created_at > NOW()".to_string()])
830            .unwrap();
831
832        assert_ne!(
833            rules_a.fingerprint(),
834            rules_b.fingerprint(),
835            "Table filters with different schemas should produce different fingerprints"
836        );
837    }
838
839    #[test]
840    fn test_fingerprint_includes_time_filter_schema() {
841        // Time filters with different schemas should produce different fingerprints
842        let mut rules_a = TableRules::default();
843        rules_a
844            .apply_time_filter_cli(&["public.metrics:timestamp:1 year".to_string()])
845            .unwrap();
846
847        let mut rules_b = TableRules::default();
848        rules_b
849            .apply_time_filter_cli(&["reporting.metrics:timestamp:1 year".to_string()])
850            .unwrap();
851
852        assert_ne!(
853            rules_a.fingerprint(),
854            rules_b.fingerprint(),
855            "Time filters with different schemas should produce different fingerprints"
856        );
857    }
858}