flowscope-core 0.7.0

Core SQL lineage analysis engine
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
//! Cross-statement lineage tracking.
//!
//! This module provides [`CrossStatementTracker`], which manages the relationships
//! between statements in a multi-statement workload. It tracks which statements
//! produce tables (via CREATE/INSERT) and which consume them (via SELECT/JOIN).
//!
//! # Architecture
//!
//! The tracker maintains producer-consumer relationships between statements:
//!
//! - **Producers**: Statements that create or modify tables (CREATE TABLE, INSERT INTO, etc.)
//! - **Consumers**: Statements that read from tables (SELECT, JOIN, etc.)
//!
//! When a table is produced by statement N and consumed by statement M (where M > N),
//! a cross-statement edge is created to represent the data flow dependency.
//!
//! # View vs Table Distinction
//!
//! The tracker distinguishes between views and tables because they have different
//! semantics in lineage graphs:
//!
//! - Tables represent physical data storage
//! - Views represent logical transformations that are expanded at query time
//!
//! This distinction affects node ID generation and the type of lineage edges created.
//!
//! # Thread Safety
//!
//! `CrossStatementTracker` is designed for single-threaded use within an analysis pass.
//! Each analysis pass should create a fresh tracker instance.

use crate::types::{Edge, EdgeType, NodeType};
use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use super::helpers::generate_node_id;

/// Tracks cross-statement dependencies for multi-statement lineage.
///
/// `CrossStatementTracker` is responsible for building the dependency graph between
/// SQL statements in a multi-statement workload. It enables detection of data flow
/// patterns like ETL pipelines where one statement's output becomes another's input.
///
/// # Responsibilities
///
/// - **Producer tracking**: Records which statements create/modify tables
/// - **Consumer tracking**: Records which statements read from tables
/// - **Edge generation**: Creates cross-statement edges for the global lineage graph
/// - **Type distinction**: Maintains separate tracking for views vs tables
///
/// # Invariants
///
/// - `produced_views` is always a subset of `produced_tables` (views are tables)
/// - Cross-statement edges are only created when consumer index > producer index
/// - `all_relations` contains the union of all produced and consumed tables
///
/// ## Declaration vs production
///
/// "Declaration" and "production" are distinct concepts, populated by
/// different passes:
///
/// - *Declaration* (`declared_views` / `declared_tables` /
///   `declared_ephemerals`) fixes a relation's **identity** — i.e. whether it
///   resolves as a `view_*` / `table_*` / `cte_*` node id via
///   [`CrossStatementTracker::relation_identity`]. It is populated up-front by
///   pre-collection passes (e.g. dbt models) so forward `ref(...)` consumers
///   resolve to the same canonical node as the later producer.
/// - *Production* (`produced_tables` / `produced_views`) assigns a **producer
///   statement index** used for cross-statement edge generation. It is
///   populated during the main analysis pass as each producing statement is
///   visited.
///
/// A relation can be declared without ever being produced (e.g. the producer
/// statement is skipped) and a relation can be produced without prior
/// declaration (standard CREATE TABLE flow). Both state spaces coexist and
/// must stay consistent.
///
/// ## Declared-set disjointness
///
/// The three declared sets are mutually exclusive for any single canonical
/// name, because a relation has exactly one materialization kind:
///
/// - View declarations take precedence over table declarations: calling
///   [`CrossStatementTracker::declare_view`] removes the name from
///   `declared_tables`, and [`CrossStatementTracker::declare_table`] is a
///   no-op when the name is already in `declared_views` (view is strictly
///   more specific).
/// - `declared_ephemerals` is treated independently (dbt ephemeral models
///   never materialize as tables or views), and relation identity is resolved
///   in the order ephemeral → view → table in
///   [`CrossStatementTracker::relation_identity`].
///
/// Keep these rules in mind when adding new declaration paths; violating them
/// would make [`CrossStatementTracker::relation_identity`] non-deterministic.
///
/// # Example
///
/// ```ignore
/// let mut tracker = CrossStatementTracker::new();
///
/// // Statement 0 creates a staging table
/// tracker.record_produced("staging.raw_data", 0);
///
/// // Statement 1 reads from the staging table
/// tracker.record_consumed("staging.raw_data", 1);
///
/// // Generate cross-statement edges
/// let edges = tracker.build_cross_statement_edges();
/// assert_eq!(edges.len(), 1);
/// ```
pub(crate) struct CrossStatementTracker {
    /// Maps table canonical name -> statement index that produced it.
    ///
    /// Only tracks the most recent producer (later statements overwrite earlier ones).
    pub(crate) produced_tables: HashMap<String, usize>,
    /// Canonical names that were produced via CREATE VIEW.
    ///
    /// Used to determine node type (view vs table) for ID generation.
    pub(crate) produced_views: HashSet<String>,
    /// Canonical names that are known to materialize as views even before the
    /// producing statement is analyzed.
    ///
    /// This is populated during pre-collection for dbt models so consumer
    /// references resolved earlier in the workload still get the canonical
    /// `view_*` identity and merge with the later producer sink.
    pub(crate) declared_views: HashSet<String>,
    /// Canonical names that are known to materialize as physical tables even
    /// before the producing statement is analyzed.
    ///
    /// Populated alongside [`declared_views`] during pre-collection so that
    /// forward `ref(...)` consumers resolve to a known relation instead of
    /// emitting a false `UNRESOLVED_REFERENCE` warning. The producer's
    /// statement index is still assigned later by the main analysis pass via
    /// [`record_produced`].
    pub(crate) declared_tables: HashSet<String>,
    /// Canonical names that are known to materialize as dbt ephemeral models.
    ///
    /// Ephemeral models behave like cross-file CTEs rather than persisted
    /// relations, so references to them should reuse a canonical `cte_*`
    /// identity instead of fabricating a table/view node.
    pub(crate) declared_ephemerals: HashSet<String>,
    /// Maps table canonical name -> list of statement indices that consume it.
    ///
    /// A single table can be consumed by multiple statements.
    pub(crate) consumed_tables: HashMap<String, Vec<usize>>,
    /// All discovered tables and views across statements (for global lineage).
    ///
    /// Union of all produced and consumed relations.
    pub(crate) all_relations: HashSet<String>,
    /// All discovered CTEs across statements.
    ///
    /// CTEs are tracked separately as they have different scoping rules.
    pub(crate) all_ctes: HashSet<String>,
}

impl CrossStatementTracker {
    /// Creates a new cross-statement tracker with empty state.
    pub(crate) fn new() -> Self {
        Self {
            produced_tables: HashMap::new(),
            produced_views: HashSet::new(),
            declared_views: HashSet::new(),
            declared_tables: HashSet::new(),
            declared_ephemerals: HashSet::new(),
            consumed_tables: HashMap::new(),
            all_relations: HashSet::new(),
            all_ctes: HashSet::new(),
        }
    }

    /// Records that a table was produced by a statement.
    ///
    /// This should be called for CREATE TABLE, INSERT INTO (creating), and similar DDL.
    /// If the same table is produced by multiple statements, the later one wins.
    pub(crate) fn record_produced(&mut self, canonical: &str, statement_index: usize) {
        self.produced_tables
            .insert(canonical.to_string(), statement_index);
        self.all_relations.insert(canonical.to_string());
    }

    /// Records that a view was produced by a statement.
    ///
    /// Views are tracked separately to ensure correct node type in lineage graphs.
    /// This also calls `record_produced` internally.
    ///
    /// Upholds the declared-set disjointness invariant: if the canonical name
    /// had been pre-declared as a table (e.g. via [`declare_table`] before an
    /// override), the table declaration is removed so `is_declared` /
    /// `relation_identity` don't observe the name in both sets at once.
    pub(crate) fn record_view_produced(&mut self, canonical: &str, statement_index: usize) {
        self.produced_views.insert(canonical.to_string());
        self.declared_views.insert(canonical.to_string());
        self.declared_tables.remove(canonical);
        self.record_produced(canonical, statement_index);
    }

    /// Records that a dbt model is known to materialize as `ephemeral`.
    ///
    /// Ephemeral models are inlined into downstream SQL rather than persisted
    /// as relations, so we model them with a canonical CTE identity instead of
    /// registering them as produced tables/views.
    pub(crate) fn declare_ephemeral(&mut self, canonical: &str) {
        self.declared_ephemerals.insert(canonical.to_string());
        self.all_ctes.insert(canonical.to_string());
    }

    /// Records that a relation is known to materialize as a view before the
    /// producer statement is analyzed.
    ///
    /// Used by precollection passes (e.g. dbt models) to fix a relation's
    /// identity up front so earlier consumer references resolve to the same
    /// canonical `view_*` id as the later producer. This intentionally does
    /// **not** record a producer statement index — that still happens in the
    /// main analysis pass via `record_view_produced` / `record_produced`.
    pub(crate) fn declare_view(&mut self, canonical: &str) {
        self.declared_views.insert(canonical.to_string());
        self.declared_tables.remove(canonical);
        self.all_relations.insert(canonical.to_string());
    }

    /// Records that a relation is known to materialize as a physical table
    /// before the producer statement is analyzed.
    ///
    /// Mirrors [`declare_view`] for the table case: consumer references can
    /// resolve to a known relation even when their statement runs before the
    /// producer's. The producer's statement index is still set later by the
    /// main analysis pass via [`record_produced`].
    pub(crate) fn declare_table(&mut self, canonical: &str) {
        // If the caller previously declared this name as a view, leave the
        // view declaration in place — view is strictly more specific and the
        // identity (`view_*` id) must win.
        if !self.declared_views.contains(canonical) {
            self.declared_tables.insert(canonical.to_string());
        }
        self.all_relations.insert(canonical.to_string());
    }

    /// Records that a table was consumed by a statement.
    ///
    /// A single table can be consumed by multiple statements, and a single statement
    /// can consume multiple tables. All consumer indices are tracked.
    pub(crate) fn record_consumed(&mut self, canonical: &str, statement_index: usize) {
        self.consumed_tables
            .entry(canonical.to_string())
            .or_default()
            .push(statement_index);
        self.all_relations.insert(canonical.to_string());
    }

    /// Records a CTE definition for global tracking.
    ///
    /// CTEs are tracked separately from tables/views as they have statement-scoped lifetime.
    pub(crate) fn record_cte(&mut self, cte_name: &str) {
        self.all_ctes.insert(cte_name.to_string());
    }

    /// Checks if a canonical name refers to a view.
    #[cfg(test)]
    pub(crate) fn is_view(&self, canonical: &str) -> bool {
        self.is_view_relation(canonical)
    }

    fn is_view_relation(&self, canonical: &str) -> bool {
        self.produced_views.contains(canonical) || self.declared_views.contains(canonical)
    }

    fn is_ephemeral_relation(&self, canonical: &str) -> bool {
        self.declared_ephemerals.contains(canonical)
    }

    /// Checks if a table was produced by an earlier statement.
    ///
    /// Used to determine if a table reference is to a locally-created table
    /// (as opposed to an external table from imported schema).
    pub(crate) fn was_produced(&self, canonical: &str) -> bool {
        self.produced_tables.contains_key(canonical)
    }

    /// Checks if a canonical name has been pre-declared (by a pass like
    /// precollection) but not yet formally produced by its statement.
    ///
    /// This is the resolver's signal that "forward references to this
    /// relation are expected — the producer will run later." Separate from
    /// [`was_produced`] so callers that actually need a producer statement
    /// index (e.g. cross-statement edge generation) stay strict.
    pub(crate) fn is_declared(&self, canonical: &str) -> bool {
        self.declared_views.contains(canonical)
            || self.declared_tables.contains(canonical)
            || self.declared_ephemerals.contains(canonical)
    }

    /// Gets the statement index that produced a table, if any.
    #[cfg(test)]
    pub(crate) fn producer_index(&self, canonical: &str) -> Option<usize> {
        self.produced_tables.get(canonical).copied()
    }

    /// Removes a table from tracking (for DROP statements).
    ///
    /// This removes the table from both `produced_tables` and `produced_views`.
    /// Note: Does not remove from `all_relations` as the table was still referenced.
    pub(crate) fn remove(&mut self, canonical: &str) {
        self.produced_tables.remove(canonical);
        self.produced_views.remove(canonical);
        self.declared_views.remove(canonical);
        self.declared_tables.remove(canonical);
        self.declared_ephemerals.remove(canonical);
    }

    /// Returns the correct node ID and type for a relation-like model sink.
    ///
    /// Views get IDs prefixed with `view_`, tables with `table_`, and dbt
    /// ephemeral models with `cte_`. This ensures consistent node
    /// identification across the lineage graph without inventing persisted
    /// relations for ephemeral models.
    pub(crate) fn relation_identity(&self, canonical: &str) -> (Arc<str>, NodeType) {
        if self.is_ephemeral_relation(canonical) {
            (generate_node_id("cte", canonical), NodeType::Cte)
        } else if self.is_view_relation(canonical) {
            (generate_node_id("view", canonical), NodeType::View)
        } else {
            (generate_node_id("table", canonical), NodeType::Table)
        }
    }

    /// Returns a unique node ID and type for a specific alias of a relation.
    ///
    /// When a table is self-joined (`FROM t e1 JOIN t e2`), each alias gets a
    /// distinct node ID by hashing `canonical + alias + scope_id`. When the alias
    /// matches the canonical name (no alias or same name), falls back to the
    /// standard `relation_identity` for backward compatibility.
    ///
    /// # Limitation
    ///
    /// When an alias explicitly matches the canonical (or simple) name — e.g.,
    /// `FROM employees e1 JOIN employees employees` — both the unaliased side and
    /// the explicitly-aliased-to-same-name side receive the same node ID. This is
    /// a deliberate trade-off: backward compatibility for the common unaliased case
    /// outweighs correctness for this rare edge case where a self-join alias
    /// intentionally repeats the table name.
    pub(crate) fn relation_instance_identity(
        &self,
        canonical: &str,
        alias: &str,
        scope_id: usize,
    ) -> (Arc<str>, NodeType) {
        // If alias is the same as canonical (or the simple name extracted from canonical),
        // fall back to the standard identity to avoid changing IDs for non-self-join cases.
        let simple_name = crate::analyzer::helpers::extract_simple_name(canonical);
        if alias == canonical || alias == simple_name {
            return self.relation_identity(canonical);
        }

        let instance_key = format!("{canonical}::{alias}::scope_{scope_id}");
        if self.is_ephemeral_relation(canonical) {
            (generate_node_id("cte", &instance_key), NodeType::Cte)
        } else if self.is_view_relation(canonical) {
            (generate_node_id("view", &instance_key), NodeType::View)
        } else {
            (generate_node_id("table", &instance_key), NodeType::Table)
        }
    }

    /// Returns the node ID for a relation.
    ///
    /// Convenience method that calls `relation_identity` and returns just the ID.
    pub(crate) fn relation_node_id(&self, canonical: &str) -> Arc<str> {
        self.relation_identity(canonical).0
    }

    /// Builds cross-statement edges for the global lineage graph.
    ///
    /// Detects when a table produced by statement N is consumed by statement M (where M > N)
    /// and creates appropriate `CrossStatement` edges. These edges represent data flow
    /// between statements in a multi-statement workload.
    ///
    /// # Edge Direction
    ///
    /// Cross-statement edges are self-referential on the table node (from/to are the same),
    /// Each edge records the producing and consuming statement indices in
    /// `statement_ids` (order: `[producer, consumer]`).
    ///
    /// # Edge ID Generation
    ///
    /// Edge IDs are generated using a hash of `(table_name, producer_index, consumer_index)`.
    /// This ensures uniqueness even when the same pair of statements have multiple data flows
    /// through different tables. For example, if statement 0 produces both `table_a` and
    /// `table_b`, and statement 1 consumes both, each flow gets a distinct edge ID.
    ///
    /// The hash uses `DefaultHasher` which is fast but not guaranteed to be stable across
    /// Rust versions. This is acceptable because edge IDs are ephemeral within a single
    /// analysis run and are not persisted or compared across runs.
    pub(crate) fn build_cross_statement_edges(&self) -> Vec<Edge> {
        let mut edges = Vec::new();

        for (table_name, consumers) in &self.consumed_tables {
            if let Some(&producer_idx) = self.produced_tables.get(table_name) {
                for &consumer_idx in consumers {
                    if consumer_idx > producer_idx {
                        // Hash table name + indices to generate unique edge IDs.
                        // This prevents collisions when multiple tables flow between
                        // the same pair of statements.
                        let mut hasher = DefaultHasher::new();
                        table_name.hash(&mut hasher);
                        producer_idx.hash(&mut hasher);
                        consumer_idx.hash(&mut hasher);
                        let edge_id = format!("cross_{:016x}", hasher.finish());
                        let node_id = self.relation_node_id(table_name);

                        let mut edge =
                            Edge::new(edge_id, node_id.clone(), node_id, EdgeType::CrossStatement);
                        edge.statement_ids = vec![producer_idx, consumer_idx];
                        edges.push(edge);
                    }
                }
            }
        }

        edges
    }
}

impl Default for CrossStatementTracker {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_record_produced_consumed() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("public.users", 0);
        tracker.record_consumed("public.users", 1);
        tracker.record_consumed("public.users", 2);

        assert!(tracker.was_produced("public.users"));
        assert_eq!(tracker.producer_index("public.users"), Some(0));
        assert_eq!(
            tracker.consumed_tables.get("public.users"),
            Some(&vec![1, 2])
        );
    }

    #[test]
    fn test_view_vs_table() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("public.my_table", 0);
        tracker.record_view_produced("public.my_view", 1);

        assert!(!tracker.is_view("public.my_table"));
        assert!(tracker.is_view("public.my_view"));

        let (table_id, table_type) = tracker.relation_identity("public.my_table");
        assert!(table_id.starts_with("table_"));
        assert_eq!(table_type, NodeType::Table);

        let (view_id, view_type) = tracker.relation_identity("public.my_view");
        assert!(view_id.starts_with("view_"));
        assert_eq!(view_type, NodeType::View);
    }

    #[test]
    fn test_declared_view_uses_view_identity_before_producer_runs() {
        let mut tracker = CrossStatementTracker::new();

        tracker.declare_view("models.future_view");

        assert!(tracker.is_view("models.future_view"));
        let (view_id, view_type) = tracker.relation_identity("models.future_view");
        assert!(view_id.starts_with("view_"));
        assert_eq!(view_type, NodeType::View);
    }

    #[test]
    fn test_declared_ephemeral_uses_cte_identity_before_producer_runs() {
        let mut tracker = CrossStatementTracker::new();

        tracker.declare_ephemeral("models.future_ephemeral");

        let (node_id, node_type) = tracker.relation_identity("models.future_ephemeral");
        assert!(node_id.starts_with("cte_"));
        assert_eq!(node_type, NodeType::Cte);
    }

    #[test]
    fn test_cross_statement_edges() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("staging.temp", 0);
        tracker.record_consumed("staging.temp", 1);
        tracker.record_consumed("staging.temp", 2);

        let edges = tracker.build_cross_statement_edges();
        assert_eq!(edges.len(), 2);

        assert!(edges
            .iter()
            .all(|e| e.edge_type == EdgeType::CrossStatement));
        assert!(edges
            .iter()
            .any(|e| e.statement_ids == vec![0usize, 1usize]));
        assert!(edges
            .iter()
            .any(|e| e.statement_ids == vec![0usize, 2usize]));
    }

    #[test]
    fn test_remove() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_view_produced("public.temp_view", 0);
        assert!(tracker.is_view("public.temp_view"));

        tracker.remove("public.temp_view");
        assert!(!tracker.is_view("public.temp_view"));
        assert!(!tracker.was_produced("public.temp_view"));
    }

    #[test]
    fn test_no_cross_statement_edges_for_unconsumed_table() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("staging.temp", 0);
        // No consumers recorded

        let edges = tracker.build_cross_statement_edges();
        assert!(edges.is_empty());
    }

    #[test]
    fn test_no_cross_statement_edges_for_external_table() {
        let mut tracker = CrossStatementTracker::new();

        // Table consumed but never produced (external table)
        tracker.record_consumed("external.source", 0);
        tracker.record_consumed("external.source", 1);

        let edges = tracker.build_cross_statement_edges();
        assert!(edges.is_empty());
    }

    #[test]
    fn test_no_edge_when_consumer_before_producer() {
        let mut tracker = CrossStatementTracker::new();

        // Statement 1 produces the table
        tracker.record_produced("staging.temp", 1);
        // Statement 0 consumes it (before it's produced - shouldn't create edge)
        tracker.record_consumed("staging.temp", 0);

        let edges = tracker.build_cross_statement_edges();
        assert!(edges.is_empty());
    }

    #[test]
    fn test_multiple_tables_cross_statement() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("staging.a", 0);
        tracker.record_produced("staging.b", 1);
        tracker.record_consumed("staging.a", 2);
        tracker.record_consumed("staging.b", 2);

        let edges = tracker.build_cross_statement_edges();
        assert_eq!(edges.len(), 2);
    }

    #[test]
    fn test_record_cte() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_cte("my_cte");
        tracker.record_cte("another_cte");

        assert!(tracker.all_ctes.contains("my_cte"));
        assert!(tracker.all_ctes.contains("another_cte"));
        assert_eq!(tracker.all_ctes.len(), 2);
    }

    #[test]
    fn test_all_relations_tracking() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("staging.a", 0);
        tracker.record_consumed("external.b", 1);
        tracker.record_view_produced("staging.v", 2);

        assert!(tracker.all_relations.contains("staging.a"));
        assert!(tracker.all_relations.contains("external.b"));
        assert!(tracker.all_relations.contains("staging.v"));
        assert_eq!(tracker.all_relations.len(), 3);
    }

    #[test]
    fn test_default_trait() {
        let tracker = CrossStatementTracker::default();
        assert!(tracker.produced_tables.is_empty());
        assert!(tracker.consumed_tables.is_empty());
        assert!(tracker.produced_views.is_empty());
        assert!(tracker.declared_views.is_empty());
        assert!(tracker.declared_ephemerals.is_empty());
    }

    #[test]
    fn test_relation_node_id() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("public.users", 0);
        tracker.record_view_produced("public.user_view", 1);

        let table_id = tracker.relation_node_id("public.users");
        let view_id = tracker.relation_node_id("public.user_view");

        assert!(table_id.starts_with("table_"));
        assert!(view_id.starts_with("view_"));
        assert_ne!(table_id, view_id);
    }

    #[test]
    fn test_cross_statement_edge_attributes() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("staging.temp", 0);
        tracker.record_consumed("staging.temp", 1);

        let edges = tracker.build_cross_statement_edges();
        assert_eq!(edges.len(), 1);

        let edge = &edges[0];
        assert!(edge.id.starts_with("cross_"));
        assert_eq!(edge.from, edge.to); // Self-referencing edge on the table node
        assert_eq!(edge.statement_ids, vec![0usize, 1usize]);
        assert!(edge.metadata.is_none());
    }

    #[test]
    fn test_producer_overwrite() {
        let mut tracker = CrossStatementTracker::new();

        // First producer
        tracker.record_produced("staging.data", 0);
        assert_eq!(tracker.producer_index("staging.data"), Some(0));

        // Second producer overwrites
        tracker.record_produced("staging.data", 2);
        assert_eq!(tracker.producer_index("staging.data"), Some(2));
    }

    #[test]
    fn test_same_statement_producer_consumer() {
        let mut tracker = CrossStatementTracker::new();

        // Statement 0 both produces and consumes (e.g., INSERT INTO ... SELECT FROM same table)
        tracker.record_produced("staging.data", 0);
        tracker.record_consumed("staging.data", 0);

        let edges = tracker.build_cross_statement_edges();
        // No edge because consumer index (0) is not > producer index (0)
        assert!(edges.is_empty());
    }

    #[test]
    fn test_remove_preserves_all_relations() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("staging.temp", 0);
        assert!(tracker.all_relations.contains("staging.temp"));

        tracker.remove("staging.temp");
        // all_relations should still contain the table (it was referenced)
        assert!(tracker.all_relations.contains("staging.temp"));
    }

    #[test]
    fn test_remove_nonexistent_table() {
        let mut tracker = CrossStatementTracker::new();

        // Removing a table that was never recorded should not panic
        tracker.remove("nonexistent.table");
        assert!(!tracker.was_produced("nonexistent.table"));
    }

    #[test]
    fn test_view_edge_type() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_view_produced("analytics.user_summary", 0);
        tracker.record_consumed("analytics.user_summary", 1);

        let edges = tracker.build_cross_statement_edges();
        assert_eq!(edges.len(), 1);

        let edge = &edges[0];
        // Edge should reference view node ID
        assert!(edge.from.starts_with("view_"));
        assert_eq!(edge.edge_type, EdgeType::CrossStatement);
    }

    #[test]
    fn test_complex_etl_pattern() {
        let mut tracker = CrossStatementTracker::new();

        // ETL pipeline: source -> staging -> mart
        // Statement 0: CREATE TABLE staging.raw FROM external.source
        tracker.record_consumed("external.source", 0);
        tracker.record_produced("staging.raw", 0);

        // Statement 1: CREATE TABLE staging.cleaned FROM staging.raw
        tracker.record_consumed("staging.raw", 1);
        tracker.record_produced("staging.cleaned", 1);

        // Statement 2: CREATE TABLE mart.final FROM staging.cleaned
        tracker.record_consumed("staging.cleaned", 2);
        tracker.record_produced("mart.final", 2);

        let edges = tracker.build_cross_statement_edges();
        // Should have 2 cross-statement edges:
        // - staging.raw: 0 -> 1
        // - staging.cleaned: 1 -> 2
        assert_eq!(edges.len(), 2);

        // Get node IDs for verification
        let raw_node_id = tracker.relation_node_id("staging.raw");
        let cleaned_node_id = tracker.relation_node_id("staging.cleaned");

        // Verify edge details using node IDs
        let raw_edge = edges.iter().find(|e| e.from == raw_node_id);
        let cleaned_edge = edges.iter().find(|e| e.from == cleaned_node_id);

        assert!(raw_edge.is_some());
        assert!(cleaned_edge.is_some());

        let raw_edge = raw_edge.unwrap();
        assert_eq!(raw_edge.statement_ids, vec![0usize, 1usize]);
    }

    #[test]
    fn test_multiple_consumers_same_table() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("shared.data", 0);
        tracker.record_consumed("shared.data", 1);
        tracker.record_consumed("shared.data", 2);
        tracker.record_consumed("shared.data", 3);

        let edges = tracker.build_cross_statement_edges();
        // Should have 3 edges (one for each consumer)
        assert_eq!(edges.len(), 3);

        // All edges should have producer (first entry) == 0
        for edge in &edges {
            assert_eq!(edge.statement_ids.first().copied(), Some(0usize));
        }
    }

    #[test]
    fn test_unknown_relation_identity() {
        let tracker = CrossStatementTracker::new();

        // Relation that was never recorded should default to table
        let (id, node_type) = tracker.relation_identity("unknown.table");
        assert!(id.starts_with("table_"));
        assert_eq!(node_type, NodeType::Table);
    }

    #[test]
    fn test_duplicate_cte_recording() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_cte("my_cte");
        tracker.record_cte("my_cte"); // Duplicate

        // Should only have one entry (HashSet deduplication)
        assert_eq!(tracker.all_ctes.len(), 1);
    }

    #[test]
    fn test_edge_id_uniqueness() {
        let mut tracker = CrossStatementTracker::new();

        // Multiple tables with multiple consumers
        tracker.record_produced("table_a", 0);
        tracker.record_produced("table_b", 1);
        tracker.record_consumed("table_a", 2);
        tracker.record_consumed("table_b", 2);
        tracker.record_consumed("table_a", 3);

        let edges = tracker.build_cross_statement_edges();
        assert_eq!(edges.len(), 3);

        // All edge IDs should be unique
        let ids: Vec<_> = edges.iter().map(|e| &e.id).collect();
        let unique_ids: std::collections::HashSet<_> = ids.iter().collect();
        assert_eq!(ids.len(), unique_ids.len());
    }

    #[test]
    fn edge_ids_differ_for_same_statement_pairs() {
        let mut tracker = CrossStatementTracker::new();

        tracker.record_produced("table_a", 0);
        tracker.record_consumed("table_a", 1);
        tracker.record_produced("table_b", 0);
        tracker.record_consumed("table_b", 1);

        let edges = tracker.build_cross_statement_edges();
        assert_eq!(edges.len(), 2);

        let ids: std::collections::HashSet<_> = edges.iter().map(|edge| edge.id.clone()).collect();
        assert_eq!(ids.len(), 2, "expected unique edge IDs for each table");
    }
}