robin-sparkless 4.0.0

PySpark-like DataFrame API in Rust on Polars; no JVM.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
"""
Tests for issue #297: Join with different case columns and select with third case.

PySpark allows selecting columns with a different case when multiple columns
with different cases exist after a join. It picks the first matching column.
"""

import os

import pytest

from tests.fixtures.spark_imports import get_spark_imports

_imports = get_spark_imports()
SparkSession = _imports.SparkSession
F = _imports.F


class TestIssue297JoinDifferentCaseSelect:
    """Test join with different case columns and select with third case."""

    @pytest.mark.skipif(
        (
            os.environ.get("SPARKLESS_TEST_BACKEND")
            or os.environ.get("MOCK_SPARK_TEST_BACKEND")
            or ""
        )
        .strip()
        .lower()
        == "pyspark",
        reason="Skipped in PySpark mode (driver/worker Python version mismatch with pytest-xdist)",
    )
    def test_join_different_case_select_third_case(self):
        """Test joining DataFrames with different case keys and selecting with third case."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            # Create two dataframes with different case keys
            df1 = spark.createDataFrame(
                [
                    {"name": "Alice", "Value1": 1},
                    {"name": "Bob", "Value1": 2},
                ]
            )

            df2 = spark.createDataFrame(
                [
                    {"NAME": "Alice", "Value2": 1},
                    {"NAME": "Bob", "Value2": 2},
                ]
            )

            # Join the dataframes and apply select with different case
            df = df1.join(df2, on="Name", how="left").select("NaMe", "Value1", "Value2")

            # Should not raise an exception - PySpark picks the first match
            result = df.collect()

            # Verify results
            assert len(result) == 2
            assert result[0]["NaMe"] == "Alice"
            assert result[0]["Value1"] == 1
            assert result[0]["Value2"] == 1
            assert result[1]["NaMe"] == "Bob"
            assert result[1]["Value1"] == 2
            assert result[1]["Value2"] == 2

            # Verify column names in result
            assert "NaMe" in df.columns
            assert "Value1" in df.columns
            assert "Value2" in df.columns
        finally:
            spark.stop()

    def test_join_different_case_select_left_column(self):
        """Test that selecting with different case picks the left DataFrame's column."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame([{"name": "Alice", "value": 1}])
            df2 = spark.createDataFrame([{"NAME": "Bob", "value": 2}])

            # Join on different case column names; use qualified columns to avoid ambiguous "value"
            df = df1.join(df2, df1["name"] == df2["NAME"], "left")
            result = df.select(df1["name"].alias("NaMe"), df1["value"]).collect()

            assert len(result) == 1
            assert result[0]["NaMe"] == "Alice"
            assert result[0]["value"] == 1
        finally:
            spark.stop()

    def test_join_same_case_no_ambiguity(self):
        """Test that joins with same case columns work normally."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame([{"name": "Alice", "value1": 1}])
            df2 = spark.createDataFrame(
                [{"name": "Alice", "value2": 2}]
            )  # Same name to match

            df = df1.join(df2, on="name", how="left")
            result = df.select("name", "value1", "value2").collect()

            assert len(result) == 1
            assert result[0]["name"] == "Alice"
            assert result[0]["value1"] == 1
            assert result[0]["value2"] == 2
        finally:
            spark.stop()

    def test_different_join_types(self):
        """Test that the fix works with different join types."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame(
                [
                    {"name": "Alice", "id": 1},
                    {"name": "Bob", "id": 2},
                    {"name": "Charlie", "id": 3},
                ]
            )
            df2 = spark.createDataFrame(
                [
                    {"NAME": "Alice", "score": 100},
                    {"NAME": "Bob", "score": 200},
                    {"NAME": "David", "score": 300},
                ]
            )

            # Test inner join
            inner_df = df1.join(df2, on="Name", how="inner").select(
                "NaMe", "id", "score"
            )
            inner_result = inner_df.collect()
            assert len(inner_result) == 2  # Only matching rows
            assert inner_result[0]["NaMe"] == "Alice"

            # Test left join
            left_df = df1.join(df2, on="Name", how="left").select("NaMe", "id", "score")
            left_result = left_df.collect()
            assert len(left_result) == 3  # All left rows
            assert left_result[2]["NaMe"] == "Charlie"  # No match, should be None
            assert left_result[2]["score"] is None

            # Test right join
            right_df = df1.join(df2, on="Name", how="right").select(
                "NaMe", "id", "score"
            )
            right_result = right_df.collect()
            assert len(right_result) == 3  # All right rows
            # "NaMe" picks first match case-insensitively. PySpark returns "David"
            # (from right's NAME) for the David row - column resolution varies by implementation.
            david_row = next((r for r in right_result if r["score"] == 300), None)
            assert david_row is not None
            assert david_row["NaMe"] == "David"
            assert david_row["id"] is None  # No match in left

            # Test outer join
            outer_df = df1.join(df2, on="Name", how="outer").select(
                "NaMe", "id", "score"
            )
            outer_result = outer_df.collect()
            assert len(outer_result) == 4  # All rows from both sides
        finally:
            spark.stop()

    def test_multiple_ambiguous_columns(self):
        """Test selecting multiple columns (use qualified refs to avoid ambiguous AgE/CiTy)."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame(
                [
                    {"name": "Alice", "age": 25, "city": "NYC"},
                    {"name": "Bob", "age": 30, "city": "LA"},
                ]
            )
            df2 = spark.createDataFrame(
                [
                    {"NAME": "Alice", "AGE": 25, "CITY": "NYC"},
                    {"NAME": "Bob", "AGE": 30, "CITY": "LA"},
                ]
            )

            df = df1.join(df2, df1["name"] == df2["NAME"], "left")
            result = df.select(
                df1["name"].alias("NaMe"),
                df1["age"].alias("AgE"),
                df1["city"].alias("CiTy"),
                df1["age"],
                df1["city"],
            ).collect()

            assert len(result) == 2
            assert result[0]["NaMe"] == "Alice"
            assert result[0]["AgE"] == 25
            assert result[0]["CiTy"] == "NYC"
            assert result[0]["age"] == 25
            assert result[0]["city"] == "NYC"
        finally:
            spark.stop()

    def test_chained_operations_after_select(self):
        """Test that operations after select work correctly with ambiguous columns."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame(
                [
                    {"name": "Alice", "value": 10},
                    {"name": "Bob", "value": 20},
                    {"name": "Charlie", "value": 30},
                ]
            )
            df2 = spark.createDataFrame(
                [
                    {"NAME": "Alice", "score": 100},
                    {"NAME": "Bob", "score": 200},
                ]
            )

            # Join, select with different case, then filter
            df = (
                df1.join(df2, on="Name", how="left")
                .select("NaMe", "value", "score")
                .filter(F.col("value") > 15)
            )

            result = df.collect()
            assert len(result) == 2
            assert result[0]["NaMe"] == "Bob"
            assert result[1]["NaMe"] == "Charlie"

            # Test orderBy
            ordered_df = (
                df1.join(df2, on="Name", how="left")
                .select("NaMe", "value")
                .orderBy(F.desc("value"))
            )
            ordered_result = ordered_df.collect()
            assert ordered_result[0]["NaMe"] == "Charlie"
            assert ordered_result[0]["value"] == 30
        finally:
            spark.stop()

    def test_groupby_after_select_with_ambiguous_column(self):
        """Test groupBy works after selecting ambiguous column."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame(
                [
                    {"name": "Alice", "dept": "IT", "salary": 100},
                    {"name": "Bob", "dept": "IT", "salary": 200},
                    {"name": "Charlie", "dept": "HR", "salary": 150},
                ]
            )
            df2 = spark.createDataFrame(
                [
                    {"NAME": "Alice", "bonus": 10},
                    {"NAME": "Bob", "bonus": 20},
                    {"NAME": "Charlie", "bonus": 15},
                ]
            )

            # Join, select with different case, then groupBy
            df = (
                df1.join(df2, on="Name", how="left")
                .select("NaMe", "dept", "salary", "bonus")
                .groupBy("dept")
                .agg(
                    F.sum("salary").alias("total_salary"),
                    F.sum("bonus").alias("total_bonus"),
                )
            )

            result = df.collect()
            assert len(result) == 2
            # Verify we can still access the grouped column
            dept_names = [row["dept"] for row in result]
            assert "IT" in dept_names
            assert "HR" in dept_names
        finally:
            spark.stop()

    def test_single_match_preserves_original_name(self):
        """Test that single match preserves original column name (not requested name)."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            # Create DataFrame with only one column (no ambiguity)
            df = spark.createDataFrame([{"Name": "Alice", "Age": 25}])

            # Select with different case - should use original name
            result = df.select("name", "age").collect()

            assert len(result) == 1
            # When there's only one match, original column name is preserved
            assert (
                "Name" in df.select("name").columns
                or "name" in df.select("name").columns
            )
            # The value should be correct
            row = result[0]
            # Check that we can access the data (column name might be Name or name depending on implementation)
            assert row[df.select("name").columns[0]] == "Alice"
        finally:
            spark.stop()

    def test_multiple_matches_uses_requested_name(self):
        """Test that multiple matches use the requested column name."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame([{"name": "Alice"}])
            df2 = spark.createDataFrame([{"NAME": "Bob"}])

            # After join, both "name" and "NAME" exist
            df = df1.join(df2, on="Name", how="left")

            # Select with different case - should use requested name
            result = df.select("NaMe").collect()

            assert len(result) == 1
            # Output column should be "NaMe" (requested name)
            assert "NaMe" in df.select("NaMe").columns
            # Value should come from first match ("name" from left DataFrame)
            assert result[0]["NaMe"] == "Alice"
        finally:
            spark.stop()

    def test_empty_dataframes(self):
        """Test that the fix works with empty DataFrames."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            StructType = _imports.StructType
            StructField = _imports.StructField
            StringType = _imports.StringType
            IntegerType = _imports.IntegerType

            schema1 = StructType(
                [
                    StructField("name", StringType()),
                    StructField("value", IntegerType()),
                ]
            )
            schema2 = StructType(
                [
                    StructField("NAME", StringType()),
                    StructField("score", IntegerType()),
                ]
            )

            df1 = spark.createDataFrame([], schema=schema1)
            df2 = spark.createDataFrame([], schema=schema2)

            # Join empty DataFrames and select with different case
            df = df1.join(df2, on="Name", how="left").select("NaMe", "value", "score")

            result = df.collect()
            assert len(result) == 0
            # Verify columns exist even if empty
            assert "NaMe" in df.columns or len(df.columns) > 0
        finally:
            spark.stop()

    def test_null_values_in_joined_columns(self):
        """Test that null values are handled correctly."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame(
                [
                    {"name": "Alice", "value": 1},
                    {"name": None, "value": 2},
                ]
            )
            df2 = spark.createDataFrame(
                [
                    {"NAME": "Alice", "score": 100},
                    {"NAME": None, "score": 200},
                ]
            )

            # Join and select with different case
            df = df1.join(df2, on="Name", how="left").select("NaMe", "value", "score")

            result = df.collect()
            assert len(result) == 2
            # First row should match
            assert result[0]["NaMe"] == "Alice"
            # Second row might have nulls depending on join behavior
        finally:
            spark.stop()

    def test_different_case_variations(self):
        """Test various case combinations."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame([{"name": "Alice"}])
            df2 = spark.createDataFrame([{"NAME": "Bob"}])

            df = df1.join(df2, on="Name", how="left")

            # Test various case variations
            test_cases = ["NaMe", "nAmE", "NAME", "name", "Name"]
            for case_variant in test_cases:
                result = df.select(case_variant).collect()
                assert len(result) == 1
                # All should work and pick the first match
                assert result[0][case_variant] == "Alice"
        finally:
            spark.stop()

    def test_with_column_after_select(self):
        """Test withColumn works after selecting ambiguous column."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame([{"name": "Alice", "value": 10}])
            df2 = spark.createDataFrame(
                [{"NAME": "Alice", "score": 20}]
            )  # Match on Alice

            # Join, select with different case, then add column
            df = (
                df1.join(df2, on="Name", how="left")
                .select("NaMe", "value", "score")
                .withColumn("total", F.col("value") + F.col("score"))
            )

            result = df.collect()
            assert len(result) == 1
            assert "total" in df.columns
            assert result[0]["total"] == 30
            assert result[0]["NaMe"] == "Alice"
        finally:
            spark.stop()

    def test_drop_after_select(self):
        """Test drop works after selecting ambiguous column."""
        spark = SparkSession.builder.appName("issue-297").getOrCreate()
        try:
            df1 = spark.createDataFrame([{"name": "Alice", "value": 10, "extra": 1}])
            df2 = spark.createDataFrame([{"NAME": "Bob", "score": 20}])

            # Join, select with different case, then drop
            df = (
                df1.join(df2, on="Name", how="left")
                .select("NaMe", "value", "score", "extra")
                .drop("extra")
            )

            result = df.collect()
            assert len(result) == 1
            assert "NaMe" in df.columns
            assert "extra" not in df.columns
        finally:
            spark.stop()