1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""Tests for issue #173: Validation fails during materialization when replaying operations.
This issue occurs when:
1. A DataFrame has withColumn operations that reference columns
2. Those columns are then dropped via select()
3. Materialization replays the withColumn operations
4. Validation uses the final schema (after select) instead of the schema at queue time
"""
from sparkless.testing import get_imports
_imports = get_imports()
SparkSession = _imports.SparkSession
F = _imports.F
from datetime import datetime, timedelta
class TestIssue173ValidationDuringMaterialization:
"""Test cases for issue #173: validation during materialization replay."""
def test_validation_during_materialization_with_dropped_columns(self):
"""Test that validation works during materialization when columns are dropped.
This test verifies the fix where:
- withColumn operations reference columns (timestamp_str, value)
- select() drops those columns
- Materialization replays withColumn operations
- Validation uses schema at queue time (not final schema after select)
"""
spark = SparkSession.builder.appName("test").getOrCreate()
try:
data = []
for i in range(100):
data.append(
{
"id": f"ID-{i:08d}",
# Use a timestamp format that both PySpark 3.5+ and Sparkless parse consistently.
"timestamp_str": (
datetime.now() - timedelta(days=i % 365)
).strftime("%Y-%m-%dT%H:%M:%S"),
"value": i * 10,
}
)
df = spark.createDataFrame(data, ["id", "timestamp_str", "value"])
# Transform that uses timestamp_str then drops it
# This should work - columns exist when withColumn is called
transformed_df = (
df.withColumn(
"timestamp_parsed",
F.to_timestamp(F.col("timestamp_str"), "yyyy-MM-dd'T'HH:mm:ss"),
)
.withColumn("value_doubled", F.col("value") * 2)
.select(
"id", "timestamp_parsed", "value_doubled"
) # timestamp_str and value are DROPPED
)
# This triggers materialization and the bug
# During materialization, withColumn operations are replayed
# Validation uses the final schema (after select) which doesn't have timestamp_str
validation_predicate = (
F.col("id").isNotNull()
& F.col("timestamp_parsed").isNotNull()
& F.col("value_doubled").isNotNull()
)
valid_df = transformed_df.filter(validation_predicate)
count = valid_df.count()
assert count == 100
finally:
spark.stop()