from __future__ import annotations
from datetime import date, datetime
import pytest
from tests.utils import _row_to_dict, assert_rows_equal
def test_create_data_frame_list_of_dicts_schema_inferred(spark) -> None:
data = [{"name": "Alice", "age": 25}, {"name": "Bob", "age": 30}]
df = spark.createDataFrame(data)
rows = df.collect()
assert len(rows) == 2
assert rows[0]["name"] == "Alice" and rows[0]["age"] == 25
assert rows[1]["name"] == "Bob" and rows[1]["age"] == 30
def test_create_data_frame_list_of_tuples_with_column_names(spark) -> None:
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data, ["name", "age"])
rows = df.collect()
assert len(rows) == 2
assert rows[0]["name"] == "Alice" and rows[0]["age"] == 25
assert rows[1]["name"] == "Bob" and rows[1]["age"] == 30
def test_create_data_frame_list_of_tuples_schema_inferred_default_names(spark) -> None:
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data)
rows = df.collect()
assert len(rows) == 2
assert rows[0]["_1"] == "Alice" and rows[0]["_2"] == 25
assert rows[1]["_1"] == "Bob" and rows[1]["_2"] == 30
def test_create_data_frame_schema_list_of_tuples_name_type(spark) -> None:
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data, "name string, age bigint")
rows = df.collect()
assert len(rows) == 2
assert rows[0]["name"] == "Alice" and rows[0]["age"] == 25
def test_create_data_frame_empty_data_no_schema(spark) -> None:
with pytest.raises(Exception, match="CANNOT_INFER_EMPTY_SCHEMA"):
spark.createDataFrame([])
def test_create_data_frame_empty_data_with_schema(spark) -> None:
with pytest.raises(Exception, match="CANNOT_INFER_EMPTY_SCHEMA"):
spark.createDataFrame([], ["name", "age"])
def test_create_data_frame_dict_order_from_schema(spark) -> None:
data = [{"age": 25, "name": "Alice"}, {"age": 30, "name": "Bob"}]
df = spark.createDataFrame(data, ["name", "age"])
assert list(df.columns) == ["name", "age"]
assert df.count() == 2
def test_create_data_frame_tuples_with_column_names(spark) -> None:
data = [(1, 25, "Alice"), (2, 30, "Bob")]
df = spark.createDataFrame(data, ["id", "age", "name"])
actual = [_row_to_dict(r) for r in df.collect()]
expected = [
{"id": 1, "age": 25, "name": "Alice"},
{"id": 2, "age": 30, "name": "Bob"},
]
assert_rows_equal(actual, expected, order_matters=True)
def test_create_data_frame_ddl_schema_with_data(spark) -> None:
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data, "name: string, age: int")
rows = df.collect()
assert len(rows) == 2
assert rows[0]["name"] == "Alice" and rows[0]["age"] == 25
assert rows[1]["name"] == "Bob" and rows[1]["age"] == 30
def test_create_data_frame_ddl_schema_space_separated(spark) -> None:
data = [("x", 1)]
df = spark.createDataFrame(data, "a string, b int")
rows = df.collect()
assert rows[0]["a"] == "x" and rows[0]["b"] == 1
def test_create_data_frame_empty_data_with_ddl_schema(spark) -> None:
df = spark.createDataFrame([], "name: string, age: int")
assert df.collect() == []
assert list(df.columns) == ["name", "age"]
def test_create_data_frame_accepts_sampling_ratio_none(spark) -> None:
data = [("a", 1)]
df = spark.createDataFrame(data, ["x", "y"], samplingRatio=None)
actual = [_row_to_dict(r) for r in df.collect()]
assert_rows_equal(actual, [{"x": "a", "y": 1}], order_matters=True)
def test_create_data_frame_accepts_verify_schema_true(spark) -> None:
data = [("Alice", 25)]
df = spark.createDataFrame(data, ["name", "age"], verifySchema=True)
actual = [_row_to_dict(r) for r in df.collect()]
assert_rows_equal(actual, [{"name": "Alice", "age": 25}], order_matters=True)
def test_create_data_frame_accepts_verify_schema_false(spark) -> None:
data = [("Bob", 30)]
df = spark.createDataFrame(data, ["name", "age"], verifySchema=False)
actual = [_row_to_dict(r) for r in df.collect()]
assert_rows_equal(actual, [{"name": "Bob", "age": 30}], order_matters=True)
def test_create_data_frame_single_row_dict(spark) -> None:
df = spark.createDataFrame([{"id": 1, "name": "Only"}])
actual = [_row_to_dict(r) for r in df.collect()]
assert_rows_equal(actual, [{"id": 1, "name": "Only"}], order_matters=True)
def test_create_data_frame_single_row_list(spark) -> None:
df = spark.createDataFrame([(10, "x")], ["num", "label"])
actual = [_row_to_dict(r) for r in df.collect()]
assert_rows_equal(actual, [{"num": 10, "label": "x"}], order_matters=True)
def test_create_data_frame_types_none_int_float_bool_str(spark) -> None:
data = [
{"a": None, "b": 42, "c": 3.14, "d": True, "e": "hello"},
{"a": 1, "b": None, "c": None, "d": False, "e": ""},
]
df = spark.createDataFrame(data)
rows = df.collect()
assert rows[0]["a"] is None and rows[0]["b"] == 42 and rows[0]["c"] == 3.14
assert rows[0]["d"] is True and rows[0]["e"] == "hello"
assert rows[1]["a"] == 1 and rows[1]["b"] is None and rows[1]["e"] == ""
def test_create_data_frame_types_date_and_datetime(spark) -> None:
data = [
{"id": 1, "d": date(2025, 2, 10), "ts": datetime(2025, 2, 10, 12, 0, 0)},
{"id": 2, "d": None, "ts": None},
]
df = spark.createDataFrame(data, "id bigint, d date, ts timestamp")
rows = df.collect()
assert len(rows) == 2
assert rows[0]["id"] == 1 and rows[0]["d"] is not None and rows[0]["ts"] is not None
assert rows[1]["id"] == 2 and rows[1]["d"] is None and rows[1]["ts"] is None
def test_create_data_frame_types_list_and_struct(spark) -> None:
data = [
{"id": 1, "arr": [1, 2, 3], "nested": {"k": "v"}},
{"id": 2, "arr": [], "nested": None},
]
df = spark.createDataFrame(
data, "id bigint, arr array<bigint>, nested struct<k:string>"
)
rows = df.collect()
assert len(rows) == 2
assert (
rows[0]["id"] == 1
and rows[0]["arr"] == [1, 2, 3]
and rows[0]["nested"]["k"] == "v"
)
assert rows[1]["id"] == 2 and rows[1]["arr"] == []
def test_create_data_frame_dict_row_missing_key_fills_none(spark) -> None:
data = [{"name": "Alice", "age": 25}, {"name": "Bob"}]
df = spark.createDataFrame(data, ["name", "age"])
rows = df.collect()
assert len(rows) == 2
def test_create_data_frame_dict_column_order_from_schema_not_insertion(spark) -> None:
data = [{"z": 3, "a": 1, "m": 2}]
df = spark.createDataFrame(data, ["a", "m", "z"])
row = _row_to_dict(df.collect()[0])
assert list(row.keys()) == ["a", "m", "z"]
assert row["a"] == 1 and row["m"] == 2 and row["z"] == 3
def test_create_data_frame_many_columns(spark) -> None:
n = 15
names = [f"c{i}" for i in range(n)]
row = tuple(i for i in range(n))
df = spark.createDataFrame([row], names)
out = df.collect()[0]
for i in range(n):
assert out[names[i]] == i
def test_create_data_frame_unicode_column_names(spark) -> None:
data = [{"name": "Alice", "âge": 25}]
df = spark.createDataFrame(data, ["name", "âge"])
actual = [_row_to_dict(r) for r in df.collect()]
assert_rows_equal(actual, [{"name": "Alice", "âge": 25}], order_matters=True)
def test_create_data_frame_invalid_row_type_raises(spark) -> None:
data = [{"a": 1}, 42]
with pytest.raises(Exception):
spark.createDataFrame(data)
def test_create_data_frame_mixed_dict_and_list_rows_raises(spark) -> None:
data = [{"a": 1}, (2,)]
df = spark.createDataFrame(data)
with pytest.raises(Exception):
df.count()
def test_create_data_frame_invalid_schema_type_raises(spark) -> None:
data = [("a", 1)]
try:
spark.createDataFrame(data, schema=123)
assert False, "expected TypeError"
except TypeError:
pass