alopex_cli/commands/
sql.rs

1//! SQL Command - SQL query execution
2//!
3//! Supports: query execution, file-based queries
4
5use std::collections::HashSet;
6use std::fs;
7use std::io::{self, Read, Write};
8
9use alopex_embedded::Database;
10
11use crate::batch::BatchMode;
12use crate::cli::SqlCommand;
13use crate::client::http::{ClientError, HttpClient};
14use crate::error::{CliError, Result};
15use crate::models::{Column, DataType, Row, Value};
16use crate::output::formatter::Formatter;
17use crate::streaming::timeout::parse_deadline;
18use crate::streaming::{CancelSignal, Deadline, StreamingWriter, WriteStatus};
19use crate::tui::{is_tty, TuiApp};
20use crate::ui::mode::UiMode;
21use futures_util::StreamExt;
22
23#[doc(hidden)]
24pub struct SqlExecutionOptions<'a> {
25    pub limit: Option<usize>,
26    pub quiet: bool,
27    pub cancel: &'a CancelSignal,
28    pub deadline: &'a Deadline,
29    pub admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
30}
31
32/// Execute a SQL command with dynamic column detection.
33///
34/// This function creates the StreamingWriter internally based on the query result type,
35/// ensuring that SELECT queries use the correct column headers.
36///
37/// # Arguments
38///
39/// * `db` - The database instance.
40/// * `cmd` - The SQL command to execute.
41/// * `writer` - The output writer.
42/// * `formatter` - The formatter to use.
43/// * `limit` - Optional row limit.
44/// * `quiet` - Whether to suppress warnings.
45#[allow(clippy::too_many_arguments)]
46pub fn execute_with_formatter<'a, W: Write>(
47    db: &Database,
48    cmd: SqlCommand,
49    batch_mode: &BatchMode,
50    ui_mode: UiMode,
51    writer: &mut W,
52    formatter: Box<dyn Formatter>,
53    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
54    limit: Option<usize>,
55    quiet: bool,
56) -> Result<()> {
57    let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
58    let cancel = CancelSignal::new();
59
60    execute_with_formatter_control(
61        db,
62        cmd,
63        batch_mode,
64        ui_mode,
65        writer,
66        formatter,
67        SqlExecutionOptions {
68            limit,
69            quiet,
70            cancel: &cancel,
71            deadline: &deadline,
72            admin_launcher,
73        },
74    )
75}
76
77#[doc(hidden)]
78pub fn execute_with_formatter_control<W: Write>(
79    db: &Database,
80    cmd: SqlCommand,
81    batch_mode: &BatchMode,
82    ui_mode: UiMode,
83    writer: &mut W,
84    formatter: Box<dyn Formatter>,
85    mut options: SqlExecutionOptions<'_>,
86) -> Result<()> {
87    let sql = cmd.resolve_query(batch_mode)?;
88    let effective_limit = merge_limit(options.limit, cmd.max_rows);
89    options.limit = effective_limit;
90
91    if ui_mode == UiMode::Tui {
92        return execute_tui_local_or_fallback(db, &sql, writer, formatter, options);
93    }
94
95    execute_sql_with_formatter(db, &sql, writer, formatter, &options)
96}
97
98fn is_select_query(sql: &str) -> Result<bool> {
99    use alopex_sql::{AlopexDialect, Parser, StatementKind};
100
101    let dialect = AlopexDialect;
102    let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
103    Ok(stmts.len() == 1
104        && matches!(
105            stmts.first().map(|s| &s.kind),
106            Some(StatementKind::Select(_))
107        ))
108}
109
110/// Execute a SQL command against a remote server using HttpClient.
111#[allow(clippy::too_many_arguments)]
112pub async fn execute_remote_with_formatter<'a, W: Write>(
113    client: &HttpClient,
114    cmd: &SqlCommand,
115    batch_mode: &BatchMode,
116    ui_mode: UiMode,
117    writer: &mut W,
118    formatter: Box<dyn Formatter>,
119    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
120    limit: Option<usize>,
121    quiet: bool,
122) -> Result<()> {
123    let effective_limit = merge_limit(limit, cmd.max_rows);
124    let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
125    let cancel = CancelSignal::new();
126    let options = SqlExecutionOptions {
127        limit: effective_limit,
128        quiet,
129        cancel: &cancel,
130        deadline: &deadline,
131        admin_launcher,
132    };
133
134    execute_remote_with_formatter_control(
135        client, cmd, batch_mode, ui_mode, writer, formatter, options,
136    )
137    .await
138}
139
140fn sql_context_message(sql: &str) -> String {
141    let condensed = sql.split_whitespace().collect::<Vec<_>>().join(" ");
142    let max_len = 200;
143    if condensed.chars().count() > max_len {
144        let truncated: String = condensed.chars().take(max_len).collect();
145        format!("SQL: {truncated}...")
146    } else {
147        format!("SQL: {condensed}")
148    }
149}
150
151#[doc(hidden)]
152pub async fn execute_remote_with_formatter_control<W: Write>(
153    client: &HttpClient,
154    cmd: &SqlCommand,
155    batch_mode: &BatchMode,
156    ui_mode: UiMode,
157    writer: &mut W,
158    formatter: Box<dyn Formatter>,
159    options: SqlExecutionOptions<'_>,
160) -> Result<()> {
161    let sql = cmd.resolve_query(batch_mode)?;
162    if ui_mode == UiMode::Tui {
163        return execute_tui_remote_or_fallback(client, &sql, cmd, writer, formatter, options).await;
164    }
165    execute_remote_with_formatter_impl(client, &sql, cmd, writer, formatter, &options).await
166}
167
168async fn execute_remote_with_formatter_impl<W: Write>(
169    client: &HttpClient,
170    sql: &str,
171    cmd: &SqlCommand,
172    writer: &mut W,
173    formatter: Box<dyn Formatter>,
174    options: &SqlExecutionOptions<'_>,
175) -> Result<()> {
176    if is_select_query(sql)? && formatter.supports_streaming() {
177        return execute_remote_streaming(
178            client,
179            sql,
180            writer,
181            formatter,
182            options,
183            cmd.fetch_size,
184            cmd.max_rows,
185        )
186        .await;
187    }
188
189    let request = RemoteSqlRequest {
190        sql: sql.to_string(),
191        streaming: false,
192        fetch_size: cmd.fetch_size,
193        max_rows: cmd.max_rows,
194    };
195    let response: RemoteSqlResponse = tokio::select! {
196        result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
197            match result {
198                Ok(value) => value.map_err(map_client_error)?,
199                Err(_) => {
200                    let _ = send_cancel_request(client).await;
201                    return Err(CliError::Timeout(format!(
202                        "deadline exceeded after {}",
203                        humantime::format_duration(options.deadline.duration())
204                    )));
205                }
206            }
207        }
208        _ = options.cancel.wait() => {
209            let _ = send_cancel_request(client).await;
210            return Err(CliError::Cancelled);
211        }
212    };
213
214    if response.columns.is_empty() {
215        if options.quiet {
216            return Ok(());
217        }
218        let message = match response.affected_rows {
219            Some(count) => format!("{count} row(s) affected"),
220            None => "Operation completed successfully".to_string(),
221        };
222        let columns = sql_status_columns();
223        let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
224            .with_quiet(options.quiet);
225        streaming_writer.prepare(Some(1))?;
226        let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
227        streaming_writer.write_row(row)?;
228        return streaming_writer.finish();
229    }
230
231    let columns: Vec<Column> = response
232        .columns
233        .iter()
234        .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
235        .collect();
236    let mut streaming_writer =
237        StreamingWriter::new(writer, formatter, columns, options.limit).with_quiet(options.quiet);
238    streaming_writer.prepare(Some(response.rows.len()))?;
239    for row in response.rows {
240        if options.cancel.is_cancelled() {
241            let _ = send_cancel_request(client).await;
242            return Err(CliError::Cancelled);
243        }
244        options.deadline.check()?;
245        let values = row.into_iter().map(remote_value_to_value).collect();
246        match streaming_writer.write_row(Row::new(values))? {
247            WriteStatus::LimitReached => break,
248            WriteStatus::Continue => {}
249        }
250    }
251    streaming_writer.finish()
252}
253
254fn execute_tui_local_or_fallback<'a, W: Write>(
255    db: &Database,
256    sql: &str,
257    writer: &mut W,
258    formatter: Box<dyn Formatter>,
259    mut options: SqlExecutionOptions<'a>,
260) -> Result<()> {
261    if !is_tty() {
262        if !options.quiet {
263            eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
264        }
265        return execute_sql_with_formatter(db, sql, writer, formatter, &options);
266    }
267
268    let admin_launcher = options.admin_launcher.take();
269    match execute_tui_local(db, sql, &options, admin_launcher) {
270        Ok(()) => Ok(()),
271        Err(err) => {
272            if !options.quiet {
273                eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
274            }
275            execute_sql_with_formatter(db, sql, writer, formatter, &options)
276        }
277    }
278}
279
280fn execute_tui_local<'a>(
281    db: &Database,
282    sql: &str,
283    options: &SqlExecutionOptions<'a>,
284    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
285) -> Result<()> {
286    use alopex_sql::ExecutionResult;
287
288    options.deadline.check()?;
289    let result = db.execute_sql(sql)?;
290    options.deadline.check()?;
291
292    let (columns, rows) = match result {
293        ExecutionResult::Success => {
294            let columns = sql_status_columns();
295            let row = Row::new(vec![
296                Value::Text("OK".to_string()),
297                Value::Text("Operation completed successfully".to_string()),
298            ]);
299            (columns, vec![row])
300        }
301        ExecutionResult::RowsAffected(count) => {
302            let columns = sql_status_columns();
303            let row = Row::new(vec![
304                Value::Text("OK".to_string()),
305                Value::Text(format!("{count} row(s) affected")),
306            ]);
307            (columns, vec![row])
308        }
309        ExecutionResult::Query(query_result) => {
310            let columns = query_result
311                .columns
312                .iter()
313                .map(|col| Column::new(&col.name, DataType::Text))
314                .collect::<Vec<_>>();
315            let mut rows = Vec::with_capacity(query_result.rows.len());
316            for sql_row in query_result.rows {
317                let values = sql_row.into_iter().map(sql_value_to_value).collect();
318                rows.push(Row::new(values));
319            }
320            if let Some(limit) = options.limit {
321                rows.truncate(limit);
322            }
323            (columns, rows)
324        }
325    };
326
327    let app = TuiApp::new(columns, rows, "local", false)
328        .with_context_message(Some(sql_context_message(sql)))
329        .with_admin_launcher(admin_launcher);
330    app.run()
331}
332
333async fn execute_tui_remote_or_fallback<'a, W: Write>(
334    client: &HttpClient,
335    sql: &str,
336    cmd: &SqlCommand,
337    writer: &mut W,
338    formatter: Box<dyn Formatter>,
339    mut options: SqlExecutionOptions<'a>,
340) -> Result<()> {
341    if !is_tty() {
342        if !options.quiet {
343            eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
344        }
345        return execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options)
346            .await;
347    }
348
349    let admin_launcher = options.admin_launcher.take();
350    match execute_tui_remote(client, sql, cmd, &options, admin_launcher).await {
351        Ok(()) => Ok(()),
352        Err(err) => {
353            if !options.quiet {
354                eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
355            }
356            execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options).await
357        }
358    }
359}
360
361async fn execute_tui_remote<'a>(
362    client: &HttpClient,
363    sql: &str,
364    cmd: &SqlCommand,
365    options: &SqlExecutionOptions<'a>,
366    admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
367) -> Result<()> {
368    if is_select_query(sql)? {
369        let (columns, rows) =
370            collect_remote_streaming_rows(client, sql, options, cmd.fetch_size, cmd.max_rows)
371                .await?;
372        let app = TuiApp::new(columns, rows, "server", false)
373            .with_context_message(Some(sql_context_message(sql)))
374            .with_admin_launcher(admin_launcher);
375        return app.run();
376    }
377
378    let request = RemoteSqlRequest {
379        sql: sql.to_string(),
380        streaming: false,
381        fetch_size: cmd.fetch_size,
382        max_rows: cmd.max_rows,
383    };
384
385    let response: RemoteSqlResponse = tokio::select! {
386        result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
387            match result {
388                Ok(value) => value.map_err(map_client_error)?,
389                Err(_) => {
390                    let _ = send_cancel_request(client).await;
391                    return Err(CliError::Timeout(format!(
392                        "deadline exceeded after {}",
393                        humantime::format_duration(options.deadline.duration())
394                    )));
395                }
396            }
397        }
398        _ = options.cancel.wait() => {
399            let _ = send_cancel_request(client).await;
400            return Err(CliError::Cancelled);
401        }
402    };
403
404    let (columns, rows) = if response.columns.is_empty() {
405        let columns = sql_status_columns();
406        let message = match response.affected_rows {
407            Some(count) => format!("{count} row(s) affected"),
408            None => "Operation completed successfully".to_string(),
409        };
410        let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
411        (columns, vec![row])
412    } else {
413        let columns: Vec<Column> = response
414            .columns
415            .iter()
416            .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
417            .collect();
418        let mut rows = response
419            .rows
420            .into_iter()
421            .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
422            .collect::<Vec<_>>();
423        if let Some(limit) = options.limit {
424            rows.truncate(limit);
425        }
426        (columns, rows)
427    };
428
429    let app = TuiApp::new(columns, rows, "server", false)
430        .with_context_message(Some(sql_context_message(sql)))
431        .with_admin_launcher(admin_launcher);
432    app.run()
433}
434
435async fn collect_remote_streaming_rows(
436    client: &HttpClient,
437    sql: &str,
438    options: &SqlExecutionOptions<'_>,
439    fetch_size: Option<usize>,
440    max_rows: Option<usize>,
441) -> Result<(Vec<Column>, Vec<Row>)> {
442    let request = RemoteSqlRequest {
443        sql: sql.to_string(),
444        streaming: true,
445        fetch_size,
446        max_rows,
447    };
448
449    let response = tokio::select! {
450        result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
451            match result {
452                Ok(value) => value.map_err(map_client_error)?,
453                Err(_) => {
454                    let _ = send_cancel_request(client).await;
455                    return Err(CliError::Timeout(format!(
456                        "deadline exceeded after {}",
457                        humantime::format_duration(options.deadline.duration())
458                    )));
459                }
460            }
461        }
462        _ = options.cancel.wait() => {
463            let _ = send_cancel_request(client).await;
464            return Err(CliError::Cancelled);
465        }
466    };
467
468    let mut stream = response.bytes_stream();
469    let mut buffer: Vec<u8> = Vec::new();
470    let mut pos: usize = 0;
471    let mut done = false;
472    let mut saw_array_start = false;
473    let mut columns: Option<Vec<String>> = None;
474    let mut column_set: Option<HashSet<String>> = None;
475    let mut rows: Vec<Row> = Vec::new();
476
477    while !done {
478        if options.cancel.is_cancelled() {
479            let _ = send_cancel_request(client).await;
480            return Err(CliError::Cancelled);
481        }
482        if let Err(err) = options.deadline.check() {
483            let _ = send_cancel_request(client).await;
484            return Err(err);
485        }
486
487        let next = tokio::select! {
488            _ = options.cancel.wait() => {
489                let _ = send_cancel_request(client).await;
490                return Err(CliError::Cancelled);
491            }
492            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
493                match result {
494                    Ok(value) => value,
495                    Err(_) => {
496                        let _ = send_cancel_request(client).await;
497                        return Err(CliError::Timeout(format!(
498                            "deadline exceeded after {}",
499                            humantime::format_duration(options.deadline.duration())
500                        )));
501                    }
502                }
503            }
504        };
505
506        let chunk = match next {
507            Some(chunk) => chunk,
508            None => break,
509        };
510
511        let bytes = match chunk {
512            Ok(bytes) => bytes,
513            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
514        };
515
516        buffer.extend_from_slice(&bytes);
517
518        loop {
519            skip_whitespace(&buffer, &mut pos);
520            if pos >= buffer.len() {
521                break;
522            }
523
524            if !saw_array_start {
525                if buffer[pos] != b'[' {
526                    return Err(CliError::InvalidArgument(
527                        "Invalid streaming response: expected JSON array".into(),
528                    ));
529                }
530                pos += 1;
531                saw_array_start = true;
532                continue;
533            }
534
535            skip_whitespace(&buffer, &mut pos);
536            if pos >= buffer.len() {
537                break;
538            }
539
540            if buffer[pos] == b']' {
541                pos += 1;
542                done = true;
543                break;
544            }
545
546            let slice = &buffer[pos..];
547            let mut stream =
548                serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
549            let value = match stream.next() {
550                Some(Ok(value)) => value,
551                Some(Err(err)) if err.is_eof() => break,
552                Some(Err(err)) => return Err(CliError::Json(err)),
553                None => break,
554            };
555            pos = pos.saturating_add(stream.byte_offset());
556
557            let object = value.as_object().ok_or_else(|| {
558                CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
559            })?;
560
561            if columns.is_none() {
562                let names: Vec<String> = object.keys().cloned().collect();
563                let set: HashSet<String> = names.iter().cloned().collect();
564                if names.is_empty() {
565                    return Err(CliError::InvalidArgument(
566                        "Invalid streaming row: empty object".into(),
567                    ));
568                }
569                columns = Some(names);
570                column_set = Some(set);
571            }
572
573            let names = columns
574                .as_ref()
575                .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
576            let set = column_set
577                .as_ref()
578                .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
579
580            if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
581                return Err(CliError::InvalidArgument(
582                    "Invalid streaming row: column mismatch".into(),
583                ));
584            }
585
586            let values = names
587                .iter()
588                .map(|name| {
589                    object.get(name).ok_or_else(|| {
590                        CliError::InvalidArgument(format!(
591                            "Invalid streaming row: missing column '{name}'"
592                        ))
593                    })
594                })
595                .map(|value| value.and_then(json_value_to_value))
596                .collect::<Result<Vec<_>>>()?;
597            rows.push(Row::new(values));
598
599            if let Some(limit) = options.limit {
600                if rows.len() >= limit {
601                    let _ = send_cancel_request(client).await;
602                    done = true;
603                    break;
604                }
605            }
606
607            skip_whitespace(&buffer, &mut pos);
608            if pos >= buffer.len() {
609                break;
610            }
611            match buffer[pos] {
612                b',' => {
613                    pos += 1;
614                }
615                b']' => {
616                    pos += 1;
617                    done = true;
618                    break;
619                }
620                _ => {
621                    return Err(CliError::InvalidArgument(
622                        "Invalid streaming response: expected ',' or ']'".into(),
623                    ))
624                }
625            }
626        }
627
628        if pos > 0 {
629            buffer.drain(..pos);
630            pos = 0;
631        }
632    }
633
634    if done {
635        if has_non_whitespace(&buffer) {
636            return Err(CliError::InvalidArgument(
637                "Invalid streaming response: unexpected trailing data".into(),
638            ));
639        }
640        buffer.clear();
641        loop {
642            let next = tokio::select! {
643                _ = options.cancel.wait() => {
644                    let _ = send_cancel_request(client).await;
645                    return Err(CliError::Cancelled);
646                }
647                result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
648                    match result {
649                        Ok(value) => value,
650                        Err(_) => {
651                            let _ = send_cancel_request(client).await;
652                            return Err(CliError::Timeout(format!(
653                                "deadline exceeded after {}",
654                                humantime::format_duration(options.deadline.duration())
655                            )));
656                        }
657                    }
658                }
659            };
660
661            let chunk = match next {
662                Some(chunk) => chunk,
663                None => break,
664            };
665
666            let bytes = match chunk {
667                Ok(bytes) => bytes,
668                Err(err) => {
669                    return Err(CliError::ServerConnection(format!("request failed: {err}")))
670                }
671            };
672
673            if has_non_whitespace(&bytes) {
674                return Err(CliError::InvalidArgument(
675                    "Invalid streaming response: unexpected trailing data".into(),
676                ));
677            }
678        }
679    } else {
680        skip_whitespace(&buffer, &mut pos);
681        if pos < buffer.len() {
682            return Err(CliError::InvalidArgument(
683                "Invalid streaming response: unexpected trailing data".into(),
684            ));
685        }
686        return Err(CliError::InvalidArgument(
687            "Invalid streaming response: unexpected end of stream".into(),
688        ));
689    }
690
691    let columns = columns
692        .unwrap_or_default()
693        .into_iter()
694        .map(|name| Column::new(name, DataType::Text))
695        .collect();
696    Ok((columns, rows))
697}
698
699async fn execute_remote_streaming<W: Write>(
700    client: &HttpClient,
701    sql: &str,
702    writer: &mut W,
703    formatter: Box<dyn Formatter>,
704    options: &SqlExecutionOptions<'_>,
705    fetch_size: Option<usize>,
706    max_rows: Option<usize>,
707) -> Result<()> {
708    let request = RemoteSqlRequest {
709        sql: sql.to_string(),
710        streaming: true,
711        fetch_size,
712        max_rows,
713    };
714
715    let response = tokio::select! {
716        result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
717            match result {
718                Ok(value) => value.map_err(map_client_error)?,
719                Err(_) => {
720                    let _ = send_cancel_request(client).await;
721                    return Err(CliError::Timeout(format!(
722                        "deadline exceeded after {}",
723                        humantime::format_duration(options.deadline.duration())
724                    )));
725                }
726            }
727        }
728        _ = options.cancel.wait() => {
729            let _ = send_cancel_request(client).await;
730            return Err(CliError::Cancelled);
731        }
732    };
733
734    let mut stream = response.bytes_stream();
735    let mut buffer: Vec<u8> = Vec::new();
736    let mut pos: usize = 0;
737    let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
738    let mut formatter = Some(formatter);
739    let mut columns: Option<Vec<String>> = None;
740    let mut column_set: Option<HashSet<String>> = None;
741    let mut done = false;
742    let mut saw_array_start = false;
743
744    while !done {
745        if options.cancel.is_cancelled() {
746            let _ = send_cancel_request(client).await;
747            return Err(CliError::Cancelled);
748        }
749        if let Err(err) = options.deadline.check() {
750            let _ = send_cancel_request(client).await;
751            return Err(err);
752        }
753
754        let next = tokio::select! {
755            _ = options.cancel.wait() => {
756                let _ = send_cancel_request(client).await;
757                return Err(CliError::Cancelled);
758            }
759            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
760                match result {
761                    Ok(value) => value,
762                    Err(_) => {
763                        let _ = send_cancel_request(client).await;
764                        return Err(CliError::Timeout(format!(
765                            "deadline exceeded after {}",
766                            humantime::format_duration(options.deadline.duration())
767                        )));
768                    }
769                }
770            }
771        };
772
773        let chunk = match next {
774            Some(chunk) => chunk,
775            None => break,
776        };
777
778        let bytes = match chunk {
779            Ok(bytes) => bytes,
780            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
781        };
782
783        buffer.extend_from_slice(&bytes);
784
785        loop {
786            skip_whitespace(&buffer, &mut pos);
787            if pos >= buffer.len() {
788                break;
789            }
790
791            if !saw_array_start {
792                if buffer[pos] != b'[' {
793                    return Err(CliError::InvalidArgument(
794                        "Invalid streaming response: expected JSON array".into(),
795                    ));
796                }
797                pos += 1;
798                saw_array_start = true;
799                continue;
800            }
801
802            skip_whitespace(&buffer, &mut pos);
803            if pos >= buffer.len() {
804                break;
805            }
806
807            if buffer[pos] == b']' {
808                pos += 1;
809                done = true;
810                break;
811            }
812
813            let slice = &buffer[pos..];
814            let mut stream =
815                serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
816            let value = match stream.next() {
817                Some(Ok(value)) => value,
818                Some(Err(err)) if err.is_eof() => break,
819                Some(Err(err)) => return Err(CliError::Json(err)),
820                None => break,
821            };
822            pos = pos.saturating_add(stream.byte_offset());
823
824            let object = value.as_object().ok_or_else(|| {
825                CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
826            })?;
827
828            if columns.is_none() {
829                let names: Vec<String> = object.keys().cloned().collect();
830                let set: HashSet<String> = names.iter().cloned().collect();
831                if names.is_empty() {
832                    return Err(CliError::InvalidArgument(
833                        "Invalid streaming row: empty object".into(),
834                    ));
835                }
836                let cols = names
837                    .iter()
838                    .map(|name| Column::new(name, DataType::Text))
839                    .collect::<Vec<_>>();
840                let formatter = formatter
841                    .take()
842                    .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
843                let mut writer = StreamingWriter::new(&mut *writer, formatter, cols, options.limit)
844                    .with_quiet(options.quiet);
845                writer.prepare(None)?;
846                streaming_writer = Some(writer);
847                columns = Some(names);
848                column_set = Some(set);
849            }
850
851            let names = columns
852                .as_ref()
853                .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
854            let set = column_set
855                .as_ref()
856                .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
857
858            if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
859                return Err(CliError::InvalidArgument(
860                    "Invalid streaming row: column mismatch".into(),
861                ));
862            }
863
864            let values = names
865                .iter()
866                .map(|name| {
867                    object.get(name).ok_or_else(|| {
868                        CliError::InvalidArgument(format!(
869                            "Invalid streaming row: missing column '{name}'"
870                        ))
871                    })
872                })
873                .map(|value| value.and_then(json_value_to_value))
874                .collect::<Result<Vec<_>>>()?;
875
876            if let Some(writer) = streaming_writer.as_mut() {
877                match writer.write_row(Row::new(values))? {
878                    WriteStatus::LimitReached => {
879                        let _ = send_cancel_request(client).await;
880                        return writer.finish();
881                    }
882                    WriteStatus::Continue => {}
883                }
884            }
885
886            skip_whitespace(&buffer, &mut pos);
887            if pos >= buffer.len() {
888                break;
889            }
890            match buffer[pos] {
891                b',' => {
892                    pos += 1;
893                }
894                b']' => {
895                    pos += 1;
896                    done = true;
897                    break;
898                }
899                _ => {
900                    return Err(CliError::InvalidArgument(
901                        "Invalid streaming response: expected ',' or ']'".into(),
902                    ))
903                }
904            }
905        }
906
907        if pos > 0 {
908            buffer.drain(..pos);
909            pos = 0;
910        }
911    }
912
913    if done {
914        if has_non_whitespace(&buffer) {
915            return Err(CliError::InvalidArgument(
916                "Invalid streaming response: unexpected trailing data".into(),
917            ));
918        }
919        buffer.clear();
920        loop {
921            let next = tokio::select! {
922                _ = options.cancel.wait() => {
923                    let _ = send_cancel_request(client).await;
924                    return Err(CliError::Cancelled);
925                }
926                result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
927                    match result {
928                        Ok(value) => value,
929                        Err(_) => {
930                            let _ = send_cancel_request(client).await;
931                            return Err(CliError::Timeout(format!(
932                                "deadline exceeded after {}",
933                                humantime::format_duration(options.deadline.duration())
934                            )));
935                        }
936                    }
937                }
938            };
939
940            let chunk = match next {
941                Some(chunk) => chunk,
942                None => break,
943            };
944
945            let bytes = match chunk {
946                Ok(bytes) => bytes,
947                Err(err) => {
948                    return Err(CliError::ServerConnection(format!("request failed: {err}")))
949                }
950            };
951
952            if has_non_whitespace(&bytes) {
953                return Err(CliError::InvalidArgument(
954                    "Invalid streaming response: unexpected trailing data".into(),
955                ));
956            }
957        }
958    } else {
959        skip_whitespace(&buffer, &mut pos);
960        if pos < buffer.len() {
961            return Err(CliError::InvalidArgument(
962                "Invalid streaming response: unexpected trailing data".into(),
963            ));
964        }
965        return Err(CliError::InvalidArgument(
966            "Invalid streaming response: unexpected end of stream".into(),
967        ));
968    }
969
970    if let Some(mut writer) = streaming_writer {
971        return writer.finish();
972    }
973
974    if done && saw_array_start {
975        if let Some(formatter) = formatter.take() {
976            let mut writer =
977                StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
978                    .with_quiet(options.quiet);
979            writer.prepare(None)?;
980            return writer.finish();
981        }
982    }
983
984    Ok(())
985}
986
987fn skip_whitespace(buffer: &[u8], pos: &mut usize) {
988    while *pos < buffer.len() {
989        match buffer[*pos] {
990            b' ' | b'\n' | b'\r' | b'\t' => *pos += 1,
991            _ => break,
992        }
993    }
994}
995
996fn has_non_whitespace(buffer: &[u8]) -> bool {
997    buffer
998        .iter()
999        .any(|byte| !matches!(byte, b' ' | b'\n' | b'\r' | b'\t'))
1000}
1001
1002fn json_value_to_value(value: &serde_json::Value) -> Result<Value> {
1003    match value {
1004        serde_json::Value::Null => Ok(Value::Null),
1005        serde_json::Value::Bool(value) => Ok(Value::Bool(*value)),
1006        serde_json::Value::Number(value) => {
1007            if let Some(value) = value.as_i64() {
1008                Ok(Value::Int(value))
1009            } else if let Some(value) = value.as_f64() {
1010                Ok(Value::Float(value))
1011            } else {
1012                Err(CliError::InvalidArgument(
1013                    "Invalid numeric value in streaming row".into(),
1014                ))
1015            }
1016        }
1017        serde_json::Value::String(value) => Ok(Value::Text(value.clone())),
1018        serde_json::Value::Array(values) => {
1019            let mut vector = Vec::with_capacity(values.len());
1020            for entry in values {
1021                let number = entry.as_f64().ok_or_else(|| {
1022                    CliError::InvalidArgument("Invalid vector value in streaming row".into())
1023                })?;
1024                vector.push(number as f32);
1025            }
1026            Ok(Value::Vector(vector))
1027        }
1028        serde_json::Value::Object(_) => Err(CliError::InvalidArgument(
1029            "Invalid streaming row: nested objects are not supported".into(),
1030        )),
1031    }
1032}
1033
1034/// Legacy execute function for backward compatibility with tests.
1035#[allow(dead_code)]
1036pub fn execute<W: Write>(
1037    db: &Database,
1038    cmd: SqlCommand,
1039    batch_mode: &BatchMode,
1040    writer: &mut StreamingWriter<W>,
1041) -> Result<()> {
1042    let sql = cmd.resolve_query(batch_mode)?;
1043
1044    execute_sql(db, &sql, writer)
1045}
1046
1047impl SqlCommand {
1048    /// Resolve the SQL query source (argument, file, or stdin).
1049    pub fn resolve_query(&self, batch_mode: &BatchMode) -> Result<String> {
1050        match (&self.query, &self.file) {
1051            (Some(query), None) => Ok(query.clone()),
1052            (None, Some(file)) => fs::read_to_string(file).map_err(|e| {
1053                CliError::InvalidArgument(format!("Failed to read SQL file '{}': {}", file, e))
1054            }),
1055            (None, None) if !batch_mode.is_tty => {
1056                let mut buf = String::new();
1057                io::stdin().read_to_string(&mut buf)?;
1058                Ok(buf)
1059            }
1060            (None, None) => Err(CliError::NoQueryProvided),
1061            (Some(_), Some(_)) => Err(CliError::InvalidArgument(
1062                "Cannot specify both query and file".to_string(),
1063            )),
1064        }
1065    }
1066}
1067
1068/// Execute SQL and write results.
1069fn execute_sql<W: Write>(db: &Database, sql: &str, writer: &mut StreamingWriter<W>) -> Result<()> {
1070    use alopex_embedded::SqlResult;
1071
1072    let result = db.execute_sql(sql)?;
1073
1074    match result {
1075        SqlResult::Success => {
1076            // DDL success - output simple status
1077            writer.prepare(Some(1))?;
1078            let row = Row::new(vec![
1079                Value::Text("OK".to_string()),
1080                Value::Text("Operation completed successfully".to_string()),
1081            ]);
1082            writer.write_row(row)?;
1083            writer.finish()?;
1084        }
1085        SqlResult::RowsAffected(count) => {
1086            // DML success - output affected rows count
1087            writer.prepare(Some(1))?;
1088            let row = Row::new(vec![
1089                Value::Text("OK".to_string()),
1090                Value::Text(format!("{} row(s) affected", count)),
1091            ]);
1092            writer.write_row(row)?;
1093            writer.finish()?;
1094        }
1095        SqlResult::Query(query_result) => {
1096            // SELECT result - output rows
1097            let row_count = query_result.rows.len();
1098            writer.prepare(Some(row_count))?;
1099
1100            for sql_row in query_result.rows {
1101                let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1102                let row = Row::new(values);
1103
1104                match writer.write_row(row)? {
1105                    WriteStatus::LimitReached => break,
1106                    WriteStatus::Continue => {}
1107                }
1108            }
1109
1110            writer.finish()?;
1111        }
1112    }
1113
1114    Ok(())
1115}
1116
1117/// Execute SQL with formatter, dynamically determining columns from query result.
1118///
1119/// This function executes the SQL using the streaming API for FR-7 compliance,
1120/// then creates the StreamingWriter with the correct columns based on the result type
1121/// (status columns for DDL/DML, query result columns for SELECT).
1122///
1123/// FR-7 Compliance: Uses SQL parser to detect SELECT queries instead of heuristic.
1124/// This properly handles:
1125/// - WITH clauses (CTEs)
1126/// - Leading comments
1127/// - Complex query structures
1128fn execute_sql_with_formatter<W: Write>(
1129    db: &Database,
1130    sql: &str,
1131    writer: &mut W,
1132    formatter: Box<dyn Formatter>,
1133    options: &SqlExecutionOptions<'_>,
1134) -> Result<()> {
1135    use alopex_sql::{AlopexDialect, Parser, StatementKind};
1136
1137    // FR-7: Use parser to detect SELECT instead of starts_with("SELECT") heuristic
1138    // This correctly handles WITH clauses, leading comments, and complex query structures
1139    let dialect = AlopexDialect;
1140    let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
1141
1142    let is_select = stmts.len() == 1
1143        && matches!(
1144            stmts.first().map(|s| &s.kind),
1145            Some(StatementKind::Select(_))
1146        );
1147
1148    if is_select {
1149        // SELECT: use streaming path (FR-7)
1150        execute_sql_select_streaming(db, sql, writer, formatter, options)
1151    } else {
1152        // DDL/DML: use standard path
1153        execute_sql_ddl_dml(db, sql, writer, formatter, options)
1154    }
1155}
1156
1157/// Execute SELECT query with streaming callback (FR-7).
1158///
1159/// This function uses `execute_sql_with_rows` for true streaming output.
1160/// The callback receives rows one at a time from the iterator, and the
1161/// transaction is kept alive during streaming.
1162fn execute_sql_select_streaming<W: Write>(
1163    db: &Database,
1164    sql: &str,
1165    writer: &mut W,
1166    formatter: Box<dyn Formatter>,
1167    options: &SqlExecutionOptions<'_>,
1168) -> Result<()> {
1169    use alopex_embedded::StreamingQueryResult;
1170    use std::sync::atomic::{AtomicBool, Ordering};
1171    use std::sync::Arc;
1172
1173    // Helper to convert CliError to alopex_embedded::Error for callback
1174    fn cli_err_to_embedded(e: crate::error::CliError) -> alopex_embedded::Error {
1175        alopex_embedded::Error::Sql(alopex_sql::SqlError::Execution {
1176            message: e.to_string(),
1177            code: "ALOPEX-C001",
1178        })
1179    }
1180
1181    let cancelled = Arc::new(AtomicBool::new(false));
1182    let timed_out = Arc::new(AtomicBool::new(false));
1183    let cancel_flag = cancelled.clone();
1184    let timeout_flag = timed_out.clone();
1185
1186    let result = db.execute_sql_with_rows(sql, |mut rows| {
1187        // FR-7: SELECT result - stream rows directly from iterator while transaction is alive
1188        let columns = columns_from_streaming_rows(&rows);
1189        let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
1190            .with_quiet(options.quiet);
1191
1192        // FR-7: Use None for row count hint to support true streaming output
1193        streaming_writer
1194            .prepare(None)
1195            .map_err(cli_err_to_embedded)?;
1196
1197        if let Err(err) = options.deadline.check() {
1198            timeout_flag.store(true, Ordering::SeqCst);
1199            return Err(cli_err_to_embedded(err));
1200        }
1201
1202        // Consume iterator row by row for true streaming
1203        while let Ok(Some(sql_row)) = rows.next_row() {
1204            if options.cancel.is_cancelled() {
1205                cancel_flag.store(true, Ordering::SeqCst);
1206                return Err(cli_err_to_embedded(CliError::Cancelled));
1207            }
1208            if let Err(err) = options.deadline.check() {
1209                timeout_flag.store(true, Ordering::SeqCst);
1210                return Err(cli_err_to_embedded(err));
1211            }
1212            let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1213            let row = Row::new(values);
1214
1215            match streaming_writer
1216                .write_row(row)
1217                .map_err(cli_err_to_embedded)?
1218            {
1219                WriteStatus::LimitReached => break,
1220                WriteStatus::Continue => {}
1221            }
1222        }
1223
1224        streaming_writer.finish().map_err(cli_err_to_embedded)?;
1225        Ok(())
1226    });
1227
1228    let result = match result {
1229        Ok(value) => value,
1230        Err(err) => {
1231            if cancelled.load(Ordering::SeqCst) {
1232                return Err(CliError::Cancelled);
1233            }
1234            if timed_out.load(Ordering::SeqCst) {
1235                return Err(CliError::Timeout(format!(
1236                    "deadline exceeded after {}",
1237                    humantime::format_duration(options.deadline.duration())
1238                )));
1239            }
1240            return Err(CliError::Database(err));
1241        }
1242    };
1243
1244    match result {
1245        StreamingQueryResult::QueryProcessed(()) => Ok(()),
1246        StreamingQueryResult::Success | StreamingQueryResult::RowsAffected(_) => {
1247            // Unexpected: SELECT should not return these
1248            Ok(())
1249        }
1250    }
1251}
1252
1253#[derive(serde::Serialize)]
1254struct RemoteSqlRequest {
1255    sql: String,
1256    #[serde(default)]
1257    streaming: bool,
1258    #[serde(skip_serializing_if = "Option::is_none")]
1259    fetch_size: Option<usize>,
1260    #[serde(skip_serializing_if = "Option::is_none")]
1261    max_rows: Option<usize>,
1262}
1263
1264#[derive(serde::Deserialize)]
1265struct RemoteColumnInfo {
1266    name: String,
1267    data_type: String,
1268}
1269
1270#[derive(serde::Deserialize)]
1271struct RemoteSqlResponse {
1272    columns: Vec<RemoteColumnInfo>,
1273    rows: Vec<Vec<alopex_sql::storage::SqlValue>>,
1274    affected_rows: Option<u64>,
1275}
1276
1277fn map_client_error(err: ClientError) -> CliError {
1278    match err {
1279        ClientError::Request { source, .. } => {
1280            CliError::ServerConnection(format!("request failed: {source}"))
1281        }
1282        ClientError::InvalidUrl(message) => CliError::InvalidArgument(message),
1283        ClientError::Build(message) => CliError::InvalidArgument(message),
1284        ClientError::Auth(err) => CliError::InvalidArgument(err.to_string()),
1285        ClientError::HttpStatus { status, body } => {
1286            CliError::InvalidArgument(format!("Server error: HTTP {} - {}", status.as_u16(), body))
1287        }
1288    }
1289}
1290
1291async fn send_cancel_request(client: &HttpClient) -> Result<()> {
1292    #[derive(serde::Serialize)]
1293    struct CancelRequest {}
1294
1295    let request = CancelRequest {};
1296    let _: serde_json::Value = client
1297        .post_json("api/sql/cancel", &request)
1298        .await
1299        .map_err(map_client_error)?;
1300    Ok(())
1301}
1302
1303fn merge_limit(limit: Option<usize>, max_rows: Option<usize>) -> Option<usize> {
1304    match (limit, max_rows) {
1305        (Some(a), Some(b)) => Some(a.min(b)),
1306        (Some(value), None) | (None, Some(value)) => Some(value),
1307        (None, None) => None,
1308    }
1309}
1310
1311/// Execute DDL/DML query (non-SELECT statements).
1312///
1313/// This function handles CREATE, DROP, INSERT, UPDATE, DELETE and other
1314/// non-SELECT statements. It outputs status messages (OK, rows affected).
1315fn execute_sql_ddl_dml<W: Write>(
1316    db: &Database,
1317    sql: &str,
1318    writer: &mut W,
1319    formatter: Box<dyn Formatter>,
1320    options: &SqlExecutionOptions<'_>,
1321) -> Result<()> {
1322    use alopex_sql::ExecutionResult;
1323
1324    options.deadline.check()?;
1325    let result = db.execute_sql(sql)?;
1326    options.deadline.check()?;
1327
1328    match result {
1329        ExecutionResult::Success => {
1330            // DDL success - suppress status output in quiet mode
1331            if !options.quiet {
1332                let columns = sql_status_columns();
1333                let mut streaming_writer =
1334                    StreamingWriter::new(writer, formatter, columns, options.limit)
1335                        .with_quiet(options.quiet);
1336                streaming_writer.prepare(Some(1))?;
1337                let row = Row::new(vec![
1338                    Value::Text("OK".to_string()),
1339                    Value::Text("Operation completed successfully".to_string()),
1340                ]);
1341                streaming_writer.write_row(row)?;
1342                streaming_writer.finish()?;
1343            }
1344        }
1345        ExecutionResult::RowsAffected(count) => {
1346            // DML success - suppress status output in quiet mode
1347            if !options.quiet {
1348                let columns = sql_status_columns();
1349                let mut streaming_writer =
1350                    StreamingWriter::new(writer, formatter, columns, options.limit)
1351                        .with_quiet(options.quiet);
1352                streaming_writer.prepare(Some(1))?;
1353                let row = Row::new(vec![
1354                    Value::Text("OK".to_string()),
1355                    Value::Text(format!("{} row(s) affected", count)),
1356                ]);
1357                streaming_writer.write_row(row)?;
1358                streaming_writer.finish()?;
1359            }
1360        }
1361        ExecutionResult::Query(query_result) => {
1362            // Unexpected: non-SELECT should not return Query result
1363            // But handle it gracefully by outputting the result
1364            let columns = columns_from_query_result(&query_result);
1365            let mut streaming_writer =
1366                StreamingWriter::new(writer, formatter, columns, options.limit)
1367                    .with_quiet(options.quiet);
1368            streaming_writer.prepare(Some(query_result.rows.len()))?;
1369            for sql_row in query_result.rows {
1370                let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1371                let row = Row::new(values);
1372                match streaming_writer.write_row(row)? {
1373                    WriteStatus::LimitReached => break,
1374                    WriteStatus::Continue => {}
1375                }
1376            }
1377            streaming_writer.finish()?;
1378        }
1379    }
1380
1381    Ok(())
1382}
1383
1384/// Convert alopex_sql::SqlValue to our Value type.
1385fn sql_value_to_value(sql_value: alopex_sql::SqlValue) -> Value {
1386    use alopex_sql::SqlValue;
1387
1388    match sql_value {
1389        SqlValue::Null => Value::Null,
1390        SqlValue::Integer(i) => Value::Int(i as i64),
1391        SqlValue::BigInt(i) => Value::Int(i),
1392        SqlValue::Float(f) => Value::Float(f as f64),
1393        SqlValue::Double(f) => Value::Float(f),
1394        SqlValue::Text(s) => Value::Text(s),
1395        SqlValue::Blob(b) => Value::Bytes(b),
1396        SqlValue::Boolean(b) => Value::Bool(b),
1397        SqlValue::Timestamp(ts) => {
1398            // Format timestamp as ISO 8601 string
1399            Value::Text(format!("{}", ts))
1400        }
1401        SqlValue::Vector(v) => Value::Vector(v),
1402    }
1403}
1404
1405fn remote_value_to_value(sql_value: alopex_sql::storage::SqlValue) -> Value {
1406    use alopex_sql::storage::SqlValue;
1407
1408    match sql_value {
1409        SqlValue::Null => Value::Null,
1410        SqlValue::Integer(i) => Value::Int(i as i64),
1411        SqlValue::BigInt(i) => Value::Int(i),
1412        SqlValue::Float(f) => Value::Float(f as f64),
1413        SqlValue::Double(f) => Value::Float(f),
1414        SqlValue::Text(s) => Value::Text(s),
1415        SqlValue::Blob(b) => Value::Bytes(b),
1416        SqlValue::Boolean(b) => Value::Bool(b),
1417        SqlValue::Timestamp(ts) => Value::Text(ts.to_string()),
1418        SqlValue::Vector(v) => Value::Vector(v),
1419    }
1420}
1421
1422fn data_type_from_string(value: &str) -> DataType {
1423    let upper = value.to_ascii_uppercase();
1424    if upper.starts_with("INT") || upper.starts_with("BIGINT") {
1425        DataType::Int
1426    } else if upper.starts_with("FLOAT") || upper.starts_with("DOUBLE") {
1427        DataType::Float
1428    } else if upper.starts_with("BLOB") {
1429        DataType::Bytes
1430    } else if upper.starts_with("BOOLEAN") {
1431        DataType::Bool
1432    } else if upper.starts_with("VECTOR") {
1433        DataType::Vector
1434    } else {
1435        DataType::Text
1436    }
1437}
1438
1439/// Convert alopex_sql::executor::ColumnInfo to our Column type.
1440fn sql_column_to_column(col: &alopex_sql::executor::ColumnInfo) -> Column {
1441    use alopex_sql::planner::ResolvedType;
1442
1443    let data_type = match &col.data_type {
1444        ResolvedType::Integer | ResolvedType::BigInt => DataType::Int,
1445        ResolvedType::Float | ResolvedType::Double => DataType::Float,
1446        ResolvedType::Text => DataType::Text,
1447        ResolvedType::Blob => DataType::Bytes,
1448        ResolvedType::Boolean => DataType::Bool,
1449        ResolvedType::Timestamp => DataType::Text, // Display as text
1450        ResolvedType::Vector { .. } => DataType::Vector,
1451        ResolvedType::Null => DataType::Text, // Fallback
1452    };
1453
1454    Column::new(&col.name, data_type)
1455}
1456
1457/// Create columns from SQL query result.
1458#[allow(dead_code)] // Used by tests with legacy execute_sql function
1459fn columns_from_query_result(query_result: &alopex_sql::executor::QueryResult) -> Vec<Column> {
1460    query_result
1461        .columns
1462        .iter()
1463        .map(sql_column_to_column)
1464        .collect()
1465}
1466
1467/// Create columns from streaming query result iterator (FR-7).
1468#[allow(dead_code)] // Kept for potential future use with old streaming API
1469fn columns_from_streaming_result(
1470    query_iter: &alopex_embedded::QueryRowIterator<'_>,
1471) -> Vec<Column> {
1472    query_iter
1473        .columns()
1474        .iter()
1475        .map(sql_column_to_column)
1476        .collect()
1477}
1478
1479/// Create columns from callback-based streaming rows (FR-7).
1480fn columns_from_streaming_rows(rows: &alopex_embedded::StreamingRows<'_>) -> Vec<Column> {
1481    rows.columns().iter().map(sql_column_to_column).collect()
1482}
1483
1484/// Create columns for status output.
1485pub fn sql_status_columns() -> Vec<Column> {
1486    vec![
1487        Column::new("status", DataType::Text),
1488        Column::new("message", DataType::Text),
1489    ]
1490}
1491
1492#[cfg(test)]
1493mod tests {
1494    use super::*;
1495    use crate::batch::BatchModeSource;
1496    use crate::output::jsonl::JsonlFormatter;
1497
1498    fn create_test_db() -> Database {
1499        Database::open_in_memory().unwrap()
1500    }
1501
1502    fn create_status_writer(output: &mut Vec<u8>) -> StreamingWriter<&mut Vec<u8>> {
1503        let formatter = Box::new(JsonlFormatter::new());
1504        let columns = sql_status_columns();
1505        StreamingWriter::new(output, formatter, columns, None)
1506    }
1507
1508    fn create_query_writer(
1509        output: &mut Vec<u8>,
1510        columns: Vec<Column>,
1511    ) -> StreamingWriter<&mut Vec<u8>> {
1512        let formatter = Box::new(JsonlFormatter::new());
1513        StreamingWriter::new(output, formatter, columns, None)
1514    }
1515
1516    fn default_batch_mode() -> BatchMode {
1517        BatchMode {
1518            is_batch: false,
1519            is_tty: true,
1520            source: BatchModeSource::Default,
1521        }
1522    }
1523
1524    #[test]
1525    fn test_create_table() {
1526        let db = create_test_db();
1527
1528        let mut output = Vec::new();
1529        {
1530            let mut writer = create_status_writer(&mut output);
1531            execute_sql(
1532                &db,
1533                "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1534                &mut writer,
1535            )
1536            .unwrap();
1537        }
1538
1539        let result = String::from_utf8(output).unwrap();
1540        assert!(result.contains("OK"));
1541    }
1542
1543    #[test]
1544    fn test_insert_and_select() {
1545        let db = create_test_db();
1546
1547        // Create table
1548        {
1549            let mut output = Vec::new();
1550            let mut writer = create_status_writer(&mut output);
1551            execute_sql(
1552                &db,
1553                "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1554                &mut writer,
1555            )
1556            .unwrap();
1557        }
1558
1559        // Insert
1560        {
1561            let mut output = Vec::new();
1562            let mut writer = create_status_writer(&mut output);
1563            execute_sql(
1564                &db,
1565                "INSERT INTO users (id, name) VALUES (1, 'Alice');",
1566                &mut writer,
1567            )
1568            .unwrap();
1569            let result = String::from_utf8(output).unwrap();
1570            assert!(result.contains("row(s) affected"));
1571        }
1572
1573        // Select - we need columns from the query result
1574        {
1575            let mut output = Vec::new();
1576            let columns = vec![
1577                Column::new("id", DataType::Int),
1578                Column::new("name", DataType::Text),
1579            ];
1580            let mut writer = create_query_writer(&mut output, columns);
1581            execute_sql(&db, "SELECT id, name FROM users;", &mut writer).unwrap();
1582            let result = String::from_utf8(output).unwrap();
1583            assert!(result.contains("Alice"));
1584        }
1585    }
1586
1587    #[test]
1588    fn test_syntax_error() {
1589        let db = create_test_db();
1590
1591        let mut output = Vec::new();
1592        let mut writer = create_status_writer(&mut output);
1593        let result = execute_sql(&db, "CREATE TABEL invalid_syntax;", &mut writer);
1594        assert!(result.is_err());
1595    }
1596
1597    #[test]
1598    fn test_multiple_statements() {
1599        let db = create_test_db();
1600
1601        let mut output = Vec::new();
1602        {
1603            let mut writer = create_status_writer(&mut output);
1604            execute_sql(
1605                &db,
1606                "CREATE TABLE t (id INTEGER PRIMARY KEY); INSERT INTO t (id) VALUES (1);",
1607                &mut writer,
1608            )
1609            .unwrap();
1610        }
1611
1612        // Verify the table exists and has data
1613        {
1614            let mut output = Vec::new();
1615            let columns = vec![Column::new("id", DataType::Int)];
1616            let mut writer = create_query_writer(&mut output, columns);
1617            execute_sql(&db, "SELECT id FROM t;", &mut writer).unwrap();
1618            let result = String::from_utf8(output).unwrap();
1619            assert!(result.contains("1"));
1620        }
1621    }
1622
1623    #[test]
1624    fn test_sql_value_conversion() {
1625        use alopex_sql::SqlValue;
1626
1627        assert!(matches!(sql_value_to_value(SqlValue::Null), Value::Null));
1628        assert!(matches!(
1629            sql_value_to_value(SqlValue::Integer(42)),
1630            Value::Int(42)
1631        ));
1632        assert!(matches!(
1633            sql_value_to_value(SqlValue::BigInt(100)),
1634            Value::Int(100)
1635        ));
1636        assert!(matches!(
1637            sql_value_to_value(SqlValue::Boolean(true)),
1638            Value::Bool(true)
1639        ));
1640        assert!(
1641            matches!(sql_value_to_value(SqlValue::Text("hello".to_string())), Value::Text(s) if s == "hello")
1642        );
1643    }
1644
1645    #[test]
1646    fn resolve_query_from_argument() {
1647        let cmd = SqlCommand {
1648            query: Some("SELECT 1".to_string()),
1649            file: None,
1650            fetch_size: None,
1651            max_rows: None,
1652            deadline: None,
1653            tui: false,
1654        };
1655
1656        let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1657        assert_eq!(sql, "SELECT 1");
1658    }
1659
1660    #[test]
1661    fn resolve_query_from_file() {
1662        let mut file = tempfile::NamedTempFile::new().unwrap();
1663        writeln!(file, "SELECT * FROM users").unwrap();
1664
1665        let cmd = SqlCommand {
1666            query: None,
1667            file: Some(file.path().display().to_string()),
1668            fetch_size: None,
1669            max_rows: None,
1670            deadline: None,
1671            tui: false,
1672        };
1673
1674        let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1675        assert_eq!(sql, "SELECT * FROM users\n");
1676    }
1677
1678    #[test]
1679    fn resolve_query_returns_no_query_error() {
1680        let cmd = SqlCommand {
1681            query: None,
1682            file: None,
1683            fetch_size: None,
1684            max_rows: None,
1685            deadline: None,
1686            tui: false,
1687        };
1688
1689        let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1690        assert!(matches!(err, CliError::NoQueryProvided));
1691    }
1692
1693    #[test]
1694    fn resolve_query_rejects_query_and_file() {
1695        let cmd = SqlCommand {
1696            query: Some("SELECT 1".to_string()),
1697            file: Some("query.sql".into()),
1698            fetch_size: None,
1699            max_rows: None,
1700            deadline: None,
1701            tui: false,
1702        };
1703
1704        let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1705        assert!(matches!(
1706            err,
1707            CliError::InvalidArgument(msg) if msg == "Cannot specify both query and file"
1708        ));
1709    }
1710}