import subprocess
import sys
import tempfile
import os
def run_query(query, data_file=None):
cmd = ["./target/release/sql-cli"]
if data_file:
cmd.append(data_file)
cmd.extend(["-q", query, "-o", "csv"])
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode, result.stdout, result.stderr
def test_simple_nested_cte():
print("Testing simple nested CTE...")
query = """
SELECT * FROM (
WITH inner AS (SELECT 1 as value)
SELECT * FROM inner
) t
"""
returncode, stdout, stderr = run_query(query)
if returncode != 0:
print(f"✗ Simple nested CTE failed: {stderr}")
return False
if "value" in stdout and "1" in stdout:
print("✓ Simple nested CTE works")
return True
else:
print(f"✗ Simple nested CTE returned unexpected output: {stdout}")
return False
def test_nested_cte_with_data():
print("\nTesting nested CTE with data file...")
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("id,name,value\n")
f.write("1,Alice,100\n")
f.write("2,Bob,200\n")
f.write("3,Charlie,300\n")
data_file = f.name
try:
query = """
SELECT * FROM (
WITH summary AS (
SELECT id, name, value
FROM test
)
SELECT * FROM summary
) t
WHERE value > 150
"""
returncode, stdout, stderr = run_query(query, data_file)
if returncode != 0:
print(f"✗ Nested CTE with data failed: {stderr}")
return False
if "Bob" in stdout and "Charlie" in stdout and "Alice" not in stdout:
print("✓ Nested CTE with data works")
return True
else:
print(f"✗ Unexpected output: {stdout}")
return False
finally:
os.unlink(data_file)
def test_deeply_nested_ctes():
print("\nTesting deeply nested CTEs...")
query = """
SELECT * FROM (
WITH level3 AS (
SELECT * FROM (
WITH level2 AS (
SELECT * FROM (
WITH level1 AS (
SELECT 1 as a, 2 as b
)
SELECT a + b as result FROM level1
) t1
)
SELECT result * 2 as doubled FROM level2
) t2
)
SELECT doubled + 10 as final FROM level3
) t3
"""
returncode, stdout, stderr = run_query(query)
if returncode != 0:
print(f"✗ Deeply nested CTEs failed: {stderr}")
return False
if "final" in stdout and "16" in stdout:
print("✓ Deeply nested CTEs work")
return True
else:
print(f"✗ Unexpected output: {stdout}")
return False
def test_trade_reconciliation_pattern():
print("\nTesting trade reconciliation pattern...")
with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f:
f.write("DealType,PlatformOrderId,DealId,Environment\n")
f.write("Swap,ORD-001,ORD-001,PROD\n")
f.write("Swap,TT001|ORD-001,TT001,UAT\n")
f.write("Forward,ORD-002,ORD-002,PROD\n")
f.write("Forward,TT002|ORD-002,TT002,UAT\n")
data_file = f.name
try:
query = """
SELECT deal_type, order_id, prod_deal_id
FROM (
WITH extracted AS (
SELECT
DealType as deal_type,
CASE
WHEN CONTAINS(PlatformOrderId, '|') THEN SUBSTRING_AFTER(PlatformOrderId, '|', 1)
ELSE PlatformOrderId
END as order_id,
DealId as prod_deal_id,
Environment
FROM test
)
SELECT * FROM extracted
) t
WHERE Environment = 'PROD'
ORDER BY deal_type
"""
returncode, stdout, stderr = run_query(query, data_file)
if returncode != 0:
print(f"✗ Trade reconciliation pattern failed: {stderr}")
print(f"Query was: {query}")
return False
if "ORD-001" in stdout and "ORD-002" in stdout:
print("✓ Trade reconciliation pattern works")
return True
else:
print(f"✗ Unexpected output: {stdout}")
return False
finally:
os.unlink(data_file)
def main():
print("=" * 60)
print("Testing Nested CTEs and Automatic CTE Hoisting")
print("=" * 60)
print("\nBuilding project...")
result = subprocess.run(["cargo", "build", "--release"], capture_output=True)
if result.returncode != 0:
print("Failed to build project")
sys.exit(1)
tests = [
test_simple_nested_cte,
test_nested_cte_with_data,
test_deeply_nested_ctes,
test_trade_reconciliation_pattern
]
passed = 0
failed = 0
for test in tests:
try:
if test():
passed += 1
else:
failed += 1
except Exception as e:
print(f"✗ Test {test.__name__} raised exception: {e}")
failed += 1
print("\n" + "=" * 60)
print(f"Results: {passed} passed, {failed} failed")
print("=" * 60)
sys.exit(0 if failed == 0 else 1)
if __name__ == "__main__":
main()