1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
Test for issue #169: to_timestamp() + drop + materialize.
Uses get_imports from fixture only.
"""
from datetime import datetime, timedelta
from sparkless.testing import get_imports
_imports = get_imports()
SparkSession = _imports.SparkSession
F = _imports.F
class TestIssue169ToTimestampDropError:
"""Test fix for issue #169: to_timestamp() + drop() + materialize() error."""
def test_to_timestamp_drop_materialize_basic(self):
"""Test the basic reproduction case from issue #169."""
spark = SparkSession.builder.appName("test_issue_169").getOrCreate()
try:
# Create test data with timestamp strings
data = []
for i in range(150):
data.append(
{
"lab_id": f"LAB-{i:08d}",
"test_date": (
datetime.now() - timedelta(days=i % 365)
).isoformat(),
}
)
bronze_df = spark.createDataFrame(data, ["lab_id", "test_date"])
# Transform with to_timestamp() - this is the exact scenario from issue #169
silver_df = (
bronze_df.withColumn(
"test_date_clean",
F.regexp_replace(F.col("test_date"), r"\.\d+", ""),
)
.withColumn(
"test_date_parsed",
F.to_timestamp(F.col("test_date_clean"), "yyyy-MM-dd'T'HH:mm:ss"),
)
.drop("test_date_clean")
.select("lab_id", "test_date_parsed")
)
# Materialize (THIS WAS FAILING BEFORE THE FIX)
count = silver_df.count()
assert count == 150
# Verify the data is correct
rows = silver_df.collect()
assert len(rows) == 150
for row in rows:
assert "lab_id" in row
assert "test_date_parsed" in row
assert isinstance(row["test_date_parsed"], datetime)
finally:
spark.stop()
def test_to_timestamp_drop_multiple_columns(self):
"""Test to_timestamp() followed by dropping multiple columns."""
spark = SparkSession.builder.appName(
"test_issue_169_multiple_drops"
).getOrCreate()
try:
data = [
{
"id": 1,
"timestamp_str": "2024-01-01T10:00:00",
"extra_col": "test",
},
{
"id": 2,
"timestamp_str": "2024-01-02T11:00:00",
"extra_col": "test2",
},
]
df = spark.createDataFrame(data)
result = (
df.withColumn(
"ts_clean",
F.regexp_replace(F.col("timestamp_str"), r"\.\d+", ""),
)
.withColumn(
"timestamp",
F.to_timestamp(F.col("ts_clean"), "yyyy-MM-dd'T'HH:mm:ss"),
)
.drop("ts_clean", "extra_col")
.select("id", "timestamp")
)
# Materialize - should work now
count = result.count()
assert count == 2
rows = result.collect()
assert len(rows) == 2
for row in rows:
assert isinstance(row["timestamp"], datetime)
finally:
spark.stop()
def test_to_timestamp_drop_with_select(self):
"""Test to_timestamp() + drop() + select() chain."""
spark = SparkSession.builder.appName("test_issue_169_select").getOrCreate()
try:
data = [
{"id": 1, "ts_str": "2024-01-01T10:00:00"},
{"id": 2, "ts_str": "2024-01-02T11:00:00"},
]
df = spark.createDataFrame(data)
result = (
df.withColumn(
"ts_clean", F.regexp_replace(F.col("ts_str"), r"\.\d+", "")
)
.withColumn(
"ts", F.to_timestamp(F.col("ts_clean"), "yyyy-MM-dd'T'HH:mm:ss")
)
.drop("ts_clean")
.select("id", "ts")
)
# Materialize - should work
rows = result.collect()
assert len(rows) == 2
finally:
spark.stop()
def test_to_timestamp_drop_with_filter(self):
"""Test to_timestamp() + drop() + filter() chain."""
spark = SparkSession.builder.appName("test_issue_169_filter").getOrCreate()
try:
data = [
{"id": 1, "ts_str": "2024-01-01T10:00:00"},
{"id": 2, "ts_str": "2024-01-02T11:00:00"},
]
df = spark.createDataFrame(data)
result = (
df.withColumn(
"ts_clean", F.regexp_replace(F.col("ts_str"), r"\.\d+", "")
)
.withColumn(
"ts", F.to_timestamp(F.col("ts_clean"), "yyyy-MM-dd'T'HH:mm:ss")
)
.drop("ts_clean")
.filter(F.col("id") > 1)
)
# Materialize - should work
count = result.count()
assert count == 1
finally:
spark.stop()