1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Test for issue #158: 'cannot resolve' error when referencing dropped columns in select() and filter().
Verifies that referencing a dropped column raises an error with a consistent message.
Uses get_imports from fixture only.
"""
import pytest
from sparkless.testing import get_imports
_imports = get_imports()
SparkSession = _imports.SparkSession
F = _imports.F
class TestIssue158DroppedColumnError:
"""Test cases for issue #158: dropped column error messages."""
def test_select_dropped_column_raises_consistent_error(self):
"""Test that selecting a dropped column raises consistent error message."""
spark = SparkSession.builder.appName("test").getOrCreate()
# Create DataFrame with column
data = [("imp_001", "2024-01-15T10:30:45.123456", "campaign_1")]
df = spark.createDataFrame(
data, ["impression_id", "impression_date", "campaign_id"]
)
# Apply transform that drops the column
df_transformed = df.withColumn(
"impression_date_parsed",
F.to_timestamp(
F.regexp_replace(F.col("impression_date"), r"\.\d+", "").cast("string"),
"yyyy-MM-dd'T'HH:mm:ss",
),
).select(
"impression_id",
"campaign_id",
"impression_date_parsed", # New column, original 'impression_date' is dropped
)
# Verify column is dropped
assert "impression_date" not in df_transformed.columns
assert "impression_date_parsed" in df_transformed.columns
# select() should raise with consistent message
with pytest.raises(Exception) as exc_info:
df_transformed.select("impression_date")
# PySpark error message includes unresolved_column.with_suggestion and
# the phrase "cannot be resolved".
error_msg = str(exc_info.value).lower()
assert "cannot be resolved" in error_msg
assert "unresolved_column" in error_msg
assert "impression_date" in error_msg
assert "impression_id" in error_msg or "campaign_id" in error_msg
def test_select_dropped_column_with_f_col(self):
"""Test that selecting a dropped column with F.col() raises consistent error."""
spark = SparkSession.builder.appName("test").getOrCreate()
# Create DataFrame
df = spark.createDataFrame([("a", "b")], ["col1", "col2"])
# Drop column via select
df_dropped = df.select("col1")
# Select dropped column with F.col() must raise (at select or at collect)
with pytest.raises(Exception) as exc_info:
df_dropped.select(F.col("col2")).collect()
# PySpark error message includes unresolved_column.with_suggestion and
# the phrase "cannot be resolved".
error_msg = str(exc_info.value).lower()
assert "cannot be resolved" in error_msg
assert "unresolved_column" in error_msg
assert "col2" in error_msg
assert "col1" in error_msg
def test_filter_dropped_column_behavior_matches_pyspark(self):
"""Test filtering with a dropped column matches PySpark behavior."""
spark = SparkSession.builder.appName("test").getOrCreate()
# Create DataFrame
df = spark.createDataFrame([("a", "b")], ["col1", "col2"])
# Drop column via select
df_dropped = df.select("col1")
# In PySpark, applying a filter that references a previously-dropped
# column is still allowed: the filter is pushed below the projection
# and the result retains only the projected columns.
result = df_dropped.filter(F.col("col2").isNotNull())
rows = result.collect()
assert result.columns == ["col1"]
assert len(rows) == 1
assert rows[0]["col1"] == "a"
def test_minimal_reproduction(self):
"""Minimal reproduction of the bug."""
spark = SparkSession.builder.appName("minimal_repro").getOrCreate()
# Create DataFrame
df = spark.createDataFrame([("a", "b")], ["col1", "col2"])
# Drop column via select
df_dropped = df.select("col1")
# Try to select dropped column - should raise with consistent message
with pytest.raises(Exception) as exc_info:
df_dropped.select("col2")
# PySpark error message includes unresolved_column.with_suggestion and
# the phrase "cannot be resolved".
error_msg = str(exc_info.value).lower()
assert "cannot be resolved" in error_msg
assert "unresolved_column" in error_msg
assert "col2" in error_msg
assert "col1" in error_msg