import pytest
from tests.tools.parity_base import ParityTestBase
from sparkless.testing import Mode, get_mode
def _is_pyspark_mode() -> bool:
backend = get_mode()
return backend == Mode.PYSPARK
class TestSQLDMLParity(ParityTestBase):
def test_insert_into_table(self, spark):
data = [("Alice", 25)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("insert_test")
spark.sql("INSERT INTO insert_test VALUES ('Bob', 30)")
result = spark.sql("SELECT * FROM insert_test ORDER BY name")
rows = result.collect()
assert len(rows) == 2
assert rows[0]["name"] == "Alice"
assert rows[1]["name"] == "Bob"
spark.sql("DROP TABLE IF EXISTS insert_test")
def test_insert_into_specific_columns(self, spark):
data = [("Alice", 25, "IT")]
df = spark.createDataFrame(data, ["name", "age", "dept"])
df.write.mode("overwrite").saveAsTable("insert_specific")
spark.sql("INSERT INTO insert_specific (name, age) VALUES ('Bob', 30)")
result = spark.sql("SELECT * FROM insert_specific ORDER BY name")
rows = result.collect()
assert len(rows) == 2
assert rows[1]["name"] == "Bob"
assert rows[1]["age"] == 30
spark.sql("DROP TABLE IF EXISTS insert_specific")
def test_insert_multiple_values(self, spark):
data = [("Alice", 25)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("insert_multi")
spark.sql("INSERT INTO insert_multi VALUES ('Bob', 30), ('Charlie', 35)")
result = spark.sql("SELECT * FROM insert_multi ORDER BY name")
rows = result.collect()
assert len(rows) == 3
spark.sql("DROP TABLE IF EXISTS insert_multi")
@pytest.mark.skipif(
_is_pyspark_mode(),
reason="UPDATE TABLE is not supported in PySpark - this is a sparkless-specific feature",
)
def test_update_table(self, spark):
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("update_test")
spark.sql("UPDATE update_test SET age = 26 WHERE name = 'Alice'")
result = spark.sql("SELECT * FROM update_test WHERE name = 'Alice'")
rows = result.collect()
assert len(rows) == 1
assert rows[0]["age"] == 26
spark.sql("DROP TABLE IF EXISTS update_test")
@pytest.mark.skipif(
_is_pyspark_mode(),
reason="UPDATE TABLE is not supported in PySpark - this is a sparkless-specific feature",
)
def test_update_multiple_columns(self, spark):
data = [("Alice", 25, "IT")]
df = spark.createDataFrame(data, ["name", "age", "dept"])
df.write.mode("overwrite").saveAsTable("update_multi")
spark.sql("UPDATE update_multi SET age = 26, dept = 'HR' WHERE name = 'Alice'")
result = spark.sql("SELECT * FROM update_multi WHERE name = 'Alice'")
rows = result.collect()
assert len(rows) == 1
assert rows[0]["age"] == 26
assert rows[0]["dept"] == "HR"
spark.sql("DROP TABLE IF EXISTS update_multi")
@pytest.mark.skipif(
_is_pyspark_mode(),
reason="DELETE FROM TABLE is not supported in PySpark - this is a sparkless-specific feature",
)
def test_delete_from_table(self, spark):
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("delete_test")
spark.sql("DELETE FROM delete_test WHERE age > 30")
result = spark.sql("SELECT * FROM delete_test ORDER BY name")
rows = result.collect()
assert len(rows) == 2
assert rows[0]["name"] == "Alice"
assert rows[1]["name"] == "Bob"
spark.sql("DROP TABLE IF EXISTS delete_test")
@pytest.mark.skipif(
_is_pyspark_mode(),
reason="DELETE FROM TABLE is not supported in PySpark - this is a sparkless-specific feature",
)
def test_delete_all_rows(self, spark):
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("delete_all")
spark.sql("DELETE FROM delete_all")
result = spark.sql("SELECT * FROM delete_all")
assert result.count() == 0
spark.sql("DROP TABLE IF EXISTS delete_all")
def test_insert_from_select(self, spark):
data = [("Alice", 25, "IT"), ("Bob", 30, "HR"), ("Charlie", 35, "IT")]
df = spark.createDataFrame(data, ["name", "age", "dept"])
df.write.mode("overwrite").saveAsTable("source_table")
empty_df = spark.createDataFrame([], "name string, age int")
empty_df.write.mode("overwrite").saveAsTable("target_table")
spark.sql(
"INSERT INTO target_table SELECT name, age FROM source_table WHERE dept = 'IT'"
)
result = spark.sql("SELECT * FROM target_table ORDER BY name")
rows = result.collect()
assert len(rows) == 2
spark.sql("DROP TABLE IF EXISTS source_table")
spark.sql("DROP TABLE IF EXISTS target_table")