import pytest
from tests.tools.parity_base import ParityTestBase
from sparkless.testing import Mode, get_mode
def _is_pyspark_mode() -> bool:
backend = get_mode()
return backend == Mode.PYSPARK
class TestSQLDDLParity(ParityTestBase):
def test_create_database(self, spark):
spark.sql("CREATE DATABASE IF NOT EXISTS test_db")
databases = spark.catalog.listDatabases()
db_names = [db.name for db in databases]
assert "test_db" in db_names
spark.sql("DROP DATABASE IF EXISTS test_db")
def test_create_database_if_not_exists(self, spark):
spark.sql("CREATE DATABASE IF NOT EXISTS test_db2")
spark.sql("CREATE DATABASE IF NOT EXISTS test_db2")
databases = spark.catalog.listDatabases()
db_names = [db.name for db in databases]
assert "test_db2" in db_names
spark.sql("DROP DATABASE IF EXISTS test_db2")
def test_drop_database(self, spark):
spark.sql("CREATE DATABASE IF NOT EXISTS test_db3")
spark.sql("DROP DATABASE IF EXISTS test_db3")
databases = spark.catalog.listDatabases()
db_names = [db.name for db in databases]
assert "test_db3" not in db_names
def test_create_table_from_dataframe(self, spark):
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("test_users")
assert spark.catalog.tableExists("test_users")
result = spark.sql("SELECT * FROM test_users")
assert result.count() == 3
spark.sql("DROP TABLE IF EXISTS test_users")
@pytest.mark.skipif(
_is_pyspark_mode(),
reason="CREATE TABLE AS SELECT requires Hive support in PySpark, which is not enabled by default",
)
def test_create_table_with_select(self, spark):
data = [("Alice", 25, "IT"), ("Bob", 30, "HR"), ("Charlie", 35, "IT")]
df = spark.createDataFrame(data, ["name", "age", "dept"])
df.write.mode("overwrite").saveAsTable("employees")
spark.sql(
"CREATE TABLE IF NOT EXISTS it_employees AS SELECT name, age FROM employees WHERE dept = 'IT'"
)
assert spark.catalog.tableExists("it_employees")
result = spark.sql("SELECT * FROM it_employees")
assert result.count() == 2
spark.sql("DROP TABLE IF EXISTS employees")
spark.sql("DROP TABLE IF EXISTS it_employees")
def test_drop_table(self, spark):
data = [("Alice", 25)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("temp_table")
assert spark.catalog.tableExists("temp_table")
spark.sql("DROP TABLE IF EXISTS temp_table")
assert not spark.catalog.tableExists("temp_table")
def test_drop_table_if_exists(self, spark):
spark.sql("DROP TABLE IF EXISTS non_existent_table")
data = [("Alice", 25)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("temp_table2")
spark.sql("DROP TABLE IF EXISTS temp_table2")
assert not spark.catalog.tableExists("temp_table2")
def test_create_schema(self, spark):
spark.sql("CREATE SCHEMA IF NOT EXISTS test_schema")
databases = spark.catalog.listDatabases()
db_names = [db.name for db in databases]
assert "test_schema" in db_names
spark.sql("DROP SCHEMA IF EXISTS test_schema")
def test_set_current_database(self, spark):
spark.sql("CREATE DATABASE IF NOT EXISTS test_current_db")
spark.catalog.setCurrentDatabase("test_current_db")
data = [("Alice", 25)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").saveAsTable("current_db_table")
assert spark.catalog.tableExists("current_db_table", "test_current_db")
spark.sql("DROP TABLE IF EXISTS test_current_db.current_db_table")
spark.sql("DROP DATABASE IF EXISTS test_current_db")
def test_table_in_specific_database(self, spark):
spark.sql("CREATE DATABASE IF NOT EXISTS test_db_specific")
data = [("Alice", 25)]
df = spark.createDataFrame(data, ["name", "age"])
df.write.mode("overwrite").option("path", "/tmp/test_path").saveAsTable(
"test_db_specific.specific_table"
)
assert spark.catalog.tableExists("specific_table", "test_db_specific")
spark.sql("DROP TABLE IF EXISTS test_db_specific.specific_table")
spark.sql("DROP DATABASE IF EXISTS test_db_specific")