alopex_cli/commands/
sql.rs

1//! SQL Command - SQL query execution
2//!
3//! Supports: query execution, file-based queries
4
5use std::collections::HashSet;
6use std::fs;
7use std::io::{self, Read, Write};
8
9use alopex_embedded::Database;
10
11use crate::batch::BatchMode;
12use crate::cli::SqlCommand;
13use crate::client::http::{ClientError, HttpClient};
14use crate::error::{CliError, Result};
15use crate::models::{Column, DataType, Row, Value};
16use crate::output::formatter::Formatter;
17use crate::streaming::timeout::parse_deadline;
18use crate::streaming::{CancelSignal, Deadline, StreamingWriter, WriteStatus};
19use crate::tui::{is_tty, TuiApp};
20use futures_util::StreamExt;
21
22#[doc(hidden)]
23pub struct SqlExecutionOptions<'a> {
24    pub limit: Option<usize>,
25    pub quiet: bool,
26    pub cancel: &'a CancelSignal,
27    pub deadline: &'a Deadline,
28}
29
30/// Execute a SQL command with dynamic column detection.
31///
32/// This function creates the StreamingWriter internally based on the query result type,
33/// ensuring that SELECT queries use the correct column headers.
34///
35/// # Arguments
36///
37/// * `db` - The database instance.
38/// * `cmd` - The SQL command to execute.
39/// * `writer` - The output writer.
40/// * `formatter` - The formatter to use.
41/// * `limit` - Optional row limit.
42/// * `quiet` - Whether to suppress warnings.
43pub fn execute_with_formatter<W: Write>(
44    db: &Database,
45    cmd: SqlCommand,
46    batch_mode: &BatchMode,
47    writer: &mut W,
48    formatter: Box<dyn Formatter>,
49    limit: Option<usize>,
50    quiet: bool,
51) -> Result<()> {
52    let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
53    let cancel = CancelSignal::new();
54
55    execute_with_formatter_control(
56        db,
57        cmd,
58        batch_mode,
59        writer,
60        formatter,
61        SqlExecutionOptions {
62            limit,
63            quiet,
64            cancel: &cancel,
65            deadline: &deadline,
66        },
67    )
68}
69
70#[doc(hidden)]
71pub fn execute_with_formatter_control<W: Write>(
72    db: &Database,
73    cmd: SqlCommand,
74    batch_mode: &BatchMode,
75    writer: &mut W,
76    formatter: Box<dyn Formatter>,
77    options: SqlExecutionOptions<'_>,
78) -> Result<()> {
79    let sql = cmd.resolve_query(batch_mode)?;
80    let effective_limit = merge_limit(options.limit, cmd.max_rows);
81    let options = SqlExecutionOptions {
82        limit: effective_limit,
83        ..options
84    };
85
86    if cmd.tui {
87        return execute_tui_local_or_fallback(db, &sql, writer, formatter, options);
88    }
89
90    execute_sql_with_formatter(db, &sql, writer, formatter, &options)
91}
92
93fn is_select_query(sql: &str) -> Result<bool> {
94    use alopex_sql::{AlopexDialect, Parser, StatementKind};
95
96    let dialect = AlopexDialect;
97    let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
98    Ok(stmts.len() == 1
99        && matches!(
100            stmts.first().map(|s| &s.kind),
101            Some(StatementKind::Select(_))
102        ))
103}
104
105/// Execute a SQL command against a remote server using HttpClient.
106pub async fn execute_remote_with_formatter<W: Write>(
107    client: &HttpClient,
108    cmd: &SqlCommand,
109    batch_mode: &BatchMode,
110    writer: &mut W,
111    formatter: Box<dyn Formatter>,
112    limit: Option<usize>,
113    quiet: bool,
114) -> Result<()> {
115    let effective_limit = merge_limit(limit, cmd.max_rows);
116    let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
117    let cancel = CancelSignal::new();
118    let options = SqlExecutionOptions {
119        limit: effective_limit,
120        quiet,
121        cancel: &cancel,
122        deadline: &deadline,
123    };
124
125    execute_remote_with_formatter_control(client, cmd, batch_mode, writer, formatter, options).await
126}
127
128#[doc(hidden)]
129pub async fn execute_remote_with_formatter_control<W: Write>(
130    client: &HttpClient,
131    cmd: &SqlCommand,
132    batch_mode: &BatchMode,
133    writer: &mut W,
134    formatter: Box<dyn Formatter>,
135    options: SqlExecutionOptions<'_>,
136) -> Result<()> {
137    let sql = cmd.resolve_query(batch_mode)?;
138    if cmd.tui {
139        return execute_tui_remote_or_fallback(client, &sql, cmd, writer, formatter, options).await;
140    }
141    execute_remote_with_formatter_impl(client, &sql, cmd, writer, formatter, &options).await
142}
143
144async fn execute_remote_with_formatter_impl<W: Write>(
145    client: &HttpClient,
146    sql: &str,
147    cmd: &SqlCommand,
148    writer: &mut W,
149    formatter: Box<dyn Formatter>,
150    options: &SqlExecutionOptions<'_>,
151) -> Result<()> {
152    if is_select_query(sql)? && formatter.supports_streaming() {
153        return execute_remote_streaming(
154            client,
155            sql,
156            writer,
157            formatter,
158            options,
159            cmd.fetch_size,
160            cmd.max_rows,
161        )
162        .await;
163    }
164
165    let request = RemoteSqlRequest {
166        sql: sql.to_string(),
167        streaming: false,
168        fetch_size: cmd.fetch_size,
169        max_rows: cmd.max_rows,
170    };
171    let response: RemoteSqlResponse = tokio::select! {
172        result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
173            match result {
174                Ok(value) => value.map_err(map_client_error)?,
175                Err(_) => {
176                    let _ = send_cancel_request(client).await;
177                    return Err(CliError::Timeout(format!(
178                        "deadline exceeded after {}",
179                        humantime::format_duration(options.deadline.duration())
180                    )));
181                }
182            }
183        }
184        _ = options.cancel.wait() => {
185            let _ = send_cancel_request(client).await;
186            return Err(CliError::Cancelled);
187        }
188    };
189
190    if response.columns.is_empty() {
191        if options.quiet {
192            return Ok(());
193        }
194        let message = match response.affected_rows {
195            Some(count) => format!("{count} row(s) affected"),
196            None => "Operation completed successfully".to_string(),
197        };
198        let columns = sql_status_columns();
199        let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
200            .with_quiet(options.quiet);
201        streaming_writer.prepare(Some(1))?;
202        let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
203        streaming_writer.write_row(row)?;
204        return streaming_writer.finish();
205    }
206
207    let columns: Vec<Column> = response
208        .columns
209        .iter()
210        .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
211        .collect();
212    let mut streaming_writer =
213        StreamingWriter::new(writer, formatter, columns, options.limit).with_quiet(options.quiet);
214    streaming_writer.prepare(Some(response.rows.len()))?;
215    for row in response.rows {
216        if options.cancel.is_cancelled() {
217            let _ = send_cancel_request(client).await;
218            return Err(CliError::Cancelled);
219        }
220        options.deadline.check()?;
221        let values = row.into_iter().map(remote_value_to_value).collect();
222        match streaming_writer.write_row(Row::new(values))? {
223            WriteStatus::LimitReached => break,
224            WriteStatus::Continue => {}
225        }
226    }
227    streaming_writer.finish()
228}
229
230fn execute_tui_local_or_fallback<W: Write>(
231    db: &Database,
232    sql: &str,
233    writer: &mut W,
234    formatter: Box<dyn Formatter>,
235    options: SqlExecutionOptions<'_>,
236) -> Result<()> {
237    if !is_tty() {
238        if !options.quiet {
239            eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
240        }
241        return execute_sql_with_formatter(db, sql, writer, formatter, &options);
242    }
243
244    match execute_tui_local(db, sql, &options) {
245        Ok(()) => Ok(()),
246        Err(err) => {
247            if !options.quiet {
248                eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
249            }
250            execute_sql_with_formatter(db, sql, writer, formatter, &options)
251        }
252    }
253}
254
255fn execute_tui_local(db: &Database, sql: &str, options: &SqlExecutionOptions<'_>) -> Result<()> {
256    use alopex_sql::ExecutionResult;
257
258    options.deadline.check()?;
259    let result = db.execute_sql(sql)?;
260    options.deadline.check()?;
261
262    let (columns, rows) = match result {
263        ExecutionResult::Success => {
264            let columns = sql_status_columns();
265            let row = Row::new(vec![
266                Value::Text("OK".to_string()),
267                Value::Text("Operation completed successfully".to_string()),
268            ]);
269            (columns, vec![row])
270        }
271        ExecutionResult::RowsAffected(count) => {
272            let columns = sql_status_columns();
273            let row = Row::new(vec![
274                Value::Text("OK".to_string()),
275                Value::Text(format!("{count} row(s) affected")),
276            ]);
277            (columns, vec![row])
278        }
279        ExecutionResult::Query(query_result) => {
280            let columns = query_result
281                .columns
282                .iter()
283                .map(|col| Column::new(&col.name, DataType::Text))
284                .collect::<Vec<_>>();
285            let mut rows = Vec::with_capacity(query_result.rows.len());
286            for sql_row in query_result.rows {
287                let values = sql_row.into_iter().map(sql_value_to_value).collect();
288                rows.push(Row::new(values));
289            }
290            if let Some(limit) = options.limit {
291                rows.truncate(limit);
292            }
293            (columns, rows)
294        }
295    };
296
297    let app = TuiApp::new(columns, rows, "local", false);
298    app.run()
299}
300
301async fn execute_tui_remote_or_fallback<W: Write>(
302    client: &HttpClient,
303    sql: &str,
304    cmd: &SqlCommand,
305    writer: &mut W,
306    formatter: Box<dyn Formatter>,
307    options: SqlExecutionOptions<'_>,
308) -> Result<()> {
309    if !is_tty() {
310        if !options.quiet {
311            eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
312        }
313        return execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options)
314            .await;
315    }
316
317    match execute_tui_remote(client, sql, cmd, &options).await {
318        Ok(()) => Ok(()),
319        Err(err) => {
320            if !options.quiet {
321                eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
322            }
323            execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options).await
324        }
325    }
326}
327
328async fn execute_tui_remote(
329    client: &HttpClient,
330    sql: &str,
331    cmd: &SqlCommand,
332    options: &SqlExecutionOptions<'_>,
333) -> Result<()> {
334    if is_select_query(sql)? {
335        let (columns, rows) =
336            collect_remote_streaming_rows(client, sql, options, cmd.fetch_size, cmd.max_rows)
337                .await?;
338        let app = TuiApp::new(columns, rows, "server", false);
339        return app.run();
340    }
341
342    let request = RemoteSqlRequest {
343        sql: sql.to_string(),
344        streaming: false,
345        fetch_size: cmd.fetch_size,
346        max_rows: cmd.max_rows,
347    };
348
349    let response: RemoteSqlResponse = tokio::select! {
350        result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
351            match result {
352                Ok(value) => value.map_err(map_client_error)?,
353                Err(_) => {
354                    let _ = send_cancel_request(client).await;
355                    return Err(CliError::Timeout(format!(
356                        "deadline exceeded after {}",
357                        humantime::format_duration(options.deadline.duration())
358                    )));
359                }
360            }
361        }
362        _ = options.cancel.wait() => {
363            let _ = send_cancel_request(client).await;
364            return Err(CliError::Cancelled);
365        }
366    };
367
368    let (columns, rows) = if response.columns.is_empty() {
369        let columns = sql_status_columns();
370        let message = match response.affected_rows {
371            Some(count) => format!("{count} row(s) affected"),
372            None => "Operation completed successfully".to_string(),
373        };
374        let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
375        (columns, vec![row])
376    } else {
377        let columns: Vec<Column> = response
378            .columns
379            .iter()
380            .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
381            .collect();
382        let mut rows = response
383            .rows
384            .into_iter()
385            .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
386            .collect::<Vec<_>>();
387        if let Some(limit) = options.limit {
388            rows.truncate(limit);
389        }
390        (columns, rows)
391    };
392
393    let app = TuiApp::new(columns, rows, "server", false);
394    app.run()
395}
396
397async fn collect_remote_streaming_rows(
398    client: &HttpClient,
399    sql: &str,
400    options: &SqlExecutionOptions<'_>,
401    fetch_size: Option<usize>,
402    max_rows: Option<usize>,
403) -> Result<(Vec<Column>, Vec<Row>)> {
404    let request = RemoteSqlRequest {
405        sql: sql.to_string(),
406        streaming: true,
407        fetch_size,
408        max_rows,
409    };
410
411    let response = tokio::select! {
412        result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
413            match result {
414                Ok(value) => value.map_err(map_client_error)?,
415                Err(_) => {
416                    let _ = send_cancel_request(client).await;
417                    return Err(CliError::Timeout(format!(
418                        "deadline exceeded after {}",
419                        humantime::format_duration(options.deadline.duration())
420                    )));
421                }
422            }
423        }
424        _ = options.cancel.wait() => {
425            let _ = send_cancel_request(client).await;
426            return Err(CliError::Cancelled);
427        }
428    };
429
430    let mut stream = response.bytes_stream();
431    let mut buffer: Vec<u8> = Vec::new();
432    let mut pos: usize = 0;
433    let mut done = false;
434    let mut saw_array_start = false;
435    let mut columns: Option<Vec<String>> = None;
436    let mut column_set: Option<HashSet<String>> = None;
437    let mut rows: Vec<Row> = Vec::new();
438
439    while !done {
440        if options.cancel.is_cancelled() {
441            let _ = send_cancel_request(client).await;
442            return Err(CliError::Cancelled);
443        }
444        if let Err(err) = options.deadline.check() {
445            let _ = send_cancel_request(client).await;
446            return Err(err);
447        }
448
449        let next = tokio::select! {
450            _ = options.cancel.wait() => {
451                let _ = send_cancel_request(client).await;
452                return Err(CliError::Cancelled);
453            }
454            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
455                match result {
456                    Ok(value) => value,
457                    Err(_) => {
458                        let _ = send_cancel_request(client).await;
459                        return Err(CliError::Timeout(format!(
460                            "deadline exceeded after {}",
461                            humantime::format_duration(options.deadline.duration())
462                        )));
463                    }
464                }
465            }
466        };
467
468        let chunk = match next {
469            Some(chunk) => chunk,
470            None => break,
471        };
472
473        let bytes = match chunk {
474            Ok(bytes) => bytes,
475            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
476        };
477
478        buffer.extend_from_slice(&bytes);
479
480        loop {
481            skip_whitespace(&buffer, &mut pos);
482            if pos >= buffer.len() {
483                break;
484            }
485
486            if !saw_array_start {
487                if buffer[pos] != b'[' {
488                    return Err(CliError::InvalidArgument(
489                        "Invalid streaming response: expected JSON array".into(),
490                    ));
491                }
492                pos += 1;
493                saw_array_start = true;
494                continue;
495            }
496
497            skip_whitespace(&buffer, &mut pos);
498            if pos >= buffer.len() {
499                break;
500            }
501
502            if buffer[pos] == b']' {
503                pos += 1;
504                done = true;
505                break;
506            }
507
508            let slice = &buffer[pos..];
509            let mut stream =
510                serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
511            let value = match stream.next() {
512                Some(Ok(value)) => value,
513                Some(Err(err)) if err.is_eof() => break,
514                Some(Err(err)) => return Err(CliError::Json(err)),
515                None => break,
516            };
517            pos = pos.saturating_add(stream.byte_offset());
518
519            let object = value.as_object().ok_or_else(|| {
520                CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
521            })?;
522
523            if columns.is_none() {
524                let names: Vec<String> = object.keys().cloned().collect();
525                let set: HashSet<String> = names.iter().cloned().collect();
526                if names.is_empty() {
527                    return Err(CliError::InvalidArgument(
528                        "Invalid streaming row: empty object".into(),
529                    ));
530                }
531                columns = Some(names);
532                column_set = Some(set);
533            }
534
535            let names = columns
536                .as_ref()
537                .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
538            let set = column_set
539                .as_ref()
540                .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
541
542            if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
543                return Err(CliError::InvalidArgument(
544                    "Invalid streaming row: column mismatch".into(),
545                ));
546            }
547
548            let values = names
549                .iter()
550                .map(|name| {
551                    object.get(name).ok_or_else(|| {
552                        CliError::InvalidArgument(format!(
553                            "Invalid streaming row: missing column '{name}'"
554                        ))
555                    })
556                })
557                .map(|value| value.and_then(json_value_to_value))
558                .collect::<Result<Vec<_>>>()?;
559            rows.push(Row::new(values));
560
561            if let Some(limit) = options.limit {
562                if rows.len() >= limit {
563                    let _ = send_cancel_request(client).await;
564                    done = true;
565                    break;
566                }
567            }
568
569            skip_whitespace(&buffer, &mut pos);
570            if pos >= buffer.len() {
571                break;
572            }
573            match buffer[pos] {
574                b',' => {
575                    pos += 1;
576                }
577                b']' => {
578                    pos += 1;
579                    done = true;
580                    break;
581                }
582                _ => {
583                    return Err(CliError::InvalidArgument(
584                        "Invalid streaming response: expected ',' or ']'".into(),
585                    ))
586                }
587            }
588        }
589
590        if pos > 0 {
591            buffer.drain(..pos);
592            pos = 0;
593        }
594    }
595
596    if done {
597        if has_non_whitespace(&buffer) {
598            return Err(CliError::InvalidArgument(
599                "Invalid streaming response: unexpected trailing data".into(),
600            ));
601        }
602        buffer.clear();
603        loop {
604            let next = tokio::select! {
605                _ = options.cancel.wait() => {
606                    let _ = send_cancel_request(client).await;
607                    return Err(CliError::Cancelled);
608                }
609                result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
610                    match result {
611                        Ok(value) => value,
612                        Err(_) => {
613                            let _ = send_cancel_request(client).await;
614                            return Err(CliError::Timeout(format!(
615                                "deadline exceeded after {}",
616                                humantime::format_duration(options.deadline.duration())
617                            )));
618                        }
619                    }
620                }
621            };
622
623            let chunk = match next {
624                Some(chunk) => chunk,
625                None => break,
626            };
627
628            let bytes = match chunk {
629                Ok(bytes) => bytes,
630                Err(err) => {
631                    return Err(CliError::ServerConnection(format!("request failed: {err}")))
632                }
633            };
634
635            if has_non_whitespace(&bytes) {
636                return Err(CliError::InvalidArgument(
637                    "Invalid streaming response: unexpected trailing data".into(),
638                ));
639            }
640        }
641    } else {
642        skip_whitespace(&buffer, &mut pos);
643        if pos < buffer.len() {
644            return Err(CliError::InvalidArgument(
645                "Invalid streaming response: unexpected trailing data".into(),
646            ));
647        }
648        return Err(CliError::InvalidArgument(
649            "Invalid streaming response: unexpected end of stream".into(),
650        ));
651    }
652
653    let columns = columns
654        .unwrap_or_default()
655        .into_iter()
656        .map(|name| Column::new(name, DataType::Text))
657        .collect();
658    Ok((columns, rows))
659}
660
661async fn execute_remote_streaming<W: Write>(
662    client: &HttpClient,
663    sql: &str,
664    writer: &mut W,
665    formatter: Box<dyn Formatter>,
666    options: &SqlExecutionOptions<'_>,
667    fetch_size: Option<usize>,
668    max_rows: Option<usize>,
669) -> Result<()> {
670    let request = RemoteSqlRequest {
671        sql: sql.to_string(),
672        streaming: true,
673        fetch_size,
674        max_rows,
675    };
676
677    let response = tokio::select! {
678        result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
679            match result {
680                Ok(value) => value.map_err(map_client_error)?,
681                Err(_) => {
682                    let _ = send_cancel_request(client).await;
683                    return Err(CliError::Timeout(format!(
684                        "deadline exceeded after {}",
685                        humantime::format_duration(options.deadline.duration())
686                    )));
687                }
688            }
689        }
690        _ = options.cancel.wait() => {
691            let _ = send_cancel_request(client).await;
692            return Err(CliError::Cancelled);
693        }
694    };
695
696    let mut stream = response.bytes_stream();
697    let mut buffer: Vec<u8> = Vec::new();
698    let mut pos: usize = 0;
699    let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
700    let mut formatter = Some(formatter);
701    let mut columns: Option<Vec<String>> = None;
702    let mut column_set: Option<HashSet<String>> = None;
703    let mut done = false;
704    let mut saw_array_start = false;
705
706    while !done {
707        if options.cancel.is_cancelled() {
708            let _ = send_cancel_request(client).await;
709            return Err(CliError::Cancelled);
710        }
711        if let Err(err) = options.deadline.check() {
712            let _ = send_cancel_request(client).await;
713            return Err(err);
714        }
715
716        let next = tokio::select! {
717            _ = options.cancel.wait() => {
718                let _ = send_cancel_request(client).await;
719                return Err(CliError::Cancelled);
720            }
721            result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
722                match result {
723                    Ok(value) => value,
724                    Err(_) => {
725                        let _ = send_cancel_request(client).await;
726                        return Err(CliError::Timeout(format!(
727                            "deadline exceeded after {}",
728                            humantime::format_duration(options.deadline.duration())
729                        )));
730                    }
731                }
732            }
733        };
734
735        let chunk = match next {
736            Some(chunk) => chunk,
737            None => break,
738        };
739
740        let bytes = match chunk {
741            Ok(bytes) => bytes,
742            Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
743        };
744
745        buffer.extend_from_slice(&bytes);
746
747        loop {
748            skip_whitespace(&buffer, &mut pos);
749            if pos >= buffer.len() {
750                break;
751            }
752
753            if !saw_array_start {
754                if buffer[pos] != b'[' {
755                    return Err(CliError::InvalidArgument(
756                        "Invalid streaming response: expected JSON array".into(),
757                    ));
758                }
759                pos += 1;
760                saw_array_start = true;
761                continue;
762            }
763
764            skip_whitespace(&buffer, &mut pos);
765            if pos >= buffer.len() {
766                break;
767            }
768
769            if buffer[pos] == b']' {
770                pos += 1;
771                done = true;
772                break;
773            }
774
775            let slice = &buffer[pos..];
776            let mut stream =
777                serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
778            let value = match stream.next() {
779                Some(Ok(value)) => value,
780                Some(Err(err)) if err.is_eof() => break,
781                Some(Err(err)) => return Err(CliError::Json(err)),
782                None => break,
783            };
784            pos = pos.saturating_add(stream.byte_offset());
785
786            let object = value.as_object().ok_or_else(|| {
787                CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
788            })?;
789
790            if columns.is_none() {
791                let names: Vec<String> = object.keys().cloned().collect();
792                let set: HashSet<String> = names.iter().cloned().collect();
793                if names.is_empty() {
794                    return Err(CliError::InvalidArgument(
795                        "Invalid streaming row: empty object".into(),
796                    ));
797                }
798                let cols = names
799                    .iter()
800                    .map(|name| Column::new(name, DataType::Text))
801                    .collect::<Vec<_>>();
802                let formatter = formatter
803                    .take()
804                    .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
805                let mut writer = StreamingWriter::new(&mut *writer, formatter, cols, options.limit)
806                    .with_quiet(options.quiet);
807                writer.prepare(None)?;
808                streaming_writer = Some(writer);
809                columns = Some(names);
810                column_set = Some(set);
811            }
812
813            let names = columns
814                .as_ref()
815                .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
816            let set = column_set
817                .as_ref()
818                .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
819
820            if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
821                return Err(CliError::InvalidArgument(
822                    "Invalid streaming row: column mismatch".into(),
823                ));
824            }
825
826            let values = names
827                .iter()
828                .map(|name| {
829                    object.get(name).ok_or_else(|| {
830                        CliError::InvalidArgument(format!(
831                            "Invalid streaming row: missing column '{name}'"
832                        ))
833                    })
834                })
835                .map(|value| value.and_then(json_value_to_value))
836                .collect::<Result<Vec<_>>>()?;
837
838            if let Some(writer) = streaming_writer.as_mut() {
839                match writer.write_row(Row::new(values))? {
840                    WriteStatus::LimitReached => {
841                        let _ = send_cancel_request(client).await;
842                        return writer.finish();
843                    }
844                    WriteStatus::Continue => {}
845                }
846            }
847
848            skip_whitespace(&buffer, &mut pos);
849            if pos >= buffer.len() {
850                break;
851            }
852            match buffer[pos] {
853                b',' => {
854                    pos += 1;
855                }
856                b']' => {
857                    pos += 1;
858                    done = true;
859                    break;
860                }
861                _ => {
862                    return Err(CliError::InvalidArgument(
863                        "Invalid streaming response: expected ',' or ']'".into(),
864                    ))
865                }
866            }
867        }
868
869        if pos > 0 {
870            buffer.drain(..pos);
871            pos = 0;
872        }
873    }
874
875    if done {
876        if has_non_whitespace(&buffer) {
877            return Err(CliError::InvalidArgument(
878                "Invalid streaming response: unexpected trailing data".into(),
879            ));
880        }
881        buffer.clear();
882        loop {
883            let next = tokio::select! {
884                _ = options.cancel.wait() => {
885                    let _ = send_cancel_request(client).await;
886                    return Err(CliError::Cancelled);
887                }
888                result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
889                    match result {
890                        Ok(value) => value,
891                        Err(_) => {
892                            let _ = send_cancel_request(client).await;
893                            return Err(CliError::Timeout(format!(
894                                "deadline exceeded after {}",
895                                humantime::format_duration(options.deadline.duration())
896                            )));
897                        }
898                    }
899                }
900            };
901
902            let chunk = match next {
903                Some(chunk) => chunk,
904                None => break,
905            };
906
907            let bytes = match chunk {
908                Ok(bytes) => bytes,
909                Err(err) => {
910                    return Err(CliError::ServerConnection(format!("request failed: {err}")))
911                }
912            };
913
914            if has_non_whitespace(&bytes) {
915                return Err(CliError::InvalidArgument(
916                    "Invalid streaming response: unexpected trailing data".into(),
917                ));
918            }
919        }
920    } else {
921        skip_whitespace(&buffer, &mut pos);
922        if pos < buffer.len() {
923            return Err(CliError::InvalidArgument(
924                "Invalid streaming response: unexpected trailing data".into(),
925            ));
926        }
927        return Err(CliError::InvalidArgument(
928            "Invalid streaming response: unexpected end of stream".into(),
929        ));
930    }
931
932    if let Some(mut writer) = streaming_writer {
933        return writer.finish();
934    }
935
936    if done && saw_array_start {
937        if let Some(formatter) = formatter.take() {
938            let mut writer =
939                StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
940                    .with_quiet(options.quiet);
941            writer.prepare(None)?;
942            return writer.finish();
943        }
944    }
945
946    Ok(())
947}
948
949fn skip_whitespace(buffer: &[u8], pos: &mut usize) {
950    while *pos < buffer.len() {
951        match buffer[*pos] {
952            b' ' | b'\n' | b'\r' | b'\t' => *pos += 1,
953            _ => break,
954        }
955    }
956}
957
958fn has_non_whitespace(buffer: &[u8]) -> bool {
959    buffer
960        .iter()
961        .any(|byte| !matches!(byte, b' ' | b'\n' | b'\r' | b'\t'))
962}
963
964fn json_value_to_value(value: &serde_json::Value) -> Result<Value> {
965    match value {
966        serde_json::Value::Null => Ok(Value::Null),
967        serde_json::Value::Bool(value) => Ok(Value::Bool(*value)),
968        serde_json::Value::Number(value) => {
969            if let Some(value) = value.as_i64() {
970                Ok(Value::Int(value))
971            } else if let Some(value) = value.as_f64() {
972                Ok(Value::Float(value))
973            } else {
974                Err(CliError::InvalidArgument(
975                    "Invalid numeric value in streaming row".into(),
976                ))
977            }
978        }
979        serde_json::Value::String(value) => Ok(Value::Text(value.clone())),
980        serde_json::Value::Array(values) => {
981            let mut vector = Vec::with_capacity(values.len());
982            for entry in values {
983                let number = entry.as_f64().ok_or_else(|| {
984                    CliError::InvalidArgument("Invalid vector value in streaming row".into())
985                })?;
986                vector.push(number as f32);
987            }
988            Ok(Value::Vector(vector))
989        }
990        serde_json::Value::Object(_) => Err(CliError::InvalidArgument(
991            "Invalid streaming row: nested objects are not supported".into(),
992        )),
993    }
994}
995
996/// Legacy execute function for backward compatibility with tests.
997#[allow(dead_code)]
998pub fn execute<W: Write>(
999    db: &Database,
1000    cmd: SqlCommand,
1001    batch_mode: &BatchMode,
1002    writer: &mut StreamingWriter<W>,
1003) -> Result<()> {
1004    let sql = cmd.resolve_query(batch_mode)?;
1005
1006    execute_sql(db, &sql, writer)
1007}
1008
1009impl SqlCommand {
1010    /// Resolve the SQL query source (argument, file, or stdin).
1011    pub fn resolve_query(&self, batch_mode: &BatchMode) -> Result<String> {
1012        match (&self.query, &self.file) {
1013            (Some(query), None) => Ok(query.clone()),
1014            (None, Some(file)) => fs::read_to_string(file).map_err(|e| {
1015                CliError::InvalidArgument(format!("Failed to read SQL file '{}': {}", file, e))
1016            }),
1017            (None, None) if !batch_mode.is_tty => {
1018                let mut buf = String::new();
1019                io::stdin().read_to_string(&mut buf)?;
1020                Ok(buf)
1021            }
1022            (None, None) => Err(CliError::NoQueryProvided),
1023            (Some(_), Some(_)) => Err(CliError::InvalidArgument(
1024                "Cannot specify both query and file".to_string(),
1025            )),
1026        }
1027    }
1028}
1029
1030/// Execute SQL and write results.
1031fn execute_sql<W: Write>(db: &Database, sql: &str, writer: &mut StreamingWriter<W>) -> Result<()> {
1032    use alopex_embedded::SqlResult;
1033
1034    let result = db.execute_sql(sql)?;
1035
1036    match result {
1037        SqlResult::Success => {
1038            // DDL success - output simple status
1039            writer.prepare(Some(1))?;
1040            let row = Row::new(vec![
1041                Value::Text("OK".to_string()),
1042                Value::Text("Operation completed successfully".to_string()),
1043            ]);
1044            writer.write_row(row)?;
1045            writer.finish()?;
1046        }
1047        SqlResult::RowsAffected(count) => {
1048            // DML success - output affected rows count
1049            writer.prepare(Some(1))?;
1050            let row = Row::new(vec![
1051                Value::Text("OK".to_string()),
1052                Value::Text(format!("{} row(s) affected", count)),
1053            ]);
1054            writer.write_row(row)?;
1055            writer.finish()?;
1056        }
1057        SqlResult::Query(query_result) => {
1058            // SELECT result - output rows
1059            let row_count = query_result.rows.len();
1060            writer.prepare(Some(row_count))?;
1061
1062            for sql_row in query_result.rows {
1063                let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1064                let row = Row::new(values);
1065
1066                match writer.write_row(row)? {
1067                    WriteStatus::LimitReached => break,
1068                    WriteStatus::Continue => {}
1069                }
1070            }
1071
1072            writer.finish()?;
1073        }
1074    }
1075
1076    Ok(())
1077}
1078
1079/// Execute SQL with formatter, dynamically determining columns from query result.
1080///
1081/// This function executes the SQL using the streaming API for FR-7 compliance,
1082/// then creates the StreamingWriter with the correct columns based on the result type
1083/// (status columns for DDL/DML, query result columns for SELECT).
1084///
1085/// FR-7 Compliance: Uses SQL parser to detect SELECT queries instead of heuristic.
1086/// This properly handles:
1087/// - WITH clauses (CTEs)
1088/// - Leading comments
1089/// - Complex query structures
1090fn execute_sql_with_formatter<W: Write>(
1091    db: &Database,
1092    sql: &str,
1093    writer: &mut W,
1094    formatter: Box<dyn Formatter>,
1095    options: &SqlExecutionOptions<'_>,
1096) -> Result<()> {
1097    use alopex_sql::{AlopexDialect, Parser, StatementKind};
1098
1099    // FR-7: Use parser to detect SELECT instead of starts_with("SELECT") heuristic
1100    // This correctly handles WITH clauses, leading comments, and complex query structures
1101    let dialect = AlopexDialect;
1102    let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
1103
1104    let is_select = stmts.len() == 1
1105        && matches!(
1106            stmts.first().map(|s| &s.kind),
1107            Some(StatementKind::Select(_))
1108        );
1109
1110    if is_select {
1111        // SELECT: use streaming path (FR-7)
1112        execute_sql_select_streaming(db, sql, writer, formatter, options)
1113    } else {
1114        // DDL/DML: use standard path
1115        execute_sql_ddl_dml(db, sql, writer, formatter, options)
1116    }
1117}
1118
1119/// Execute SELECT query with streaming callback (FR-7).
1120///
1121/// This function uses `execute_sql_with_rows` for true streaming output.
1122/// The callback receives rows one at a time from the iterator, and the
1123/// transaction is kept alive during streaming.
1124fn execute_sql_select_streaming<W: Write>(
1125    db: &Database,
1126    sql: &str,
1127    writer: &mut W,
1128    formatter: Box<dyn Formatter>,
1129    options: &SqlExecutionOptions<'_>,
1130) -> Result<()> {
1131    use alopex_embedded::StreamingQueryResult;
1132    use std::sync::atomic::{AtomicBool, Ordering};
1133    use std::sync::Arc;
1134
1135    // Helper to convert CliError to alopex_embedded::Error for callback
1136    fn cli_err_to_embedded(e: crate::error::CliError) -> alopex_embedded::Error {
1137        alopex_embedded::Error::Sql(alopex_sql::SqlError::Execution {
1138            message: e.to_string(),
1139            code: "ALOPEX-C001",
1140        })
1141    }
1142
1143    let cancelled = Arc::new(AtomicBool::new(false));
1144    let timed_out = Arc::new(AtomicBool::new(false));
1145    let cancel_flag = cancelled.clone();
1146    let timeout_flag = timed_out.clone();
1147
1148    let result = db.execute_sql_with_rows(sql, |mut rows| {
1149        // FR-7: SELECT result - stream rows directly from iterator while transaction is alive
1150        let columns = columns_from_streaming_rows(&rows);
1151        let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
1152            .with_quiet(options.quiet);
1153
1154        // FR-7: Use None for row count hint to support true streaming output
1155        streaming_writer
1156            .prepare(None)
1157            .map_err(cli_err_to_embedded)?;
1158
1159        if let Err(err) = options.deadline.check() {
1160            timeout_flag.store(true, Ordering::SeqCst);
1161            return Err(cli_err_to_embedded(err));
1162        }
1163
1164        // Consume iterator row by row for true streaming
1165        while let Ok(Some(sql_row)) = rows.next_row() {
1166            if options.cancel.is_cancelled() {
1167                cancel_flag.store(true, Ordering::SeqCst);
1168                return Err(cli_err_to_embedded(CliError::Cancelled));
1169            }
1170            if let Err(err) = options.deadline.check() {
1171                timeout_flag.store(true, Ordering::SeqCst);
1172                return Err(cli_err_to_embedded(err));
1173            }
1174            let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1175            let row = Row::new(values);
1176
1177            match streaming_writer
1178                .write_row(row)
1179                .map_err(cli_err_to_embedded)?
1180            {
1181                WriteStatus::LimitReached => break,
1182                WriteStatus::Continue => {}
1183            }
1184        }
1185
1186        streaming_writer.finish().map_err(cli_err_to_embedded)?;
1187        Ok(())
1188    });
1189
1190    let result = match result {
1191        Ok(value) => value,
1192        Err(err) => {
1193            if cancelled.load(Ordering::SeqCst) {
1194                return Err(CliError::Cancelled);
1195            }
1196            if timed_out.load(Ordering::SeqCst) {
1197                return Err(CliError::Timeout(format!(
1198                    "deadline exceeded after {}",
1199                    humantime::format_duration(options.deadline.duration())
1200                )));
1201            }
1202            return Err(CliError::Database(err));
1203        }
1204    };
1205
1206    match result {
1207        StreamingQueryResult::QueryProcessed(()) => Ok(()),
1208        StreamingQueryResult::Success | StreamingQueryResult::RowsAffected(_) => {
1209            // Unexpected: SELECT should not return these
1210            Ok(())
1211        }
1212    }
1213}
1214
1215#[derive(serde::Serialize)]
1216struct RemoteSqlRequest {
1217    sql: String,
1218    #[serde(default)]
1219    streaming: bool,
1220    #[serde(skip_serializing_if = "Option::is_none")]
1221    fetch_size: Option<usize>,
1222    #[serde(skip_serializing_if = "Option::is_none")]
1223    max_rows: Option<usize>,
1224}
1225
1226#[derive(serde::Deserialize)]
1227struct RemoteColumnInfo {
1228    name: String,
1229    data_type: String,
1230}
1231
1232#[derive(serde::Deserialize)]
1233struct RemoteSqlResponse {
1234    columns: Vec<RemoteColumnInfo>,
1235    rows: Vec<Vec<alopex_sql::storage::SqlValue>>,
1236    affected_rows: Option<u64>,
1237}
1238
1239fn map_client_error(err: ClientError) -> CliError {
1240    match err {
1241        ClientError::Request { source, .. } => {
1242            CliError::ServerConnection(format!("request failed: {source}"))
1243        }
1244        ClientError::InvalidUrl(message) => CliError::InvalidArgument(message),
1245        ClientError::Build(message) => CliError::InvalidArgument(message),
1246        ClientError::Auth(err) => CliError::InvalidArgument(err.to_string()),
1247        ClientError::HttpStatus { status, body } => {
1248            CliError::InvalidArgument(format!("Server error: HTTP {} - {}", status.as_u16(), body))
1249        }
1250    }
1251}
1252
1253async fn send_cancel_request(client: &HttpClient) -> Result<()> {
1254    #[derive(serde::Serialize)]
1255    struct CancelRequest {}
1256
1257    let request = CancelRequest {};
1258    let _: serde_json::Value = client
1259        .post_json("api/sql/cancel", &request)
1260        .await
1261        .map_err(map_client_error)?;
1262    Ok(())
1263}
1264
1265fn merge_limit(limit: Option<usize>, max_rows: Option<usize>) -> Option<usize> {
1266    match (limit, max_rows) {
1267        (Some(a), Some(b)) => Some(a.min(b)),
1268        (Some(value), None) | (None, Some(value)) => Some(value),
1269        (None, None) => None,
1270    }
1271}
1272
1273/// Execute DDL/DML query (non-SELECT statements).
1274///
1275/// This function handles CREATE, DROP, INSERT, UPDATE, DELETE and other
1276/// non-SELECT statements. It outputs status messages (OK, rows affected).
1277fn execute_sql_ddl_dml<W: Write>(
1278    db: &Database,
1279    sql: &str,
1280    writer: &mut W,
1281    formatter: Box<dyn Formatter>,
1282    options: &SqlExecutionOptions<'_>,
1283) -> Result<()> {
1284    use alopex_sql::ExecutionResult;
1285
1286    options.deadline.check()?;
1287    let result = db.execute_sql(sql)?;
1288    options.deadline.check()?;
1289
1290    match result {
1291        ExecutionResult::Success => {
1292            // DDL success - suppress status output in quiet mode
1293            if !options.quiet {
1294                let columns = sql_status_columns();
1295                let mut streaming_writer =
1296                    StreamingWriter::new(writer, formatter, columns, options.limit)
1297                        .with_quiet(options.quiet);
1298                streaming_writer.prepare(Some(1))?;
1299                let row = Row::new(vec![
1300                    Value::Text("OK".to_string()),
1301                    Value::Text("Operation completed successfully".to_string()),
1302                ]);
1303                streaming_writer.write_row(row)?;
1304                streaming_writer.finish()?;
1305            }
1306        }
1307        ExecutionResult::RowsAffected(count) => {
1308            // DML success - suppress status output in quiet mode
1309            if !options.quiet {
1310                let columns = sql_status_columns();
1311                let mut streaming_writer =
1312                    StreamingWriter::new(writer, formatter, columns, options.limit)
1313                        .with_quiet(options.quiet);
1314                streaming_writer.prepare(Some(1))?;
1315                let row = Row::new(vec![
1316                    Value::Text("OK".to_string()),
1317                    Value::Text(format!("{} row(s) affected", count)),
1318                ]);
1319                streaming_writer.write_row(row)?;
1320                streaming_writer.finish()?;
1321            }
1322        }
1323        ExecutionResult::Query(query_result) => {
1324            // Unexpected: non-SELECT should not return Query result
1325            // But handle it gracefully by outputting the result
1326            let columns = columns_from_query_result(&query_result);
1327            let mut streaming_writer =
1328                StreamingWriter::new(writer, formatter, columns, options.limit)
1329                    .with_quiet(options.quiet);
1330            streaming_writer.prepare(Some(query_result.rows.len()))?;
1331            for sql_row in query_result.rows {
1332                let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1333                let row = Row::new(values);
1334                match streaming_writer.write_row(row)? {
1335                    WriteStatus::LimitReached => break,
1336                    WriteStatus::Continue => {}
1337                }
1338            }
1339            streaming_writer.finish()?;
1340        }
1341    }
1342
1343    Ok(())
1344}
1345
1346/// Convert alopex_sql::SqlValue to our Value type.
1347fn sql_value_to_value(sql_value: alopex_sql::SqlValue) -> Value {
1348    use alopex_sql::SqlValue;
1349
1350    match sql_value {
1351        SqlValue::Null => Value::Null,
1352        SqlValue::Integer(i) => Value::Int(i as i64),
1353        SqlValue::BigInt(i) => Value::Int(i),
1354        SqlValue::Float(f) => Value::Float(f as f64),
1355        SqlValue::Double(f) => Value::Float(f),
1356        SqlValue::Text(s) => Value::Text(s),
1357        SqlValue::Blob(b) => Value::Bytes(b),
1358        SqlValue::Boolean(b) => Value::Bool(b),
1359        SqlValue::Timestamp(ts) => {
1360            // Format timestamp as ISO 8601 string
1361            Value::Text(format!("{}", ts))
1362        }
1363        SqlValue::Vector(v) => Value::Vector(v),
1364    }
1365}
1366
1367fn remote_value_to_value(sql_value: alopex_sql::storage::SqlValue) -> Value {
1368    use alopex_sql::storage::SqlValue;
1369
1370    match sql_value {
1371        SqlValue::Null => Value::Null,
1372        SqlValue::Integer(i) => Value::Int(i as i64),
1373        SqlValue::BigInt(i) => Value::Int(i),
1374        SqlValue::Float(f) => Value::Float(f as f64),
1375        SqlValue::Double(f) => Value::Float(f),
1376        SqlValue::Text(s) => Value::Text(s),
1377        SqlValue::Blob(b) => Value::Bytes(b),
1378        SqlValue::Boolean(b) => Value::Bool(b),
1379        SqlValue::Timestamp(ts) => Value::Text(ts.to_string()),
1380        SqlValue::Vector(v) => Value::Vector(v),
1381    }
1382}
1383
1384fn data_type_from_string(value: &str) -> DataType {
1385    let upper = value.to_ascii_uppercase();
1386    if upper.starts_with("INT") || upper.starts_with("BIGINT") {
1387        DataType::Int
1388    } else if upper.starts_with("FLOAT") || upper.starts_with("DOUBLE") {
1389        DataType::Float
1390    } else if upper.starts_with("BLOB") {
1391        DataType::Bytes
1392    } else if upper.starts_with("BOOLEAN") {
1393        DataType::Bool
1394    } else if upper.starts_with("VECTOR") {
1395        DataType::Vector
1396    } else {
1397        DataType::Text
1398    }
1399}
1400
1401/// Convert alopex_sql::executor::ColumnInfo to our Column type.
1402fn sql_column_to_column(col: &alopex_sql::executor::ColumnInfo) -> Column {
1403    use alopex_sql::planner::ResolvedType;
1404
1405    let data_type = match &col.data_type {
1406        ResolvedType::Integer | ResolvedType::BigInt => DataType::Int,
1407        ResolvedType::Float | ResolvedType::Double => DataType::Float,
1408        ResolvedType::Text => DataType::Text,
1409        ResolvedType::Blob => DataType::Bytes,
1410        ResolvedType::Boolean => DataType::Bool,
1411        ResolvedType::Timestamp => DataType::Text, // Display as text
1412        ResolvedType::Vector { .. } => DataType::Vector,
1413        ResolvedType::Null => DataType::Text, // Fallback
1414    };
1415
1416    Column::new(&col.name, data_type)
1417}
1418
1419/// Create columns from SQL query result.
1420#[allow(dead_code)] // Used by tests with legacy execute_sql function
1421fn columns_from_query_result(query_result: &alopex_sql::executor::QueryResult) -> Vec<Column> {
1422    query_result
1423        .columns
1424        .iter()
1425        .map(sql_column_to_column)
1426        .collect()
1427}
1428
1429/// Create columns from streaming query result iterator (FR-7).
1430#[allow(dead_code)] // Kept for potential future use with old streaming API
1431fn columns_from_streaming_result(
1432    query_iter: &alopex_embedded::QueryRowIterator<'_>,
1433) -> Vec<Column> {
1434    query_iter
1435        .columns()
1436        .iter()
1437        .map(sql_column_to_column)
1438        .collect()
1439}
1440
1441/// Create columns from callback-based streaming rows (FR-7).
1442fn columns_from_streaming_rows(rows: &alopex_embedded::StreamingRows<'_>) -> Vec<Column> {
1443    rows.columns().iter().map(sql_column_to_column).collect()
1444}
1445
1446/// Create columns for status output.
1447pub fn sql_status_columns() -> Vec<Column> {
1448    vec![
1449        Column::new("status", DataType::Text),
1450        Column::new("message", DataType::Text),
1451    ]
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456    use super::*;
1457    use crate::batch::BatchModeSource;
1458    use crate::output::jsonl::JsonlFormatter;
1459
1460    fn create_test_db() -> Database {
1461        Database::open_in_memory().unwrap()
1462    }
1463
1464    fn create_status_writer(output: &mut Vec<u8>) -> StreamingWriter<&mut Vec<u8>> {
1465        let formatter = Box::new(JsonlFormatter::new());
1466        let columns = sql_status_columns();
1467        StreamingWriter::new(output, formatter, columns, None)
1468    }
1469
1470    fn create_query_writer(
1471        output: &mut Vec<u8>,
1472        columns: Vec<Column>,
1473    ) -> StreamingWriter<&mut Vec<u8>> {
1474        let formatter = Box::new(JsonlFormatter::new());
1475        StreamingWriter::new(output, formatter, columns, None)
1476    }
1477
1478    fn default_batch_mode() -> BatchMode {
1479        BatchMode {
1480            is_batch: false,
1481            is_tty: true,
1482            source: BatchModeSource::Default,
1483        }
1484    }
1485
1486    #[test]
1487    fn test_create_table() {
1488        let db = create_test_db();
1489
1490        let mut output = Vec::new();
1491        {
1492            let mut writer = create_status_writer(&mut output);
1493            execute_sql(
1494                &db,
1495                "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1496                &mut writer,
1497            )
1498            .unwrap();
1499        }
1500
1501        let result = String::from_utf8(output).unwrap();
1502        assert!(result.contains("OK"));
1503    }
1504
1505    #[test]
1506    fn test_insert_and_select() {
1507        let db = create_test_db();
1508
1509        // Create table
1510        {
1511            let mut output = Vec::new();
1512            let mut writer = create_status_writer(&mut output);
1513            execute_sql(
1514                &db,
1515                "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1516                &mut writer,
1517            )
1518            .unwrap();
1519        }
1520
1521        // Insert
1522        {
1523            let mut output = Vec::new();
1524            let mut writer = create_status_writer(&mut output);
1525            execute_sql(
1526                &db,
1527                "INSERT INTO users (id, name) VALUES (1, 'Alice');",
1528                &mut writer,
1529            )
1530            .unwrap();
1531            let result = String::from_utf8(output).unwrap();
1532            assert!(result.contains("row(s) affected"));
1533        }
1534
1535        // Select - we need columns from the query result
1536        {
1537            let mut output = Vec::new();
1538            let columns = vec![
1539                Column::new("id", DataType::Int),
1540                Column::new("name", DataType::Text),
1541            ];
1542            let mut writer = create_query_writer(&mut output, columns);
1543            execute_sql(&db, "SELECT id, name FROM users;", &mut writer).unwrap();
1544            let result = String::from_utf8(output).unwrap();
1545            assert!(result.contains("Alice"));
1546        }
1547    }
1548
1549    #[test]
1550    fn test_syntax_error() {
1551        let db = create_test_db();
1552
1553        let mut output = Vec::new();
1554        let mut writer = create_status_writer(&mut output);
1555        let result = execute_sql(&db, "CREATE TABEL invalid_syntax;", &mut writer);
1556        assert!(result.is_err());
1557    }
1558
1559    #[test]
1560    fn test_multiple_statements() {
1561        let db = create_test_db();
1562
1563        let mut output = Vec::new();
1564        {
1565            let mut writer = create_status_writer(&mut output);
1566            execute_sql(
1567                &db,
1568                "CREATE TABLE t (id INTEGER PRIMARY KEY); INSERT INTO t (id) VALUES (1);",
1569                &mut writer,
1570            )
1571            .unwrap();
1572        }
1573
1574        // Verify the table exists and has data
1575        {
1576            let mut output = Vec::new();
1577            let columns = vec![Column::new("id", DataType::Int)];
1578            let mut writer = create_query_writer(&mut output, columns);
1579            execute_sql(&db, "SELECT id FROM t;", &mut writer).unwrap();
1580            let result = String::from_utf8(output).unwrap();
1581            assert!(result.contains("1"));
1582        }
1583    }
1584
1585    #[test]
1586    fn test_sql_value_conversion() {
1587        use alopex_sql::SqlValue;
1588
1589        assert!(matches!(sql_value_to_value(SqlValue::Null), Value::Null));
1590        assert!(matches!(
1591            sql_value_to_value(SqlValue::Integer(42)),
1592            Value::Int(42)
1593        ));
1594        assert!(matches!(
1595            sql_value_to_value(SqlValue::BigInt(100)),
1596            Value::Int(100)
1597        ));
1598        assert!(matches!(
1599            sql_value_to_value(SqlValue::Boolean(true)),
1600            Value::Bool(true)
1601        ));
1602        assert!(
1603            matches!(sql_value_to_value(SqlValue::Text("hello".to_string())), Value::Text(s) if s == "hello")
1604        );
1605    }
1606
1607    #[test]
1608    fn resolve_query_from_argument() {
1609        let cmd = SqlCommand {
1610            query: Some("SELECT 1".to_string()),
1611            file: None,
1612            fetch_size: None,
1613            max_rows: None,
1614            deadline: None,
1615            tui: false,
1616        };
1617
1618        let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1619        assert_eq!(sql, "SELECT 1");
1620    }
1621
1622    #[test]
1623    fn resolve_query_from_file() {
1624        let mut file = tempfile::NamedTempFile::new().unwrap();
1625        writeln!(file, "SELECT * FROM users").unwrap();
1626
1627        let cmd = SqlCommand {
1628            query: None,
1629            file: Some(file.path().display().to_string()),
1630            fetch_size: None,
1631            max_rows: None,
1632            deadline: None,
1633            tui: false,
1634        };
1635
1636        let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1637        assert_eq!(sql, "SELECT * FROM users\n");
1638    }
1639
1640    #[test]
1641    fn resolve_query_returns_no_query_error() {
1642        let cmd = SqlCommand {
1643            query: None,
1644            file: None,
1645            fetch_size: None,
1646            max_rows: None,
1647            deadline: None,
1648            tui: false,
1649        };
1650
1651        let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1652        assert!(matches!(err, CliError::NoQueryProvided));
1653    }
1654
1655    #[test]
1656    fn resolve_query_rejects_query_and_file() {
1657        let cmd = SqlCommand {
1658            query: Some("SELECT 1".to_string()),
1659            file: Some("query.sql".into()),
1660            fetch_size: None,
1661            max_rows: None,
1662            deadline: None,
1663            tui: false,
1664        };
1665
1666        let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1667        assert!(matches!(
1668            err,
1669            CliError::InvalidArgument(msg) if msg == "Cannot specify both query and file"
1670        ));
1671    }
1672}