Skip to main content

alopex_cli/commands/
sql.rs

1//! SQL Command - SQL query execution
2//!
3//! Supports: query execution, file-based queries
4
5use std::collections::HashSet;
6use std::fs;
7use std::io::{self, Read, Write};
8
9use alopex_embedded::Database;
10
11use crate::batch::BatchMode;
12use crate::cli::SqlCommand;
13use crate::client::http::{ClientError, HttpClient};
14use crate::error::{CliError, Result};
15use crate::models::{Column, DataType, Row, Value};
16use crate::output::formatter::Formatter;
17use crate::streaming::timeout::parse_deadline;
18use crate::streaming::{CancelSignal, Deadline, StreamingWriter, WriteStatus};
19use crate::tui::{is_tty, TuiApp};
20use crate::ui::mode::UiMode;
21use futures_util::StreamExt;
22use reqwest::Response;
23
24#[doc(hidden)]
25pub struct SqlExecutionOptions<'a> {
26    pub limit: Option<usize>,
27    pub quiet: bool,
28    pub cancel: &'a CancelSignal,
29    pub deadline: &'a Deadline,
30    pub admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
31}
32
33/// Execute a SQL command with dynamic column detection.
34///
35/// This function creates the StreamingWriter internally based on the query result type,
36/// ensuring that SELECT queries use the correct column headers.
37///
38/// # Arguments
39///
40/// * `db` - The database instance.
41/// * `cmd` - The SQL command to execute.
42/// * `writer` - The output writer.
43/// * `formatter` - The formatter to use.
44/// * `limit` - Optional row limit.
45/// * `quiet` - Whether to suppress warnings.
46#[allow(clippy::too_many_arguments)]
47pub fn execute_with_formatter<'a, W: Write>(
48    db: &Database,
49    cmd: SqlCommand,
50    batch_mode: &BatchMode,
51    ui_mode: UiMode,
52    writer: &mut W,
53    formatter: Box<dyn Formatter>,
54    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
55    limit: Option<usize>,
56    quiet: bool,
57) -> Result<()> {
58    let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
59    let cancel = CancelSignal::new();
60
61    execute_with_formatter_control(
62        db,
63        cmd,
64        batch_mode,
65        ui_mode,
66        writer,
67        formatter,
68        SqlExecutionOptions {
69            limit,
70            quiet,
71            cancel: &cancel,
72            deadline: &deadline,
73            admin_launcher,
74        },
75    )
76}
77
78#[doc(hidden)]
79pub fn execute_with_formatter_control<W: Write>(
80    db: &Database,
81    cmd: SqlCommand,
82    batch_mode: &BatchMode,
83    ui_mode: UiMode,
84    writer: &mut W,
85    formatter: Box<dyn Formatter>,
86    mut options: SqlExecutionOptions<'_>,
87) -> Result<()> {
88    let sql = cmd.resolve_query(batch_mode)?;
89    let effective_limit = merge_limit(options.limit, cmd.max_rows);
90    options.limit = effective_limit;
91
92    if ui_mode == UiMode::Tui {
93        return execute_tui_local_or_fallback(db, &sql, writer, formatter, options);
94    }
95
96    execute_sql_with_formatter(db, &sql, writer, formatter, &options)
97}
98
99fn is_select_query(sql: &str) -> Result<bool> {
100    use alopex_sql::{AlopexDialect, Parser, StatementKind};
101
102    let dialect = AlopexDialect;
103    let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
104    Ok(stmts.len() == 1
105        && matches!(
106            stmts.first().map(|s| &s.kind),
107            Some(StatementKind::Select(_))
108        ))
109}
110
111/// Execute a SQL command against a remote server using HttpClient.
112#[allow(clippy::too_many_arguments)]
113pub async fn execute_remote_with_formatter<'a, W: Write>(
114    client: &HttpClient,
115    cmd: &SqlCommand,
116    batch_mode: &BatchMode,
117    ui_mode: UiMode,
118    writer: &mut W,
119    formatter: Box<dyn Formatter>,
120    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
121    limit: Option<usize>,
122    quiet: bool,
123) -> Result<()> {
124    let effective_limit = merge_limit(limit, cmd.max_rows);
125    let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
126    let cancel = CancelSignal::new();
127    let options = SqlExecutionOptions {
128        limit: effective_limit,
129        quiet,
130        cancel: &cancel,
131        deadline: &deadline,
132        admin_launcher,
133    };
134
135    execute_remote_with_formatter_control(
136        client, cmd, batch_mode, ui_mode, writer, formatter, options,
137    )
138    .await
139}
140
141fn sql_context_message(sql: &str) -> String {
142    let condensed = sql.split_whitespace().collect::<Vec<_>>().join(" ");
143    let max_len = 200;
144    if condensed.chars().count() > max_len {
145        let truncated: String = condensed.chars().take(max_len).collect();
146        format!("SQL: {truncated}...")
147    } else {
148        format!("SQL: {condensed}")
149    }
150}
151
152#[doc(hidden)]
153pub async fn execute_remote_with_formatter_control<W: Write>(
154    client: &HttpClient,
155    cmd: &SqlCommand,
156    batch_mode: &BatchMode,
157    ui_mode: UiMode,
158    writer: &mut W,
159    formatter: Box<dyn Formatter>,
160    options: SqlExecutionOptions<'_>,
161) -> Result<()> {
162    let sql = cmd.resolve_query(batch_mode)?;
163    if ui_mode == UiMode::Tui {
164        return execute_tui_remote_or_fallback(client, &sql, cmd, writer, formatter, options).await;
165    }
166    execute_remote_with_formatter_impl(client, &sql, cmd, writer, formatter, &options).await
167}
168
169async fn execute_remote_with_formatter_impl<W: Write>(
170    client: &HttpClient,
171    sql: &str,
172    cmd: &SqlCommand,
173    writer: &mut W,
174    formatter: Box<dyn Formatter>,
175    options: &SqlExecutionOptions<'_>,
176) -> Result<()> {
177    if is_select_query(sql)? && formatter.supports_streaming() {
178        return execute_remote_streaming(
179            client,
180            sql,
181            writer,
182            formatter,
183            options,
184            cmd.fetch_size,
185            cmd.max_rows,
186        )
187        .await;
188    }
189
190    let request = RemoteSqlRequest {
191        sql: sql.to_string(),
192        streaming: false,
193        fetch_size: cmd.fetch_size,
194        max_rows: cmd.max_rows,
195    };
196    let response: RemoteSqlResponse = tokio::select! {
197        result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
198            match result {
199                Ok(value) => value.map_err(map_client_error)?,
200                Err(_) => {
201                    let _ = send_cancel_request(client).await;
202                    return Err(CliError::Timeout(format!(
203                        "deadline exceeded after {}",
204                        humantime::format_duration(options.deadline.duration())
205                    )));
206                }
207            }
208        }
209        _ = options.cancel.wait() => {
210            let _ = send_cancel_request(client).await;
211            return Err(CliError::Cancelled);
212        }
213    };
214
215    if response.columns.is_empty() {
216        if options.quiet {
217            return Ok(());
218        }
219        let message = match response.affected_rows {
220            Some(count) => format!("{count} row(s) affected"),
221            None => "Operation completed successfully".to_string(),
222        };
223        let columns = sql_status_columns();
224        let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
225            .with_quiet(options.quiet);
226        streaming_writer.prepare(Some(1))?;
227        let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
228        streaming_writer.write_row(row)?;
229        return streaming_writer.finish();
230    }
231
232    let columns: Vec<Column> = response
233        .columns
234        .iter()
235        .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
236        .collect();
237    let mut streaming_writer =
238        StreamingWriter::new(writer, formatter, columns, options.limit).with_quiet(options.quiet);
239    streaming_writer.prepare(Some(response.rows.len()))?;
240    for row in response.rows {
241        if options.cancel.is_cancelled() {
242            let _ = send_cancel_request(client).await;
243            return Err(CliError::Cancelled);
244        }
245        options.deadline.check()?;
246        let values = row.into_iter().map(remote_value_to_value).collect();
247        match streaming_writer.write_row(Row::new(values))? {
248            WriteStatus::LimitReached => break,
249            WriteStatus::Continue => {}
250        }
251    }
252    streaming_writer.finish()
253}
254
255fn execute_tui_local_or_fallback<'a, W: Write>(
256    db: &Database,
257    sql: &str,
258    writer: &mut W,
259    formatter: Box<dyn Formatter>,
260    mut options: SqlExecutionOptions<'a>,
261) -> Result<()> {
262    if !is_tty() {
263        if !options.quiet {
264            eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
265        }
266        return execute_sql_with_formatter(db, sql, writer, formatter, &options);
267    }
268
269    let admin_launcher = options.admin_launcher.take();
270    match execute_tui_local(db, sql, &options, admin_launcher) {
271        Ok(()) => Ok(()),
272        Err(err) => {
273            if !options.quiet {
274                eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
275            }
276            execute_sql_with_formatter(db, sql, writer, formatter, &options)
277        }
278    }
279}
280
281fn execute_tui_local<'a>(
282    db: &Database,
283    sql: &str,
284    options: &SqlExecutionOptions<'a>,
285    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
286) -> Result<()> {
287    use alopex_sql::ExecutionResult;
288
289    options.deadline.check()?;
290    let result = db.execute_sql(sql)?;
291    options.deadline.check()?;
292
293    let (columns, rows) = match result {
294        ExecutionResult::Success => {
295            let columns = sql_status_columns();
296            let row = Row::new(vec![
297                Value::Text("OK".to_string()),
298                Value::Text("Operation completed successfully".to_string()),
299            ]);
300            (columns, vec![row])
301        }
302        ExecutionResult::RowsAffected(count) => {
303            let columns = sql_status_columns();
304            let row = Row::new(vec![
305                Value::Text("OK".to_string()),
306                Value::Text(format!("{count} row(s) affected")),
307            ]);
308            (columns, vec![row])
309        }
310        ExecutionResult::Query(query_result) => {
311            let columns = query_result
312                .columns
313                .iter()
314                .map(|col| Column::new(&col.name, DataType::Text))
315                .collect::<Vec<_>>();
316            let mut rows = Vec::with_capacity(query_result.rows.len());
317            for sql_row in query_result.rows {
318                let values = sql_row.into_iter().map(sql_value_to_value).collect();
319                rows.push(Row::new(values));
320            }
321            if let Some(limit) = options.limit {
322                rows.truncate(limit);
323            }
324            (columns, rows)
325        }
326    };
327
328    let app = TuiApp::new(columns, rows, "local", false)
329        .with_context_message(Some(sql_context_message(sql)))
330        .with_admin_launcher(admin_launcher);
331    app.run()
332}
333
334async fn execute_tui_remote_or_fallback<'a, W: Write>(
335    client: &HttpClient,
336    sql: &str,
337    cmd: &SqlCommand,
338    writer: &mut W,
339    formatter: Box<dyn Formatter>,
340    mut options: SqlExecutionOptions<'a>,
341) -> Result<()> {
342    if !is_tty() {
343        if !options.quiet {
344            eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
345        }
346        return execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options)
347            .await;
348    }
349
350    let admin_launcher = options.admin_launcher.take();
351    match execute_tui_remote(client, sql, cmd, &options, admin_launcher).await {
352        Ok(()) => Ok(()),
353        Err(err) => {
354            if !options.quiet {
355                eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
356            }
357            execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options).await
358        }
359    }
360}
361
362async fn execute_tui_remote<'a>(
363    client: &HttpClient,
364    sql: &str,
365    cmd: &SqlCommand,
366    options: &SqlExecutionOptions<'a>,
367    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
368) -> Result<()> {
369    if is_select_query(sql)? {
370        let (columns, rows) =
371            collect_remote_streaming_rows(client, sql, options, cmd.fetch_size, cmd.max_rows)
372                .await?;
373        let app = TuiApp::new(columns, rows, "server", false)
374            .with_context_message(Some(sql_context_message(sql)))
375            .with_admin_launcher(admin_launcher);
376        return app.run();
377    }
378
379    let request = RemoteSqlRequest {
380        sql: sql.to_string(),
381        streaming: false,
382        fetch_size: cmd.fetch_size,
383        max_rows: cmd.max_rows,
384    };
385
386    let response: RemoteSqlResponse = tokio::select! {
387        result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
388            match result {
389                Ok(value) => value.map_err(map_client_error)?,
390                Err(_) => {
391                    let _ = send_cancel_request(client).await;
392                    return Err(CliError::Timeout(format!(
393                        "deadline exceeded after {}",
394                        humantime::format_duration(options.deadline.duration())
395                    )));
396                }
397            }
398        }
399        _ = options.cancel.wait() => {
400            let _ = send_cancel_request(client).await;
401            return Err(CliError::Cancelled);
402        }
403    };
404
405    let (columns, rows) = if response.columns.is_empty() {
406        let columns = sql_status_columns();
407        let message = match response.affected_rows {
408            Some(count) => format!("{count} row(s) affected"),
409            None => "Operation completed successfully".to_string(),
410        };
411        let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
412        (columns, vec![row])
413    } else {
414        let columns: Vec<Column> = response
415            .columns
416            .iter()
417            .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
418            .collect();
419        let mut rows = response
420            .rows
421            .into_iter()
422            .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
423            .collect::<Vec<_>>();
424        if let Some(limit) = options.limit {
425            rows.truncate(limit);
426        }
427        (columns, rows)
428    };
429
430    let app = TuiApp::new(columns, rows, "server", false)
431        .with_context_message(Some(sql_context_message(sql)))
432        .with_admin_launcher(admin_launcher);
433    app.run()
434}
435
436async fn collect_remote_streaming_rows(
437    client: &HttpClient,
438    sql: &str,
439    options: &SqlExecutionOptions<'_>,
440    fetch_size: Option<usize>,
441    max_rows: Option<usize>,
442) -> Result<(Vec<Column>, Vec<Row>)> {
443    let request = RemoteSqlRequest {
444        sql: sql.to_string(),
445        streaming: true,
446        fetch_size,
447        max_rows,
448    };
449
450    let response = tokio::select! {
451        result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
452            match result {
453                Ok(value) => value.map_err(map_client_error)?,
454                Err(_) => {
455                    let _ = send_cancel_request(client).await;
456                    return Err(CliError::Timeout(format!(
457                        "deadline exceeded after {}",
458                        humantime::format_duration(options.deadline.duration())
459                    )));
460                }
461            }
462        }
463        _ = options.cancel.wait() => {
464            let _ = send_cancel_request(client).await;
465            return Err(CliError::Cancelled);
466        }
467    };
468
469    if let Some(content_type) = response
470        .headers()
471        .get(reqwest::header::CONTENT_TYPE)
472        .and_then(|value| value.to_str().ok())
473    {
474        if content_type.starts_with("application/jsonl") {
475            return collect_remote_jsonl_rows(client, response, options).await;
476        }
477    }
478
479    let mut stream = response.bytes_stream();
480    let mut buffer: Vec<u8> = Vec::new();
481    let mut pos: usize = 0;
482    let mut done = false;
483    let mut saw_array_start = false;
484    let mut columns: Option<Vec<String>> = None;
485    let mut column_set: Option<HashSet<String>> = None;
486    let mut rows: Vec<Row> = Vec::new();
487
488    while !done {
489        if options.cancel.is_cancelled() {
490            let _ = send_cancel_request(client).await;
491            return Err(CliError::Cancelled);
492        }
493        if let Err(err) = options.deadline.check() {
494            let _ = send_cancel_request(client).await;
495            return Err(err);
496        }
497
498        let next = tokio::select! {
499            _ = options.cancel.wait() => {
500                let _ = send_cancel_request(client).await;
501                return Err(CliError::Cancelled);
502            }
503            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
504                match result {
505                    Ok(value) => value,
506                    Err(_) => {
507                        let _ = send_cancel_request(client).await;
508                        return Err(CliError::Timeout(format!(
509                            "deadline exceeded after {}",
510                            humantime::format_duration(options.deadline.duration())
511                        )));
512                    }
513                }
514            }
515        };
516
517        let chunk = match next {
518            Some(chunk) => chunk,
519            None => break,
520        };
521
522        let bytes = match chunk {
523            Ok(bytes) => bytes,
524            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
525        };
526
527        buffer.extend_from_slice(&bytes);
528
529        loop {
530            skip_whitespace(&buffer, &mut pos);
531            if pos >= buffer.len() {
532                break;
533            }
534
535            if !saw_array_start {
536                if buffer[pos] != b'[' {
537                    return Err(CliError::InvalidArgument(
538                        "Invalid streaming response: expected JSON array".into(),
539                    ));
540                }
541                pos += 1;
542                saw_array_start = true;
543                continue;
544            }
545
546            skip_whitespace(&buffer, &mut pos);
547            if pos >= buffer.len() {
548                break;
549            }
550
551            if buffer[pos] == b']' {
552                pos += 1;
553                done = true;
554                break;
555            }
556
557            let slice = &buffer[pos..];
558            let mut stream =
559                serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
560            let value = match stream.next() {
561                Some(Ok(value)) => value,
562                Some(Err(err)) if err.is_eof() => break,
563                Some(Err(err)) => return Err(CliError::Json(err)),
564                None => break,
565            };
566            pos = pos.saturating_add(stream.byte_offset());
567
568            let object = value.as_object().ok_or_else(|| {
569                CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
570            })?;
571
572            if columns.is_none() {
573                let names: Vec<String> = object.keys().cloned().collect();
574                let set: HashSet<String> = names.iter().cloned().collect();
575                if names.is_empty() {
576                    return Err(CliError::InvalidArgument(
577                        "Invalid streaming row: empty object".into(),
578                    ));
579                }
580                columns = Some(names);
581                column_set = Some(set);
582            }
583
584            let names = columns
585                .as_ref()
586                .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
587            let set = column_set
588                .as_ref()
589                .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
590
591            if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
592                return Err(CliError::InvalidArgument(
593                    "Invalid streaming row: column mismatch".into(),
594                ));
595            }
596
597            let values = names
598                .iter()
599                .map(|name| {
600                    object.get(name).ok_or_else(|| {
601                        CliError::InvalidArgument(format!(
602                            "Invalid streaming row: missing column '{name}'"
603                        ))
604                    })
605                })
606                .map(|value| value.and_then(json_value_to_value))
607                .collect::<Result<Vec<_>>>()?;
608            rows.push(Row::new(values));
609
610            if let Some(limit) = options.limit {
611                if rows.len() >= limit {
612                    let _ = send_cancel_request(client).await;
613                    done = true;
614                    break;
615                }
616            }
617
618            skip_whitespace(&buffer, &mut pos);
619            if pos >= buffer.len() {
620                break;
621            }
622            match buffer[pos] {
623                b',' => {
624                    pos += 1;
625                }
626                b']' => {
627                    pos += 1;
628                    done = true;
629                    break;
630                }
631                _ => {
632                    return Err(CliError::InvalidArgument(
633                        "Invalid streaming response: expected ',' or ']'".into(),
634                    ))
635                }
636            }
637        }
638
639        if pos > 0 {
640            buffer.drain(..pos);
641            pos = 0;
642        }
643    }
644
645    if done {
646        if has_non_whitespace(&buffer) {
647            return Err(CliError::InvalidArgument(
648                "Invalid streaming response: unexpected trailing data".into(),
649            ));
650        }
651        buffer.clear();
652        loop {
653            let next = tokio::select! {
654                _ = options.cancel.wait() => {
655                    let _ = send_cancel_request(client).await;
656                    return Err(CliError::Cancelled);
657                }
658                result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
659                    match result {
660                        Ok(value) => value,
661                        Err(_) => {
662                            let _ = send_cancel_request(client).await;
663                            return Err(CliError::Timeout(format!(
664                                "deadline exceeded after {}",
665                                humantime::format_duration(options.deadline.duration())
666                            )));
667                        }
668                    }
669                }
670            };
671
672            let chunk = match next {
673                Some(chunk) => chunk,
674                None => break,
675            };
676
677            let bytes = match chunk {
678                Ok(bytes) => bytes,
679                Err(err) => {
680                    return Err(CliError::ServerConnection(format!("request failed: {err}")))
681                }
682            };
683
684            if has_non_whitespace(&bytes) {
685                return Err(CliError::InvalidArgument(
686                    "Invalid streaming response: unexpected trailing data".into(),
687                ));
688            }
689        }
690    } else {
691        skip_whitespace(&buffer, &mut pos);
692        if pos < buffer.len() {
693            return Err(CliError::InvalidArgument(
694                "Invalid streaming response: unexpected trailing data".into(),
695            ));
696        }
697        return Err(CliError::InvalidArgument(
698            "Invalid streaming response: unexpected end of stream".into(),
699        ));
700    }
701
702    let columns = columns
703        .unwrap_or_default()
704        .into_iter()
705        .map(|name| Column::new(name, DataType::Text))
706        .collect();
707    Ok((columns, rows))
708}
709
710#[allow(dead_code)]
711async fn collect_remote_non_streaming_rows(
712    client: &HttpClient,
713    sql: &str,
714    options: &SqlExecutionOptions<'_>,
715    fetch_size: Option<usize>,
716    max_rows: Option<usize>,
717) -> Result<(Vec<Column>, Vec<Row>)> {
718    let request = RemoteSqlRequest {
719        sql: sql.to_string(),
720        streaming: false,
721        fetch_size,
722        max_rows,
723    };
724    let response: RemoteSqlResponse = tokio::select! {
725        result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
726            match result {
727                Ok(value) => value.map_err(map_client_error)?,
728                Err(_) => {
729                    let _ = send_cancel_request(client).await;
730                    return Err(CliError::Timeout(format!(
731                        "deadline exceeded after {}",
732                        humantime::format_duration(options.deadline.duration())
733                    )));
734                }
735            }
736        }
737        _ = options.cancel.wait() => {
738            let _ = send_cancel_request(client).await;
739            return Err(CliError::Cancelled);
740        }
741    };
742
743    if response.columns.is_empty() {
744        let message = match response.affected_rows {
745            Some(count) => format!("{count} row(s) affected"),
746            None => "Operation completed successfully".to_string(),
747        };
748        let columns = sql_status_columns();
749        let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
750        return Ok((columns, vec![row]));
751    }
752
753    let columns = response
754        .columns
755        .iter()
756        .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
757        .collect::<Vec<_>>();
758    let mut rows = response
759        .rows
760        .into_iter()
761        .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
762        .collect::<Vec<_>>();
763    if let Some(limit) = options.limit {
764        rows.truncate(limit);
765    }
766    Ok((columns, rows))
767}
768
769async fn execute_remote_streaming<W: Write>(
770    client: &HttpClient,
771    sql: &str,
772    writer: &mut W,
773    formatter: Box<dyn Formatter>,
774    options: &SqlExecutionOptions<'_>,
775    fetch_size: Option<usize>,
776    max_rows: Option<usize>,
777) -> Result<()> {
778    let request = RemoteSqlRequest {
779        sql: sql.to_string(),
780        streaming: true,
781        fetch_size,
782        max_rows,
783    };
784
785    let response = tokio::select! {
786        result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
787            match result {
788                Ok(value) => value.map_err(map_client_error)?,
789                Err(_) => {
790                    let _ = send_cancel_request(client).await;
791                    return Err(CliError::Timeout(format!(
792                        "deadline exceeded after {}",
793                        humantime::format_duration(options.deadline.duration())
794                    )));
795                }
796            }
797        }
798        _ = options.cancel.wait() => {
799            let _ = send_cancel_request(client).await;
800            return Err(CliError::Cancelled);
801        }
802    };
803
804    if let Some(content_type) = response
805        .headers()
806        .get(reqwest::header::CONTENT_TYPE)
807        .and_then(|value| value.to_str().ok())
808    {
809        if content_type.starts_with("application/jsonl") {
810            return execute_remote_jsonl_streaming(client, response, writer, formatter, options)
811                .await;
812        }
813    }
814
815    let mut stream = response.bytes_stream();
816    let mut buffer: Vec<u8> = Vec::new();
817    let mut pos: usize = 0;
818    let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
819    let mut formatter = Some(formatter);
820    let mut columns: Option<Vec<String>> = None;
821    let mut column_set: Option<HashSet<String>> = None;
822    let mut done = false;
823    let mut saw_array_start = false;
824
825    while !done {
826        if options.cancel.is_cancelled() {
827            let _ = send_cancel_request(client).await;
828            return Err(CliError::Cancelled);
829        }
830        if let Err(err) = options.deadline.check() {
831            let _ = send_cancel_request(client).await;
832            return Err(err);
833        }
834
835        let next = tokio::select! {
836            _ = options.cancel.wait() => {
837                let _ = send_cancel_request(client).await;
838                return Err(CliError::Cancelled);
839            }
840            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
841                match result {
842                    Ok(value) => value,
843                    Err(_) => {
844                        let _ = send_cancel_request(client).await;
845                        return Err(CliError::Timeout(format!(
846                            "deadline exceeded after {}",
847                            humantime::format_duration(options.deadline.duration())
848                        )));
849                    }
850                }
851            }
852        };
853
854        let chunk = match next {
855            Some(chunk) => chunk,
856            None => break,
857        };
858
859        let bytes = match chunk {
860            Ok(bytes) => bytes,
861            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
862        };
863
864        buffer.extend_from_slice(&bytes);
865
866        loop {
867            skip_whitespace(&buffer, &mut pos);
868            if pos >= buffer.len() {
869                break;
870            }
871
872            if !saw_array_start {
873                if buffer[pos] != b'[' {
874                    return Err(CliError::InvalidArgument(
875                        "Invalid streaming response: expected JSON array".into(),
876                    ));
877                }
878                pos += 1;
879                saw_array_start = true;
880                continue;
881            }
882
883            skip_whitespace(&buffer, &mut pos);
884            if pos >= buffer.len() {
885                break;
886            }
887
888            if buffer[pos] == b']' {
889                pos += 1;
890                done = true;
891                break;
892            }
893
894            let slice = &buffer[pos..];
895            let mut stream =
896                serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
897            let value = match stream.next() {
898                Some(Ok(value)) => value,
899                Some(Err(err)) if err.is_eof() => break,
900                Some(Err(err)) => return Err(CliError::Json(err)),
901                None => break,
902            };
903            pos = pos.saturating_add(stream.byte_offset());
904
905            let object = value.as_object().ok_or_else(|| {
906                CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
907            })?;
908
909            if columns.is_none() {
910                let names: Vec<String> = object.keys().cloned().collect();
911                let set: HashSet<String> = names.iter().cloned().collect();
912                if names.is_empty() {
913                    return Err(CliError::InvalidArgument(
914                        "Invalid streaming row: empty object".into(),
915                    ));
916                }
917                let cols = names
918                    .iter()
919                    .map(|name| Column::new(name, DataType::Text))
920                    .collect::<Vec<_>>();
921                let formatter = formatter
922                    .take()
923                    .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
924                let mut writer = StreamingWriter::new(&mut *writer, formatter, cols, options.limit)
925                    .with_quiet(options.quiet);
926                writer.prepare(None)?;
927                streaming_writer = Some(writer);
928                columns = Some(names);
929                column_set = Some(set);
930            }
931
932            let names = columns
933                .as_ref()
934                .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
935            let set = column_set
936                .as_ref()
937                .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
938
939            if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
940                return Err(CliError::InvalidArgument(
941                    "Invalid streaming row: column mismatch".into(),
942                ));
943            }
944
945            let values = names
946                .iter()
947                .map(|name| {
948                    object.get(name).ok_or_else(|| {
949                        CliError::InvalidArgument(format!(
950                            "Invalid streaming row: missing column '{name}'"
951                        ))
952                    })
953                })
954                .map(|value| value.and_then(json_value_to_value))
955                .collect::<Result<Vec<_>>>()?;
956
957            if let Some(writer) = streaming_writer.as_mut() {
958                match writer.write_row(Row::new(values))? {
959                    WriteStatus::LimitReached => {
960                        let _ = send_cancel_request(client).await;
961                        return writer.finish();
962                    }
963                    WriteStatus::Continue => {}
964                }
965            }
966
967            skip_whitespace(&buffer, &mut pos);
968            if pos >= buffer.len() {
969                break;
970            }
971            match buffer[pos] {
972                b',' => {
973                    pos += 1;
974                }
975                b']' => {
976                    pos += 1;
977                    done = true;
978                    break;
979                }
980                _ => {
981                    return Err(CliError::InvalidArgument(
982                        "Invalid streaming response: expected ',' or ']'".into(),
983                    ))
984                }
985            }
986        }
987
988        if pos > 0 {
989            buffer.drain(..pos);
990            pos = 0;
991        }
992    }
993
994    if done {
995        if has_non_whitespace(&buffer) {
996            return Err(CliError::InvalidArgument(
997                "Invalid streaming response: unexpected trailing data".into(),
998            ));
999        }
1000        buffer.clear();
1001        loop {
1002            let next = tokio::select! {
1003                _ = options.cancel.wait() => {
1004                    let _ = send_cancel_request(client).await;
1005                    return Err(CliError::Cancelled);
1006                }
1007                result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
1008                    match result {
1009                        Ok(value) => value,
1010                        Err(_) => {
1011                            let _ = send_cancel_request(client).await;
1012                            return Err(CliError::Timeout(format!(
1013                                "deadline exceeded after {}",
1014                                humantime::format_duration(options.deadline.duration())
1015                            )));
1016                        }
1017                    }
1018                }
1019            };
1020
1021            let chunk = match next {
1022                Some(chunk) => chunk,
1023                None => break,
1024            };
1025
1026            let bytes = match chunk {
1027                Ok(bytes) => bytes,
1028                Err(err) => {
1029                    return Err(CliError::ServerConnection(format!("request failed: {err}")))
1030                }
1031            };
1032
1033            if has_non_whitespace(&bytes) {
1034                return Err(CliError::InvalidArgument(
1035                    "Invalid streaming response: unexpected trailing data".into(),
1036                ));
1037            }
1038        }
1039    } else {
1040        skip_whitespace(&buffer, &mut pos);
1041        if pos < buffer.len() {
1042            return Err(CliError::InvalidArgument(
1043                "Invalid streaming response: unexpected trailing data".into(),
1044            ));
1045        }
1046        return Err(CliError::InvalidArgument(
1047            "Invalid streaming response: unexpected end of stream".into(),
1048        ));
1049    }
1050
1051    if let Some(mut writer) = streaming_writer {
1052        return writer.finish();
1053    }
1054
1055    if done && saw_array_start {
1056        if let Some(formatter) = formatter.take() {
1057            let mut writer =
1058                StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
1059                    .with_quiet(options.quiet);
1060            writer.prepare(None)?;
1061            return writer.finish();
1062        }
1063    }
1064
1065    Ok(())
1066}
1067
1068async fn collect_remote_jsonl_rows(
1069    client: &HttpClient,
1070    response: Response,
1071    options: &SqlExecutionOptions<'_>,
1072) -> Result<(Vec<Column>, Vec<Row>)> {
1073    let mut stream = response.bytes_stream();
1074    let mut buffer: Vec<u8> = Vec::new();
1075    let mut columns: Option<Vec<Column>> = None;
1076    let mut rows: Vec<Row> = Vec::new();
1077    let mut done = false;
1078
1079    while !done {
1080        if options.cancel.is_cancelled() {
1081            let _ = send_cancel_request(client).await;
1082            return Err(CliError::Cancelled);
1083        }
1084        if let Err(err) = options.deadline.check() {
1085            let _ = send_cancel_request(client).await;
1086            return Err(err);
1087        }
1088
1089        let next = tokio::select! {
1090            _ = options.cancel.wait() => {
1091                let _ = send_cancel_request(client).await;
1092                return Err(CliError::Cancelled);
1093            }
1094            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
1095                match result {
1096                    Ok(value) => value,
1097                    Err(_) => {
1098                        let _ = send_cancel_request(client).await;
1099                        return Err(CliError::Timeout(format!(
1100                            "deadline exceeded after {}",
1101                            humantime::format_duration(options.deadline.duration())
1102                        )));
1103                    }
1104                }
1105            }
1106        };
1107
1108        let chunk = match next {
1109            Some(chunk) => chunk,
1110            None => break,
1111        };
1112
1113        let bytes = match chunk {
1114            Ok(bytes) => bytes,
1115            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
1116        };
1117
1118        buffer.extend_from_slice(&bytes);
1119
1120        while let Some(newline) = buffer.iter().position(|&b| b == b'\n') {
1121            let line = buffer.drain(..=newline).collect::<Vec<u8>>();
1122            if let Some(item) = parse_jsonl_line(&line)? {
1123                if let Some(error) = item.error {
1124                    return Err(CliError::InvalidArgument(format!(
1125                        "Server error: {}",
1126                        error.message
1127                    )));
1128                }
1129                if item.done {
1130                    done = true;
1131                    break;
1132                }
1133                let row = item.row.ok_or_else(|| {
1134                    CliError::InvalidArgument("Invalid streaming response: missing row".into())
1135                })?;
1136                if columns.is_none() {
1137                    columns = Some(default_stream_columns(row.len()));
1138                }
1139                rows.push(Row::new(
1140                    row.into_iter().map(remote_value_to_value).collect(),
1141                ));
1142                if let Some(limit) = options.limit {
1143                    if rows.len() >= limit {
1144                        let _ = send_cancel_request(client).await;
1145                        done = true;
1146                        break;
1147                    }
1148                }
1149            }
1150        }
1151    }
1152
1153    if !done {
1154        if let Some(item) = parse_jsonl_line(&buffer)? {
1155            if let Some(error) = item.error {
1156                return Err(CliError::InvalidArgument(format!(
1157                    "Server error: {}",
1158                    error.message
1159                )));
1160            }
1161            if item.done {
1162                done = true;
1163            } else if let Some(row) = item.row {
1164                if columns.is_none() {
1165                    columns = Some(default_stream_columns(row.len()));
1166                }
1167                rows.push(Row::new(
1168                    row.into_iter().map(remote_value_to_value).collect(),
1169                ));
1170                if let Some(limit) = options.limit {
1171                    if rows.len() >= limit {
1172                        let _ = send_cancel_request(client).await;
1173                        done = true;
1174                    }
1175                }
1176            } else {
1177                return Err(CliError::InvalidArgument(
1178                    "Invalid streaming response: missing row".into(),
1179                ));
1180            }
1181        }
1182    }
1183
1184    if !done {
1185        return Err(CliError::InvalidArgument(
1186            "Invalid streaming response: unexpected end of stream".into(),
1187        ));
1188    }
1189
1190    Ok((columns.unwrap_or_default(), rows))
1191}
1192
1193async fn execute_remote_jsonl_streaming<W: Write>(
1194    client: &HttpClient,
1195    response: Response,
1196    writer: &mut W,
1197    formatter: Box<dyn Formatter>,
1198    options: &SqlExecutionOptions<'_>,
1199) -> Result<()> {
1200    let mut stream = response.bytes_stream();
1201    let mut buffer: Vec<u8> = Vec::new();
1202    let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
1203    let mut formatter = Some(formatter);
1204    let mut done = false;
1205
1206    while !done {
1207        if options.cancel.is_cancelled() {
1208            let _ = send_cancel_request(client).await;
1209            return Err(CliError::Cancelled);
1210        }
1211        if let Err(err) = options.deadline.check() {
1212            let _ = send_cancel_request(client).await;
1213            return Err(err);
1214        }
1215
1216        let next = tokio::select! {
1217            _ = options.cancel.wait() => {
1218                let _ = send_cancel_request(client).await;
1219                return Err(CliError::Cancelled);
1220            }
1221            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
1222                match result {
1223                    Ok(value) => value,
1224                    Err(_) => {
1225                        let _ = send_cancel_request(client).await;
1226                        return Err(CliError::Timeout(format!(
1227                            "deadline exceeded after {}",
1228                            humantime::format_duration(options.deadline.duration())
1229                        )));
1230                    }
1231                }
1232            }
1233        };
1234
1235        let chunk = match next {
1236            Some(chunk) => chunk,
1237            None => break,
1238        };
1239
1240        let bytes = match chunk {
1241            Ok(bytes) => bytes,
1242            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
1243        };
1244
1245        buffer.extend_from_slice(&bytes);
1246
1247        while let Some(newline) = buffer.iter().position(|&b| b == b'\n') {
1248            let line = buffer.drain(..=newline).collect::<Vec<u8>>();
1249            if let Some(item) = parse_jsonl_line(&line)? {
1250                if let Some(error) = item.error {
1251                    return Err(CliError::InvalidArgument(format!(
1252                        "Server error: {}",
1253                        error.message
1254                    )));
1255                }
1256                if item.done {
1257                    done = true;
1258                    break;
1259                }
1260                let row = item.row.ok_or_else(|| {
1261                    CliError::InvalidArgument("Invalid streaming response: missing row".into())
1262                })?;
1263                if streaming_writer.is_none() {
1264                    let columns = default_stream_columns(row.len());
1265                    let formatter = formatter
1266                        .take()
1267                        .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
1268                    let mut writer =
1269                        StreamingWriter::new(&mut *writer, formatter, columns, options.limit)
1270                            .with_quiet(options.quiet);
1271                    writer.prepare(None)?;
1272                    streaming_writer = Some(writer);
1273                }
1274                let values = row.into_iter().map(remote_value_to_value).collect();
1275                if let Some(writer) = streaming_writer.as_mut() {
1276                    match writer.write_row(Row::new(values))? {
1277                        WriteStatus::LimitReached => {
1278                            let _ = send_cancel_request(client).await;
1279                            done = true;
1280                            break;
1281                        }
1282                        WriteStatus::Continue => {}
1283                    }
1284                }
1285            }
1286        }
1287    }
1288
1289    if !done {
1290        if let Some(item) = parse_jsonl_line(&buffer)? {
1291            if let Some(error) = item.error {
1292                return Err(CliError::InvalidArgument(format!(
1293                    "Server error: {}",
1294                    error.message
1295                )));
1296            }
1297            if item.done {
1298                done = true;
1299            } else if let Some(row) = item.row {
1300                if streaming_writer.is_none() {
1301                    let columns = default_stream_columns(row.len());
1302                    let formatter = formatter
1303                        .take()
1304                        .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
1305                    let mut writer =
1306                        StreamingWriter::new(&mut *writer, formatter, columns, options.limit)
1307                            .with_quiet(options.quiet);
1308                    writer.prepare(None)?;
1309                    streaming_writer = Some(writer);
1310                }
1311                let values = row.into_iter().map(remote_value_to_value).collect();
1312                if let Some(writer) = streaming_writer.as_mut() {
1313                    match writer.write_row(Row::new(values))? {
1314                        WriteStatus::LimitReached => {
1315                            let _ = send_cancel_request(client).await;
1316                            done = true;
1317                        }
1318                        WriteStatus::Continue => {}
1319                    }
1320                }
1321            } else {
1322                return Err(CliError::InvalidArgument(
1323                    "Invalid streaming response: missing row".into(),
1324                ));
1325            }
1326        }
1327    }
1328
1329    if !done {
1330        return Err(CliError::InvalidArgument(
1331            "Invalid streaming response: unexpected end of stream".into(),
1332        ));
1333    }
1334
1335    if let Some(mut writer) = streaming_writer {
1336        return writer.finish();
1337    }
1338
1339    if let Some(formatter) = formatter.take() {
1340        let mut writer = StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
1341            .with_quiet(options.quiet);
1342        writer.prepare(None)?;
1343        return writer.finish();
1344    }
1345
1346    Ok(())
1347}
1348
1349fn default_stream_columns(count: usize) -> Vec<Column> {
1350    (0..count)
1351        .map(|idx| Column::new(format!("col{}", idx + 1), DataType::Text))
1352        .collect()
1353}
1354
1355fn parse_jsonl_line(line: &[u8]) -> Result<Option<RemoteStreamItem>> {
1356    if line
1357        .iter()
1358        .all(|b| matches!(b, b' ' | b'\n' | b'\r' | b'\t'))
1359    {
1360        return Ok(None);
1361    }
1362    let text = std::str::from_utf8(line)
1363        .map_err(|err| CliError::InvalidArgument(format!("Invalid UTF-8: {err}")))?;
1364    let trimmed = text.trim();
1365    if trimmed.is_empty() {
1366        return Ok(None);
1367    }
1368    let item = serde_json::from_str::<RemoteStreamItem>(trimmed)?;
1369    Ok(Some(item))
1370}
1371
1372fn skip_whitespace(buffer: &[u8], pos: &mut usize) {
1373    while *pos < buffer.len() {
1374        match buffer[*pos] {
1375            b' ' | b'\n' | b'\r' | b'\t' => *pos += 1,
1376            _ => break,
1377        }
1378    }
1379}
1380
1381fn has_non_whitespace(buffer: &[u8]) -> bool {
1382    buffer
1383        .iter()
1384        .any(|byte| !matches!(byte, b' ' | b'\n' | b'\r' | b'\t'))
1385}
1386
1387fn json_value_to_value(value: &serde_json::Value) -> Result<Value> {
1388    match value {
1389        serde_json::Value::Null => Ok(Value::Null),
1390        serde_json::Value::Bool(value) => Ok(Value::Bool(*value)),
1391        serde_json::Value::Number(value) => {
1392            if let Some(value) = value.as_i64() {
1393                Ok(Value::Int(value))
1394            } else if let Some(value) = value.as_f64() {
1395                Ok(Value::Float(value))
1396            } else {
1397                Err(CliError::InvalidArgument(
1398                    "Invalid numeric value in streaming row".into(),
1399                ))
1400            }
1401        }
1402        serde_json::Value::String(value) => Ok(Value::Text(value.clone())),
1403        serde_json::Value::Array(values) => {
1404            let mut vector = Vec::with_capacity(values.len());
1405            for entry in values {
1406                let number = entry.as_f64().ok_or_else(|| {
1407                    CliError::InvalidArgument("Invalid vector value in streaming row".into())
1408                })?;
1409                vector.push(number as f32);
1410            }
1411            Ok(Value::Vector(vector))
1412        }
1413        serde_json::Value::Object(_) => Err(CliError::InvalidArgument(
1414            "Invalid streaming row: nested objects are not supported".into(),
1415        )),
1416    }
1417}
1418
1419/// Legacy execute function for backward compatibility with tests.
1420#[allow(dead_code)]
1421pub fn execute<W: Write>(
1422    db: &Database,
1423    cmd: SqlCommand,
1424    batch_mode: &BatchMode,
1425    writer: &mut StreamingWriter<W>,
1426) -> Result<()> {
1427    let sql = cmd.resolve_query(batch_mode)?;
1428
1429    execute_sql(db, &sql, writer)
1430}
1431
1432impl SqlCommand {
1433    /// Resolve the SQL query source (argument, file, or stdin).
1434    pub fn resolve_query(&self, batch_mode: &BatchMode) -> Result<String> {
1435        match (&self.query, &self.file) {
1436            (Some(query), None) => Ok(query.clone()),
1437            (None, Some(file)) => fs::read_to_string(file).map_err(|e| {
1438                CliError::InvalidArgument(format!("Failed to read SQL file '{}': {}", file, e))
1439            }),
1440            (None, None) if !batch_mode.is_tty => {
1441                let mut buf = String::new();
1442                io::stdin().read_to_string(&mut buf)?;
1443                Ok(buf)
1444            }
1445            (None, None) => Err(CliError::NoQueryProvided),
1446            (Some(_), Some(_)) => Err(CliError::InvalidArgument(
1447                "Cannot specify both query and file".to_string(),
1448            )),
1449        }
1450    }
1451}
1452
1453/// Execute SQL and write results.
1454fn execute_sql<W: Write>(db: &Database, sql: &str, writer: &mut StreamingWriter<W>) -> Result<()> {
1455    use alopex_embedded::SqlResult;
1456
1457    let result = db.execute_sql(sql)?;
1458
1459    match result {
1460        SqlResult::Success => {
1461            // DDL success - output simple status
1462            writer.prepare(Some(1))?;
1463            let row = Row::new(vec![
1464                Value::Text("OK".to_string()),
1465                Value::Text("Operation completed successfully".to_string()),
1466            ]);
1467            writer.write_row(row)?;
1468            writer.finish()?;
1469        }
1470        SqlResult::RowsAffected(count) => {
1471            // DML success - output affected rows count
1472            writer.prepare(Some(1))?;
1473            let row = Row::new(vec![
1474                Value::Text("OK".to_string()),
1475                Value::Text(format!("{} row(s) affected", count)),
1476            ]);
1477            writer.write_row(row)?;
1478            writer.finish()?;
1479        }
1480        SqlResult::Query(query_result) => {
1481            // SELECT result - output rows
1482            let row_count = query_result.rows.len();
1483            writer.prepare(Some(row_count))?;
1484
1485            for sql_row in query_result.rows {
1486                let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1487                let row = Row::new(values);
1488
1489                match writer.write_row(row)? {
1490                    WriteStatus::LimitReached => break,
1491                    WriteStatus::Continue => {}
1492                }
1493            }
1494
1495            writer.finish()?;
1496        }
1497    }
1498
1499    Ok(())
1500}
1501
1502/// Execute SQL with formatter, dynamically determining columns from query result.
1503///
1504/// This function executes the SQL using the streaming API for FR-7 compliance,
1505/// then creates the StreamingWriter with the correct columns based on the result type
1506/// (status columns for DDL/DML, query result columns for SELECT).
1507///
1508/// FR-7 Compliance: Uses SQL parser to detect SELECT queries instead of heuristic.
1509/// This properly handles:
1510/// - WITH clauses (CTEs)
1511/// - Leading comments
1512/// - Complex query structures
1513fn execute_sql_with_formatter<W: Write>(
1514    db: &Database,
1515    sql: &str,
1516    writer: &mut W,
1517    formatter: Box<dyn Formatter>,
1518    options: &SqlExecutionOptions<'_>,
1519) -> Result<()> {
1520    use alopex_sql::{AlopexDialect, Parser, StatementKind};
1521
1522    // FR-7: Use parser to detect SELECT instead of starts_with("SELECT") heuristic
1523    // This correctly handles WITH clauses, leading comments, and complex query structures
1524    let dialect = AlopexDialect;
1525    let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
1526
1527    let is_select = stmts.len() == 1
1528        && matches!(
1529            stmts.first().map(|s| &s.kind),
1530            Some(StatementKind::Select(_))
1531        );
1532
1533    if is_select {
1534        // SELECT: use streaming path (FR-7)
1535        execute_sql_select_streaming(db, sql, writer, formatter, options)
1536    } else {
1537        // DDL/DML: use standard path
1538        execute_sql_ddl_dml(db, sql, writer, formatter, options)
1539    }
1540}
1541
1542/// Execute SELECT query with streaming callback (FR-7).
1543///
1544/// This function uses `execute_sql_with_rows` for true streaming output.
1545/// The callback receives rows one at a time from the iterator, and the
1546/// transaction is kept alive during streaming.
1547fn execute_sql_select_streaming<W: Write>(
1548    db: &Database,
1549    sql: &str,
1550    writer: &mut W,
1551    formatter: Box<dyn Formatter>,
1552    options: &SqlExecutionOptions<'_>,
1553) -> Result<()> {
1554    use alopex_embedded::StreamingQueryResult;
1555    use std::sync::atomic::{AtomicBool, Ordering};
1556    use std::sync::Arc;
1557
1558    // Helper to convert CliError to alopex_embedded::Error for callback
1559    fn cli_err_to_embedded(e: crate::error::CliError) -> alopex_embedded::Error {
1560        alopex_embedded::Error::Sql(alopex_sql::SqlError::Execution {
1561            message: e.to_string(),
1562            code: "ALOPEX-C001",
1563        })
1564    }
1565
1566    let cancelled = Arc::new(AtomicBool::new(false));
1567    let timed_out = Arc::new(AtomicBool::new(false));
1568    let cancel_flag = cancelled.clone();
1569    let timeout_flag = timed_out.clone();
1570
1571    let result = db.execute_sql_with_rows(sql, |mut rows| {
1572        // FR-7: SELECT result - stream rows directly from iterator while transaction is alive
1573        let columns = columns_from_streaming_rows(&rows);
1574        let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
1575            .with_quiet(options.quiet);
1576
1577        // FR-7: Use None for row count hint to support true streaming output
1578        streaming_writer
1579            .prepare(None)
1580            .map_err(cli_err_to_embedded)?;
1581
1582        if let Err(err) = options.deadline.check() {
1583            timeout_flag.store(true, Ordering::SeqCst);
1584            return Err(cli_err_to_embedded(err));
1585        }
1586
1587        // Consume iterator row by row for true streaming
1588        while let Ok(Some(sql_row)) = rows.next_row() {
1589            if options.cancel.is_cancelled() {
1590                cancel_flag.store(true, Ordering::SeqCst);
1591                return Err(cli_err_to_embedded(CliError::Cancelled));
1592            }
1593            if let Err(err) = options.deadline.check() {
1594                timeout_flag.store(true, Ordering::SeqCst);
1595                return Err(cli_err_to_embedded(err));
1596            }
1597            let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1598            let row = Row::new(values);
1599
1600            match streaming_writer
1601                .write_row(row)
1602                .map_err(cli_err_to_embedded)?
1603            {
1604                WriteStatus::LimitReached => break,
1605                WriteStatus::Continue => {}
1606            }
1607        }
1608
1609        streaming_writer.finish().map_err(cli_err_to_embedded)?;
1610        Ok(())
1611    });
1612
1613    let result = match result {
1614        Ok(value) => value,
1615        Err(err) => {
1616            if cancelled.load(Ordering::SeqCst) {
1617                return Err(CliError::Cancelled);
1618            }
1619            if timed_out.load(Ordering::SeqCst) {
1620                return Err(CliError::Timeout(format!(
1621                    "deadline exceeded after {}",
1622                    humantime::format_duration(options.deadline.duration())
1623                )));
1624            }
1625            return Err(CliError::Database(err));
1626        }
1627    };
1628
1629    match result {
1630        StreamingQueryResult::QueryProcessed(()) => Ok(()),
1631        StreamingQueryResult::Success | StreamingQueryResult::RowsAffected(_) => {
1632            // Unexpected: SELECT should not return these
1633            Ok(())
1634        }
1635    }
1636}
1637
1638#[derive(serde::Serialize)]
1639struct RemoteSqlRequest {
1640    sql: String,
1641    #[serde(default)]
1642    streaming: bool,
1643    #[serde(skip_serializing_if = "Option::is_none")]
1644    fetch_size: Option<usize>,
1645    #[serde(skip_serializing_if = "Option::is_none")]
1646    max_rows: Option<usize>,
1647}
1648
1649#[derive(serde::Deserialize)]
1650struct RemoteColumnInfo {
1651    name: String,
1652    data_type: String,
1653}
1654
1655#[derive(serde::Deserialize)]
1656struct RemoteSqlResponse {
1657    columns: Vec<RemoteColumnInfo>,
1658    rows: Vec<Vec<alopex_sql::storage::SqlValue>>,
1659    affected_rows: Option<u64>,
1660}
1661
1662#[derive(serde::Deserialize)]
1663struct RemoteStreamItem {
1664    row: Option<Vec<alopex_sql::storage::SqlValue>>,
1665    error: Option<RemoteStreamError>,
1666    #[serde(default)]
1667    done: bool,
1668}
1669
1670#[derive(serde::Deserialize)]
1671struct RemoteStreamError {
1672    message: String,
1673}
1674
1675fn map_client_error(err: ClientError) -> CliError {
1676    match err {
1677        ClientError::Request { source, .. } => {
1678            CliError::ServerConnection(format!("request failed: {source}"))
1679        }
1680        ClientError::InvalidUrl(message) => CliError::InvalidArgument(message),
1681        ClientError::Build(message) => CliError::InvalidArgument(message),
1682        ClientError::Auth(err) => CliError::InvalidArgument(err.to_string()),
1683        ClientError::HttpStatus { status, body } => {
1684            CliError::InvalidArgument(format!("Server error: HTTP {} - {}", status.as_u16(), body))
1685        }
1686    }
1687}
1688
1689async fn send_cancel_request(client: &HttpClient) -> Result<()> {
1690    #[derive(serde::Serialize)]
1691    struct CancelRequest {}
1692
1693    let request = CancelRequest {};
1694    let _: serde_json::Value = client
1695        .post_json("api/sql/cancel", &request)
1696        .await
1697        .map_err(map_client_error)?;
1698    Ok(())
1699}
1700
1701fn merge_limit(limit: Option<usize>, max_rows: Option<usize>) -> Option<usize> {
1702    match (limit, max_rows) {
1703        (Some(a), Some(b)) => Some(a.min(b)),
1704        (Some(value), None) | (None, Some(value)) => Some(value),
1705        (None, None) => None,
1706    }
1707}
1708
1709/// Execute DDL/DML query (non-SELECT statements).
1710///
1711/// This function handles CREATE, DROP, INSERT, UPDATE, DELETE and other
1712/// non-SELECT statements. It outputs status messages (OK, rows affected).
1713fn execute_sql_ddl_dml<W: Write>(
1714    db: &Database,
1715    sql: &str,
1716    writer: &mut W,
1717    formatter: Box<dyn Formatter>,
1718    options: &SqlExecutionOptions<'_>,
1719) -> Result<()> {
1720    use alopex_sql::ExecutionResult;
1721
1722    options.deadline.check()?;
1723    let result = db.execute_sql(sql)?;
1724    options.deadline.check()?;
1725
1726    match result {
1727        ExecutionResult::Success => {
1728            // DDL success - suppress status output in quiet mode
1729            if !options.quiet {
1730                let columns = sql_status_columns();
1731                let mut streaming_writer =
1732                    StreamingWriter::new(writer, formatter, columns, options.limit)
1733                        .with_quiet(options.quiet);
1734                streaming_writer.prepare(Some(1))?;
1735                let row = Row::new(vec![
1736                    Value::Text("OK".to_string()),
1737                    Value::Text("Operation completed successfully".to_string()),
1738                ]);
1739                streaming_writer.write_row(row)?;
1740                streaming_writer.finish()?;
1741            }
1742        }
1743        ExecutionResult::RowsAffected(count) => {
1744            // DML success - suppress status output in quiet mode
1745            if !options.quiet {
1746                let columns = sql_status_columns();
1747                let mut streaming_writer =
1748                    StreamingWriter::new(writer, formatter, columns, options.limit)
1749                        .with_quiet(options.quiet);
1750                streaming_writer.prepare(Some(1))?;
1751                let row = Row::new(vec![
1752                    Value::Text("OK".to_string()),
1753                    Value::Text(format!("{} row(s) affected", count)),
1754                ]);
1755                streaming_writer.write_row(row)?;
1756                streaming_writer.finish()?;
1757            }
1758        }
1759        ExecutionResult::Query(query_result) => {
1760            // Unexpected: non-SELECT should not return Query result
1761            // But handle it gracefully by outputting the result
1762            let columns = columns_from_query_result(&query_result);
1763            let mut streaming_writer =
1764                StreamingWriter::new(writer, formatter, columns, options.limit)
1765                    .with_quiet(options.quiet);
1766            streaming_writer.prepare(Some(query_result.rows.len()))?;
1767            for sql_row in query_result.rows {
1768                let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1769                let row = Row::new(values);
1770                match streaming_writer.write_row(row)? {
1771                    WriteStatus::LimitReached => break,
1772                    WriteStatus::Continue => {}
1773                }
1774            }
1775            streaming_writer.finish()?;
1776        }
1777    }
1778
1779    Ok(())
1780}
1781
1782/// Convert alopex_sql::SqlValue to our Value type.
1783fn sql_value_to_value(sql_value: alopex_sql::SqlValue) -> Value {
1784    use alopex_sql::SqlValue;
1785
1786    match sql_value {
1787        SqlValue::Null => Value::Null,
1788        SqlValue::Integer(i) => Value::Int(i as i64),
1789        SqlValue::BigInt(i) => Value::Int(i),
1790        SqlValue::Float(f) => Value::Float(f as f64),
1791        SqlValue::Double(f) => Value::Float(f),
1792        SqlValue::Text(s) => Value::Text(s),
1793        SqlValue::Blob(b) => Value::Bytes(b),
1794        SqlValue::Boolean(b) => Value::Bool(b),
1795        SqlValue::Timestamp(ts) => {
1796            // Format timestamp as ISO 8601 string
1797            Value::Text(format!("{}", ts))
1798        }
1799        SqlValue::Vector(v) => Value::Vector(v),
1800    }
1801}
1802
1803fn remote_value_to_value(sql_value: alopex_sql::storage::SqlValue) -> Value {
1804    use alopex_sql::storage::SqlValue;
1805
1806    match sql_value {
1807        SqlValue::Null => Value::Null,
1808        SqlValue::Integer(i) => Value::Int(i as i64),
1809        SqlValue::BigInt(i) => Value::Int(i),
1810        SqlValue::Float(f) => Value::Float(f as f64),
1811        SqlValue::Double(f) => Value::Float(f),
1812        SqlValue::Text(s) => Value::Text(s),
1813        SqlValue::Blob(b) => Value::Bytes(b),
1814        SqlValue::Boolean(b) => Value::Bool(b),
1815        SqlValue::Timestamp(ts) => Value::Text(ts.to_string()),
1816        SqlValue::Vector(v) => Value::Vector(v),
1817    }
1818}
1819
1820fn data_type_from_string(value: &str) -> DataType {
1821    let upper = value.to_ascii_uppercase();
1822    if upper.starts_with("INT") || upper.starts_with("BIGINT") {
1823        DataType::Int
1824    } else if upper.starts_with("FLOAT") || upper.starts_with("DOUBLE") {
1825        DataType::Float
1826    } else if upper.starts_with("BLOB") {
1827        DataType::Bytes
1828    } else if upper.starts_with("BOOLEAN") {
1829        DataType::Bool
1830    } else if upper.starts_with("VECTOR") {
1831        DataType::Vector
1832    } else {
1833        DataType::Text
1834    }
1835}
1836
1837/// Convert alopex_sql::executor::ColumnInfo to our Column type.
1838fn sql_column_to_column(col: &alopex_sql::executor::ColumnInfo) -> Column {
1839    use alopex_sql::planner::ResolvedType;
1840
1841    let data_type = match &col.data_type {
1842        ResolvedType::Integer | ResolvedType::BigInt => DataType::Int,
1843        ResolvedType::Float | ResolvedType::Double => DataType::Float,
1844        ResolvedType::Text => DataType::Text,
1845        ResolvedType::Blob => DataType::Bytes,
1846        ResolvedType::Boolean => DataType::Bool,
1847        ResolvedType::Timestamp => DataType::Text, // Display as text
1848        ResolvedType::Vector { .. } => DataType::Vector,
1849        ResolvedType::Null => DataType::Text, // Fallback
1850    };
1851
1852    Column::new(&col.name, data_type)
1853}
1854
1855/// Create columns from SQL query result.
1856#[allow(dead_code)] // Used by tests with legacy execute_sql function
1857fn columns_from_query_result(query_result: &alopex_sql::executor::QueryResult) -> Vec<Column> {
1858    query_result
1859        .columns
1860        .iter()
1861        .map(sql_column_to_column)
1862        .collect()
1863}
1864
1865/// Create columns from streaming query result iterator (FR-7).
1866#[allow(dead_code)] // Kept for potential future use with old streaming API
1867fn columns_from_streaming_result(
1868    query_iter: &alopex_embedded::QueryRowIterator<'_>,
1869) -> Vec<Column> {
1870    query_iter
1871        .columns()
1872        .iter()
1873        .map(sql_column_to_column)
1874        .collect()
1875}
1876
1877/// Create columns from callback-based streaming rows (FR-7).
1878fn columns_from_streaming_rows(rows: &alopex_embedded::StreamingRows<'_>) -> Vec<Column> {
1879    rows.columns().iter().map(sql_column_to_column).collect()
1880}
1881
1882/// Create columns for status output.
1883pub fn sql_status_columns() -> Vec<Column> {
1884    vec![
1885        Column::new("status", DataType::Text),
1886        Column::new("message", DataType::Text),
1887    ]
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892    use super::*;
1893    use crate::batch::BatchModeSource;
1894    use crate::output::jsonl::JsonlFormatter;
1895
1896    fn create_test_db() -> Database {
1897        Database::open_in_memory().unwrap()
1898    }
1899
1900    fn create_status_writer(output: &mut Vec<u8>) -> StreamingWriter<&mut Vec<u8>> {
1901        let formatter = Box::new(JsonlFormatter::new());
1902        let columns = sql_status_columns();
1903        StreamingWriter::new(output, formatter, columns, None)
1904    }
1905
1906    fn create_query_writer(
1907        output: &mut Vec<u8>,
1908        columns: Vec<Column>,
1909    ) -> StreamingWriter<&mut Vec<u8>> {
1910        let formatter = Box::new(JsonlFormatter::new());
1911        StreamingWriter::new(output, formatter, columns, None)
1912    }
1913
1914    fn default_batch_mode() -> BatchMode {
1915        BatchMode {
1916            is_batch: false,
1917            is_tty: true,
1918            source: BatchModeSource::Default,
1919        }
1920    }
1921
1922    #[test]
1923    fn test_create_table() {
1924        let db = create_test_db();
1925
1926        let mut output = Vec::new();
1927        {
1928            let mut writer = create_status_writer(&mut output);
1929            execute_sql(
1930                &db,
1931                "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1932                &mut writer,
1933            )
1934            .unwrap();
1935        }
1936
1937        let result = String::from_utf8(output).unwrap();
1938        assert!(result.contains("OK"));
1939    }
1940
1941    #[test]
1942    fn test_insert_and_select() {
1943        let db = create_test_db();
1944
1945        // Create table
1946        {
1947            let mut output = Vec::new();
1948            let mut writer = create_status_writer(&mut output);
1949            execute_sql(
1950                &db,
1951                "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1952                &mut writer,
1953            )
1954            .unwrap();
1955        }
1956
1957        // Insert
1958        {
1959            let mut output = Vec::new();
1960            let mut writer = create_status_writer(&mut output);
1961            execute_sql(
1962                &db,
1963                "INSERT INTO users (id, name) VALUES (1, 'Alice');",
1964                &mut writer,
1965            )
1966            .unwrap();
1967            let result = String::from_utf8(output).unwrap();
1968            assert!(result.contains("row(s) affected"));
1969        }
1970
1971        // Select - we need columns from the query result
1972        {
1973            let mut output = Vec::new();
1974            let columns = vec![
1975                Column::new("id", DataType::Int),
1976                Column::new("name", DataType::Text),
1977            ];
1978            let mut writer = create_query_writer(&mut output, columns);
1979            execute_sql(&db, "SELECT id, name FROM users;", &mut writer).unwrap();
1980            let result = String::from_utf8(output).unwrap();
1981            assert!(result.contains("Alice"));
1982        }
1983    }
1984
1985    #[test]
1986    fn test_syntax_error() {
1987        let db = create_test_db();
1988
1989        let mut output = Vec::new();
1990        let mut writer = create_status_writer(&mut output);
1991        let result = execute_sql(&db, "CREATE TABEL invalid_syntax;", &mut writer);
1992        assert!(result.is_err());
1993    }
1994
1995    #[test]
1996    fn test_multiple_statements() {
1997        let db = create_test_db();
1998
1999        let mut output = Vec::new();
2000        {
2001            let mut writer = create_status_writer(&mut output);
2002            execute_sql(
2003                &db,
2004                "CREATE TABLE t (id INTEGER PRIMARY KEY); INSERT INTO t (id) VALUES (1);",
2005                &mut writer,
2006            )
2007            .unwrap();
2008        }
2009
2010        // Verify the table exists and has data
2011        {
2012            let mut output = Vec::new();
2013            let columns = vec![Column::new("id", DataType::Int)];
2014            let mut writer = create_query_writer(&mut output, columns);
2015            execute_sql(&db, "SELECT id FROM t;", &mut writer).unwrap();
2016            let result = String::from_utf8(output).unwrap();
2017            assert!(result.contains("1"));
2018        }
2019    }
2020
2021    #[test]
2022    fn test_sql_value_conversion() {
2023        use alopex_sql::SqlValue;
2024
2025        assert!(matches!(sql_value_to_value(SqlValue::Null), Value::Null));
2026        assert!(matches!(
2027            sql_value_to_value(SqlValue::Integer(42)),
2028            Value::Int(42)
2029        ));
2030        assert!(matches!(
2031            sql_value_to_value(SqlValue::BigInt(100)),
2032            Value::Int(100)
2033        ));
2034        assert!(matches!(
2035            sql_value_to_value(SqlValue::Boolean(true)),
2036            Value::Bool(true)
2037        ));
2038        assert!(
2039            matches!(sql_value_to_value(SqlValue::Text("hello".to_string())), Value::Text(s) if s == "hello")
2040        );
2041    }
2042
2043    #[test]
2044    fn resolve_query_from_argument() {
2045        let cmd = SqlCommand {
2046            query: Some("SELECT 1".to_string()),
2047            file: None,
2048            fetch_size: None,
2049            max_rows: None,
2050            deadline: None,
2051            tui: false,
2052        };
2053
2054        let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
2055        assert_eq!(sql, "SELECT 1");
2056    }
2057
2058    #[test]
2059    fn resolve_query_from_file() {
2060        let mut file = tempfile::NamedTempFile::new().unwrap();
2061        writeln!(file, "SELECT * FROM users").unwrap();
2062
2063        let cmd = SqlCommand {
2064            query: None,
2065            file: Some(file.path().display().to_string()),
2066            fetch_size: None,
2067            max_rows: None,
2068            deadline: None,
2069            tui: false,
2070        };
2071
2072        let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
2073        assert_eq!(sql, "SELECT * FROM users\n");
2074    }
2075
2076    #[test]
2077    fn resolve_query_returns_no_query_error() {
2078        let cmd = SqlCommand {
2079            query: None,
2080            file: None,
2081            fetch_size: None,
2082            max_rows: None,
2083            deadline: None,
2084            tui: false,
2085        };
2086
2087        let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
2088        assert!(matches!(err, CliError::NoQueryProvided));
2089    }
2090
2091    #[test]
2092    fn resolve_query_rejects_query_and_file() {
2093        let cmd = SqlCommand {
2094            query: Some("SELECT 1".to_string()),
2095            file: Some("query.sql".into()),
2096            fetch_size: None,
2097            max_rows: None,
2098            deadline: None,
2099            tui: false,
2100        };
2101
2102        let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
2103        assert!(matches!(
2104            err,
2105            CliError::InvalidArgument(msg) if msg == "Cannot specify both query and file"
2106        ));
2107    }
2108}