import subprocess
import json
import csv
import os
import tempfile
import pytest
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.parent
SQL_CLI = PROJECT_ROOT / "target" / "release" / "sql-cli"
if not SQL_CLI.exists():
SQL_CLI = PROJECT_ROOT / "target" / "debug" / "sql-cli"
def run_query(csv_file, query):
cmd = [str(SQL_CLI), csv_file, "-q", query, "-o", "json"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Query failed: {result.stderr}")
lines = result.stdout.strip().split('\n')
json_lines = [l for l in lines if not l.startswith('#')]
if not json_lines:
return []
return json.loads(''.join(json_lines))
class TestUnnest:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.fix_file = str(PROJECT_ROOT / "data" / "fix_allocations.csv")
cls.edge_case_file = os.path.join(cls.temp_dir, "unnest_edge_cases.csv")
with open(cls.edge_case_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'tags', 'values'])
writer.writerow(['1', 'A|B|C', '1,2,3'])
writer.writerow(['2', 'X|Y', '10,20,30']) writer.writerow(['3', 'Z', '100']) writer.writerow(['4', '', ''])
@classmethod
def teardown_class(cls):
import shutil
if os.path.exists(cls.temp_dir):
shutil.rmtree(cls.temp_dir)
def test_single_unnest(self):
query = """
SELECT order_id, UNNEST(accounts, '|') AS account
FROM fix_allocations
WHERE order_id = 'ORD001'
"""
results = run_query(self.fix_file, query)
assert len(results) == 3
assert results[0]['order_id'] == 'ORD001'
assert results[0]['account'] == 'ACC_1'
assert results[1]['account'] == 'ACC_2'
assert results[2]['account'] == 'ACC_3'
def test_multiple_unnest_matching_lengths(self):
query = """
SELECT
order_id,
UNNEST(accounts, '|') AS account,
UNNEST(amounts, ',') AS amount
FROM fix_allocations
WHERE order_id = 'ORD001'
"""
results = run_query(self.fix_file, query)
assert len(results) == 3
assert results[0]['order_id'] == 'ORD001'
assert results[0]['account'] == 'ACC_1'
assert results[0]['amount'] == '200'
assert results[1]['account'] == 'ACC_2'
assert results[1]['amount'] == '200'
assert results[2]['account'] == 'ACC_3'
assert results[2]['amount'] == '200'
def test_multiple_unnest_mismatched_lengths(self):
query = """
SELECT
id,
UNNEST(tags, '|') AS tag,
UNNEST(values, ',') AS value
FROM unnest_edge_cases
WHERE id = '2'
"""
results = run_query(self.edge_case_file, query)
assert len(results) == 3
assert results[0]['tag'] == 'X'
assert results[0]['value'] == '10'
assert results[1]['tag'] == 'Y'
assert results[1]['value'] == '20'
assert results[2]['tag'] is None
assert results[2]['value'] == '30'
def test_unnest_all_rows(self):
query = """
SELECT
order_id,
symbol,
UNNEST(accounts, '|') AS account,
UNNEST(amounts, ',') AS amount
FROM fix_allocations
"""
results = run_query(self.fix_file, query)
assert len(results) == 6
ord001_rows = [r for r in results if r['order_id'] == 'ORD001']
assert len(ord001_rows) == 3
assert all(r['symbol'] == 'ZX5Y' for r in ord001_rows)
ord002_rows = [r for r in results if r['order_id'] == 'ORD002']
assert len(ord002_rows) == 2
assert all(r['symbol'] == 'ABCD' for r in ord002_rows)
def test_unnest_with_order_by(self):
query = """
SELECT
UNNEST(accounts, '|') AS account,
order_id
FROM fix_allocations
ORDER BY account
"""
results = run_query(self.fix_file, query)
assert len(results) == 6
accounts = [r['account'] for r in results]
assert accounts == ['ACC_1', 'ACC_1', 'ACC_2', 'ACC_3', 'ACC_4', 'ACC_5']
def test_unnest_with_where_filter(self):
query = """
SELECT
order_id,
UNNEST(accounts, '|') AS account
FROM fix_allocations
WHERE msg_type = 'AS'
"""
results = run_query(self.fix_file, query)
assert len(results) == 5
assert all(r['order_id'] in ['ORD001', 'ORD002'] for r in results)
def test_unnest_single_item(self):
query = """
SELECT
UNNEST(accounts, '|') AS account,
UNNEST(amounts, ',') AS amount
FROM fix_allocations
WHERE order_id = 'ORD003'
"""
results = run_query(self.fix_file, query)
assert len(results) == 1
assert results[0]['account'] == 'ACC_1'
assert results[0]['amount'] == '1000'
def test_unnest_preserves_regular_columns(self):
query = """
SELECT
msg_type,
order_id,
symbol,
UNNEST(accounts, '|') AS account
FROM fix_allocations
WHERE order_id = 'ORD001'
"""
results = run_query(self.fix_file, query)
assert len(results) == 3
assert all(r['msg_type'] == 'AS' for r in results)
assert all(r['order_id'] == 'ORD001' for r in results)
assert all(r['symbol'] == 'ZX5Y' for r in results)
assert results[0]['account'] == 'ACC_1'
assert results[1]['account'] == 'ACC_2'
assert results[2]['account'] == 'ACC_3'
def test_unnest_different_delimiters(self):
query = """
SELECT
UNNEST(tags, '|') AS tag,
UNNEST(values, ',') AS value
FROM unnest_edge_cases
WHERE id = '1'
"""
results = run_query(self.edge_case_file, query)
assert len(results) == 3
assert results[0]['tag'] == 'A'
assert results[0]['value'] == '1'
assert results[1]['tag'] == 'B'
assert results[1]['value'] == '2'
assert results[2]['tag'] == 'C'
assert results[2]['value'] == '3'
if __name__ == '__main__':
pytest.main([__file__, '-v'])