flowscope-core 0.7.0

Core SQL lineage analysis engine
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
//! MiniJinja wrapper for template rendering.

use super::dbt::passthrough_arg_to_string;
use super::error::TemplateError;
use minijinja::{Environment, Value};
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;

use regex::Regex;
use std::sync::LazyLock;

// Time-based guards are only available on native builds (not WASM)
#[cfg(not(target_arch = "wasm32"))]
use std::time::{Duration, Instant};

/// Renders a Jinja2 template with the given context.
///
/// This is the core rendering function for plain Jinja templates
/// without dbt-specific macros.
/// Recursion limit for template rendering to prevent DoS via deeply nested templates.
/// Set lower than MiniJinja's default (500) for extra safety in WASM environments.
const RECURSION_LIMIT: usize = 100;

/// Maximum template size for regex preprocessing (10 MB).
/// Templates larger than this skip preprocessing to avoid regex DoS.
const MAX_PREPROCESS_SIZE: usize = 10_000_000;

/// Timeout for template rendering on native builds.
/// Not available in WASM where Instant is unreliable.
#[cfg(not(target_arch = "wasm32"))]
const RENDER_TIMEOUT: Duration = Duration::from_secs(5);

/// Regex to match dbt {% test ... %} ... {% endtest %} blocks.
/// These define test macros which should be stripped for lineage analysis.
///
/// Pattern uses `[^%]*` for tag contents to prevent pathological backtracking
/// on crafted input (can't match across the `%}` delimiter).
static TEST_BLOCK_RE: LazyLock<Regex> = LazyLock::new(|| {
    Regex::new(r"(?s)\{%-?\s*test\b[^%]*-?%\}.*?\{%-?\s*endtest\s*-?%\}").unwrap()
});

/// Regex to match dbt {% snapshot ... %} ... {% endsnapshot %} blocks.
/// We keep the inner content but strip the snapshot tags.
///
/// Pattern uses `[^%]*` for tag contents to prevent pathological backtracking.
static SNAPSHOT_BLOCK_RE: LazyLock<Regex> = LazyLock::new(|| {
    Regex::new(r"(?s)\{%-?\s*snapshot\b[^%]*-?%\}(.*?)\{%-?\s*endsnapshot\s*-?%\}").unwrap()
});

/// Preprocesses dbt-specific template tags that MiniJinja doesn't recognize.
///
/// This handles:
/// - `{% test ... %} ... {% endtest %}` - Removed entirely (test macro definitions)
/// - `{% snapshot ... %} ... {% endsnapshot %}` - Tags stripped, inner content preserved
///
/// # Arguments
///
/// * `template` - The template string that may contain dbt-specific tags
///
/// # Returns
///
/// The preprocessed template with dbt-specific tags handled.
/// Returns `Cow::Borrowed` when no changes are needed (zero allocation).
/// Templates larger than `MAX_PREPROCESS_SIZE` are returned unchanged to prevent
/// regex DoS on very large inputs.
fn preprocess_dbt_tags(template: &str) -> Cow<'_, str> {
    // Skip preprocessing for very large templates to avoid regex DoS
    if template.len() > MAX_PREPROCESS_SIZE {
        return Cow::Borrowed(template);
    }

    // Quick check: if no template tags present, skip regex processing entirely
    if !template.contains("{%") {
        return Cow::Borrowed(template);
    }

    // Remove test blocks entirely
    let after_test = TEST_BLOCK_RE.replace_all(template, "");

    // For snapshot blocks, keep the inner content
    let after_snapshot = SNAPSHOT_BLOCK_RE.replace_all(&after_test, "$1");

    // If no changes were made, return borrowed reference to avoid allocation
    match after_snapshot {
        Cow::Borrowed(_) if matches!(after_test, Cow::Borrowed(_)) => Cow::Borrowed(template),
        _ => Cow::Owned(after_snapshot.into_owned()),
    }
}

pub(crate) fn render_jinja(
    template: &str,
    context: &HashMap<String, serde_json::Value>,
) -> Result<String, TemplateError> {
    let mut env = Environment::new();

    // Configure environment for SQL templating
    env.set_undefined_behavior(minijinja::UndefinedBehavior::Strict);

    // Set recursion limit to prevent DoS via deeply nested templates
    env.set_recursion_limit(RECURSION_LIMIT);

    // Add the template
    env.add_template("sql", template)?;

    // Convert context to MiniJinja values
    let ctx = json_context_to_minijinja(context);

    // Render the template
    let tmpl = env.get_template("sql")?;
    let rendered = tmpl.render(ctx)?;

    Ok(rendered)
}

/// Renders a Jinja2 template with dbt builtins available.
///
/// This adds common dbt macros like `ref()`, `source()`, `config()`, and `var()`
/// as stub functions that return placeholder values suitable for lineage analysis.
///
/// Unknown macros (custom project macros, dbt_utils, etc.) are handled gracefully
/// by stubbing them on-the-fly. This allows lineage analysis even when the full
/// dbt project context isn't available.
///
/// # Performance
///
/// The function reuses the MiniJinja `Environment` across retry attempts,
/// registering new passthrough functions incrementally rather than recreating
/// the entire environment on each retry. This significantly improves performance
/// for templates with many unknown macros.
pub(crate) fn render_dbt(
    template: &str,
    context: &HashMap<String, serde_json::Value>,
) -> Result<String, TemplateError> {
    // Preprocess dbt-specific tags that MiniJinja doesn't recognize
    let preprocessed = preprocess_dbt_tags(template);

    // Track which unknown functions we've already stubbed to avoid infinite loops
    let mut stubbed_functions: HashSet<String> = HashSet::new();

    // Create environment once and reuse across retries
    let mut env = Environment::new();

    // Configure environment - use lenient mode for dbt since templates
    // may reference variables that aren't always defined
    env.set_undefined_behavior(minijinja::UndefinedBehavior::Lenient);

    // Set recursion limit to prevent DoS via deeply nested templates
    env.set_recursion_limit(RECURSION_LIMIT);

    // Register dbt builtin macros
    super::dbt::register_dbt_builtins(&mut env, context);

    // Add the preprocessed template
    env.add_template("sql", &preprocessed)?;

    // Convert context to MiniJinja values once (immutable across retries)
    let ctx = json_context_to_minijinja(context);

    // Retry loop: keep trying until we succeed or hit a non-function error
    //
    // DoS Protection Design Decision:
    // On WASM, we use only iteration limits (MAX_RETRIES) because time-based checks
    // (e.g., Instant::elapsed()) are unreliable where Instant may not be available.
    // On native builds, we add an additional time-based guard for defense-in-depth.
    // The combination of MAX_RETRIES and RECURSION_LIMIT provides baseline protection:
    // - MAX_RETRIES caps the number of unknown macro stubbing attempts
    // - RECURSION_LIMIT (set on the MiniJinja environment) prevents deeply nested templates
    // - MAX_PREPROCESS_SIZE prevents regex DoS on very large inputs
    // - RENDER_TIMEOUT (native only) provides a wall-clock safety net
    const MAX_RETRIES: usize = 50;

    #[cfg(not(target_arch = "wasm32"))]
    let start_time = Instant::now();

    for _ in 0..MAX_RETRIES {
        // On native builds, check timeout before each render attempt
        #[cfg(not(target_arch = "wasm32"))]
        if start_time.elapsed() > RENDER_TIMEOUT {
            return Err(TemplateError::RenderError(format!(
                "Template rendering timed out after {:?}. Stubbed functions: {}",
                RENDER_TIMEOUT,
                format_stubbed_list(&stubbed_functions)
            )));
        }
        // Try to render the template
        let tmpl = env.get_template("sql")?;
        match tmpl.render(ctx.clone()) {
            Ok(rendered) => {
                #[cfg(feature = "tracing")]
                if !stubbed_functions.is_empty() {
                    let stubbed_list: Vec<_> = stubbed_functions.iter().cloned().collect();
                    tracing::debug!(
                        stubbed_functions = ?stubbed_list,
                        "Template rendered with stubbed unknown macros"
                    );
                }
                return Ok(rendered);
            }
            Err(e) => {
                // Check if this is an "unknown function" error
                if let Some(func_name) = extract_unknown_function(&e) {
                    if stubbed_functions.contains(&func_name) {
                        // Already stubbed this one, something else is wrong
                        return Err(TemplateError::RenderError(e.to_string()));
                    }

                    #[cfg(feature = "tracing")]
                    tracing::debug!(
                        function = %func_name,
                        stubbed_count = stubbed_functions.len() + 1,
                        "Stubbing unknown dbt macro"
                    );

                    // Register the new stub incrementally (environment is reused)
                    register_passthrough_function(&mut env, &func_name);
                    stubbed_functions.insert(func_name);
                    // Retry with the new stub
                    continue;
                }
                // Not an unknown function error, propagate it
                return Err(TemplateError::RenderError(e.to_string()));
            }
        }
    }

    Err(TemplateError::RenderError(format!(
        "Too many unknown functions in template (limit: {MAX_RETRIES}). Stubbed: {}",
        format_stubbed_list(&stubbed_functions)
    )))
}

/// Registers a passthrough function that returns its first argument or empty string.
///
/// This is used for unknown dbt macros where we don't know the semantics,
/// but want to produce parseable SQL for lineage analysis.
fn register_passthrough_function(env: &mut Environment<'_>, name: &str) {
    let name_owned = name.to_string();
    env.add_function(name_owned.clone(), move |args: &[Value]| -> Value {
        // If the macro has arguments, return the first one (common pattern)
        // Otherwise return empty string
        if let Some(first) = args.first() {
            if let Some(rendered) = passthrough_arg_to_string(first) {
                return Value::from(rendered);
            }
        }
        // For macros like {{ generate_schema_name() }}, return the macro name
        // as a placeholder identifier
        Value::from(format!("__{name_owned}__"))
    });
}

/// Extracts the function name from an "unknown function" error.
///
/// Uses MiniJinja's `ErrorKind::UnknownFunction` for reliable detection,
/// but extracts the actual function name from the error message string.
///
/// # Fragility Warning
///
/// The name extraction relies on MiniJinja's error message format:
/// `"unknown function: <name> is unknown"`
///
/// This is inherently fragile and may break if MiniJinja changes its error
/// message format in future versions. When upgrading MiniJinja, run the
/// templating test suite to verify this still works. If MiniJinja ever
/// exposes the function name directly via `Error::name()` or similar API,
/// prefer that over string parsing.
fn extract_unknown_function(err: &minijinja::Error) -> Option<String> {
    use minijinja::ErrorKind;

    // Only handle unknown function errors
    if err.kind() != ErrorKind::UnknownFunction {
        return None;
    }

    // Extract the function name from the error message
    // MiniJinja error format: "unknown function: <name> is unknown"
    const PREFIX: &str = "unknown function: ";
    const SUFFIX: &str = " is unknown";

    let msg = err.to_string();
    let start = msg.find(PREFIX)? + PREFIX.len();
    let remaining = &msg[start..];
    let end = remaining.find(SUFFIX)?;
    let func_name = &remaining[..end];

    // Validate: must be non-empty, reasonable length, valid identifier chars
    if func_name.is_empty() || func_name.len() > 100 {
        return None;
    }
    if !func_name
        .chars()
        .all(|c| c.is_alphanumeric() || c == '_' || c == '.')
    {
        return None;
    }

    Some(func_name.to_string())
}

/// Formats a set of stubbed function names for error messages.
fn format_stubbed_list(stubbed: &HashSet<String>) -> String {
    if stubbed.is_empty() {
        "(none)".to_string()
    } else {
        let mut list: Vec<_> = stubbed.iter().cloned().collect();
        list.sort();
        list.join(", ")
    }
}

/// Converts a JSON context map to MiniJinja Value format.
fn json_context_to_minijinja(context: &HashMap<String, serde_json::Value>) -> Value {
    Value::from_serialize(context)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn renders_simple_variable() {
        let mut ctx = HashMap::new();
        ctx.insert("table_name".to_string(), serde_json::json!("users"));

        let result = render_jinja("SELECT * FROM {{ table_name }}", &ctx).unwrap();
        assert_eq!(result, "SELECT * FROM users");
    }

    #[test]
    fn renders_conditional() {
        let mut ctx = HashMap::new();
        ctx.insert("include_deleted".to_string(), serde_json::json!(true));

        let template =
            r#"SELECT * FROM users{% if include_deleted %} WHERE deleted = false{% endif %}"#;
        let result = render_jinja(template, &ctx).unwrap();
        assert_eq!(result, "SELECT * FROM users WHERE deleted = false");
    }

    #[test]
    fn renders_loop() {
        let mut ctx = HashMap::new();
        ctx.insert(
            "columns".to_string(),
            serde_json::json!(["id", "name", "email"]),
        );

        let template = r#"SELECT {% for col in columns %}{{ col }}{% if not loop.last %}, {% endif %}{% endfor %} FROM users"#;
        let result = render_jinja(template, &ctx).unwrap();
        assert_eq!(result, "SELECT id, name, email FROM users");
    }

    #[test]
    fn errors_on_undefined_variable_in_strict_mode() {
        let ctx = HashMap::new();
        let result = render_jinja("SELECT * FROM {{ undefined_table }}", &ctx);
        assert!(result.is_err());
        assert!(matches!(
            result.unwrap_err(),
            TemplateError::UndefinedVariable(_)
        ));
    }

    #[test]
    fn errors_on_syntax_error() {
        let ctx = HashMap::new();
        let result = render_jinja("SELECT * FROM {{ unclosed", &ctx);
        assert!(result.is_err());
        assert!(matches!(result.unwrap_err(), TemplateError::SyntaxError(_)));
    }

    // =========================================================================
    // Tag preprocessing tests
    // =========================================================================

    #[test]
    fn preprocess_removes_test_blocks() {
        let template = r#"{% test my_test(model) %}
SELECT * FROM {{ model }} WHERE id IS NULL
{% endtest %}

SELECT * FROM users"#;

        let result = preprocess_dbt_tags(template);
        assert!(!result.contains("test my_test"));
        assert!(!result.contains("endtest"));
        assert!(result.contains("SELECT * FROM users"));
    }

    #[test]
    fn preprocess_removes_test_blocks_with_whitespace_control() {
        let template = r#"{%- test not_null(model, column_name) -%}
SELECT * FROM {{ model }} WHERE {{ column_name }} IS NULL
{%- endtest -%}
SELECT 1"#;

        let result = preprocess_dbt_tags(template);
        assert!(!result.contains("test not_null"));
        assert!(result.contains("SELECT 1"));
    }

    #[test]
    fn preprocess_keeps_snapshot_content() {
        let template = r#"{% snapshot orders_snapshot %}
SELECT * FROM orders
{% endsnapshot %}"#;

        let result = preprocess_dbt_tags(template);
        assert!(!result.contains("snapshot orders_snapshot"));
        assert!(!result.contains("endsnapshot"));
        assert!(result.contains("SELECT * FROM orders"));
    }

    #[test]
    fn preprocess_handles_multiple_blocks() {
        let template = r#"{% test test1() %}test sql{% endtest %}
{% snapshot snap1 %}SELECT 1{% endsnapshot %}
{% test test2() %}more test sql{% endtest %}
SELECT * FROM final"#;

        let result = preprocess_dbt_tags(template);
        assert!(!result.contains("test1"));
        assert!(!result.contains("test2"));
        assert!(result.contains("SELECT 1")); // snapshot content preserved
        assert!(result.contains("SELECT * FROM final"));
    }

    #[test]
    fn dbt_render_with_test_block() {
        // Full integration: test block should be stripped before rendering
        let ctx = HashMap::new();
        let template = r#"{% test my_test(model) %}
SELECT * FROM {{ ref('test_model') }}
{% endtest %}

SELECT * FROM {{ ref('users') }}"#;

        let result = render_dbt(template, &ctx).unwrap();
        assert!(!result.contains("test_model"));
        assert!(result.contains("users"));
    }

    #[test]
    fn dbt_render_with_snapshot_block() {
        let ctx = HashMap::new();
        let template = r#"{% snapshot my_snapshot %}
{{ config(unique_key='id') }}
SELECT * FROM {{ ref('source_table') }}
{% endsnapshot %}"#;

        let result = render_dbt(template, &ctx).unwrap();
        assert!(result.contains("SELECT * FROM source_table"));
        assert!(!result.contains("snapshot"));
    }
}