1use std::collections::HashSet;
6use std::fs;
7use std::io::{self, Read, Write};
8
9use alopex_embedded::Database;
10
11use crate::batch::BatchMode;
12use crate::cli::SqlCommand;
13use crate::client::http::{ClientError, HttpClient};
14use crate::error::{CliError, Result};
15use crate::models::{Column, DataType, Row, Value};
16use crate::output::formatter::Formatter;
17use crate::streaming::timeout::parse_deadline;
18use crate::streaming::{CancelSignal, Deadline, StreamingWriter, WriteStatus};
19use crate::tui::{is_tty, TuiApp};
20use futures_util::StreamExt;
21
22#[doc(hidden)]
23pub struct SqlExecutionOptions<'a> {
24 pub limit: Option<usize>,
25 pub quiet: bool,
26 pub cancel: &'a CancelSignal,
27 pub deadline: &'a Deadline,
28}
29
30pub fn execute_with_formatter<W: Write>(
44 db: &Database,
45 cmd: SqlCommand,
46 batch_mode: &BatchMode,
47 writer: &mut W,
48 formatter: Box<dyn Formatter>,
49 limit: Option<usize>,
50 quiet: bool,
51) -> Result<()> {
52 let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
53 let cancel = CancelSignal::new();
54
55 execute_with_formatter_control(
56 db,
57 cmd,
58 batch_mode,
59 writer,
60 formatter,
61 SqlExecutionOptions {
62 limit,
63 quiet,
64 cancel: &cancel,
65 deadline: &deadline,
66 },
67 )
68}
69
70#[doc(hidden)]
71pub fn execute_with_formatter_control<W: Write>(
72 db: &Database,
73 cmd: SqlCommand,
74 batch_mode: &BatchMode,
75 writer: &mut W,
76 formatter: Box<dyn Formatter>,
77 options: SqlExecutionOptions<'_>,
78) -> Result<()> {
79 let sql = cmd.resolve_query(batch_mode)?;
80 let effective_limit = merge_limit(options.limit, cmd.max_rows);
81 let options = SqlExecutionOptions {
82 limit: effective_limit,
83 ..options
84 };
85
86 if cmd.tui {
87 return execute_tui_local_or_fallback(db, &sql, writer, formatter, options);
88 }
89
90 execute_sql_with_formatter(db, &sql, writer, formatter, &options)
91}
92
93fn is_select_query(sql: &str) -> Result<bool> {
94 use alopex_sql::{AlopexDialect, Parser, StatementKind};
95
96 let dialect = AlopexDialect;
97 let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
98 Ok(stmts.len() == 1
99 && matches!(
100 stmts.first().map(|s| &s.kind),
101 Some(StatementKind::Select(_))
102 ))
103}
104
105pub async fn execute_remote_with_formatter<W: Write>(
107 client: &HttpClient,
108 cmd: &SqlCommand,
109 batch_mode: &BatchMode,
110 writer: &mut W,
111 formatter: Box<dyn Formatter>,
112 limit: Option<usize>,
113 quiet: bool,
114) -> Result<()> {
115 let effective_limit = merge_limit(limit, cmd.max_rows);
116 let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
117 let cancel = CancelSignal::new();
118 let options = SqlExecutionOptions {
119 limit: effective_limit,
120 quiet,
121 cancel: &cancel,
122 deadline: &deadline,
123 };
124
125 execute_remote_with_formatter_control(client, cmd, batch_mode, writer, formatter, options).await
126}
127
128#[doc(hidden)]
129pub async fn execute_remote_with_formatter_control<W: Write>(
130 client: &HttpClient,
131 cmd: &SqlCommand,
132 batch_mode: &BatchMode,
133 writer: &mut W,
134 formatter: Box<dyn Formatter>,
135 options: SqlExecutionOptions<'_>,
136) -> Result<()> {
137 let sql = cmd.resolve_query(batch_mode)?;
138 if cmd.tui {
139 return execute_tui_remote_or_fallback(client, &sql, cmd, writer, formatter, options).await;
140 }
141 execute_remote_with_formatter_impl(client, &sql, cmd, writer, formatter, &options).await
142}
143
144async fn execute_remote_with_formatter_impl<W: Write>(
145 client: &HttpClient,
146 sql: &str,
147 cmd: &SqlCommand,
148 writer: &mut W,
149 formatter: Box<dyn Formatter>,
150 options: &SqlExecutionOptions<'_>,
151) -> Result<()> {
152 if is_select_query(sql)? && formatter.supports_streaming() {
153 return execute_remote_streaming(
154 client,
155 sql,
156 writer,
157 formatter,
158 options,
159 cmd.fetch_size,
160 cmd.max_rows,
161 )
162 .await;
163 }
164
165 let request = RemoteSqlRequest {
166 sql: sql.to_string(),
167 streaming: false,
168 fetch_size: cmd.fetch_size,
169 max_rows: cmd.max_rows,
170 };
171 let response: RemoteSqlResponse = tokio::select! {
172 result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
173 match result {
174 Ok(value) => value.map_err(map_client_error)?,
175 Err(_) => {
176 let _ = send_cancel_request(client).await;
177 return Err(CliError::Timeout(format!(
178 "deadline exceeded after {}",
179 humantime::format_duration(options.deadline.duration())
180 )));
181 }
182 }
183 }
184 _ = options.cancel.wait() => {
185 let _ = send_cancel_request(client).await;
186 return Err(CliError::Cancelled);
187 }
188 };
189
190 if response.columns.is_empty() {
191 if options.quiet {
192 return Ok(());
193 }
194 let message = match response.affected_rows {
195 Some(count) => format!("{count} row(s) affected"),
196 None => "Operation completed successfully".to_string(),
197 };
198 let columns = sql_status_columns();
199 let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
200 .with_quiet(options.quiet);
201 streaming_writer.prepare(Some(1))?;
202 let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
203 streaming_writer.write_row(row)?;
204 return streaming_writer.finish();
205 }
206
207 let columns: Vec<Column> = response
208 .columns
209 .iter()
210 .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
211 .collect();
212 let mut streaming_writer =
213 StreamingWriter::new(writer, formatter, columns, options.limit).with_quiet(options.quiet);
214 streaming_writer.prepare(Some(response.rows.len()))?;
215 for row in response.rows {
216 if options.cancel.is_cancelled() {
217 let _ = send_cancel_request(client).await;
218 return Err(CliError::Cancelled);
219 }
220 options.deadline.check()?;
221 let values = row.into_iter().map(remote_value_to_value).collect();
222 match streaming_writer.write_row(Row::new(values))? {
223 WriteStatus::LimitReached => break,
224 WriteStatus::Continue => {}
225 }
226 }
227 streaming_writer.finish()
228}
229
230fn execute_tui_local_or_fallback<W: Write>(
231 db: &Database,
232 sql: &str,
233 writer: &mut W,
234 formatter: Box<dyn Formatter>,
235 options: SqlExecutionOptions<'_>,
236) -> Result<()> {
237 if !is_tty() {
238 if !options.quiet {
239 eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
240 }
241 return execute_sql_with_formatter(db, sql, writer, formatter, &options);
242 }
243
244 match execute_tui_local(db, sql, &options) {
245 Ok(()) => Ok(()),
246 Err(err) => {
247 if !options.quiet {
248 eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
249 }
250 execute_sql_with_formatter(db, sql, writer, formatter, &options)
251 }
252 }
253}
254
255fn execute_tui_local(db: &Database, sql: &str, options: &SqlExecutionOptions<'_>) -> Result<()> {
256 use alopex_sql::ExecutionResult;
257
258 options.deadline.check()?;
259 let result = db.execute_sql(sql)?;
260 options.deadline.check()?;
261
262 let (columns, rows) = match result {
263 ExecutionResult::Success => {
264 let columns = sql_status_columns();
265 let row = Row::new(vec![
266 Value::Text("OK".to_string()),
267 Value::Text("Operation completed successfully".to_string()),
268 ]);
269 (columns, vec![row])
270 }
271 ExecutionResult::RowsAffected(count) => {
272 let columns = sql_status_columns();
273 let row = Row::new(vec![
274 Value::Text("OK".to_string()),
275 Value::Text(format!("{count} row(s) affected")),
276 ]);
277 (columns, vec![row])
278 }
279 ExecutionResult::Query(query_result) => {
280 let columns = query_result
281 .columns
282 .iter()
283 .map(|col| Column::new(&col.name, DataType::Text))
284 .collect::<Vec<_>>();
285 let mut rows = Vec::with_capacity(query_result.rows.len());
286 for sql_row in query_result.rows {
287 let values = sql_row.into_iter().map(sql_value_to_value).collect();
288 rows.push(Row::new(values));
289 }
290 if let Some(limit) = options.limit {
291 rows.truncate(limit);
292 }
293 (columns, rows)
294 }
295 };
296
297 let app = TuiApp::new(columns, rows, "local", false);
298 app.run()
299}
300
301async fn execute_tui_remote_or_fallback<W: Write>(
302 client: &HttpClient,
303 sql: &str,
304 cmd: &SqlCommand,
305 writer: &mut W,
306 formatter: Box<dyn Formatter>,
307 options: SqlExecutionOptions<'_>,
308) -> Result<()> {
309 if !is_tty() {
310 if !options.quiet {
311 eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
312 }
313 return execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options)
314 .await;
315 }
316
317 match execute_tui_remote(client, sql, cmd, &options).await {
318 Ok(()) => Ok(()),
319 Err(err) => {
320 if !options.quiet {
321 eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
322 }
323 execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options).await
324 }
325 }
326}
327
328async fn execute_tui_remote(
329 client: &HttpClient,
330 sql: &str,
331 cmd: &SqlCommand,
332 options: &SqlExecutionOptions<'_>,
333) -> Result<()> {
334 if is_select_query(sql)? {
335 let (columns, rows) =
336 collect_remote_streaming_rows(client, sql, options, cmd.fetch_size, cmd.max_rows)
337 .await?;
338 let app = TuiApp::new(columns, rows, "server", false);
339 return app.run();
340 }
341
342 let request = RemoteSqlRequest {
343 sql: sql.to_string(),
344 streaming: false,
345 fetch_size: cmd.fetch_size,
346 max_rows: cmd.max_rows,
347 };
348
349 let response: RemoteSqlResponse = tokio::select! {
350 result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
351 match result {
352 Ok(value) => value.map_err(map_client_error)?,
353 Err(_) => {
354 let _ = send_cancel_request(client).await;
355 return Err(CliError::Timeout(format!(
356 "deadline exceeded after {}",
357 humantime::format_duration(options.deadline.duration())
358 )));
359 }
360 }
361 }
362 _ = options.cancel.wait() => {
363 let _ = send_cancel_request(client).await;
364 return Err(CliError::Cancelled);
365 }
366 };
367
368 let (columns, rows) = if response.columns.is_empty() {
369 let columns = sql_status_columns();
370 let message = match response.affected_rows {
371 Some(count) => format!("{count} row(s) affected"),
372 None => "Operation completed successfully".to_string(),
373 };
374 let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
375 (columns, vec![row])
376 } else {
377 let columns: Vec<Column> = response
378 .columns
379 .iter()
380 .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
381 .collect();
382 let mut rows = response
383 .rows
384 .into_iter()
385 .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
386 .collect::<Vec<_>>();
387 if let Some(limit) = options.limit {
388 rows.truncate(limit);
389 }
390 (columns, rows)
391 };
392
393 let app = TuiApp::new(columns, rows, "server", false);
394 app.run()
395}
396
397async fn collect_remote_streaming_rows(
398 client: &HttpClient,
399 sql: &str,
400 options: &SqlExecutionOptions<'_>,
401 fetch_size: Option<usize>,
402 max_rows: Option<usize>,
403) -> Result<(Vec<Column>, Vec<Row>)> {
404 let request = RemoteSqlRequest {
405 sql: sql.to_string(),
406 streaming: true,
407 fetch_size,
408 max_rows,
409 };
410
411 let response = tokio::select! {
412 result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
413 match result {
414 Ok(value) => value.map_err(map_client_error)?,
415 Err(_) => {
416 let _ = send_cancel_request(client).await;
417 return Err(CliError::Timeout(format!(
418 "deadline exceeded after {}",
419 humantime::format_duration(options.deadline.duration())
420 )));
421 }
422 }
423 }
424 _ = options.cancel.wait() => {
425 let _ = send_cancel_request(client).await;
426 return Err(CliError::Cancelled);
427 }
428 };
429
430 let mut stream = response.bytes_stream();
431 let mut buffer: Vec<u8> = Vec::new();
432 let mut pos: usize = 0;
433 let mut done = false;
434 let mut saw_array_start = false;
435 let mut columns: Option<Vec<String>> = None;
436 let mut column_set: Option<HashSet<String>> = None;
437 let mut rows: Vec<Row> = Vec::new();
438
439 while !done {
440 if options.cancel.is_cancelled() {
441 let _ = send_cancel_request(client).await;
442 return Err(CliError::Cancelled);
443 }
444 if let Err(err) = options.deadline.check() {
445 let _ = send_cancel_request(client).await;
446 return Err(err);
447 }
448
449 let next = tokio::select! {
450 _ = options.cancel.wait() => {
451 let _ = send_cancel_request(client).await;
452 return Err(CliError::Cancelled);
453 }
454 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
455 match result {
456 Ok(value) => value,
457 Err(_) => {
458 let _ = send_cancel_request(client).await;
459 return Err(CliError::Timeout(format!(
460 "deadline exceeded after {}",
461 humantime::format_duration(options.deadline.duration())
462 )));
463 }
464 }
465 }
466 };
467
468 let chunk = match next {
469 Some(chunk) => chunk,
470 None => break,
471 };
472
473 let bytes = match chunk {
474 Ok(bytes) => bytes,
475 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
476 };
477
478 buffer.extend_from_slice(&bytes);
479
480 loop {
481 skip_whitespace(&buffer, &mut pos);
482 if pos >= buffer.len() {
483 break;
484 }
485
486 if !saw_array_start {
487 if buffer[pos] != b'[' {
488 return Err(CliError::InvalidArgument(
489 "Invalid streaming response: expected JSON array".into(),
490 ));
491 }
492 pos += 1;
493 saw_array_start = true;
494 continue;
495 }
496
497 skip_whitespace(&buffer, &mut pos);
498 if pos >= buffer.len() {
499 break;
500 }
501
502 if buffer[pos] == b']' {
503 pos += 1;
504 done = true;
505 break;
506 }
507
508 let slice = &buffer[pos..];
509 let mut stream =
510 serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
511 let value = match stream.next() {
512 Some(Ok(value)) => value,
513 Some(Err(err)) if err.is_eof() => break,
514 Some(Err(err)) => return Err(CliError::Json(err)),
515 None => break,
516 };
517 pos = pos.saturating_add(stream.byte_offset());
518
519 let object = value.as_object().ok_or_else(|| {
520 CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
521 })?;
522
523 if columns.is_none() {
524 let names: Vec<String> = object.keys().cloned().collect();
525 let set: HashSet<String> = names.iter().cloned().collect();
526 if names.is_empty() {
527 return Err(CliError::InvalidArgument(
528 "Invalid streaming row: empty object".into(),
529 ));
530 }
531 columns = Some(names);
532 column_set = Some(set);
533 }
534
535 let names = columns
536 .as_ref()
537 .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
538 let set = column_set
539 .as_ref()
540 .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
541
542 if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
543 return Err(CliError::InvalidArgument(
544 "Invalid streaming row: column mismatch".into(),
545 ));
546 }
547
548 let values = names
549 .iter()
550 .map(|name| {
551 object.get(name).ok_or_else(|| {
552 CliError::InvalidArgument(format!(
553 "Invalid streaming row: missing column '{name}'"
554 ))
555 })
556 })
557 .map(|value| value.and_then(json_value_to_value))
558 .collect::<Result<Vec<_>>>()?;
559 rows.push(Row::new(values));
560
561 if let Some(limit) = options.limit {
562 if rows.len() >= limit {
563 let _ = send_cancel_request(client).await;
564 done = true;
565 break;
566 }
567 }
568
569 skip_whitespace(&buffer, &mut pos);
570 if pos >= buffer.len() {
571 break;
572 }
573 match buffer[pos] {
574 b',' => {
575 pos += 1;
576 }
577 b']' => {
578 pos += 1;
579 done = true;
580 break;
581 }
582 _ => {
583 return Err(CliError::InvalidArgument(
584 "Invalid streaming response: expected ',' or ']'".into(),
585 ))
586 }
587 }
588 }
589
590 if pos > 0 {
591 buffer.drain(..pos);
592 pos = 0;
593 }
594 }
595
596 if done {
597 if has_non_whitespace(&buffer) {
598 return Err(CliError::InvalidArgument(
599 "Invalid streaming response: unexpected trailing data".into(),
600 ));
601 }
602 buffer.clear();
603 loop {
604 let next = tokio::select! {
605 _ = options.cancel.wait() => {
606 let _ = send_cancel_request(client).await;
607 return Err(CliError::Cancelled);
608 }
609 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
610 match result {
611 Ok(value) => value,
612 Err(_) => {
613 let _ = send_cancel_request(client).await;
614 return Err(CliError::Timeout(format!(
615 "deadline exceeded after {}",
616 humantime::format_duration(options.deadline.duration())
617 )));
618 }
619 }
620 }
621 };
622
623 let chunk = match next {
624 Some(chunk) => chunk,
625 None => break,
626 };
627
628 let bytes = match chunk {
629 Ok(bytes) => bytes,
630 Err(err) => {
631 return Err(CliError::ServerConnection(format!("request failed: {err}")))
632 }
633 };
634
635 if has_non_whitespace(&bytes) {
636 return Err(CliError::InvalidArgument(
637 "Invalid streaming response: unexpected trailing data".into(),
638 ));
639 }
640 }
641 } else {
642 skip_whitespace(&buffer, &mut pos);
643 if pos < buffer.len() {
644 return Err(CliError::InvalidArgument(
645 "Invalid streaming response: unexpected trailing data".into(),
646 ));
647 }
648 return Err(CliError::InvalidArgument(
649 "Invalid streaming response: unexpected end of stream".into(),
650 ));
651 }
652
653 let columns = columns
654 .unwrap_or_default()
655 .into_iter()
656 .map(|name| Column::new(name, DataType::Text))
657 .collect();
658 Ok((columns, rows))
659}
660
661async fn execute_remote_streaming<W: Write>(
662 client: &HttpClient,
663 sql: &str,
664 writer: &mut W,
665 formatter: Box<dyn Formatter>,
666 options: &SqlExecutionOptions<'_>,
667 fetch_size: Option<usize>,
668 max_rows: Option<usize>,
669) -> Result<()> {
670 let request = RemoteSqlRequest {
671 sql: sql.to_string(),
672 streaming: true,
673 fetch_size,
674 max_rows,
675 };
676
677 let response = tokio::select! {
678 result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
679 match result {
680 Ok(value) => value.map_err(map_client_error)?,
681 Err(_) => {
682 let _ = send_cancel_request(client).await;
683 return Err(CliError::Timeout(format!(
684 "deadline exceeded after {}",
685 humantime::format_duration(options.deadline.duration())
686 )));
687 }
688 }
689 }
690 _ = options.cancel.wait() => {
691 let _ = send_cancel_request(client).await;
692 return Err(CliError::Cancelled);
693 }
694 };
695
696 let mut stream = response.bytes_stream();
697 let mut buffer: Vec<u8> = Vec::new();
698 let mut pos: usize = 0;
699 let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
700 let mut formatter = Some(formatter);
701 let mut columns: Option<Vec<String>> = None;
702 let mut column_set: Option<HashSet<String>> = None;
703 let mut done = false;
704 let mut saw_array_start = false;
705
706 while !done {
707 if options.cancel.is_cancelled() {
708 let _ = send_cancel_request(client).await;
709 return Err(CliError::Cancelled);
710 }
711 if let Err(err) = options.deadline.check() {
712 let _ = send_cancel_request(client).await;
713 return Err(err);
714 }
715
716 let next = tokio::select! {
717 _ = options.cancel.wait() => {
718 let _ = send_cancel_request(client).await;
719 return Err(CliError::Cancelled);
720 }
721 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
722 match result {
723 Ok(value) => value,
724 Err(_) => {
725 let _ = send_cancel_request(client).await;
726 return Err(CliError::Timeout(format!(
727 "deadline exceeded after {}",
728 humantime::format_duration(options.deadline.duration())
729 )));
730 }
731 }
732 }
733 };
734
735 let chunk = match next {
736 Some(chunk) => chunk,
737 None => break,
738 };
739
740 let bytes = match chunk {
741 Ok(bytes) => bytes,
742 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
743 };
744
745 buffer.extend_from_slice(&bytes);
746
747 loop {
748 skip_whitespace(&buffer, &mut pos);
749 if pos >= buffer.len() {
750 break;
751 }
752
753 if !saw_array_start {
754 if buffer[pos] != b'[' {
755 return Err(CliError::InvalidArgument(
756 "Invalid streaming response: expected JSON array".into(),
757 ));
758 }
759 pos += 1;
760 saw_array_start = true;
761 continue;
762 }
763
764 skip_whitespace(&buffer, &mut pos);
765 if pos >= buffer.len() {
766 break;
767 }
768
769 if buffer[pos] == b']' {
770 pos += 1;
771 done = true;
772 break;
773 }
774
775 let slice = &buffer[pos..];
776 let mut stream =
777 serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
778 let value = match stream.next() {
779 Some(Ok(value)) => value,
780 Some(Err(err)) if err.is_eof() => break,
781 Some(Err(err)) => return Err(CliError::Json(err)),
782 None => break,
783 };
784 pos = pos.saturating_add(stream.byte_offset());
785
786 let object = value.as_object().ok_or_else(|| {
787 CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
788 })?;
789
790 if columns.is_none() {
791 let names: Vec<String> = object.keys().cloned().collect();
792 let set: HashSet<String> = names.iter().cloned().collect();
793 if names.is_empty() {
794 return Err(CliError::InvalidArgument(
795 "Invalid streaming row: empty object".into(),
796 ));
797 }
798 let cols = names
799 .iter()
800 .map(|name| Column::new(name, DataType::Text))
801 .collect::<Vec<_>>();
802 let formatter = formatter
803 .take()
804 .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
805 let mut writer = StreamingWriter::new(&mut *writer, formatter, cols, options.limit)
806 .with_quiet(options.quiet);
807 writer.prepare(None)?;
808 streaming_writer = Some(writer);
809 columns = Some(names);
810 column_set = Some(set);
811 }
812
813 let names = columns
814 .as_ref()
815 .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
816 let set = column_set
817 .as_ref()
818 .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
819
820 if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
821 return Err(CliError::InvalidArgument(
822 "Invalid streaming row: column mismatch".into(),
823 ));
824 }
825
826 let values = names
827 .iter()
828 .map(|name| {
829 object.get(name).ok_or_else(|| {
830 CliError::InvalidArgument(format!(
831 "Invalid streaming row: missing column '{name}'"
832 ))
833 })
834 })
835 .map(|value| value.and_then(json_value_to_value))
836 .collect::<Result<Vec<_>>>()?;
837
838 if let Some(writer) = streaming_writer.as_mut() {
839 match writer.write_row(Row::new(values))? {
840 WriteStatus::LimitReached => {
841 let _ = send_cancel_request(client).await;
842 return writer.finish();
843 }
844 WriteStatus::Continue => {}
845 }
846 }
847
848 skip_whitespace(&buffer, &mut pos);
849 if pos >= buffer.len() {
850 break;
851 }
852 match buffer[pos] {
853 b',' => {
854 pos += 1;
855 }
856 b']' => {
857 pos += 1;
858 done = true;
859 break;
860 }
861 _ => {
862 return Err(CliError::InvalidArgument(
863 "Invalid streaming response: expected ',' or ']'".into(),
864 ))
865 }
866 }
867 }
868
869 if pos > 0 {
870 buffer.drain(..pos);
871 pos = 0;
872 }
873 }
874
875 if done {
876 if has_non_whitespace(&buffer) {
877 return Err(CliError::InvalidArgument(
878 "Invalid streaming response: unexpected trailing data".into(),
879 ));
880 }
881 buffer.clear();
882 loop {
883 let next = tokio::select! {
884 _ = options.cancel.wait() => {
885 let _ = send_cancel_request(client).await;
886 return Err(CliError::Cancelled);
887 }
888 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
889 match result {
890 Ok(value) => value,
891 Err(_) => {
892 let _ = send_cancel_request(client).await;
893 return Err(CliError::Timeout(format!(
894 "deadline exceeded after {}",
895 humantime::format_duration(options.deadline.duration())
896 )));
897 }
898 }
899 }
900 };
901
902 let chunk = match next {
903 Some(chunk) => chunk,
904 None => break,
905 };
906
907 let bytes = match chunk {
908 Ok(bytes) => bytes,
909 Err(err) => {
910 return Err(CliError::ServerConnection(format!("request failed: {err}")))
911 }
912 };
913
914 if has_non_whitespace(&bytes) {
915 return Err(CliError::InvalidArgument(
916 "Invalid streaming response: unexpected trailing data".into(),
917 ));
918 }
919 }
920 } else {
921 skip_whitespace(&buffer, &mut pos);
922 if pos < buffer.len() {
923 return Err(CliError::InvalidArgument(
924 "Invalid streaming response: unexpected trailing data".into(),
925 ));
926 }
927 return Err(CliError::InvalidArgument(
928 "Invalid streaming response: unexpected end of stream".into(),
929 ));
930 }
931
932 if let Some(mut writer) = streaming_writer {
933 return writer.finish();
934 }
935
936 if done && saw_array_start {
937 if let Some(formatter) = formatter.take() {
938 let mut writer =
939 StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
940 .with_quiet(options.quiet);
941 writer.prepare(None)?;
942 return writer.finish();
943 }
944 }
945
946 Ok(())
947}
948
949fn skip_whitespace(buffer: &[u8], pos: &mut usize) {
950 while *pos < buffer.len() {
951 match buffer[*pos] {
952 b' ' | b'\n' | b'\r' | b'\t' => *pos += 1,
953 _ => break,
954 }
955 }
956}
957
958fn has_non_whitespace(buffer: &[u8]) -> bool {
959 buffer
960 .iter()
961 .any(|byte| !matches!(byte, b' ' | b'\n' | b'\r' | b'\t'))
962}
963
964fn json_value_to_value(value: &serde_json::Value) -> Result<Value> {
965 match value {
966 serde_json::Value::Null => Ok(Value::Null),
967 serde_json::Value::Bool(value) => Ok(Value::Bool(*value)),
968 serde_json::Value::Number(value) => {
969 if let Some(value) = value.as_i64() {
970 Ok(Value::Int(value))
971 } else if let Some(value) = value.as_f64() {
972 Ok(Value::Float(value))
973 } else {
974 Err(CliError::InvalidArgument(
975 "Invalid numeric value in streaming row".into(),
976 ))
977 }
978 }
979 serde_json::Value::String(value) => Ok(Value::Text(value.clone())),
980 serde_json::Value::Array(values) => {
981 let mut vector = Vec::with_capacity(values.len());
982 for entry in values {
983 let number = entry.as_f64().ok_or_else(|| {
984 CliError::InvalidArgument("Invalid vector value in streaming row".into())
985 })?;
986 vector.push(number as f32);
987 }
988 Ok(Value::Vector(vector))
989 }
990 serde_json::Value::Object(_) => Err(CliError::InvalidArgument(
991 "Invalid streaming row: nested objects are not supported".into(),
992 )),
993 }
994}
995
996#[allow(dead_code)]
998pub fn execute<W: Write>(
999 db: &Database,
1000 cmd: SqlCommand,
1001 batch_mode: &BatchMode,
1002 writer: &mut StreamingWriter<W>,
1003) -> Result<()> {
1004 let sql = cmd.resolve_query(batch_mode)?;
1005
1006 execute_sql(db, &sql, writer)
1007}
1008
1009impl SqlCommand {
1010 pub fn resolve_query(&self, batch_mode: &BatchMode) -> Result<String> {
1012 match (&self.query, &self.file) {
1013 (Some(query), None) => Ok(query.clone()),
1014 (None, Some(file)) => fs::read_to_string(file).map_err(|e| {
1015 CliError::InvalidArgument(format!("Failed to read SQL file '{}': {}", file, e))
1016 }),
1017 (None, None) if !batch_mode.is_tty => {
1018 let mut buf = String::new();
1019 io::stdin().read_to_string(&mut buf)?;
1020 Ok(buf)
1021 }
1022 (None, None) => Err(CliError::NoQueryProvided),
1023 (Some(_), Some(_)) => Err(CliError::InvalidArgument(
1024 "Cannot specify both query and file".to_string(),
1025 )),
1026 }
1027 }
1028}
1029
1030fn execute_sql<W: Write>(db: &Database, sql: &str, writer: &mut StreamingWriter<W>) -> Result<()> {
1032 use alopex_embedded::SqlResult;
1033
1034 let result = db.execute_sql(sql)?;
1035
1036 match result {
1037 SqlResult::Success => {
1038 writer.prepare(Some(1))?;
1040 let row = Row::new(vec![
1041 Value::Text("OK".to_string()),
1042 Value::Text("Operation completed successfully".to_string()),
1043 ]);
1044 writer.write_row(row)?;
1045 writer.finish()?;
1046 }
1047 SqlResult::RowsAffected(count) => {
1048 writer.prepare(Some(1))?;
1050 let row = Row::new(vec![
1051 Value::Text("OK".to_string()),
1052 Value::Text(format!("{} row(s) affected", count)),
1053 ]);
1054 writer.write_row(row)?;
1055 writer.finish()?;
1056 }
1057 SqlResult::Query(query_result) => {
1058 let row_count = query_result.rows.len();
1060 writer.prepare(Some(row_count))?;
1061
1062 for sql_row in query_result.rows {
1063 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1064 let row = Row::new(values);
1065
1066 match writer.write_row(row)? {
1067 WriteStatus::LimitReached => break,
1068 WriteStatus::Continue => {}
1069 }
1070 }
1071
1072 writer.finish()?;
1073 }
1074 }
1075
1076 Ok(())
1077}
1078
1079fn execute_sql_with_formatter<W: Write>(
1091 db: &Database,
1092 sql: &str,
1093 writer: &mut W,
1094 formatter: Box<dyn Formatter>,
1095 options: &SqlExecutionOptions<'_>,
1096) -> Result<()> {
1097 use alopex_sql::{AlopexDialect, Parser, StatementKind};
1098
1099 let dialect = AlopexDialect;
1102 let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
1103
1104 let is_select = stmts.len() == 1
1105 && matches!(
1106 stmts.first().map(|s| &s.kind),
1107 Some(StatementKind::Select(_))
1108 );
1109
1110 if is_select {
1111 execute_sql_select_streaming(db, sql, writer, formatter, options)
1113 } else {
1114 execute_sql_ddl_dml(db, sql, writer, formatter, options)
1116 }
1117}
1118
1119fn execute_sql_select_streaming<W: Write>(
1125 db: &Database,
1126 sql: &str,
1127 writer: &mut W,
1128 formatter: Box<dyn Formatter>,
1129 options: &SqlExecutionOptions<'_>,
1130) -> Result<()> {
1131 use alopex_embedded::StreamingQueryResult;
1132 use std::sync::atomic::{AtomicBool, Ordering};
1133 use std::sync::Arc;
1134
1135 fn cli_err_to_embedded(e: crate::error::CliError) -> alopex_embedded::Error {
1137 alopex_embedded::Error::Sql(alopex_sql::SqlError::Execution {
1138 message: e.to_string(),
1139 code: "ALOPEX-C001",
1140 })
1141 }
1142
1143 let cancelled = Arc::new(AtomicBool::new(false));
1144 let timed_out = Arc::new(AtomicBool::new(false));
1145 let cancel_flag = cancelled.clone();
1146 let timeout_flag = timed_out.clone();
1147
1148 let result = db.execute_sql_with_rows(sql, |mut rows| {
1149 let columns = columns_from_streaming_rows(&rows);
1151 let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
1152 .with_quiet(options.quiet);
1153
1154 streaming_writer
1156 .prepare(None)
1157 .map_err(cli_err_to_embedded)?;
1158
1159 if let Err(err) = options.deadline.check() {
1160 timeout_flag.store(true, Ordering::SeqCst);
1161 return Err(cli_err_to_embedded(err));
1162 }
1163
1164 while let Ok(Some(sql_row)) = rows.next_row() {
1166 if options.cancel.is_cancelled() {
1167 cancel_flag.store(true, Ordering::SeqCst);
1168 return Err(cli_err_to_embedded(CliError::Cancelled));
1169 }
1170 if let Err(err) = options.deadline.check() {
1171 timeout_flag.store(true, Ordering::SeqCst);
1172 return Err(cli_err_to_embedded(err));
1173 }
1174 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1175 let row = Row::new(values);
1176
1177 match streaming_writer
1178 .write_row(row)
1179 .map_err(cli_err_to_embedded)?
1180 {
1181 WriteStatus::LimitReached => break,
1182 WriteStatus::Continue => {}
1183 }
1184 }
1185
1186 streaming_writer.finish().map_err(cli_err_to_embedded)?;
1187 Ok(())
1188 });
1189
1190 let result = match result {
1191 Ok(value) => value,
1192 Err(err) => {
1193 if cancelled.load(Ordering::SeqCst) {
1194 return Err(CliError::Cancelled);
1195 }
1196 if timed_out.load(Ordering::SeqCst) {
1197 return Err(CliError::Timeout(format!(
1198 "deadline exceeded after {}",
1199 humantime::format_duration(options.deadline.duration())
1200 )));
1201 }
1202 return Err(CliError::Database(err));
1203 }
1204 };
1205
1206 match result {
1207 StreamingQueryResult::QueryProcessed(()) => Ok(()),
1208 StreamingQueryResult::Success | StreamingQueryResult::RowsAffected(_) => {
1209 Ok(())
1211 }
1212 }
1213}
1214
1215#[derive(serde::Serialize)]
1216struct RemoteSqlRequest {
1217 sql: String,
1218 #[serde(default)]
1219 streaming: bool,
1220 #[serde(skip_serializing_if = "Option::is_none")]
1221 fetch_size: Option<usize>,
1222 #[serde(skip_serializing_if = "Option::is_none")]
1223 max_rows: Option<usize>,
1224}
1225
1226#[derive(serde::Deserialize)]
1227struct RemoteColumnInfo {
1228 name: String,
1229 data_type: String,
1230}
1231
1232#[derive(serde::Deserialize)]
1233struct RemoteSqlResponse {
1234 columns: Vec<RemoteColumnInfo>,
1235 rows: Vec<Vec<alopex_sql::storage::SqlValue>>,
1236 affected_rows: Option<u64>,
1237}
1238
1239fn map_client_error(err: ClientError) -> CliError {
1240 match err {
1241 ClientError::Request { source, .. } => {
1242 CliError::ServerConnection(format!("request failed: {source}"))
1243 }
1244 ClientError::InvalidUrl(message) => CliError::InvalidArgument(message),
1245 ClientError::Build(message) => CliError::InvalidArgument(message),
1246 ClientError::Auth(err) => CliError::InvalidArgument(err.to_string()),
1247 ClientError::HttpStatus { status, body } => {
1248 CliError::InvalidArgument(format!("Server error: HTTP {} - {}", status.as_u16(), body))
1249 }
1250 }
1251}
1252
1253async fn send_cancel_request(client: &HttpClient) -> Result<()> {
1254 #[derive(serde::Serialize)]
1255 struct CancelRequest {}
1256
1257 let request = CancelRequest {};
1258 let _: serde_json::Value = client
1259 .post_json("api/sql/cancel", &request)
1260 .await
1261 .map_err(map_client_error)?;
1262 Ok(())
1263}
1264
1265fn merge_limit(limit: Option<usize>, max_rows: Option<usize>) -> Option<usize> {
1266 match (limit, max_rows) {
1267 (Some(a), Some(b)) => Some(a.min(b)),
1268 (Some(value), None) | (None, Some(value)) => Some(value),
1269 (None, None) => None,
1270 }
1271}
1272
1273fn execute_sql_ddl_dml<W: Write>(
1278 db: &Database,
1279 sql: &str,
1280 writer: &mut W,
1281 formatter: Box<dyn Formatter>,
1282 options: &SqlExecutionOptions<'_>,
1283) -> Result<()> {
1284 use alopex_sql::ExecutionResult;
1285
1286 options.deadline.check()?;
1287 let result = db.execute_sql(sql)?;
1288 options.deadline.check()?;
1289
1290 match result {
1291 ExecutionResult::Success => {
1292 if !options.quiet {
1294 let columns = sql_status_columns();
1295 let mut streaming_writer =
1296 StreamingWriter::new(writer, formatter, columns, options.limit)
1297 .with_quiet(options.quiet);
1298 streaming_writer.prepare(Some(1))?;
1299 let row = Row::new(vec![
1300 Value::Text("OK".to_string()),
1301 Value::Text("Operation completed successfully".to_string()),
1302 ]);
1303 streaming_writer.write_row(row)?;
1304 streaming_writer.finish()?;
1305 }
1306 }
1307 ExecutionResult::RowsAffected(count) => {
1308 if !options.quiet {
1310 let columns = sql_status_columns();
1311 let mut streaming_writer =
1312 StreamingWriter::new(writer, formatter, columns, options.limit)
1313 .with_quiet(options.quiet);
1314 streaming_writer.prepare(Some(1))?;
1315 let row = Row::new(vec![
1316 Value::Text("OK".to_string()),
1317 Value::Text(format!("{} row(s) affected", count)),
1318 ]);
1319 streaming_writer.write_row(row)?;
1320 streaming_writer.finish()?;
1321 }
1322 }
1323 ExecutionResult::Query(query_result) => {
1324 let columns = columns_from_query_result(&query_result);
1327 let mut streaming_writer =
1328 StreamingWriter::new(writer, formatter, columns, options.limit)
1329 .with_quiet(options.quiet);
1330 streaming_writer.prepare(Some(query_result.rows.len()))?;
1331 for sql_row in query_result.rows {
1332 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1333 let row = Row::new(values);
1334 match streaming_writer.write_row(row)? {
1335 WriteStatus::LimitReached => break,
1336 WriteStatus::Continue => {}
1337 }
1338 }
1339 streaming_writer.finish()?;
1340 }
1341 }
1342
1343 Ok(())
1344}
1345
1346fn sql_value_to_value(sql_value: alopex_sql::SqlValue) -> Value {
1348 use alopex_sql::SqlValue;
1349
1350 match sql_value {
1351 SqlValue::Null => Value::Null,
1352 SqlValue::Integer(i) => Value::Int(i as i64),
1353 SqlValue::BigInt(i) => Value::Int(i),
1354 SqlValue::Float(f) => Value::Float(f as f64),
1355 SqlValue::Double(f) => Value::Float(f),
1356 SqlValue::Text(s) => Value::Text(s),
1357 SqlValue::Blob(b) => Value::Bytes(b),
1358 SqlValue::Boolean(b) => Value::Bool(b),
1359 SqlValue::Timestamp(ts) => {
1360 Value::Text(format!("{}", ts))
1362 }
1363 SqlValue::Vector(v) => Value::Vector(v),
1364 }
1365}
1366
1367fn remote_value_to_value(sql_value: alopex_sql::storage::SqlValue) -> Value {
1368 use alopex_sql::storage::SqlValue;
1369
1370 match sql_value {
1371 SqlValue::Null => Value::Null,
1372 SqlValue::Integer(i) => Value::Int(i as i64),
1373 SqlValue::BigInt(i) => Value::Int(i),
1374 SqlValue::Float(f) => Value::Float(f as f64),
1375 SqlValue::Double(f) => Value::Float(f),
1376 SqlValue::Text(s) => Value::Text(s),
1377 SqlValue::Blob(b) => Value::Bytes(b),
1378 SqlValue::Boolean(b) => Value::Bool(b),
1379 SqlValue::Timestamp(ts) => Value::Text(ts.to_string()),
1380 SqlValue::Vector(v) => Value::Vector(v),
1381 }
1382}
1383
1384fn data_type_from_string(value: &str) -> DataType {
1385 let upper = value.to_ascii_uppercase();
1386 if upper.starts_with("INT") || upper.starts_with("BIGINT") {
1387 DataType::Int
1388 } else if upper.starts_with("FLOAT") || upper.starts_with("DOUBLE") {
1389 DataType::Float
1390 } else if upper.starts_with("BLOB") {
1391 DataType::Bytes
1392 } else if upper.starts_with("BOOLEAN") {
1393 DataType::Bool
1394 } else if upper.starts_with("VECTOR") {
1395 DataType::Vector
1396 } else {
1397 DataType::Text
1398 }
1399}
1400
1401fn sql_column_to_column(col: &alopex_sql::executor::ColumnInfo) -> Column {
1403 use alopex_sql::planner::ResolvedType;
1404
1405 let data_type = match &col.data_type {
1406 ResolvedType::Integer | ResolvedType::BigInt => DataType::Int,
1407 ResolvedType::Float | ResolvedType::Double => DataType::Float,
1408 ResolvedType::Text => DataType::Text,
1409 ResolvedType::Blob => DataType::Bytes,
1410 ResolvedType::Boolean => DataType::Bool,
1411 ResolvedType::Timestamp => DataType::Text, ResolvedType::Vector { .. } => DataType::Vector,
1413 ResolvedType::Null => DataType::Text, };
1415
1416 Column::new(&col.name, data_type)
1417}
1418
1419#[allow(dead_code)] fn columns_from_query_result(query_result: &alopex_sql::executor::QueryResult) -> Vec<Column> {
1422 query_result
1423 .columns
1424 .iter()
1425 .map(sql_column_to_column)
1426 .collect()
1427}
1428
1429#[allow(dead_code)] fn columns_from_streaming_result(
1432 query_iter: &alopex_embedded::QueryRowIterator<'_>,
1433) -> Vec<Column> {
1434 query_iter
1435 .columns()
1436 .iter()
1437 .map(sql_column_to_column)
1438 .collect()
1439}
1440
1441fn columns_from_streaming_rows(rows: &alopex_embedded::StreamingRows<'_>) -> Vec<Column> {
1443 rows.columns().iter().map(sql_column_to_column).collect()
1444}
1445
1446pub fn sql_status_columns() -> Vec<Column> {
1448 vec![
1449 Column::new("status", DataType::Text),
1450 Column::new("message", DataType::Text),
1451 ]
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456 use super::*;
1457 use crate::batch::BatchModeSource;
1458 use crate::output::jsonl::JsonlFormatter;
1459
1460 fn create_test_db() -> Database {
1461 Database::open_in_memory().unwrap()
1462 }
1463
1464 fn create_status_writer(output: &mut Vec<u8>) -> StreamingWriter<&mut Vec<u8>> {
1465 let formatter = Box::new(JsonlFormatter::new());
1466 let columns = sql_status_columns();
1467 StreamingWriter::new(output, formatter, columns, None)
1468 }
1469
1470 fn create_query_writer(
1471 output: &mut Vec<u8>,
1472 columns: Vec<Column>,
1473 ) -> StreamingWriter<&mut Vec<u8>> {
1474 let formatter = Box::new(JsonlFormatter::new());
1475 StreamingWriter::new(output, formatter, columns, None)
1476 }
1477
1478 fn default_batch_mode() -> BatchMode {
1479 BatchMode {
1480 is_batch: false,
1481 is_tty: true,
1482 source: BatchModeSource::Default,
1483 }
1484 }
1485
1486 #[test]
1487 fn test_create_table() {
1488 let db = create_test_db();
1489
1490 let mut output = Vec::new();
1491 {
1492 let mut writer = create_status_writer(&mut output);
1493 execute_sql(
1494 &db,
1495 "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1496 &mut writer,
1497 )
1498 .unwrap();
1499 }
1500
1501 let result = String::from_utf8(output).unwrap();
1502 assert!(result.contains("OK"));
1503 }
1504
1505 #[test]
1506 fn test_insert_and_select() {
1507 let db = create_test_db();
1508
1509 {
1511 let mut output = Vec::new();
1512 let mut writer = create_status_writer(&mut output);
1513 execute_sql(
1514 &db,
1515 "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1516 &mut writer,
1517 )
1518 .unwrap();
1519 }
1520
1521 {
1523 let mut output = Vec::new();
1524 let mut writer = create_status_writer(&mut output);
1525 execute_sql(
1526 &db,
1527 "INSERT INTO users (id, name) VALUES (1, 'Alice');",
1528 &mut writer,
1529 )
1530 .unwrap();
1531 let result = String::from_utf8(output).unwrap();
1532 assert!(result.contains("row(s) affected"));
1533 }
1534
1535 {
1537 let mut output = Vec::new();
1538 let columns = vec![
1539 Column::new("id", DataType::Int),
1540 Column::new("name", DataType::Text),
1541 ];
1542 let mut writer = create_query_writer(&mut output, columns);
1543 execute_sql(&db, "SELECT id, name FROM users;", &mut writer).unwrap();
1544 let result = String::from_utf8(output).unwrap();
1545 assert!(result.contains("Alice"));
1546 }
1547 }
1548
1549 #[test]
1550 fn test_syntax_error() {
1551 let db = create_test_db();
1552
1553 let mut output = Vec::new();
1554 let mut writer = create_status_writer(&mut output);
1555 let result = execute_sql(&db, "CREATE TABEL invalid_syntax;", &mut writer);
1556 assert!(result.is_err());
1557 }
1558
1559 #[test]
1560 fn test_multiple_statements() {
1561 let db = create_test_db();
1562
1563 let mut output = Vec::new();
1564 {
1565 let mut writer = create_status_writer(&mut output);
1566 execute_sql(
1567 &db,
1568 "CREATE TABLE t (id INTEGER PRIMARY KEY); INSERT INTO t (id) VALUES (1);",
1569 &mut writer,
1570 )
1571 .unwrap();
1572 }
1573
1574 {
1576 let mut output = Vec::new();
1577 let columns = vec![Column::new("id", DataType::Int)];
1578 let mut writer = create_query_writer(&mut output, columns);
1579 execute_sql(&db, "SELECT id FROM t;", &mut writer).unwrap();
1580 let result = String::from_utf8(output).unwrap();
1581 assert!(result.contains("1"));
1582 }
1583 }
1584
1585 #[test]
1586 fn test_sql_value_conversion() {
1587 use alopex_sql::SqlValue;
1588
1589 assert!(matches!(sql_value_to_value(SqlValue::Null), Value::Null));
1590 assert!(matches!(
1591 sql_value_to_value(SqlValue::Integer(42)),
1592 Value::Int(42)
1593 ));
1594 assert!(matches!(
1595 sql_value_to_value(SqlValue::BigInt(100)),
1596 Value::Int(100)
1597 ));
1598 assert!(matches!(
1599 sql_value_to_value(SqlValue::Boolean(true)),
1600 Value::Bool(true)
1601 ));
1602 assert!(
1603 matches!(sql_value_to_value(SqlValue::Text("hello".to_string())), Value::Text(s) if s == "hello")
1604 );
1605 }
1606
1607 #[test]
1608 fn resolve_query_from_argument() {
1609 let cmd = SqlCommand {
1610 query: Some("SELECT 1".to_string()),
1611 file: None,
1612 fetch_size: None,
1613 max_rows: None,
1614 deadline: None,
1615 tui: false,
1616 };
1617
1618 let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1619 assert_eq!(sql, "SELECT 1");
1620 }
1621
1622 #[test]
1623 fn resolve_query_from_file() {
1624 let mut file = tempfile::NamedTempFile::new().unwrap();
1625 writeln!(file, "SELECT * FROM users").unwrap();
1626
1627 let cmd = SqlCommand {
1628 query: None,
1629 file: Some(file.path().display().to_string()),
1630 fetch_size: None,
1631 max_rows: None,
1632 deadline: None,
1633 tui: false,
1634 };
1635
1636 let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
1637 assert_eq!(sql, "SELECT * FROM users\n");
1638 }
1639
1640 #[test]
1641 fn resolve_query_returns_no_query_error() {
1642 let cmd = SqlCommand {
1643 query: None,
1644 file: None,
1645 fetch_size: None,
1646 max_rows: None,
1647 deadline: None,
1648 tui: false,
1649 };
1650
1651 let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1652 assert!(matches!(err, CliError::NoQueryProvided));
1653 }
1654
1655 #[test]
1656 fn resolve_query_rejects_query_and_file() {
1657 let cmd = SqlCommand {
1658 query: Some("SELECT 1".to_string()),
1659 file: Some("query.sql".into()),
1660 fetch_size: None,
1661 max_rows: None,
1662 deadline: None,
1663 tui: false,
1664 };
1665
1666 let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
1667 assert!(matches!(
1668 err,
1669 CliError::InvalidArgument(msg) if msg == "Cannot specify both query and file"
1670 ));
1671 }
1672}