use crate::config::CliConfig;
use crate::project_config::ProjectConfig;
use comfy_table::{presets::NOTHING, Table, TableComponent};
use futures_util::StreamExt;
use serde::Deserialize;
use serde_json::Value;
use std::io::{IsTerminal, Write};
const TTY_PREVIEW_ROWS: usize = 1000;
#[allow(clippy::too_many_arguments)]
pub async fn run(
query: &str,
service: Option<&str>,
tenant_override: Option<&str>,
format: Option<&str>,
all: bool,
limit: Option<i64>,
no_pager: bool,
env: Option<&str>,
) -> eyre::Result<()> {
let mut config = CliConfig::load_with_env(env)?;
if let Some(t) = tenant_override {
config.tenant_override = Some(t.to_string());
}
let service_name = resolve_service_name(service)?;
let service_id = config.find_service_id(&service_name).await?;
let effective_query = apply_limit_hint(query, limit);
let client = config.http_client();
let resp = config
.auth_request(
&client,
reqwest::Method::POST,
&format!("{}/api/services/{}/query", config.api_url, service_id),
)
.json(&serde_json::json!({ "sql": effective_query }))
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
eyre::bail!("SQL request failed ({}): {}", status, body);
}
let format = format.unwrap_or("table");
let tty = std::io::stdout().is_terminal();
match (format, tty) {
("jsonl", _) => stream_flat(resp, RowFormat::Jsonl).await,
("csv", _) => stream_flat(resp, RowFormat::Csv).await,
("tsv", _) => stream_flat(resp, RowFormat::Tsv).await,
("table", true) => render_table(resp, all, no_pager).await,
("table", false) => stream_flat(resp, RowFormat::Jsonl).await,
(other, _) => eyre::bail!(
"Unknown --format '{}'. Use one of: table, jsonl, csv, tsv",
other
),
}
}
fn resolve_service_name(explicit: Option<&str>) -> eyre::Result<String> {
if let Some(name) = explicit {
return Ok(name.to_string());
}
let project = ProjectConfig::find_and_load()?.ok_or_else(|| {
eyre::eyre!(
"--service not given and no Cufflink.toml found in current directory or any parent."
)
})?;
project.service.name.clone().ok_or_else(|| {
eyre::eyre!("Current service has no `name` in Cufflink.toml. Pass --service explicitly.")
})
}
fn apply_limit_hint(query: &str, limit: Option<i64>) -> String {
let Some(n) = limit else {
return query.to_string();
};
if query.to_ascii_uppercase().contains(" LIMIT ") {
query.to_string()
} else {
let trimmed = query.trim_end().trim_end_matches(';');
format!("{trimmed} LIMIT {n}")
}
}
#[derive(Debug, Deserialize)]
struct ColumnMeta {
name: String,
#[serde(default)]
#[allow(dead_code)]
type_name: String,
}
#[derive(Copy, Clone)]
enum RowFormat {
Jsonl,
Csv,
Tsv,
}
enum Frame {
Columns(Vec<ColumnMeta>),
Row(Vec<Value>),
Done {
rows: u64,
truncated: bool,
elapsed_ms: u64,
},
Error(String),
}
fn parse_frame(line: &str) -> eyre::Result<Frame> {
let v: Value = serde_json::from_str(line)
.map_err(|e| eyre::eyre!("Malformed NDJSON frame: {} — line was: {}", e, line))?;
match v.get("type").and_then(|t| t.as_str()) {
Some("columns") => {
let cols: Vec<ColumnMeta> =
serde_json::from_value(v.get("columns").cloned().unwrap_or(Value::Array(vec![])))?;
Ok(Frame::Columns(cols))
}
Some("row") => {
let values: Vec<Value> = v
.get("values")
.and_then(|x| x.as_array())
.cloned()
.unwrap_or_default();
Ok(Frame::Row(values))
}
Some("done") => Ok(Frame::Done {
rows: v.get("rows").and_then(|x| x.as_u64()).unwrap_or(0),
truncated: v
.get("truncated")
.and_then(|x| x.as_bool())
.unwrap_or(false),
elapsed_ms: v.get("elapsed_ms").and_then(|x| x.as_u64()).unwrap_or(0),
}),
Some("error") => {
let msg = v
.get("message")
.and_then(|x| x.as_str())
.unwrap_or("(no message)")
.to_string();
Ok(Frame::Error(msg))
}
other => eyre::bail!("Unknown NDJSON frame type: {:?}", other),
}
}
async fn stream_flat(resp: reqwest::Response, kind: RowFormat) -> eyre::Result<()> {
let mut stream = resp.bytes_stream();
let mut buf: Vec<u8> = Vec::new();
let stdout = std::io::stdout();
let mut out = stdout.lock();
let mut columns: Vec<ColumnMeta> = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buf.extend_from_slice(&chunk);
while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
let line_bytes: Vec<u8> = buf.drain(..=pos).collect();
let line = std::str::from_utf8(&line_bytes[..line_bytes.len() - 1])?.trim();
if line.is_empty() {
continue;
}
match parse_frame(line)? {
Frame::Columns(c) => {
match kind {
RowFormat::Csv => writeln!(out, "{}", join_header(&c, ','))?,
RowFormat::Tsv => writeln!(out, "{}", join_header(&c, '\t'))?,
RowFormat::Jsonl => {}
}
columns = c;
}
Frame::Row(values) => match kind {
RowFormat::Jsonl => {
let obj = if columns.is_empty() {
Value::Array(values)
} else {
let mut map = serde_json::Map::with_capacity(columns.len());
for (c, v) in columns.iter().zip(values.into_iter()) {
map.insert(c.name.clone(), v);
}
Value::Object(map)
};
writeln!(out, "{}", serde_json::to_string(&obj)?)?;
}
RowFormat::Csv => writeln!(out, "{}", encode_row(&values, ','))?,
RowFormat::Tsv => writeln!(out, "{}", encode_row(&values, '\t'))?,
},
Frame::Done {
rows, truncated, ..
} => {
if truncated {
eprintln!("(server row cap reached after {rows} rows)");
}
}
Frame::Error(msg) => eyre::bail!("Server error during streaming: {msg}"),
}
}
}
Ok(())
}
async fn render_table(resp: reqwest::Response, all: bool, no_pager: bool) -> eyre::Result<()> {
let mut stream = resp.bytes_stream();
let mut buf: Vec<u8> = Vec::new();
let mut columns: Vec<ColumnMeta> = Vec::new();
let mut rows: Vec<Vec<Value>> = Vec::new();
let mut skipped: u64 = 0;
let mut total: u64 = 0;
let mut server_truncated = false;
let mut elapsed_ms: u64 = 0;
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buf.extend_from_slice(&chunk);
while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
let line_bytes: Vec<u8> = buf.drain(..=pos).collect();
let line = std::str::from_utf8(&line_bytes[..line_bytes.len() - 1])?.trim();
if line.is_empty() {
continue;
}
match parse_frame(line)? {
Frame::Columns(c) => columns = c,
Frame::Row(values) => {
total += 1;
if all || rows.len() < TTY_PREVIEW_ROWS {
rows.push(values);
} else {
skipped += 1;
}
}
Frame::Done {
rows: server_rows,
truncated,
elapsed_ms: elapsed,
} => {
server_truncated = truncated;
elapsed_ms = elapsed;
if server_rows > 0 && server_rows != total {
total = server_rows;
}
}
Frame::Error(msg) => eyre::bail!("Server error during streaming: {msg}"),
}
}
}
if columns.is_empty() && rows.is_empty() {
println!("(empty result)");
return Ok(());
}
let mut table = Table::new();
table.load_preset(NOTHING);
table.set_style(TableComponent::HeaderLines, '-');
table.set_style(TableComponent::MiddleHeaderIntersections, ' ');
let header: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
table.set_header(header);
for row in &rows {
let cells: Vec<String> = row.iter().map(cell_string).collect();
table.add_row(cells);
}
let rendered = format!("{table}");
if all && !no_pager && !rendered.is_empty() {
pipe_to_pager(&rendered).unwrap_or_else(|_| println!("{rendered}"));
} else {
println!("{rendered}");
}
let hint = match (skipped > 0, server_truncated) {
(true, true) => Some(format!(
"{skipped} rows hidden. Server cap hit at {total}. Use --format csv|jsonl to stream, or reduce the query."
)),
(true, false) => Some(format!(
"{skipped} rows hidden (showing first {TTY_PREVIEW_ROWS}). Use --all, --limit N, or --format csv|jsonl."
)),
(false, true) => Some(format!(
"Server cap hit at {total} rows. Query returned more than the platform allows."
)),
(false, false) => None,
};
if let Some(h) = hint {
eprintln!("{h}");
}
if elapsed_ms > 0 {
eprintln!("({total} rows, {elapsed_ms} ms)");
}
Ok(())
}
fn cell_string(v: &Value) -> String {
match v {
Value::Null => String::new(),
Value::String(s) => s.clone(),
other => other.to_string(),
}
}
fn join_header(cols: &[ColumnMeta], sep: char) -> String {
cols.iter()
.map(|c| csv_escape(&c.name, sep))
.collect::<Vec<_>>()
.join(&sep.to_string())
}
fn encode_row(values: &[Value], sep: char) -> String {
values
.iter()
.map(|v| csv_escape(&cell_string(v), sep))
.collect::<Vec<_>>()
.join(&sep.to_string())
}
fn csv_escape(s: &str, sep: char) -> String {
if s.contains(sep) || s.contains('"') || s.contains('\n') || s.contains('\r') {
let escaped = s.replace('"', "\"\"");
format!("\"{escaped}\"")
} else {
s.to_string()
}
}
fn pipe_to_pager(content: &str) -> eyre::Result<()> {
let pager = std::env::var("PAGER").unwrap_or_else(|_| "less -SRX".to_string());
let mut parts = pager.split_whitespace();
let program = parts
.next()
.ok_or_else(|| eyre::eyre!("PAGER env var is empty"))?;
let args: Vec<&str> = parts.collect();
let mut child = std::process::Command::new(program)
.args(&args)
.stdin(std::process::Stdio::piped())
.spawn()?;
if let Some(mut stdin) = child.stdin.take() {
stdin.write_all(content.as_bytes())?;
}
child.wait()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn apply_limit_adds_when_absent() {
assert_eq!(
apply_limit_hint("select * from items", Some(10)),
"select * from items LIMIT 10"
);
assert_eq!(
apply_limit_hint("select * from items;", Some(10)),
"select * from items LIMIT 10"
);
}
#[test]
fn apply_limit_leaves_query_when_present() {
let q = "select * from items limit 5";
assert_eq!(
apply_limit_hint(q, Some(10)).to_uppercase(),
q.to_uppercase()
);
assert_eq!(
apply_limit_hint("SELECT * FROM items LIMIT 5", Some(10)),
"SELECT * FROM items LIMIT 5"
);
}
#[test]
fn parses_columns_frame() {
let line = r#"{"type":"columns","columns":[{"name":"id","type_name":"uuid"},{"name":"email","type_name":"text"}]}"#;
match parse_frame(line).unwrap() {
Frame::Columns(c) => {
assert_eq!(c.len(), 2);
assert_eq!(c[0].name, "id");
assert_eq!(c[1].type_name, "text");
}
_ => panic!("expected Columns"),
}
}
#[test]
fn parses_row_frame() {
let line = r#"{"type":"row","values":["abc",42,null,true]}"#;
match parse_frame(line).unwrap() {
Frame::Row(vs) => {
assert_eq!(vs.len(), 4);
assert_eq!(vs[1].as_i64().unwrap(), 42);
assert!(vs[2].is_null());
}
_ => panic!("expected Row"),
}
}
#[test]
fn parses_done_frame() {
let line = r#"{"type":"done","rows":17,"truncated":true,"elapsed_ms":250}"#;
match parse_frame(line).unwrap() {
Frame::Done {
rows,
truncated,
elapsed_ms,
} => {
assert_eq!(rows, 17);
assert!(truncated);
assert_eq!(elapsed_ms, 250);
}
_ => panic!("expected Done"),
}
}
#[test]
fn csv_escape_quotes_embedded_commas() {
assert_eq!(csv_escape("hello", ','), "hello");
assert_eq!(csv_escape("a,b", ','), "\"a,b\"");
assert_eq!(csv_escape("a\"b", ','), "\"a\"\"b\"");
assert_eq!(csv_escape("multi\nline", ','), "\"multi\nline\"");
assert_eq!(csv_escape("a,b", '\t'), "a,b");
assert_eq!(csv_escape("a\tb", '\t'), "\"a\tb\"");
}
#[test]
fn cell_string_flattens_null_to_empty() {
assert_eq!(cell_string(&Value::Null), "");
assert_eq!(cell_string(&Value::String("x".into())), "x");
assert_eq!(cell_string(&Value::Bool(true)), "true");
}
}