1use std::collections::HashSet;
6use std::fs;
7use std::io::{self, Read, Write};
8
9use alopex_embedded::Database;
10
11use crate::batch::BatchMode;
12use crate::cli::SqlCommand;
13use crate::client::http::{ClientError, HttpClient};
14use crate::error::{CliError, Result};
15use crate::models::{Column, DataType, Row, Value};
16use crate::output::formatter::Formatter;
17use crate::streaming::timeout::parse_deadline;
18use crate::streaming::{CancelSignal, Deadline, StreamingWriter, WriteStatus};
19use crate::tui::{is_tty, TuiApp};
20use crate::ui::mode::UiMode;
21use futures_util::StreamExt;
22
23#[doc(hidden)]
24pub struct SqlExecutionOptions<'a> {
25 pub limit: Option<usize>,
26 pub quiet: bool,
27 pub cancel: &'a CancelSignal,
28 pub deadline: &'a Deadline,
29 pub admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
30}
31
32#[allow(clippy::too_many_arguments)]
46pub fn execute_with_formatter<'a, W: Write>(
47 db: &Database,
48 cmd: SqlCommand,
49 batch_mode: &BatchMode,
50 ui_mode: UiMode,
51 writer: &mut W,
52 formatter: Box<dyn Formatter>,
53 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
54 limit: Option<usize>,
55 quiet: bool,
56) -> Result<()> {
57 let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
58 let cancel = CancelSignal::new();
59
60 execute_with_formatter_control(
61 db,
62 cmd,
63 batch_mode,
64 ui_mode,
65 writer,
66 formatter,
67 SqlExecutionOptions {
68 limit,
69 quiet,
70 cancel: &cancel,
71 deadline: &deadline,
72 admin_launcher,
73 },
74 )
75}
76
77#[doc(hidden)]
78pub fn execute_with_formatter_control<W: Write>(
79 db: &Database,
80 cmd: SqlCommand,
81 batch_mode: &BatchMode,
82 ui_mode: UiMode,
83 writer: &mut W,
84 formatter: Box<dyn Formatter>,
85 mut options: SqlExecutionOptions<'_>,
86) -> Result<()> {
87 let sql = cmd.resolve_query(batch_mode)?;
88 let effective_limit = merge_limit(options.limit, cmd.max_rows);
89 options.limit = effective_limit;
90
91 if ui_mode == UiMode::Tui {
92 return execute_tui_local_or_fallback(db, &sql, writer, formatter, options);
93 }
94
95 execute_sql_with_formatter(db, &sql, writer, formatter, &options)
96}
97
98fn is_select_query(sql: &str) -> Result<bool> {
99 use alopex_sql::{AlopexDialect, Parser, StatementKind};
100
101 let dialect = AlopexDialect;
102 let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
103 Ok(stmts.len() == 1
104 && matches!(
105 stmts.first().map(|s| &s.kind),
106 Some(StatementKind::Select(_))
107 ))
108}
109
110#[allow(clippy::too_many_arguments)]
112pub async fn execute_remote_with_formatter<'a, W: Write>(
113 client: &HttpClient,
114 cmd: &SqlCommand,
115 batch_mode: &BatchMode,
116 ui_mode: UiMode,
117 writer: &mut W,
118 formatter: Box<dyn Formatter>,
119 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
120 limit: Option<usize>,
121 quiet: bool,
122) -> Result<()> {
123 let effective_limit = merge_limit(limit, cmd.max_rows);
124 let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
125 let cancel = CancelSignal::new();
126 let options = SqlExecutionOptions {
127 limit: effective_limit,
128 quiet,
129 cancel: &cancel,
130 deadline: &deadline,
131 admin_launcher,
132 };
133
134 execute_remote_with_formatter_control(
135 client, cmd, batch_mode, ui_mode, writer, formatter, options,
136 )
137 .await
138}
139
140fn sql_context_message(sql: &str) -> String {
141 let condensed = sql.split_whitespace().collect::<Vec<_>>().join(" ");
142 let max_len = 200;
143 if condensed.chars().count() > max_len {
144 let truncated: String = condensed.chars().take(max_len).collect();
145 format!("SQL: {truncated}...")
146 } else {
147 format!("SQL: {condensed}")
148 }
149}
150
151#[doc(hidden)]
152pub async fn execute_remote_with_formatter_control<W: Write>(
153 client: &HttpClient,
154 cmd: &SqlCommand,
155 batch_mode: &BatchMode,
156 ui_mode: UiMode,
157 writer: &mut W,
158 formatter: Box<dyn Formatter>,
159 options: SqlExecutionOptions<'_>,
160) -> Result<()> {
161 let sql = cmd.resolve_query(batch_mode)?;
162 if ui_mode == UiMode::Tui {
163 return execute_tui_remote_or_fallback(client, &sql, cmd, writer, formatter, options).await;
164 }
165 execute_remote_with_formatter_impl(client, &sql, cmd, writer, formatter, &options).await
166}
167
168async fn execute_remote_with_formatter_impl<W: Write>(
169 client: &HttpClient,
170 sql: &str,
171 cmd: &SqlCommand,
172 writer: &mut W,
173 formatter: Box<dyn Formatter>,
174 options: &SqlExecutionOptions<'_>,
175) -> Result<()> {
176 if is_select_query(sql)? && formatter.supports_streaming() {
177 return execute_remote_streaming(
178 client,
179 sql,
180 writer,
181 formatter,
182 options,
183 cmd.fetch_size,
184 cmd.max_rows,
185 )
186 .await;
187 }
188
189 let request = RemoteSqlRequest {
190 sql: sql.to_string(),
191 streaming: false,
192 fetch_size: cmd.fetch_size,
193 max_rows: cmd.max_rows,
194 };
195 let response: RemoteSqlResponse = tokio::select! {
196 result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
197 match result {
198 Ok(value) => value.map_err(map_client_error)?,
199 Err(_) => {
200 let _ = send_cancel_request(client).await;
201 return Err(CliError::Timeout(format!(
202 "deadline exceeded after {}",
203 humantime::format_duration(options.deadline.duration())
204 )));
205 }
206 }
207 }
208 _ = options.cancel.wait() => {
209 let _ = send_cancel_request(client).await;
210 return Err(CliError::Cancelled);
211 }
212 };
213
214 if response.columns.is_empty() {
215 if options.quiet {
216 return Ok(());
217 }
218 let message = match response.affected_rows {
219 Some(count) => format!("{count} row(s) affected"),
220 None => "Operation completed successfully".to_string(),
221 };
222 let columns = sql_status_columns();
223 let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
224 .with_quiet(options.quiet);
225 streaming_writer.prepare(Some(1))?;
226 let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
227 streaming_writer.write_row(row)?;
228 return streaming_writer.finish();
229 }
230
231 let columns: Vec<Column> = response
232 .columns
233 .iter()
234 .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
235 .collect();
236 let mut streaming_writer =
237 StreamingWriter::new(writer, formatter, columns, options.limit).with_quiet(options.quiet);
238 streaming_writer.prepare(Some(response.rows.len()))?;
239 for row in response.rows {
240 if options.cancel.is_cancelled() {
241 let _ = send_cancel_request(client).await;
242 return Err(CliError::Cancelled);
243 }
244 options.deadline.check()?;
245 let values = row.into_iter().map(remote_value_to_value).collect();
246 match streaming_writer.write_row(Row::new(values))? {
247 WriteStatus::LimitReached => break,
248 WriteStatus::Continue => {}
249 }
250 }
251 streaming_writer.finish()
252}
253
254fn execute_tui_local_or_fallback<'a, W: Write>(
255 db: &Database,
256 sql: &str,
257 writer: &mut W,
258 formatter: Box<dyn Formatter>,
259 mut options: SqlExecutionOptions<'a>,
260) -> Result<()> {
261 if !is_tty() {
262 if !options.quiet {
263 eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
264 }
265 return execute_sql_with_formatter(db, sql, writer, formatter, &options);
266 }
267
268 let admin_launcher = options.admin_launcher.take();
269 match execute_tui_local(db, sql, &options, admin_launcher) {
270 Ok(()) => Ok(()),
271 Err(err) => {
272 if !options.quiet {
273 eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
274 }
275 execute_sql_with_formatter(db, sql, writer, formatter, &options)
276 }
277 }
278}
279
280fn execute_tui_local<'a>(
281 db: &Database,
282 sql: &str,
283 options: &SqlExecutionOptions<'a>,
284 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
285) -> Result<()> {
286 use alopex_sql::ExecutionResult;
287
288 options.deadline.check()?;
289 let result = db.execute_sql(sql)?;
290 options.deadline.check()?;
291
292 let (columns, rows) = match result {
293 ExecutionResult::Success => {
294 let columns = sql_status_columns();
295 let row = Row::new(vec![
296 Value::Text("OK".to_string()),
297 Value::Text("Operation completed successfully".to_string()),
298 ]);
299 (columns, vec![row])
300 }
301 ExecutionResult::RowsAffected(count) => {
302 let columns = sql_status_columns();
303 let row = Row::new(vec![
304 Value::Text("OK".to_string()),
305 Value::Text(format!("{count} row(s) affected")),
306 ]);
307 (columns, vec![row])
308 }
309 ExecutionResult::Query(query_result) => {
310 let columns = query_result
311 .columns
312 .iter()
313 .map(|col| Column::new(&col.name, DataType::Text))
314 .collect::<Vec<_>>();
315 let mut rows = Vec::with_capacity(query_result.rows.len());
316 for sql_row in query_result.rows {
317 let values = sql_row.into_iter().map(sql_value_to_value).collect();
318 rows.push(Row::new(values));
319 }
320 if let Some(limit) = options.limit {
321 rows.truncate(limit);
322 }
323 (columns, rows)
324 }
325 };
326
327 let app = TuiApp::new(columns, rows, "local", false)
328 .with_context_message(Some(sql_context_message(sql)))
329 .with_admin_launcher(admin_launcher);
330 app.run()
331}
332
333async fn execute_tui_remote_or_fallback<'a, W: Write>(
334 client: &HttpClient,
335 sql: &str,
336 cmd: &SqlCommand,
337 writer: &mut W,
338 formatter: Box<dyn Formatter>,
339 mut options: SqlExecutionOptions<'a>,
340) -> Result<()> {
341 if !is_tty() {
342 if !options.quiet {
343 eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
344 }
345 return execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options)
346 .await;
347 }
348
349 let admin_launcher = options.admin_launcher.take();
350 match execute_tui_remote(client, sql, cmd, &options, admin_launcher).await {
351 Ok(()) => Ok(()),
352 Err(err) => {
353 if !options.quiet {
354 eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
355 }
356 execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options).await
357 }
358 }
359}
360
361async fn execute_tui_remote<'a>(
362 client: &HttpClient,
363 sql: &str,
364 cmd: &SqlCommand,
365 options: &SqlExecutionOptions<'a>,
366 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
367) -> Result<()> {
368 if is_select_query(sql)? {
369 let (columns, rows) =
370 collect_remote_streaming_rows(client, sql, options, cmd.fetch_size, cmd.max_rows)
371 .await?;
372 let app = TuiApp::new(columns, rows, "server", false)
373 .with_context_message(Some(sql_context_message(sql)))
374 .with_admin_launcher(admin_launcher);
375 return app.run();
376 }
377
378 let request = RemoteSqlRequest {
379 sql: sql.to_string(),
380 streaming: false,
381 fetch_size: cmd.fetch_size,
382 max_rows: cmd.max_rows,
383 };
384
385 let response: RemoteSqlResponse = tokio::select! {
386 result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
387 match result {
388 Ok(value) => value.map_err(map_client_error)?,
389 Err(_) => {
390 let _ = send_cancel_request(client).await;
391 return Err(CliError::Timeout(format!(
392 "deadline exceeded after {}",
393 humantime::format_duration(options.deadline.duration())
394 )));
395 }
396 }
397 }
398 _ = options.cancel.wait() => {
399 let _ = send_cancel_request(client).await;
400 return Err(CliError::Cancelled);
401 }
402 };
403
404 let (columns, rows) = if response.columns.is_empty() {
405 let columns = sql_status_columns();
406 let message = match response.affected_rows {
407 Some(count) => format!("{count} row(s) affected"),
408 None => "Operation completed successfully".to_string(),
409 };
410 let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
411 (columns, vec![row])
412 } else {
413 let columns: Vec<Column> = response
414 .columns
415 .iter()
416 .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
417 .collect();
418 let mut rows = response
419 .rows
420 .into_iter()
421 .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
422 .collect::<Vec<_>>();
423 if let Some(limit) = options.limit {
424 rows.truncate(limit);
425 }
426 (columns, rows)
427 };
428
429 let app = TuiApp::new(columns, rows, "server", false)
430 .with_context_message(Some(sql_context_message(sql)))
431 .with_admin_launcher(admin_launcher);
432 app.run()
433}
434
435async fn collect_remote_streaming_rows(
436 client: &HttpClient,
437 sql: &str,
438 options: &SqlExecutionOptions<'_>,
439 fetch_size: Option<usize>,
440 max_rows: Option<usize>,
441) -> Result<(Vec<Column>, Vec<Row>)> {
442 let request = RemoteSqlRequest {
443 sql: sql.to_string(),
444 streaming: true,
445 fetch_size,
446 max_rows,
447 };
448
449 let response = tokio::select! {
450 result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
451 match result {
452 Ok(value) => value.map_err(map_client_error)?,
453 Err(_) => {
454 let _ = send_cancel_request(client).await;
455 return Err(CliError::Timeout(format!(
456 "deadline exceeded after {}",
457 humantime::format_duration(options.deadline.duration())
458 )));
459 }
460 }
461 }
462 _ = options.cancel.wait() => {
463 let _ = send_cancel_request(client).await;
464 return Err(CliError::Cancelled);
465 }
466 };
467
468 let mut stream = response.bytes_stream();
469 let mut buffer: Vec<u8> = Vec::new();
470 let mut pos: usize = 0;
471 let mut done = false;
472 let mut saw_array_start = false;
473 let mut columns: Option<Vec<String>> = None;
474 let mut column_set: Option<HashSet<String>> = None;
475 let mut rows: Vec<Row> = Vec::new();
476
477 while !done {
478 if options.cancel.is_cancelled() {
479 let _ = send_cancel_request(client).await;
480 return Err(CliError::Cancelled);
481 }
482 if let Err(err) = options.deadline.check() {
483 let _ = send_cancel_request(client).await;
484 return Err(err);
485 }
486
487 let next = tokio::select! {
488 _ = options.cancel.wait() => {
489 let _ = send_cancel_request(client).await;
490 return Err(CliError::Cancelled);
491 }
492 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
493 match result {
494 Ok(value) => value,
495 Err(_) => {
496 let _ = send_cancel_request(client).await;
497 return Err(CliError::Timeout(format!(
498 "deadline exceeded after {}",
499 humantime::format_duration(options.deadline.duration())
500 )));
501 }
502 }
503 }
504 };
505
506 let chunk = match next {
507 Some(chunk) => chunk,
508 None => break,
509 };
510
511 let bytes = match chunk {
512 Ok(bytes) => bytes,
513 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
514 };
515
516 buffer.extend_from_slice(&bytes);
517
518 loop {
519 skip_whitespace(&buffer, &mut pos);
520 if pos >= buffer.len() {
521 break;
522 }
523
524 if !saw_array_start {
525 if buffer[pos] != b'[' {
526 return Err(CliError::InvalidArgument(
527 "Invalid streaming response: expected JSON array".into(),
528 ));
529 }
530 pos += 1;
531 saw_array_start = true;
532 continue;
533 }
534
535 skip_whitespace(&buffer, &mut pos);
536 if pos >= buffer.len() {
537 break;
538 }
539
540 if buffer[pos] == b']' {
541 pos += 1;
542 done = true;
543 break;
544 }
545
546 let slice = &buffer[pos..];
547 let mut stream =
548 serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
549 let value = match stream.next() {
550 Some(Ok(value)) => value,
551 Some(Err(err)) if err.is_eof() => break,
552 Some(Err(err)) => return Err(CliError::Json(err)),
553 None => break,
554 };
555 pos = pos.saturating_add(stream.byte_offset());
556
557 let object = value.as_object().ok_or_else(|| {
558 CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
559 })?;
560
561 if columns.is_none() {
562 let names: Vec<String> = object.keys().cloned().collect();
563 let set: HashSet<String> = names.iter().cloned().collect();
564 if names.is_empty() {
565 return Err(CliError::InvalidArgument(
566 "Invalid streaming row: empty object".into(),
567 ));
568 }
569 columns = Some(names);
570 column_set = Some(set);
571 }
572
573 let names = columns
574 .as_ref()
575 .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
576 let set = column_set
577 .as_ref()
578 .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
579
580 if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
581 return Err(CliError::InvalidArgument(
582 "Invalid streaming row: column mismatch".into(),
583 ));
584 }
585
586 let values = names
587 .iter()
588 .map(|name| {
589 object.get(name).ok_or_else(|| {
590 CliError::InvalidArgument(format!(
591 "Invalid streaming row: missing column '{name}'"
592 ))
593 })
594 })
595 .map(|value| value.and_then(json_value_to_value))
596 .collect::<Result<Vec<_>>>()?;
597 rows.push(Row::new(values));
598
599 if let Some(limit) = options.limit {
600 if rows.len() >= limit {
601 let _ = send_cancel_request(client).await;
602 done = true;
603 break;
604 }
605 }
606
607 skip_whitespace(&buffer, &mut pos);
608 if pos >= buffer.len() {
609 break;
610 }
611 match buffer[pos] {
612 b',' => {
613 pos += 1;
614 }
615 b']' => {
616 pos += 1;
617 done = true;
618 break;
619 }
620 _ => {
621 return Err(CliError::InvalidArgument(
622 "Invalid streaming response: expected ',' or ']'".into(),
623 ))
624 }
625 }
626 }
627
628 if pos > 0 {
629 buffer.drain(..pos);
630 pos = 0;
631 }
632 }
633
634 if done {
635 if has_non_whitespace(&buffer) {
636 return Err(CliError::InvalidArgument(
637 "Invalid streaming response: unexpected trailing data".into(),
638 ));
639 }
640 buffer.clear();
641 loop {
642 let next = tokio::select! {
643 _ = options.cancel.wait() => {
644 let _ = send_cancel_request(client).await;
645 return Err(CliError::Cancelled);
646 }
647 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
648 match result {
649 Ok(value) => value,
650 Err(_) => {
651 let _ = send_cancel_request(client).await;
652 return Err(CliError::Timeout(format!(
653 "deadline exceeded after {}",
654 humantime::format_duration(options.deadline.duration())
655 )));
656 }
657 }
658 }
659 };
660
661 let chunk = match next {
662 Some(chunk) => chunk,
663 None => break,
664 };
665
666 let bytes = match chunk {
667 Ok(bytes) => bytes,
668 Err(err) => {
669 return Err(CliError::ServerConnection(format!("request failed: {err}")))
670 }
671 };
672
673 if has_non_whitespace(&bytes) {
674 return Err(CliError::InvalidArgument(
675 "Invalid streaming response: unexpected trailing data".into(),
676 ));
677 }
678 }
679 } else {
680 skip_whitespace(&buffer, &mut pos);
681 if pos < buffer.len() {
682 return Err(CliError::InvalidArgument(
683 "Invalid streaming response: unexpected trailing data".into(),
684 ));
685 }
686 return Err(CliError::InvalidArgument(
687 "Invalid streaming response: unexpected end of stream".into(),
688 ));
689 }
690
691 let columns = columns
692 .unwrap_or_default()
693 .into_iter()
694 .map(|name| Column::new(name, DataType::Text))
695 .collect();
696 Ok((columns, rows))
697}
698
699async fn execute_remote_streaming<W: Write>(
700 client: &HttpClient,
701 sql: &str,
702 writer: &mut W,
703 formatter: Box<dyn Formatter>,
704 options: &SqlExecutionOptions<'_>,
705 fetch_size: Option<usize>,
706 max_rows: Option<usize>,
707) -> Result<()> {
708 let request = RemoteSqlRequest {
709 sql: sql.to_string(),
710 streaming: true,
711 fetch_size,
712 max_rows,
713 };
714
715 let response = tokio::select! {
716 result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
717 match result {
718 Ok(value) => value.map_err(map_client_error)?,
719 Err(_) => {
720 let _ = send_cancel_request(client).await;
721 return Err(CliError::Timeout(format!(
722 "deadline exceeded after {}",
723 humantime::format_duration(options.deadline.duration())
724 )));
725 }
726 }
727 }
728 _ = options.cancel.wait() => {
729 let _ = send_cancel_request(client).await;
730 return Err(CliError::Cancelled);
731 }
732 };
733
734 let mut stream = response.bytes_stream();
735 let mut buffer: Vec<u8> = Vec::new();
736 let mut pos: usize = 0;
737 let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
738 let mut formatter = Some(formatter);
739 let mut columns: Option<Vec<String>> = None;
740 let mut column_set: Option<HashSet<String>> = None;
741 let mut done = false;
742 let mut saw_array_start = false;
743
744 while !done {
745 if options.cancel.is_cancelled() {
746 let _ = send_cancel_request(client).await;
747 return Err(CliError::Cancelled);
748 }
749 if let Err(err) = options.deadline.check() {
750 let _ = send_cancel_request(client).await;
751 return Err(err);
752 }
753
754 let next = tokio::select! {
755 _ = options.cancel.wait() => {
756 let _ = send_cancel_request(client).await;
757 return Err(CliError::Cancelled);
758 }
759 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
760 match result {
761 Ok(value) => value,
762 Err(_) => {
763 let _ = send_cancel_request(client).await;
764 return Err(CliError::Timeout(format!(
765 "deadline exceeded after {}",
766 humantime::format_duration(options.deadline.duration())
767 )));
768 }
769 }
770 }
771 };
772
773 let chunk = match next {
774 Some(chunk) => chunk,
775 None => break,
776 };
777
778 let bytes = match chunk {
779 Ok(bytes) => bytes,
780 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
781 };
782
783 buffer.extend_from_slice(&bytes);
784
785 loop {
786 skip_whitespace(&buffer, &mut pos);
787 if pos >= buffer.len() {
788 break;
789 }
790
791 if !saw_array_start {
792 if buffer[pos] != b'[' {
793 return Err(CliError::InvalidArgument(
794 "Invalid streaming response: expected JSON array".into(),
795 ));
796 }
797 pos += 1;
798 saw_array_start = true;
799 continue;
800 }
801
802 skip_whitespace(&buffer, &mut pos);
803 if pos >= buffer.len() {
804 break;
805 }
806
807 if buffer[pos] == b']' {
808 pos += 1;
809 done = true;
810 break;
811 }
812
813 let slice = &buffer[pos..];
814 let mut stream =
815 serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
816 let value = match stream.next() {
817 Some(Ok(value)) => value,
818 Some(Err(err)) if err.is_eof() => break,
819 Some(Err(err)) => return Err(CliError::Json(err)),
820 None => break,
821 };
822 pos = pos.saturating_add(stream.byte_offset());
823
824 let object = value.as_object().ok_or_else(|| {
825 CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
826 })?;
827
828 if columns.is_none() {
829 let names: Vec<String> = object.keys().cloned().collect();
830 let set: HashSet<String> = names.iter().cloned().collect();
831 if names.is_empty() {
832 return Err(CliError::InvalidArgument(
833 "Invalid streaming row: empty object".into(),
834 ));
835 }
836 let cols = names
837 .iter()
838 .map(|name| Column::new(name, DataType::Text))
839 .collect::<Vec<_>>();
840 let formatter = formatter
841 .take()
842 .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
843 let mut writer = StreamingWriter::new(&mut *writer, formatter, cols, options.limit)
844 .with_quiet(options.quiet);
845 writer.prepare(None)?;
846 streaming_writer = Some(writer);
847 columns = Some(names);
848 column_set = Some(set);
849 }
850
851 let names = columns
852 .as_ref()
853 .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
854 let set = column_set
855 .as_ref()
856 .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
857
858 if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
859 return Err(CliError::InvalidArgument(
860 "Invalid streaming row: column mismatch".into(),
861 ));
862 }
863
864 let values = names
865 .iter()
866 .map(|name| {
867 object.get(name).ok_or_else(|| {
868 CliError::InvalidArgument(format!(
869 "Invalid streaming row: missing column '{name}'"
870 ))
871 })
872 })
873 .map(|value| value.and_then(json_value_to_value))
874 .collect::<Result<Vec<_>>>()?;
875
876 if let Some(writer) = streaming_writer.as_mut() {
877 match writer.write_row(Row::new(values))? {
878 WriteStatus::LimitReached => {
879 let _ = send_cancel_request(client).await;
880 return writer.finish();
881 }
882 WriteStatus::Continue => {}
883 }
884 }
885
886 skip_whitespace(&buffer, &mut pos);
887 if pos >= buffer.len() {
888 break;
889 }
890 match buffer[pos] {
891 b',' => {
892 pos += 1;
893 }
894 b']' => {
895 pos += 1;
896 done = true;
897 break;
898 }
899 _ => {
900 return Err(CliError::InvalidArgument(
901 "Invalid streaming response: expected ',' or ']'".into(),
902 ))
903 }
904 }
905 }
906
907 if pos > 0 {
908 buffer.drain(..pos);
909 pos = 0;
910 }
911 }
912
913 if done {
914 if has_non_whitespace(&buffer) {
915 return Err(CliError::InvalidArgument(
916 "Invalid streaming response: unexpected trailing data".into(),
917 ));
918 }
919 buffer.clear();
920 loop {
921 let next = tokio::select! {
922 _ = options.cancel.wait() => {
923 let _ = send_cancel_request(client).await;
924 return Err(CliError::Cancelled);
925 }
926 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
927 match result {
928 Ok(value) => value,
929 Err(_) => {
930 let _ = send_cancel_request(client).await;
931 return Err(CliError::Timeout(format!(
932 "deadline exceeded after {}",
933 humantime::format_duration(options.deadline.duration())
934 )));
935 }
936 }
937 }
938 };
939
940 let chunk = match next {
941 Some(chunk) => chunk,
942 None => break,
943 };
944
945 let bytes = match chunk {
946 Ok(bytes) => bytes,
947 Err(err) => {
948 return Err(CliError::ServerConnection(format!("request failed: {err}")))
949 }
950 };
951
952 if has_non_whitespace(&bytes) {
953 return Err(CliError::InvalidArgument(
954 "Invalid streaming response: unexpected trailing data".into(),
955 ));
956 }
957 }
958 } else {
959 skip_whitespace(&buffer, &mut pos);
960 if pos < buffer.len() {
961 return Err(CliError::InvalidArgument(
962 "Invalid streaming response: unexpected trailing data".into(),
963 ));
964 }
965 return Err(CliError::InvalidArgument(
966 "Invalid streaming response: unexpected end of stream".into(),
967 ));
968 }
969
970 if let Some(mut writer) = streaming_writer {
971 return writer.finish();
972 }
973
974 if done && saw_array_start {
975 if let Some(formatter) = formatter.take() {
976 let mut writer =
977 StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
978 .with_quiet(options.quiet);
979 writer.prepare(None)?;
980 return writer.finish();
981 }
982 }
983
984 Ok(())
985}
986
987fn skip_whitespace(buffer: &[u8], pos: &mut usize) {
988 while *pos < buffer.len() {
989 match buffer[*pos] {
990 b' ' | b'\n' | b'\r' | b'\t' => *pos += 1,
991 _ => break,
992 }
993 }
994}
995
996fn has_non_whitespace(buffer: &[u8]) -> bool {
997 buffer
998 .iter()
999 .any(|byte| !matches!(byte, b' ' | b'\n' | b'\r' | b'\t'))
1000}
1001
1002fn json_value_to_value(value: &serde_json::Value) -> Result<Value> {
1003 match value {
1004 serde_json::Value::Null => Ok(Value::Null),
1005 serde_json::Value::Bool(value) => Ok(Value::Bool(*value)),
1006 serde_json::Value::Number(value) => {
1007 if let Some(value) = value.as_i64() {
1008 Ok(Value::Int(value))
1009 } else if let Some(value) = value.as_f64() {
1010 Ok(Value::Float(value))
1011 } else {
1012 Err(CliError::InvalidArgument(
1013 "Invalid numeric value in streaming row".into(),
1014 ))
1015 }
1016 }
1017 serde_json::Value::String(value) => Ok(Value::Text(value.clone())),
1018 serde_json::Value::Array(values) => {
1019 let mut vector = Vec::with_capacity(values.len());
1020 for entry in values {
1021 let number = entry.as_f64().ok_or_else(|| {
1022 CliError::InvalidArgument("Invalid vector value in streaming row".into())
1023 })?;
1024 vector.push(number as f32);
1025 }
1026 Ok(Value::Vector(vector))
1027 }
1028 serde_json::Value::Object(_) => Err(CliError::InvalidArgument(
1029 "Invalid streaming row: nested objects are not supported".into(),
1030 )),
1031 }
1032}
1033
1034#[allow(dead_code)]
1036pub fn execute<W: Write>(
1037 db: &Database,
1038 cmd: SqlCommand,
1039 batch_mode: &BatchMode,
1040 writer: &mut StreamingWriter<W>,
1041) -> Result<()> {
1042 let sql = cmd.resolve_query(batch_mode)?;
1043
1044 execute_sql(db, &sql, writer)
1045}
1046
1047impl SqlCommand {
1048 pub fn resolve_query(&self, batch_mode: &BatchMode) -> Result<String> {
1050 match (&self.query, &self.file) {
1051 (Some(query), None) => Ok(query.clone()),
1052 (None, Some(file)) => fs::read_to_string(file).map_err(|e| {
1053 CliError::InvalidArgument(format!("Failed to read SQL file '{}': {}", file, e))
1054 }),
1055 (None, None) if !batch_mode.is_tty => {
1056 let mut buf = String::new();
1057 io::stdin().read_to_string(&mut buf)?;
1058 Ok(buf)
1059 }
1060 (None, None) => Err(CliError::NoQueryProvided),
1061 (Some(_), Some(_)) => Err(CliError::InvalidArgument(
1062 "Cannot specify both query and file".to_string(),
1063 )),
1064 }
1065 }
1066}
1067
1068fn execute_sql<W: Write>(db: &Database, sql: &str, writer: &mut StreamingWriter<W>) -> Result<()> {
1070 use alopex_embedded::SqlResult;
1071
1072 let result = db.execute_sql(sql)?;
1073
1074 match result {
1075 SqlResult::Success => {
1076 writer.prepare(Some(1))?;
1078 let row = Row::new(vec![
1079 Value::Text("OK".to_string()),
1080 Value::Text("Operation completed successfully".to_string()),
1081 ]);
1082 writer.write_row(row)?;
1083 writer.finish()?;
1084 }
1085 SqlResult::RowsAffected(count) => {
1086 writer.prepare(Some(1))?;
1088 let row = Row::new(vec![
1089 Value::Text("OK".to_string()),
1090 Value::Text(format!("{} row(s) affected", count)),
1091 ]);
1092 writer.write_row(row)?;
1093 writer.finish()?;
1094 }
1095 SqlResult::Query(query_result) => {
1096 let row_count = query_result.rows.len();
1098 writer.prepare(Some(row_count))?;
1099
1100 for sql_row in query_result.rows {
1101 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1102 let row = Row::new(values);
1103
1104 match writer.write_row(row)? {
1105 WriteStatus::LimitReached => break,
1106 WriteStatus::Continue => {}
1107 }
1108 }
1109
1110 writer.finish()?;
1111 }
1112 }
1113
1114 Ok(())
1115}
1116
1117fn execute_sql_with_formatter<W: Write>(
1129 db: &Database,
1130 sql: &str,
1131 writer: &mut W,
1132 formatter: Box<dyn Formatter>,
1133 options: &SqlExecutionOptions<'_>,
1134) -> Result<()> {
1135 use alopex_sql::{AlopexDialect, Parser, StatementKind};
1136
1137 let dialect = AlopexDialect;
1140 let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
1141
1142 let is_select = stmts.len() == 1
1143 && matches!(
1144 stmts.first().map(|s| &s.kind),
1145 Some(StatementKind::Select(_))
1146 );
1147
1148 if is_select {
1149 execute_sql_select_streaming(db, sql, writer, formatter, options)
1151 } else {
1152 execute_sql_ddl_dml(db, sql, writer, formatter, options)
1154 }
1155}
1156
1157fn execute_sql_select_streaming<W: Write>(
1163 db: &Database,
1164 sql: &str,
1165 writer: &mut W,
1166 formatter: Box<dyn Formatter>,
1167 options: &SqlExecutionOptions<'_>,
1168) -> Result<()> {
1169 use alopex_embedded::StreamingQueryResult;
1170 use std::sync::atomic::{AtomicBool, Ordering};
1171 use std::sync::Arc;
1172
1173 fn cli_err_to_embedded(e: crate::error::CliError) -> alopex_embedded::Error {
1175 alopex_embedded::Error::Sql(alopex_sql::SqlError::Execution {
1176 message: e.to_string(),
1177 code: "ALOPEX-C001",
1178 })
1179 }
1180
1181 let cancelled = Arc::new(AtomicBool::new(false));
1182 let timed_out = Arc::new(AtomicBool::new(false));
1183 let cancel_flag = cancelled.clone();
1184 let timeout_flag = timed_out.clone();
1185
1186 let result = db.execute_sql_with_rows(sql, |mut rows| {
1187 let columns = columns_from_streaming_rows(&rows);
1189 let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
1190 .with_quiet(options.quiet);
1191
1192 streaming_writer
1194 .prepare(None)
1195 .map_err(cli_err_to_embedded)?;
1196
1197 if let Err(err) = options.deadline.check() {
1198 timeout_flag.store(true, Ordering::SeqCst);
1199 return Err(cli_err_to_embedded(err));
1200 }
1201
1202 while let Ok(Some(sql_row)) = rows.next_row() {
1204 if options.cancel.is_cancelled() {
1205 cancel_flag.store(true, Ordering::SeqCst);
1206 return Err(cli_err_to_embedded(CliError::Cancelled));
1207 }
1208 if let Err(err) = options.deadline.check() {
1209 timeout_flag.store(true, Ordering::SeqCst);
1210 return Err(cli_err_to_embedded(err));
1211 }
1212 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1213 let row = Row::new(values);
1214
1215 match streaming_writer
1216 .write_row(row)
1217 .map_err(cli_err_to_embedded)?
1218 {
1219 WriteStatus::LimitReached => break,
1220 WriteStatus::Continue => {}
1221 }
1222 }
1223
1224 streaming_writer.finish().map_err(cli_err_to_embedded)?;
1225 Ok(())
1226 });
1227
1228 let result = match result {
1229 Ok(value) => value,
1230 Err(err) => {
1231 if cancelled.load(Ordering::SeqCst) {
1232 return Err(CliError::Cancelled);
1233 }
1234 if timed_out.load(Ordering::SeqCst) {
1235 return Err(CliError::Timeout(format!(
1236 "deadline exceeded after {}",
1237 humantime::format_duration(options.deadline.duration())
1238 )));
1239 }
1240 return Err(CliError::Database(err));
1241 }
1242 };
1243
1244 match result {
1245 StreamingQueryResult::QueryProcessed(()) => Ok(()),
1246 StreamingQueryResult::Success | StreamingQueryResult::RowsAffected(_) => {
1247 Ok(())
1249 }
1250 }
1251}
1252
1253#[derive(serde::Serialize)]
1254struct RemoteSqlRequest {
1255 sql: String,
1256 #[serde(default)]
1257 streaming: bool,
1258 #[serde(skip_serializing_if = "Option::is_none")]
1259 fetch_size: Option<usize>,
1260 #[serde(skip_serializing_if = "Option::is_none")]
1261 max_rows: Option<usize>,
1262}
1263
1264#[derive(serde::Deserialize)]
1265struct RemoteColumnInfo {
1266 name: String,
1267 data_type: String,
1268}
1269
1270#[derive(serde::Deserialize)]
1271struct RemoteSqlResponse {
1272 columns: Vec<RemoteColumnInfo>,
1273 rows: Vec<Vec<alopex_sql::storage::SqlValue>>,
1274 affected_rows: Option<u64>,
1275}
1276
1277fn map_client_error(err: ClientError) -> CliError {
1278 match err {
1279 ClientError::Request { source, .. } => {
1280 CliError::ServerConnection(format!("request failed: {source}"))
1281 }
1282 ClientError::InvalidUrl(message) => CliError::InvalidArgument(message),
1283 ClientError::Build(message) => CliError::InvalidArgument(message),
1284 ClientError::Auth(err) => CliError::InvalidArgument(err.to_string()),
1285 ClientError::HttpStatus { status, body } => {
1286 CliError::InvalidArgument(format!("Server error: HTTP {} - {}", status.as_u16(), body))
1287 }
1288 }
1289}
1290
1291async fn send_cancel_request(client: &HttpClient) -> Result<()> {
1292 #[derive(serde::Serialize)]
1293 struct CancelRequest {}
1294
1295 let request = CancelRequest {};
1296 let _: serde_json::Value = client
1297 .post_json("api/sql/cancel", &request)
1298 .await
1299 .map_err(map_client_error)?;
1300 Ok(())
1301}
1302
1303fn merge_limit(limit: Option<usize>, max_rows: Option<usize>) -> Option<usize> {
1304 match (limit, max_rows) {
1305 (Some(a), Some(b)) => Some(a.min(b)),
1306 (Some(value), None) | (None, Some(value)) => Some(value),
1307 (None, None) => None,
1308 }
1309}
1310
1311fn execute_sql_ddl_dml<W: Write>(
1316 db: &Database,
1317 sql: &str,
1318 writer: &mut W,
1319 formatter: Box<dyn Formatter>,
1320 options: &SqlExecutionOptions<'_>,
1321) -> Result<()> {
1322 use alopex_sql::ExecutionResult;
1323
1324 options.deadline.check()?;
1325 let result = db.execute_sql(sql)?;
1326 options.deadline.check()?;
1327
1328 match result {
1329 ExecutionResult::Success => {
1330 if !options.quiet {
1332 let columns = sql_status_columns();
1333 let mut streaming_writer =
1334 StreamingWriter::new(writer, formatter, columns, options.limit)
1335 .with_quiet(options.quiet);
1336 streaming_writer.prepare(Some(1))?;
1337 let row = Row::new(vec![
1338 Value::Text("OK".to_string()),
1339 Value::Text("Operation completed successfully".to_string()),
1340 ]);
1341 streaming_writer.write_row(row)?;
1342 streaming_writer.finish()?;
1343 }
1344 }
1345 ExecutionResult::RowsAffected(count) => {
1346 if !options.quiet {
1348 let columns = sql_status_columns();
1349 let mut streaming_writer =
1350 StreamingWriter::new(writer, formatter, columns, options.limit)
1351 .with_quiet(options.quiet);
1352 streaming_writer.prepare(Some(1))?;
1353 let row = Row::new(vec![
1354 Value::Text("OK".to_string()),
1355 Value::Text(format!("{} row(s) affected", count)),
1356 ]);
1357 streaming_writer.write_row(row)?;
1358 streaming_writer.finish()?;
1359 }
1360 }
1361 ExecutionResult::Query(query_result) => {
1362 let columns = columns_from_query_result(&query_result);
1365 let mut streaming_writer =
1366 StreamingWriter::new(writer, formatter, columns, options.limit)
1367 .with_quiet(options.quiet);
1368 streaming_writer.prepare(Some(query_result.rows.len()))?;
1369 for sql_row in query_result.rows {
1370 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1371 let row = Row::new(values);
1372 match streaming_writer.write_row(row)? {
1373 WriteStatus::LimitReached => break,
1374 WriteStatus::Continue => {}
1375 }
1376 }
1377 streaming_writer.finish()?;
1378 }
1379 }
1380
1381 Ok(())
1382}
1383
1384fn sql_value_to_value(sql_value: alopex_sql::SqlValue) -> Value {
1386 use alopex_sql::SqlValue;
1387
1388 match sql_value {
1389 SqlValue::Null => Value::Null,
1390 SqlValue::Integer(i) => Value::Int(i as i64),
1391 SqlValue::BigInt(i) => Value::Int(i),
1392 SqlValue::Float(f) => Value::Float(f as f64),
1393 SqlValue::Double(f) => Value::Float(f),
1394 SqlValue::Text(s) => Value::Text(s),
1395 SqlValue::Blob(b) => Value::Bytes(b),
1396 SqlValue::Boolean(b) => Value::Bool(b),
1397 SqlValue::Timestamp(ts) => {
1398 Value::Text(format!("{}", ts))
1400 }
1401 SqlValue::Vector(v) => Value::Vector(v),
1402 }
1403}
1404
1405fn remote_value_to_value(sql_value: alopex_sql::storage::SqlValue) -> Value {
1406 use alopex_sql::storage::SqlValue;
1407
1408 match sql_value {
1409 SqlValue::Null => Value::Null,
1410 SqlValue::Integer(i) => Value::Int(i as i64),
1411 SqlValue::BigInt(i) => Value::Int(i),
1412 SqlValue::Float(f) => Value::Float(f as f64),
1413 SqlValue::Double(f) => Value::Float(f),
1414 SqlValue::Text(s) => Value::Text(s),
1415 SqlValue::Blob(b) => Value::Bytes(b),
1416 SqlValue::Boolean(b) => Value::Bool(b),
1417 SqlValue::Timestamp(ts) => Value::Text(ts.to_string()),
1418 SqlValue::Vector(v) => Value::Vector(v),
1419 }
1420}
1421
1422fn data_type_from_string(value: &str) -> DataType {
1423 let upper = value.to_ascii_uppercase();
1424 if upper.starts_with("INT") || upper.starts_with("BIGINT") {
1425 DataType::Int
1426 } else if upper.starts_with("FLOAT") || upper.starts_with("DOUBLE") {
1427 DataType::Float
1428 } else if upper.starts_with("BLOB") {
1429 DataType::Bytes
1430 } else if upper.starts_with("BOOLEAN") {
1431 DataType::Bool
1432 } else if upper.starts_with("VECTOR") {
1433 DataType::Vector
1434 } else {
1435 DataType::Text
1436 }
1437}
1438
1439fn sql_column_to_column(col: &alopex_sql::executor::ColumnInfo) -> Column {
1441 use alopex_sql::planner::ResolvedType;
1442
1443 let data_type = match &col.data_type {
1444 ResolvedType::Integer | ResolvedType::BigInt => DataType::Int,
1445 ResolvedType::Float | ResolvedType::Double => DataType::Float,
1446 ResolvedType::Text => DataType::Text,
1447 ResolvedType::Blob => DataType::Bytes,
1448 ResolvedType::Boolean => DataType::Bool,
1449 ResolvedType::Timestamp => DataType::Text, ResolvedType::Vector { .. } => DataType::Vector,
1451 ResolvedType::Null => DataType::Text, };
1453
1454 Column::new(&col.name, data_type)
1455}
1456
1457#[allow(dead_code)] fn columns_from_query_result(query_result: &alopex_sql::executor::QueryResult) -> Vec<Column> {
1460 query_result
1461 .columns
1462 .iter()
1463 .map(sql_column_to_column)
1464 .collect()
1465}
1466
1467#[allow(dead_code)] fn columns_from_streaming_result(
1470 query_iter: &alopex_embedded::QueryRowIterator<'_>,
1471) -> Vec<Column> {
1472 query_iter
1473 .columns()
1474 .iter()
1475 .map(sql_column_to_column)
1476 .collect()
1477}
1478
1479fn columns_from_streaming_rows(rows: &alopex_embedded::StreamingRows<'_>) -> Vec<Column> {
1481 rows.columns().iter().map(sql_column_to_column).collect()
1482}
1483
1484pub fn sql_status_columns() -> Vec<Column> {
1486 vec![
1487 Column::new("status", DataType::Text),
1488 Column::new("message", DataType::Text),
1489 ]
1490}
1491
1492#[cfg(test)]
1493mod tests {
1494 use super::*;
1495 use crate::batch::BatchModeSource;
1496 use crate::output::jsonl::JsonlFormatter;
1497
1498 fn create_test_db() -> Database {
1499 Database::open_in_memory().unwrap()
1500 }
1501
1502 fn create_status_writer(output: &mut Vec<u8>) -> StreamingWriter<&mut Vec<u8>> {
1503 let formatter = Box::new(JsonlFormatter::new());
1504 let columns = sql_status_columns();
1505 StreamingWriter::new(output, formatter, columns, None)
1506 }
1507
1508 fn create_query_writer(
1509 output: &mut Vec<u8>,
1510 columns: Vec<Column>,
1511 ) -> StreamingWriter<&mut Vec<u8>> {
1512 let formatter = Box::new(JsonlFormatter::new());
1513 StreamingWriter::new(output, formatter, columns, None)
1514 }
1515
1516 fn default_batch_mode() -> BatchMode {
1517 BatchMode {
1518 is_batch: false,
1519 is_tty: true,
1520 source: BatchModeSource::Default,
1521 }
1522 }
1523
1524 #[test]
1525 fn test_create_table() {
1526 let db = create_test_db();
1527
1528 let mut output = Vec::new();
1529 {
1530 let mut writer = create_status_writer(&mut output);
1531 execute_sql(
1532 &db,
1533 "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1534 &mut writer,
1535 )
1536 .unwrap();
1537 }
1538
1539 let result = String::from_utf8(output).unwrap();
1540 assert!(result.contains("OK"));
1541 }
1542
1543 #[test]
1544 fn test_insert_and_select() {
1545 let db = create_test_db();
1546
1547 {
1549 let mut output = Vec::new();
1550 let mut writer = create_status_writer(&mut output);
1551 execute_sql(
1552 &db,
1553 "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1554 &mut writer,
1555 )
1556 .unwrap();
1557 }
1558
1559 {
1561 let mut output = Vec::new();
1562 let mut writer = create_status_writer(&mut output);
1563 execute_sql(
1564 &db,
1565 "INSERT INTO users (id, name) VALUES (1, 'Alice');",
1566 &mut writer,
1567 )
1568 .unwrap();
1569 let result = String::from_utf8(output).unwrap();
1570 assert!(result.contains("row(s) affected"));
1571 }
1572
1573 {
1575 let mut output = Vec::new();
1576 let columns = vec![
1577 Column::new("id", DataType::Int),
1578 Column::new("name", DataType::Text),
1579 ];
1580 let mut writer = create_query_writer(&mut output, columns);
1581 execute_sql(&db, "SELECT id, name FROM users;", &mut writer).unwrap();
1582 let result = String::from_utf8(output).unwrap();
1583 assert!(result.contains("Alice"));
1584 }
1585 }
1586
1587 #[test]
1588 fn test_syntax_error() {
1589 let db = create_test_db();
1590
1591 let mut output = Vec::new();
1592 let mut writer = create_status_writer(&mut output);
1593 let result = execute_sql(&db, "CREATE TABEL invalid_syntax;", &mut writer);
1594 assert!(result.is_err());
1595 }
1596
1597 #[test]
1598 fn test_multiple_statements() {
1599 let db = create_test_db();
1600
1601 let mut output = Vec::new();
1602 {
1603 let mut writer = create_status_writer(&mut output);
1604 execute_sql(
1605 &db,
1606 "CREATE TABLE t (id INTEGER PRIMARY KEY); INSERT INTO t (id) VALUES (1);",
1607 &mut writer,
1608 )
1609 .unwrap();
1610 }
1611
1612 {
1614 let mut output = Vec::new();
1615 let columns = vec![Column::new("id", DataType::Int)];
1616 let mut writer = create_query_writer(&mut output, columns);
1617 execute_sql(&db, "SELECT id FROM t;", &mut writer).unwrap();
1618 let result = String::from_utf8(output).unwrap();
1619 assert!(result.contains("1"));
1620 }
1621 }
1622
1623 #[test]
1624 fn test_sql_value_conversion() {
1625 use alopex_sql::SqlValue;
1626
1627 assert!(matches!(sql_value_to_value(SqlValue::Null), Value::Null));
1628 assert!(matches!(
1629 sql_value_to_value(SqlValue::Integer(42)),
1630 Value::Int(42)
1631 ));
1632 assert!(matches!(
1633 sql_value_to_value(SqlValue::BigInt(100)),
1634 Value::Int(100)
1635 ));
1636 assert!(matches!(
1637 sql_value_to_value(SqlValue::Boolean(true)),
1638 Value::Bool(true)
1639 ));
1640 assert!(
1641 matches!(sql_value_to_value(SqlValue::Text("hello".to_string())), Value::Text(s) if s == "hello")
1642 );
1643 }
1644
1645 #[test]
1646 fn resolve_query_from_argument() {
1647 let cmd = SqlCommand {
1648 query: Some("SELECT 1".to_string()),
1649 file: None,
1650 fetch_size: None,
1651 max_rows: None,
1652 deadline: None,
1653 tui: false,
1654 };
1655
1656 let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1657 assert_eq!(sql, "SELECT 1");
1658 }
1659
1660 #[test]
1661 fn resolve_query_from_file() {
1662 let mut file = tempfile::NamedTempFile::new().unwrap();
1663 writeln!(file, "SELECT * FROM users").unwrap();
1664
1665 let cmd = SqlCommand {
1666 query: None,
1667 file: Some(file.path().display().to_string()),
1668 fetch_size: None,
1669 max_rows: None,
1670 deadline: None,
1671 tui: false,
1672 };
1673
1674 let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1675 assert_eq!(sql, "SELECT * FROM users\n");
1676 }
1677
1678 #[test]
1679 fn resolve_query_returns_no_query_error() {
1680 let cmd = SqlCommand {
1681 query: None,
1682 file: None,
1683 fetch_size: None,
1684 max_rows: None,
1685 deadline: None,
1686 tui: false,
1687 };
1688
1689 let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1690 assert!(matches!(err, CliError::NoQueryProvided));
1691 }
1692
1693 #[test]
1694 fn resolve_query_rejects_query_and_file() {
1695 let cmd = SqlCommand {
1696 query: Some("SELECT 1".to_string()),
1697 file: Some("query.sql".into()),
1698 fetch_size: None,
1699 max_rows: None,
1700 deadline: None,
1701 tui: false,
1702 };
1703
1704 let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1705 assert!(matches!(
1706 err,
1707 CliError::InvalidArgument(msg) if msg == "Cannot specify both query and file"
1708 ));
1709 }
1710}