from tests.tools.parity_base import ParityTestBase
class TestTableAppendPersistence(ParityTestBase):
def test_append_data_visible_immediately(self, spark, table_prefix):
schema = f"sch_{table_prefix.replace('-', '_')}"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
data1 = [{"id": 1, "name": "test1"}]
df1 = spark.createDataFrame(data1, "id int, name string")
df1.write.mode("overwrite").saveAsTable(f"{schema}.test_table")
result1 = spark.table(f"{schema}.test_table")
assert result1.count() == 1, "Initial table should have 1 row"
data2 = [{"id": 2, "name": "test2"}]
df2 = spark.createDataFrame(data2, "id int, name string")
df2.write.mode("append").saveAsTable(f"{schema}.test_table")
result2 = spark.table(f"{schema}.test_table")
count = result2.count()
assert count == 2, (
f"Table should have 2 rows after append, got {count}. "
"This verifies fix for issue #112."
)
rows = result2.collect()
assert len(rows) == 2
assert rows[0]["id"] in [1, 2]
assert rows[1]["id"] in [1, 2]
assert {row["id"] for row in rows} == {1, 2}
def test_append_to_new_table(self, spark, table_prefix):
schema = f"sch_{table_prefix.replace('-', '_')}"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
data1 = [{"id": 1, "name": "test1"}]
df1 = spark.createDataFrame(data1, "id int, name string")
df1.write.mode("append").saveAsTable(f"{schema}.new_table")
result = spark.table(f"{schema}.new_table")
assert result.count() == 1, "New table created by append should have 1 row"
data2 = [{"id": 2, "name": "test2"}]
df2 = spark.createDataFrame(data2, "id int, name string")
df2.write.mode("append").saveAsTable(f"{schema}.new_table")
result2 = spark.table(f"{schema}.new_table")
assert result2.count() == 2, "Table should have 2 rows after second append"
def test_multiple_append_operations(self, spark, table_prefix):
schema = f"sch_{table_prefix.replace('-', '_')}"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
data1 = [{"id": 1, "value": "a"}]
df1 = spark.createDataFrame(data1, "id int, value string")
df1.write.mode("overwrite").saveAsTable(f"{schema}.multi_append")
for i in range(2, 6):
data = [{"id": i, "value": chr(ord("a") + i - 1)}]
df = spark.createDataFrame(data, "id int, value string")
df.write.mode("append").saveAsTable(f"{schema}.multi_append")
result = spark.table(f"{schema}.multi_append")
assert result.count() == i, (
f"After {i - 1} appends, table should have {i} rows, got {result.count()}"
)
result = spark.table(f"{schema}.multi_append")
assert result.count() == 5, "Final table should have 5 rows"
rows = result.collect()
ids = {row["id"] for row in rows}
assert ids == {1, 2, 3, 4, 5}, "All rows should be present"