import subprocess
import json
import csv
import os
import tempfile
import pytest
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.parent
SQL_CLI = PROJECT_ROOT / "target" / "release" / "sql-cli"
if not SQL_CLI.exists():
SQL_CLI = PROJECT_ROOT / "target" / "debug" / "sql-cli"
def run_query(csv_file, query):
cmd = [str(SQL_CLI), csv_file, "-q", query, "-o", "json"]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise Exception(f"Query failed: {result.stderr}")
lines = result.stdout.strip().split('\n')
json_lines = [l for l in lines if not l.startswith('#')]
if not json_lines:
return []
return json.loads(''.join(json_lines))
class TestDeltasFunction:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.series_file = os.path.join(cls.temp_dir, "series.csv")
with open(cls.series_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'value', 'price', 'volume'])
writer.writerow([1, 10, 100.0, 1000])
writer.writerow([2, 15, 102.5, 1200])
writer.writerow([3, 12, 101.0, 900])
writer.writerow([4, 20, 105.0, 1500])
writer.writerow([5, 18, 104.5, 1400])
def test_deltas_single_column(self):
result = run_query(self.series_file,
"SELECT DELTAS(value) as value_deltas FROM series")
assert len(result) == 1
expected = "[10, 5, -3, 8, -2]"
assert result[0]['value_deltas'] == expected
def test_deltas_multiple_values(self):
pass
def test_deltas_float_values(self):
result = run_query(self.series_file,
"SELECT DELTAS(price) as price_deltas FROM series")
assert len(result) == 1
expected = "[100, 2.5, -1.5, 4, -0.5]"
assert result[0]['price_deltas'] == expected
class TestSumsFunction:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.data_file = os.path.join(cls.temp_dir, "sums_data.csv")
with open(cls.data_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'daily_sales', 'returns'])
writer.writerow([1, 100, 5])
writer.writerow([2, 150, 10])
writer.writerow([3, 200, 8])
writer.writerow([4, 120, 15])
writer.writerow([5, 180, 12])
def test_sums_column(self):
result = run_query(self.data_file,
"SELECT SUMS(daily_sales) as cumulative_sales FROM sums_data")
assert len(result) == 1
expected = "[100, 250, 450, 570, 750]"
assert result[0]['cumulative_sales'] == expected
def test_sums_literal_values(self):
pass
class TestMovingAverage:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.stock_file = os.path.join(cls.temp_dir, "stock.csv")
with open(cls.stock_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'closing_price'])
writer.writerow([1, 10])
writer.writerow([2, 20])
writer.writerow([3, 30])
writer.writerow([4, 40])
writer.writerow([5, 50])
writer.writerow([6, 60])
def test_mavg_default_window(self):
result = run_query(self.stock_file,
"SELECT MAVG(closing_price) as ma3 FROM stock")
assert len(result) == 1
expected = "[10, 15, 20, 30, 40, 50]"
assert result[0]['ma3'] == expected
def test_mavg_with_window(self):
pass
class TestPercentageChange:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.growth_file = os.path.join(cls.temp_dir, "growth.csv")
with open(cls.growth_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'revenue'])
writer.writerow([1, 100])
writer.writerow([2, 110])
writer.writerow([3, 121])
writer.writerow([4, 133.1])
writer.writerow([5, 146.41])
def test_pct_change(self):
result = run_query(self.growth_file,
"SELECT PCT_CHANGE(revenue) as growth_rate FROM growth")
assert len(result) == 1
expected = "[null, 10.00%, 10.00%, 10.00%, 10.00%]"
assert result[0]['growth_rate'] == expected
def test_pct_change_literals(self):
pass
class TestRankFunction:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.scores_file = os.path.join(cls.temp_dir, "scores.csv")
with open(cls.scores_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'score'])
writer.writerow([1, 85])
writer.writerow([2, 92])
writer.writerow([3, 78])
writer.writerow([4, 92])
writer.writerow([5, 88])
def test_rank_column(self):
result = run_query(self.scores_file,
"SELECT RANK(score) as score_rank FROM scores")
assert len(result) == 1
expected = "[2, 4, 1, 4, 3]"
assert result[0]['score_rank'] == expected
def test_rank_literals(self):
pass
class TestCumulativeMaxMin:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.extremes_file = os.path.join(cls.temp_dir, "extremes.csv")
with open(cls.extremes_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'temperature'])
writer.writerow([1, 20])
writer.writerow([2, 25])
writer.writerow([3, 18])
writer.writerow([4, 30])
writer.writerow([5, 22])
def test_cummax(self):
result = run_query(self.extremes_file,
"SELECT CUMMAX(temperature) as max_temp FROM extremes")
assert len(result) == 1
expected = "[20, 25, 25, 30, 30]"
assert result[0]['max_temp'] == expected
def test_cummin(self):
result = run_query(self.extremes_file,
"SELECT CUMMIN(temperature) as min_temp FROM extremes")
assert len(result) == 1
expected = "[20, 20, 18, 18, 18]"
assert result[0]['min_temp'] == expected
def test_cummax_literals(self):
pass
def test_cummin_literals(self):
pass
class TestAnalyticsEdgeCases:
@classmethod
def setup_class(cls):
cls.temp_dir = tempfile.mkdtemp()
cls.edge_file = os.path.join(cls.temp_dir, "edge.csv")
with open(cls.edge_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['id', 'val'])
writer.writerow([1, 42])
def test_single_value_deltas(self):
result = run_query(self.edge_file,
"SELECT DELTAS(val) as delta FROM edge")
assert len(result) == 1
expected = "[42]"
assert result[0]['delta'] == expected
def test_single_value_sums(self):
result = run_query(self.edge_file,
"SELECT SUMS(val) as sum FROM edge")
assert len(result) == 1
expected = "[42]"
assert result[0]['sum'] == expected
def test_empty_window_mavg(self):
pass
def test_negative_values(self):
pass
if __name__ == "__main__":
pytest.main([__file__, "-v"])