import pytest
from sparkless.testing import get_imports
class TestIssue270TupleDataFrame:
def test_tuple_data_with_structtype_schema(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1), ("Bob", 2)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
df.show()
rows = df.collect()
assert len(rows) == 2
assert rows[0]["Name"] == "Alice"
assert rows[0]["Value"] == 1
assert rows[1]["Name"] == "Bob"
assert rows[1]["Value"] == 2
def test_tuple_data_show_works(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1), ("Bob", 2)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
try:
df.show()
except AttributeError as e:
if "'tuple' object has no attribute" in str(e):
pytest.fail(f"show() failed with tuple error: {e}")
def test_tuple_data_unionByName_works(self, spark):
imports = get_imports()
T = imports
data1 = [("Alice", 1), ("Bob", 2)]
data2 = [("Charlie", 3)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df1 = spark.createDataFrame(data=data1, schema=schema)
df2 = spark.createDataFrame(data=data2, schema=schema)
result = df1.unionByName(df2)
assert result.count() == 3
def test_tuple_data_operations_work(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1, "IT"), ("Bob", 2, "HR")]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
T.StructField(name="Dept", dataType=T.StringType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
df_filled = df.fillna({"Value": 0})
assert df_filled.count() == 2
df_replaced = df.replace({"IT": "Engineering"})
assert df_replaced.count() == 2
result = df.select("Name", "Value").collect()
assert len(result) == 2
def test_mixed_tuple_and_dict_data(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1), {"Name": "Bob", "Value": 2}]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 2
def test_tuple_data_single_column(self, spark):
imports = get_imports()
T = imports
data = [("Alice",), ("Bob",)]
schema = T.StructType([T.StructField(name="Name", dataType=T.StringType())])
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 2
df.show()
def test_tuple_data_mismatched_length(self, spark):
imports = get_imports()
T = imports
data = [("Alice",), ("Bob", 2)] schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
with pytest.raises(Exception) as exc_info:
spark.createDataFrame(data=data, schema=schema)
assert (
"LENGTH_SHOULD_BE_THE_SAME" in str(exc_info.value)
or "length" in str(exc_info.value).lower()
)
def test_tuple_data_empty_schema(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1), ("Bob", 2)]
schema = T.StructType([])
with pytest.raises(Exception) as exc_info:
spark.createDataFrame(data=data, schema=schema)
assert (
"LENGTH_SHOULD_BE_THE_SAME" in str(exc_info.value)
or "length" in str(exc_info.value).lower()
)
def test_list_data_with_structtype_schema(self, spark):
imports = get_imports()
T = imports
data = [["Alice", 1], ["Bob", 2]]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
df.show()
def test_pyspark_parity_exact_example(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1), ("Bob", 2)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
df.show()
rows = df.collect()
assert len(rows) == 2
assert rows[0]["Name"] == "Alice"
assert rows[0]["Value"] == 1
assert rows[1]["Name"] == "Bob"
assert rows[1]["Value"] == 2
def test_tuple_with_none_values(self, spark):
imports = get_imports()
T = imports
data = [("Alice", None), ("Bob", 2), (None, 3)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 3
df.show()
rows = df.collect()
assert rows[0]["Value"] is None
assert rows[2]["Name"] is None
def test_tuple_with_different_data_types(self, spark):
imports = get_imports()
T = imports
data = [
("Alice", 25, 75000.50, True, "2024-01-01"),
("Bob", 30, 80000.75, False, "2024-02-01"),
]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Age", dataType=T.IntegerType()),
T.StructField(name="Salary", dataType=T.DoubleType()),
T.StructField(name="Active", dataType=T.BooleanType()),
T.StructField(name="Date", dataType=T.StringType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 2
df.show()
df.select("Name", "Salary").show()
df.filter(df["Age"] > 25).show()
df.withColumn("SalaryK", df["Salary"] / 1000).show()
def test_tuple_data_with_long_schema(self, spark):
imports = get_imports()
T = imports
data = [tuple(range(10)), tuple(range(10, 20))]
schema = T.StructType(
[
T.StructField(name=f"col_{i}", dataType=T.IntegerType())
for i in range(10)
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 2
assert len(df.columns) == 10
df.show()
def test_tuple_data_single_row(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 1
df.show()
def test_tuple_data_empty_dataframe_with_schema(self, spark):
imports = get_imports()
T = imports
data = []
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 0
assert len(df.columns) == 2
df.show()
def test_tuple_data_mixed_with_row_objects(self, spark):
imports = get_imports()
T = imports
Row = imports.Row
data = [("Alice", 1), Row(Name="Bob", Value=2), ("Charlie", 3)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 3
df.show()
def test_tuple_data_operations_comprehensive(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1, "IT"), ("Bob", 2, "HR"), ("Charlie", 3, "IT")]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
T.StructField(name="Dept", dataType=T.StringType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
df_filled = df.fillna({"Value": 0}, subset=["Value"])
assert df_filled.count() == 3
df_replaced = df.replace({"IT": "Engineering"})
assert df_replaced.count() == 3
df_with_nulls = spark.createDataFrame(
[("Alice", None, "IT"), ("Bob", 2, "HR")], schema
)
df_dropped = df_with_nulls.dropna(subset=["Value"])
assert df_dropped.count() == 1
grouped = df.groupBy("Dept").count()
assert grouped.count() == 2
ordered = df.orderBy("Value")
assert ordered.count() == 3
distinct_depts = df.select("Dept").distinct()
assert distinct_depts.count() == 2
def test_tuple_data_union_operations(self, spark):
imports = get_imports()
T = imports
data1 = [("Alice", 1), ("Bob", 2)]
data2 = [("Charlie", 3), ("Diana", 4)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df1 = spark.createDataFrame(data=data1, schema=schema)
df2 = spark.createDataFrame(data=data2, schema=schema)
unioned = df1.unionByName(df2)
assert unioned.count() == 4
unioned2 = df1.union(df2)
assert unioned2.count() == 4
def test_tuple_data_join_operations(self, spark):
imports = get_imports()
T = imports
employees = [("Alice", 1, "IT"), ("Bob", 2, "HR")]
departments = [("IT", "Engineering"), ("HR", "Human Resources")]
emp_schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Id", dataType=T.IntegerType()),
T.StructField(name="Dept", dataType=T.StringType()),
]
)
dept_schema = T.StructType(
[
T.StructField(name="Dept", dataType=T.StringType()),
T.StructField(name="Name", dataType=T.StringType()),
]
)
df_emp = spark.createDataFrame(data=employees, schema=emp_schema)
df_dept = spark.createDataFrame(data=departments, schema=dept_schema)
joined = df_emp.join(df_dept, "Dept", "inner")
assert joined.count() == 2
def test_tuple_data_error_message_matches_pyspark(self, spark):
imports = get_imports()
T = imports
data = [("Alice",), ("Bob", 2)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
with pytest.raises(Exception) as exc_info:
spark.createDataFrame(data=data, schema=schema)
error_msg = str(exc_info.value)
assert "LENGTH_SHOULD_BE_THE_SAME" in error_msg or "length" in error_msg.lower()
assert "1" in error_msg assert "2" in error_msg
data2 = [("Alice", 1, 100), ("Bob", 2)]
with pytest.raises(Exception) as exc_info2:
spark.createDataFrame(data=data2, schema=schema)
error_msg2 = str(exc_info2.value)
assert (
"LENGTH_SHOULD_BE_THE_SAME" in error_msg2 or "length" in error_msg2.lower()
)
assert "3" in error_msg2 assert "2" in error_msg2
def test_tuple_data_all_operations_from_issue(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1), ("Bob", 2)]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Value", dataType=T.IntegerType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
df.show()
df2 = spark.createDataFrame([("Charlie", 3)], schema)
unioned = df.unionByName(df2)
assert unioned.count() == 3
result = df.select("Name").collect()
assert len(result) == 2
def test_tuple_data_with_array_type(self, spark):
imports = get_imports()
T = imports
data = [("Alice", [1, 2, 3]), ("Bob", [4, 5])]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Values", dataType=T.ArrayType(T.IntegerType())),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.count() == 2
df.show()
def test_tuple_data_preserves_order(self, spark):
imports = get_imports()
T = imports
data = [("Alice", 1, "IT")]
schema = T.StructType(
[
T.StructField(name="Name", dataType=T.StringType()),
T.StructField(name="Id", dataType=T.IntegerType()),
T.StructField(name="Dept", dataType=T.StringType()),
]
)
df = spark.createDataFrame(data=data, schema=schema)
assert df.columns == ["Name", "Id", "Dept"]