1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
"""Tests for issue #188: String concatenation cache handling edge cases.
This module tests edge cases for string concatenation cache handling.
Uses get_spark_imports from fixture only.
"""
import pytest
from tests.fixtures.spark_imports import get_spark_imports
_imports = get_spark_imports()
SparkSession = _imports.SparkSession
F = _imports.F
class TestStringConcatenationCacheEdgeCases:
"""Test edge cases for string concatenation cache handling."""
@pytest.fixture
def spark(self, request):
"""Use conftest spark fixture."""
return request.getfixturevalue("spark")
def test_string_concat_with_empty_strings(self, spark):
"""Test string concatenation with empty strings in cached DataFrame."""
df = spark.createDataFrame([("", ""), ("a", ""), ("", "b")], ["col1", "col2"])
# String concatenation with + operator
df2 = df.withColumn("concat", F.col("col1") + F.col("col2"))
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
results = df2_cached.collect()
# In PySpark, using + with string columns attempts numeric addition;
# non-numeric strings are cast to null, so all results are None.
assert [result["concat"] for result in results] == [None, None, None], (
"String-like addition with + yields nulls for non-numeric strings when cached"
)
def test_string_concat_with_none_values(self, spark):
"""Test string concatenation with None values in cached DataFrame."""
df = spark.createDataFrame(
[("a", None), (None, "b"), (None, None)], ["col1", "col2"]
)
# String concatenation with + operator
df2 = df.withColumn("concat", F.col("col1") + F.col("col2"))
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
results = df2_cached.collect()
# String concat with None: None+anything => None (null propagation)
assert (
results[0]["concat"] is None
and results[1]["concat"] is None
and results[2]["concat"] is None
), "String concatenation with None values when cached"
def test_nested_string_concat(self, spark):
"""Test nested string concatenation operations in cached DataFrame."""
df = spark.createDataFrame([("a", "b", "c")], ["col1", "col2", "col3"])
# Nested string concatenation: (col1 + col2) + col3
df2 = df.withColumn("concat", (F.col("col1") + F.col("col2")) + F.col("col3"))
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
result = df2_cached.collect()[0]
# In PySpark, nested + on string columns still performs numeric-style
# addition; here it yields null.
assert result["concat"] is None, "Nested string addition yields null in PySpark"
def test_string_concat_vs_numeric_addition(self, spark):
"""Test that numeric addition is NOT treated as string concatenation."""
df = spark.createDataFrame([(1, 2), (3, 4)], ["col1", "col2"])
# Numeric addition with + operator (should NOT be affected by cache)
df2 = df.withColumn("sum", F.col("col1") + F.col("col2"))
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
results = df2_cached.collect()
# Numeric addition should work correctly (not None)
# The heuristic checks if result is string before setting to None
assert results[0]["sum"] == 3, (
"Numeric addition should not be affected by cache"
)
assert results[1]["sum"] == 7, (
"Numeric addition should not be affected by cache"
)
def test_string_concat_with_literal(self, spark):
"""Test string concatenation with literal strings in cached DataFrame."""
df = spark.createDataFrame([("John",)], ["name"])
# String concatenation with literal
df2 = df.withColumn("greeting", F.col("name") + F.lit(" Doe"))
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
result = df2_cached.collect()[0]
# In PySpark, string + literal also attempts numeric addition and
# produces null for non-numeric strings.
assert result["greeting"] is None, (
"String + literal yields null for non-numeric strings in PySpark"
)
def test_multiple_string_concat_columns(self, spark):
"""Test multiple string concatenation columns in cached DataFrame."""
df = spark.createDataFrame(
[("a", "b", "c", "d")], ["col1", "col2", "col3", "col4"]
)
# Multiple string concatenation columns
df2 = df.withColumn("concat1", F.col("col1") + F.col("col2")).withColumn(
"concat2", F.col("col3") + F.col("col4")
)
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
result = df2_cached.collect()[0]
# Multiple string concat columns are all null when using + on strings.
assert result["concat1"] is None, "First string concat column yields null"
assert result["concat2"] is None, "Second string concat column yields null"
def test_string_concat_with_select(self, spark):
"""Test string concatenation followed by select in cached DataFrame."""
df = spark.createDataFrame([("a", "b")], ["col1", "col2"])
# String concatenation with + operator
df2 = df.withColumn("concat", F.col("col1") + F.col("col2"))
# Select the concatenated column
df3 = df2.select("concat")
# Cache the DataFrame
df3_cached = df3.cache()
_ = df3_cached.count() # Force materialization
result = df3_cached.collect()[0]
# String concat in select yields null when using + on strings.
assert result["concat"] is None, "String addition in select yields null"
def test_string_concat_chained_operations(self, spark):
"""Test string concatenation with chained operations in cached DataFrame."""
df = spark.createDataFrame([("a", "b", "c")], ["col1", "col2", "col3"])
# Chain multiple operations
df2 = (
df.withColumn("concat", F.col("col1") + F.col("col2"))
.filter(F.col("concat").isNotNull())
.withColumn("full", F.col("concat") + F.col("col3"))
)
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
results = df2_cached.collect()
# With PySpark semantics, the initial + on strings yields null, so the
# filter on isNotNull() drops all rows.
assert results == [], (
"Chained operations yield no rows under PySpark + semantics"
)
def test_string_concat_without_caching(self, spark):
"""Test that string concatenation works normally without caching."""
df = spark.createDataFrame([("a", "b")], ["col1", "col2"])
# String concatenation with + operator (NOT cached)
df2 = df.withColumn("concat", F.col("col1") + F.col("col2"))
result = df2.collect()[0]
# Even without caching, + on string columns behaves as numeric addition
# and yields null for non-numeric strings.
assert result["concat"] is None, (
"String addition with + yields null for non-numeric strings without caching"
)
def test_concat_function_with_caching(self, spark):
"""Test that F.concat() works correctly even with caching."""
df = spark.createDataFrame([("a", "b")], ["col1", "col2"])
# Using F.concat() (recommended approach)
df2 = df.withColumn("concat", F.concat(F.col("col1"), F.col("col2")))
# Cache the DataFrame
df2_cached = df2.cache()
_ = df2_cached.count() # Force materialization
result = df2_cached.collect()[0]
# F.concat() should work correctly even when cached
assert result["concat"] == "ab", (
"F.concat() should work correctly even when DataFrame is cached"
)