streamling-e2e 0.1.0

End-to-end tests for streamling
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
//! SQL transform e2e tests.
//!
//! These tests verify SQL transform behavior including:
//! - _gs_op column propagation and preservation
//! - UNION query handling
//! - Flink-compatible string functions
//! - SQL filter metrics
//!
//! Ported from crates/streamling/tests/pipeline.rs

use serde::Serialize;
use streamling_e2e::{init_tracing, PipelineOpts, TestContext, TestContextOptions};

// ============================================================================
// Test Record Types
// ============================================================================

/// Test record matching the standard test schema (block, id, data)
#[derive(Debug, Clone, Serialize)]
struct TestRecord {
    block: i64,
    id: String,
    data: String,
}

const TEST_SCHEMA: &str = r#"{
    "type": "record",
    "name": "TestMessage",
    "fields": [
        {"name": "block", "type": "long"},
        {"name": "id", "type": "string"},
        {"name": "data", "type": "string"}
    ]
}"#;

// ============================================================================
// Helper functions
// ============================================================================

fn create_test_records(count: usize) -> Vec<TestRecord> {
    (1..=count)
        .map(|i| TestRecord {
            block: i as i64,
            id: format!("id_{}", i),
            data: format!("data{}", i),
        })
        .collect()
}

// ============================================================================
// Scenario 1: _gs_op propagation tests
// ============================================================================

/// Test that _gs_op is auto-propagated when SQL transform doesn't explicitly select it
#[tokio::test]
async fn test_sql_transform_propagates_gs_op_when_missing() {
    init_tracing();

    let ctx = TestContext::new()
        .await
        .expect("Failed to create test context");

    // Register schema and produce test data
    ctx.kafka
        .register_schema(TEST_SCHEMA)
        .await
        .expect("Failed to register schema");

    let records = create_test_records(10);
    ctx.kafka
        .produce_avro_records(&records)
        .await
        .expect("Failed to produce records");

    // Pipeline: Kafka source -> SQL transform (omitting _gs_op) -> print sink
    let pipeline = format!(
        r#"
sources:
  kafka_source:
    type: kafka
    topic: {topic}
    primary_key: id

transforms:
  sql_transform:
    type: sql
    sql: "SELECT id, data FROM kafka_source"
    primary_key: id

sinks:
  print_sink:
    type: print
    from: sql_transform
    sample_every: 1
"#,
        topic = ctx.kafka_topic
    );

    let output = ctx
        .run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
        .await
        .expect("Pipeline should complete successfully");

    // Verify _gs_op was auto-propagated into the output
    assert!(
        output.has_column("_gs_op"),
        "_gs_op should be present in output schema even when not explicitly selected. Got columns: {:?}",
        output.column_names()
    );

    // Verify we got the expected number of rows
    assert_eq!(output.len(), 10, "Should have processed 10 records");
}

/// Test that explicitly selecting _gs_op preserves it (no duplication)
#[tokio::test]
async fn test_sql_transform_preserves_existing_gs_op() {
    init_tracing();

    let ctx = TestContext::new()
        .await
        .expect("Failed to create test context");

    ctx.kafka
        .register_schema(TEST_SCHEMA)
        .await
        .expect("Failed to register schema");

    let records = create_test_records(10);
    ctx.kafka
        .produce_avro_records(&records)
        .await
        .expect("Failed to produce records");

    // Pipeline: SQL transform explicitly includes _gs_op
    let pipeline = format!(
        r#"
sources:
  kafka_source:
    type: kafka
    topic: {topic}
    primary_key: id

transforms:
  sql_transform:
    type: sql
    sql: "SELECT id, data, _gs_op FROM kafka_source"
    primary_key: id

sinks:
  print_sink:
    type: print
    from: sql_transform
    sample_every: 1
"#,
        topic = ctx.kafka_topic
    );

    let output = ctx
        .run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
        .await
        .expect("Pipeline should complete successfully");

    // Verify _gs_op is present exactly once (no duplication)
    let columns = output.column_names();
    let gs_op_count = columns.iter().filter(|c| *c == "_gs_op").count();
    assert_eq!(
        gs_op_count, 1,
        "_gs_op should appear exactly once in schema"
    );
}

// ============================================================================
// Scenario 2: UNION query _gs_op handling
// ============================================================================

/// Test that _gs_op is propagated across UNION ALL when not explicitly selected
#[tokio::test]
async fn test_sql_union_propagates_gs_op_when_missing() {
    init_tracing();

    let ctx = TestContext::new()
        .await
        .expect("Failed to create test context");

    ctx.kafka
        .register_schema(TEST_SCHEMA)
        .await
        .expect("Failed to register schema");

    let records = create_test_records(10);
    ctx.kafka
        .produce_avro_records(&records)
        .await
        .expect("Failed to produce records");

    // Pipeline: UNION ALL without _gs_op in either branch
    let pipeline = format!(
        r#"
sources:
  kafka_source:
    type: kafka
    topic: {topic}
    primary_key: id

transforms:
  sql_transform:
    type: sql
    sql: "SELECT id, data FROM kafka_source UNION ALL SELECT id, data FROM kafka_source"
    primary_key: id

sinks:
  print_sink:
    type: print
    from: sql_transform
    sample_every: 1
"#,
        topic = ctx.kafka_topic
    );

    let output = ctx
        .run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(20)) // 10 records * 2 (UNION ALL)
        .await
        .expect("Pipeline should complete successfully");

    // Verify _gs_op was auto-propagated across the union
    assert!(
        output.has_column("_gs_op"),
        "_gs_op should be present in output schema for UNION"
    );

    // UNION ALL should double the records
    assert_eq!(output.len(), 20, "UNION ALL should produce 20 records");
}

/// Test that explicitly selecting _gs_op in UNION branches preserves it once
#[tokio::test]
async fn test_sql_union_preserves_existing_gs_op() {
    init_tracing();

    let ctx = TestContext::new()
        .await
        .expect("Failed to create test context");

    ctx.kafka
        .register_schema(TEST_SCHEMA)
        .await
        .expect("Failed to register schema");

    let records = create_test_records(10);
    ctx.kafka
        .produce_avro_records(&records)
        .await
        .expect("Failed to produce records");

    // Pipeline: UNION ALL with _gs_op explicitly in both branches
    let pipeline = format!(
        r#"
sources:
  kafka_source:
    type: kafka
    topic: {topic}
    primary_key: id

transforms:
  sql_transform:
    type: sql
    sql: "SELECT id, data, _gs_op FROM kafka_source UNION ALL SELECT id, data, _gs_op FROM kafka_source"
    primary_key: id

sinks:
  print_sink:
    type: print
    from: sql_transform
    sample_every: 1
"#,
        topic = ctx.kafka_topic
    );

    let output = ctx
        .run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(20))
        .await
        .expect("Pipeline should complete successfully");

    // Verify _gs_op is present exactly once (no duplication)
    let columns = output.column_names();
    let gs_op_count = columns.iter().filter(|c| *c == "_gs_op").count();
    assert_eq!(
        gs_op_count, 1,
        "_gs_op should appear exactly once in schema for UNION"
    );
}

// ============================================================================
// Scenario 3: Flink string function compatibility
// ============================================================================

/// Test that Flink-compatible string functions work in SQL transforms
#[tokio::test]
async fn test_sql_transform_uses_flink_string_functions() {
    init_tracing();

    let ctx = TestContext::new()
        .await
        .expect("Failed to create test context");

    ctx.kafka
        .register_schema(TEST_SCHEMA)
        .await
        .expect("Failed to register schema");

    let records = create_test_records(10);
    ctx.kafka
        .produce_avro_records(&records)
        .await
        .expect("Failed to produce records");

    // Pipeline: SQL transform using Flink string functions
    let pipeline = format!(
        r#"
sources:
  kafka_source:
    type: kafka
    topic: {topic}
    primary_key: id

transforms:
  sql_transform:
    type: sql
    sql: |
      SELECT
        id,
        data,
        _gs_op,
        charLength(data) AS data_len,
        TRANSLATE(data || 'a', 'a', 'z') AS translated,
        REGEXP(data, '^data') AS matches_prefix
      FROM kafka_source
    primary_key: id

sinks:
  print_sink:
    type: print
    from: sql_transform
    sample_every: 1
"#,
        topic = ctx.kafka_topic
    );

    let output = ctx
        .run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
        .await
        .expect("Pipeline should complete successfully");

    // Verify computed columns exist
    assert!(
        output.has_column("data_len"),
        "data_len column should exist"
    );
    assert!(
        output.has_column("translated"),
        "translated column should exist"
    );
    assert!(
        output.has_column("matches_prefix"),
        "matches_prefix column should exist"
    );

    // Verify we got results
    assert!(!output.is_empty(), "Should have processed records");

    // Verify charLength works correctly
    for row in output.rows() {
        if let Some(data) = row.data.get("data").and_then(|v| v.as_str()) {
            if let Some(data_len) = row.data.get("data_len").and_then(|v| v.as_i64()) {
                assert_eq!(
                    data_len as usize,
                    data.len(),
                    "charLength should match actual string length"
                );
            }
        }
    }
}

// ============================================================================
// Scenario 4: SQL filter with metrics verification
// ============================================================================

/// Test that SQL WHERE filter shows different input/output row counts in metrics
#[tokio::test]
async fn test_pipeline_sql_filter_diff_in_input_output_rows() {
    init_tracing();

    let ctx = TestContext::with_options(TestContextOptions::new().with_prometheus())
        .await
        .expect("Failed to create test context");

    let prometheus = ctx
        .prometheus
        .as_ref()
        .expect("Prometheus should be available");

    ctx.kafka
        .register_schema(TEST_SCHEMA)
        .await
        .expect("Failed to register schema");

    // Create 100 records with block values 1-100
    let records = create_test_records(100);
    ctx.kafka
        .produce_avro_records(&records)
        .await
        .expect("Failed to produce records");

    // Pipeline: Filter where block % 2 = 0 (should pass ~50 records)
    let pipeline = format!(
        r#"
sources:
  kafka_source:
    type: kafka
    topic: {topic}
    primary_key: id

transforms:
  sql_transform:
    type: sql
    sql: "SELECT id, data, _gs_op FROM kafka_source WHERE block % 2 = 0"
    primary_key: id

sinks:
  blackhole_sink:
    type: blackhole
    from: sql_transform
"#,
        topic = ctx.kafka_topic
    );

    let _status = ctx
        .run_pipeline_with_opts(
            &pipeline,
            PipelineOpts::new().record_limit(50), // Expecting ~50 records to pass filter
        )
        .await
        .expect("Pipeline should complete successfully");

    // Build queries for this test's instance
    use streamling_e2e::resources::PrometheusResource;
    let input_query = PrometheusResource::input_rows_query("sql_transform", Some(&ctx.test_id));
    let output_query = PrometheusResource::output_rows_query("sql_transform", Some(&ctx.test_id));

    // Verify input rows metric (should be ~100, all records processed)
    let input_rows = prometheus
        .wait_for_metric_at_least(&input_query, 50, 10, 500)
        .await;
    assert!(
        input_rows.is_ok(),
        "Should have input rows metric for sql_transform: {:?}",
        input_rows
    );

    // Verify output rows metric (should be ~50, only filtered records)
    let output_rows = prometheus
        .wait_for_metric_at_least(&output_query, 50, 10, 500)
        .await;
    assert!(
        output_rows.is_ok(),
        "Should have output rows metric for sql_transform: {:?}",
        output_rows
    );
}

// ============================================================================
// Scenario 5: Chained SQL transforms with comments containing apostrophes
// ============================================================================

/// Regression test: SQL comments with apostrophes (e.g. `-- don't`) must not
/// break the topology sort that determines transform execution order.
/// Previously, apostrophes in comments confused the hand-rolled string-literal
/// stripper, causing downstream transforms to fail with "table not found".
#[tokio::test]
async fn test_chained_sql_transforms_with_comment_apostrophes() {
    init_tracing();

    let ctx = TestContext::new()
        .await
        .expect("Failed to create test context");

    ctx.kafka
        .register_schema(TEST_SCHEMA)
        .await
        .expect("Failed to register schema");

    let records = create_test_records(10);
    ctx.kafka
        .produce_avro_records(&records)
        .await
        .expect("Failed to produce records");

    // Pipeline: three chained SQL transforms where the middle one has
    // comments containing apostrophes.  The topology sort must correctly
    // detect that step2 depends on step1 and step3 depends on step2.
    let pipeline = format!(
        r#"
sources:
  kafka_source:
    type: kafka
    topic: {topic}
    primary_key: id

transforms:
  step1:
    type: sql
    sql: |
      SELECT
        id,
        block,
        -- don't remove this comment — it has apostrophes
        CONCAT('prefix_', data) AS prefixed_data
      FROM kafka_source
    primary_key: id

  step2:
    type: sql
    sql: |
      SELECT
        id,
        block,
        -- it's important that this comment doesn't break parsing
        prefixed_data,
        block * 2 AS double_block
      FROM step1
      WHERE block > 0
    primary_key: id

  step3:
    type: sql
    sql: "SELECT id, double_block, prefixed_data FROM step2"
    primary_key: id

sinks:
  print_sink:
    type: print
    from: step3
    sample_every: 1
"#,
        topic = ctx.kafka_topic
    );

    let output = ctx
        .run_pipeline_with_capture(&pipeline, PipelineOpts::new().record_limit(10))
        .await
        .expect("Pipeline should complete — apostrophes in comments must not break topology sort");

    assert!(
        output.has_column("double_block"),
        "step2's computed column should propagate through step3"
    );
    assert!(
        output.has_column("prefixed_data"),
        "step1's computed column should propagate through"
    );
    assert_eq!(output.len(), 10, "Should have processed all 10 records");
}