1use std::collections::HashSet;
6use std::fs;
7use std::io::{self, Read, Write};
8
9use alopex_embedded::Database;
10
11use crate::batch::BatchMode;
12use crate::cli::SqlCommand;
13use crate::client::http::{ClientError, HttpClient};
14use crate::error::{CliError, Result};
15use crate::models::{Column, DataType, Row, Value};
16use crate::output::formatter::Formatter;
17use crate::streaming::timeout::parse_deadline;
18use crate::streaming::{CancelSignal, Deadline, StreamingWriter, WriteStatus};
19use crate::tui::{is_tty, TuiApp};
20use crate::ui::mode::UiMode;
21use futures_util::StreamExt;
22use reqwest::Response;
23
24#[doc(hidden)]
25pub struct SqlExecutionOptions<'a> {
26 pub limit: Option<usize>,
27 pub quiet: bool,
28 pub cancel: &'a CancelSignal,
29 pub deadline: &'a Deadline,
30 pub admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
31}
32
33#[allow(clippy::too_many_arguments)]
47pub fn execute_with_formatter<'a, W: Write>(
48 db: &Database,
49 cmd: SqlCommand,
50 batch_mode: &BatchMode,
51 ui_mode: UiMode,
52 writer: &mut W,
53 formatter: Box<dyn Formatter>,
54 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
55 limit: Option<usize>,
56 quiet: bool,
57) -> Result<()> {
58 let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
59 let cancel = CancelSignal::new();
60
61 execute_with_formatter_control(
62 db,
63 cmd,
64 batch_mode,
65 ui_mode,
66 writer,
67 formatter,
68 SqlExecutionOptions {
69 limit,
70 quiet,
71 cancel: &cancel,
72 deadline: &deadline,
73 admin_launcher,
74 },
75 )
76}
77
78#[doc(hidden)]
79pub fn execute_with_formatter_control<W: Write>(
80 db: &Database,
81 cmd: SqlCommand,
82 batch_mode: &BatchMode,
83 ui_mode: UiMode,
84 writer: &mut W,
85 formatter: Box<dyn Formatter>,
86 mut options: SqlExecutionOptions<'_>,
87) -> Result<()> {
88 let sql = cmd.resolve_query(batch_mode)?;
89 let effective_limit = merge_limit(options.limit, cmd.max_rows);
90 options.limit = effective_limit;
91
92 if ui_mode == UiMode::Tui {
93 return execute_tui_local_or_fallback(db, &sql, writer, formatter, options);
94 }
95
96 execute_sql_with_formatter(db, &sql, writer, formatter, &options)
97}
98
99fn is_select_query(sql: &str) -> Result<bool> {
100 use alopex_sql::{AlopexDialect, Parser, StatementKind};
101
102 let dialect = AlopexDialect;
103 let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
104 Ok(stmts.len() == 1
105 && matches!(
106 stmts.first().map(|s| &s.kind),
107 Some(StatementKind::Select(_))
108 ))
109}
110
111#[allow(clippy::too_many_arguments)]
113pub async fn execute_remote_with_formatter<'a, W: Write>(
114 client: &HttpClient,
115 cmd: &SqlCommand,
116 batch_mode: &BatchMode,
117 ui_mode: UiMode,
118 writer: &mut W,
119 formatter: Box<dyn Formatter>,
120 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
121 limit: Option<usize>,
122 quiet: bool,
123) -> Result<()> {
124 let effective_limit = merge_limit(limit, cmd.max_rows);
125 let deadline = Deadline::new(parse_deadline(cmd.deadline.as_deref())?);
126 let cancel = CancelSignal::new();
127 let options = SqlExecutionOptions {
128 limit: effective_limit,
129 quiet,
130 cancel: &cancel,
131 deadline: &deadline,
132 admin_launcher,
133 };
134
135 execute_remote_with_formatter_control(
136 client, cmd, batch_mode, ui_mode, writer, formatter, options,
137 )
138 .await
139}
140
141fn sql_context_message(sql: &str) -> String {
142 let condensed = sql.split_whitespace().collect::<Vec<_>>().join(" ");
143 let max_len = 200;
144 if condensed.chars().count() > max_len {
145 let truncated: String = condensed.chars().take(max_len).collect();
146 format!("SQL: {truncated}...")
147 } else {
148 format!("SQL: {condensed}")
149 }
150}
151
152#[doc(hidden)]
153pub async fn execute_remote_with_formatter_control<W: Write>(
154 client: &HttpClient,
155 cmd: &SqlCommand,
156 batch_mode: &BatchMode,
157 ui_mode: UiMode,
158 writer: &mut W,
159 formatter: Box<dyn Formatter>,
160 options: SqlExecutionOptions<'_>,
161) -> Result<()> {
162 let sql = cmd.resolve_query(batch_mode)?;
163 if ui_mode == UiMode::Tui {
164 return execute_tui_remote_or_fallback(client, &sql, cmd, writer, formatter, options).await;
165 }
166 execute_remote_with_formatter_impl(client, &sql, cmd, writer, formatter, &options).await
167}
168
169async fn execute_remote_with_formatter_impl<W: Write>(
170 client: &HttpClient,
171 sql: &str,
172 cmd: &SqlCommand,
173 writer: &mut W,
174 formatter: Box<dyn Formatter>,
175 options: &SqlExecutionOptions<'_>,
176) -> Result<()> {
177 if is_select_query(sql)? && formatter.supports_streaming() {
178 return execute_remote_streaming(
179 client,
180 sql,
181 writer,
182 formatter,
183 options,
184 cmd.fetch_size,
185 cmd.max_rows,
186 )
187 .await;
188 }
189
190 let request = RemoteSqlRequest {
191 sql: sql.to_string(),
192 streaming: false,
193 fetch_size: cmd.fetch_size,
194 max_rows: cmd.max_rows,
195 };
196 let response: RemoteSqlResponse = tokio::select! {
197 result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
198 match result {
199 Ok(value) => value.map_err(map_client_error)?,
200 Err(_) => {
201 let _ = send_cancel_request(client).await;
202 return Err(CliError::Timeout(format!(
203 "deadline exceeded after {}",
204 humantime::format_duration(options.deadline.duration())
205 )));
206 }
207 }
208 }
209 _ = options.cancel.wait() => {
210 let _ = send_cancel_request(client).await;
211 return Err(CliError::Cancelled);
212 }
213 };
214
215 if response.columns.is_empty() {
216 if options.quiet {
217 return Ok(());
218 }
219 let message = match response.affected_rows {
220 Some(count) => format!("{count} row(s) affected"),
221 None => "Operation completed successfully".to_string(),
222 };
223 let columns = sql_status_columns();
224 let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
225 .with_quiet(options.quiet);
226 streaming_writer.prepare(Some(1))?;
227 let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
228 streaming_writer.write_row(row)?;
229 return streaming_writer.finish();
230 }
231
232 let columns: Vec<Column> = response
233 .columns
234 .iter()
235 .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
236 .collect();
237 let mut streaming_writer =
238 StreamingWriter::new(writer, formatter, columns, options.limit).with_quiet(options.quiet);
239 streaming_writer.prepare(Some(response.rows.len()))?;
240 for row in response.rows {
241 if options.cancel.is_cancelled() {
242 let _ = send_cancel_request(client).await;
243 return Err(CliError::Cancelled);
244 }
245 options.deadline.check()?;
246 let values = row.into_iter().map(remote_value_to_value).collect();
247 match streaming_writer.write_row(Row::new(values))? {
248 WriteStatus::LimitReached => break,
249 WriteStatus::Continue => {}
250 }
251 }
252 streaming_writer.finish()
253}
254
255fn execute_tui_local_or_fallback<'a, W: Write>(
256 db: &Database,
257 sql: &str,
258 writer: &mut W,
259 formatter: Box<dyn Formatter>,
260 mut options: SqlExecutionOptions<'a>,
261) -> Result<()> {
262 if !is_tty() {
263 if !options.quiet {
264 eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
265 }
266 return execute_sql_with_formatter(db, sql, writer, formatter, &options);
267 }
268
269 let admin_launcher = options.admin_launcher.take();
270 match execute_tui_local(db, sql, &options, admin_launcher) {
271 Ok(()) => Ok(()),
272 Err(err) => {
273 if !options.quiet {
274 eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
275 }
276 execute_sql_with_formatter(db, sql, writer, formatter, &options)
277 }
278 }
279}
280
281fn execute_tui_local<'a>(
282 db: &Database,
283 sql: &str,
284 options: &SqlExecutionOptions<'a>,
285 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
286) -> Result<()> {
287 use alopex_sql::ExecutionResult;
288
289 options.deadline.check()?;
290 let result = db.execute_sql(sql)?;
291 options.deadline.check()?;
292
293 let (columns, rows) = match result {
294 ExecutionResult::Success => {
295 let columns = sql_status_columns();
296 let row = Row::new(vec![
297 Value::Text("OK".to_string()),
298 Value::Text("Operation completed successfully".to_string()),
299 ]);
300 (columns, vec![row])
301 }
302 ExecutionResult::RowsAffected(count) => {
303 let columns = sql_status_columns();
304 let row = Row::new(vec![
305 Value::Text("OK".to_string()),
306 Value::Text(format!("{count} row(s) affected")),
307 ]);
308 (columns, vec![row])
309 }
310 ExecutionResult::Query(query_result) => {
311 let columns = query_result
312 .columns
313 .iter()
314 .map(|col| Column::new(&col.name, DataType::Text))
315 .collect::<Vec<_>>();
316 let mut rows = Vec::with_capacity(query_result.rows.len());
317 for sql_row in query_result.rows {
318 let values = sql_row.into_iter().map(sql_value_to_value).collect();
319 rows.push(Row::new(values));
320 }
321 if let Some(limit) = options.limit {
322 rows.truncate(limit);
323 }
324 (columns, rows)
325 }
326 };
327
328 let app = TuiApp::new(columns, rows, "local", false)
329 .with_context_message(Some(sql_context_message(sql)))
330 .with_admin_launcher(admin_launcher);
331 app.run()
332}
333
334async fn execute_tui_remote_or_fallback<'a, W: Write>(
335 client: &HttpClient,
336 sql: &str,
337 cmd: &SqlCommand,
338 writer: &mut W,
339 formatter: Box<dyn Formatter>,
340 mut options: SqlExecutionOptions<'a>,
341) -> Result<()> {
342 if !is_tty() {
343 if !options.quiet {
344 eprintln!("Warning: --tui requires a TTY, falling back to batch output.");
345 }
346 return execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options)
347 .await;
348 }
349
350 let admin_launcher = options.admin_launcher.take();
351 match execute_tui_remote(client, sql, cmd, &options, admin_launcher).await {
352 Ok(()) => Ok(()),
353 Err(err) => {
354 if !options.quiet {
355 eprintln!("Warning: TUI failed ({err}); falling back to batch output.");
356 }
357 execute_remote_with_formatter_impl(client, sql, cmd, writer, formatter, &options).await
358 }
359 }
360}
361
362async fn execute_tui_remote<'a>(
363 client: &HttpClient,
364 sql: &str,
365 cmd: &SqlCommand,
366 options: &SqlExecutionOptions<'a>,
367 admin_launcher: Option<Box<dyn FnMut() -> Result<()> + 'a>>,
368) -> Result<()> {
369 if is_select_query(sql)? {
370 let (columns, rows) =
371 collect_remote_streaming_rows(client, sql, options, cmd.fetch_size, cmd.max_rows)
372 .await?;
373 let app = TuiApp::new(columns, rows, "server", false)
374 .with_context_message(Some(sql_context_message(sql)))
375 .with_admin_launcher(admin_launcher);
376 return app.run();
377 }
378
379 let request = RemoteSqlRequest {
380 sql: sql.to_string(),
381 streaming: false,
382 fetch_size: cmd.fetch_size,
383 max_rows: cmd.max_rows,
384 };
385
386 let response: RemoteSqlResponse = tokio::select! {
387 result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
388 match result {
389 Ok(value) => value.map_err(map_client_error)?,
390 Err(_) => {
391 let _ = send_cancel_request(client).await;
392 return Err(CliError::Timeout(format!(
393 "deadline exceeded after {}",
394 humantime::format_duration(options.deadline.duration())
395 )));
396 }
397 }
398 }
399 _ = options.cancel.wait() => {
400 let _ = send_cancel_request(client).await;
401 return Err(CliError::Cancelled);
402 }
403 };
404
405 let (columns, rows) = if response.columns.is_empty() {
406 let columns = sql_status_columns();
407 let message = match response.affected_rows {
408 Some(count) => format!("{count} row(s) affected"),
409 None => "Operation completed successfully".to_string(),
410 };
411 let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
412 (columns, vec![row])
413 } else {
414 let columns: Vec<Column> = response
415 .columns
416 .iter()
417 .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
418 .collect();
419 let mut rows = response
420 .rows
421 .into_iter()
422 .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
423 .collect::<Vec<_>>();
424 if let Some(limit) = options.limit {
425 rows.truncate(limit);
426 }
427 (columns, rows)
428 };
429
430 let app = TuiApp::new(columns, rows, "server", false)
431 .with_context_message(Some(sql_context_message(sql)))
432 .with_admin_launcher(admin_launcher);
433 app.run()
434}
435
436async fn collect_remote_streaming_rows(
437 client: &HttpClient,
438 sql: &str,
439 options: &SqlExecutionOptions<'_>,
440 fetch_size: Option<usize>,
441 max_rows: Option<usize>,
442) -> Result<(Vec<Column>, Vec<Row>)> {
443 let request = RemoteSqlRequest {
444 sql: sql.to_string(),
445 streaming: true,
446 fetch_size,
447 max_rows,
448 };
449
450 let response = tokio::select! {
451 result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
452 match result {
453 Ok(value) => value.map_err(map_client_error)?,
454 Err(_) => {
455 let _ = send_cancel_request(client).await;
456 return Err(CliError::Timeout(format!(
457 "deadline exceeded after {}",
458 humantime::format_duration(options.deadline.duration())
459 )));
460 }
461 }
462 }
463 _ = options.cancel.wait() => {
464 let _ = send_cancel_request(client).await;
465 return Err(CliError::Cancelled);
466 }
467 };
468
469 if let Some(content_type) = response
470 .headers()
471 .get(reqwest::header::CONTENT_TYPE)
472 .and_then(|value| value.to_str().ok())
473 {
474 if content_type.starts_with("application/jsonl") {
475 return collect_remote_jsonl_rows(client, response, options).await;
476 }
477 }
478
479 let mut stream = response.bytes_stream();
480 let mut buffer: Vec<u8> = Vec::new();
481 let mut pos: usize = 0;
482 let mut done = false;
483 let mut saw_array_start = false;
484 let mut columns: Option<Vec<String>> = None;
485 let mut column_set: Option<HashSet<String>> = None;
486 let mut rows: Vec<Row> = Vec::new();
487
488 while !done {
489 if options.cancel.is_cancelled() {
490 let _ = send_cancel_request(client).await;
491 return Err(CliError::Cancelled);
492 }
493 if let Err(err) = options.deadline.check() {
494 let _ = send_cancel_request(client).await;
495 return Err(err);
496 }
497
498 let next = tokio::select! {
499 _ = options.cancel.wait() => {
500 let _ = send_cancel_request(client).await;
501 return Err(CliError::Cancelled);
502 }
503 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
504 match result {
505 Ok(value) => value,
506 Err(_) => {
507 let _ = send_cancel_request(client).await;
508 return Err(CliError::Timeout(format!(
509 "deadline exceeded after {}",
510 humantime::format_duration(options.deadline.duration())
511 )));
512 }
513 }
514 }
515 };
516
517 let chunk = match next {
518 Some(chunk) => chunk,
519 None => break,
520 };
521
522 let bytes = match chunk {
523 Ok(bytes) => bytes,
524 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
525 };
526
527 buffer.extend_from_slice(&bytes);
528
529 loop {
530 skip_whitespace(&buffer, &mut pos);
531 if pos >= buffer.len() {
532 break;
533 }
534
535 if !saw_array_start {
536 if buffer[pos] != b'[' {
537 return Err(CliError::InvalidArgument(
538 "Invalid streaming response: expected JSON array".into(),
539 ));
540 }
541 pos += 1;
542 saw_array_start = true;
543 continue;
544 }
545
546 skip_whitespace(&buffer, &mut pos);
547 if pos >= buffer.len() {
548 break;
549 }
550
551 if buffer[pos] == b']' {
552 pos += 1;
553 done = true;
554 break;
555 }
556
557 let slice = &buffer[pos..];
558 let mut stream =
559 serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
560 let value = match stream.next() {
561 Some(Ok(value)) => value,
562 Some(Err(err)) if err.is_eof() => break,
563 Some(Err(err)) => return Err(CliError::Json(err)),
564 None => break,
565 };
566 pos = pos.saturating_add(stream.byte_offset());
567
568 let object = value.as_object().ok_or_else(|| {
569 CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
570 })?;
571
572 if columns.is_none() {
573 let names: Vec<String> = object.keys().cloned().collect();
574 let set: HashSet<String> = names.iter().cloned().collect();
575 if names.is_empty() {
576 return Err(CliError::InvalidArgument(
577 "Invalid streaming row: empty object".into(),
578 ));
579 }
580 columns = Some(names);
581 column_set = Some(set);
582 }
583
584 let names = columns
585 .as_ref()
586 .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
587 let set = column_set
588 .as_ref()
589 .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
590
591 if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
592 return Err(CliError::InvalidArgument(
593 "Invalid streaming row: column mismatch".into(),
594 ));
595 }
596
597 let values = names
598 .iter()
599 .map(|name| {
600 object.get(name).ok_or_else(|| {
601 CliError::InvalidArgument(format!(
602 "Invalid streaming row: missing column '{name}'"
603 ))
604 })
605 })
606 .map(|value| value.and_then(json_value_to_value))
607 .collect::<Result<Vec<_>>>()?;
608 rows.push(Row::new(values));
609
610 if let Some(limit) = options.limit {
611 if rows.len() >= limit {
612 let _ = send_cancel_request(client).await;
613 done = true;
614 break;
615 }
616 }
617
618 skip_whitespace(&buffer, &mut pos);
619 if pos >= buffer.len() {
620 break;
621 }
622 match buffer[pos] {
623 b',' => {
624 pos += 1;
625 }
626 b']' => {
627 pos += 1;
628 done = true;
629 break;
630 }
631 _ => {
632 return Err(CliError::InvalidArgument(
633 "Invalid streaming response: expected ',' or ']'".into(),
634 ))
635 }
636 }
637 }
638
639 if pos > 0 {
640 buffer.drain(..pos);
641 pos = 0;
642 }
643 }
644
645 if done {
646 if has_non_whitespace(&buffer) {
647 return Err(CliError::InvalidArgument(
648 "Invalid streaming response: unexpected trailing data".into(),
649 ));
650 }
651 buffer.clear();
652 loop {
653 let next = tokio::select! {
654 _ = options.cancel.wait() => {
655 let _ = send_cancel_request(client).await;
656 return Err(CliError::Cancelled);
657 }
658 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
659 match result {
660 Ok(value) => value,
661 Err(_) => {
662 let _ = send_cancel_request(client).await;
663 return Err(CliError::Timeout(format!(
664 "deadline exceeded after {}",
665 humantime::format_duration(options.deadline.duration())
666 )));
667 }
668 }
669 }
670 };
671
672 let chunk = match next {
673 Some(chunk) => chunk,
674 None => break,
675 };
676
677 let bytes = match chunk {
678 Ok(bytes) => bytes,
679 Err(err) => {
680 return Err(CliError::ServerConnection(format!("request failed: {err}")))
681 }
682 };
683
684 if has_non_whitespace(&bytes) {
685 return Err(CliError::InvalidArgument(
686 "Invalid streaming response: unexpected trailing data".into(),
687 ));
688 }
689 }
690 } else {
691 skip_whitespace(&buffer, &mut pos);
692 if pos < buffer.len() {
693 return Err(CliError::InvalidArgument(
694 "Invalid streaming response: unexpected trailing data".into(),
695 ));
696 }
697 return Err(CliError::InvalidArgument(
698 "Invalid streaming response: unexpected end of stream".into(),
699 ));
700 }
701
702 let columns = columns
703 .unwrap_or_default()
704 .into_iter()
705 .map(|name| Column::new(name, DataType::Text))
706 .collect();
707 Ok((columns, rows))
708}
709
710#[allow(dead_code)]
711async fn collect_remote_non_streaming_rows(
712 client: &HttpClient,
713 sql: &str,
714 options: &SqlExecutionOptions<'_>,
715 fetch_size: Option<usize>,
716 max_rows: Option<usize>,
717) -> Result<(Vec<Column>, Vec<Row>)> {
718 let request = RemoteSqlRequest {
719 sql: sql.to_string(),
720 streaming: false,
721 fetch_size,
722 max_rows,
723 };
724 let response: RemoteSqlResponse = tokio::select! {
725 result = tokio::time::timeout(options.deadline.remaining(), client.post_json("api/sql/query", &request)) => {
726 match result {
727 Ok(value) => value.map_err(map_client_error)?,
728 Err(_) => {
729 let _ = send_cancel_request(client).await;
730 return Err(CliError::Timeout(format!(
731 "deadline exceeded after {}",
732 humantime::format_duration(options.deadline.duration())
733 )));
734 }
735 }
736 }
737 _ = options.cancel.wait() => {
738 let _ = send_cancel_request(client).await;
739 return Err(CliError::Cancelled);
740 }
741 };
742
743 if response.columns.is_empty() {
744 let message = match response.affected_rows {
745 Some(count) => format!("{count} row(s) affected"),
746 None => "Operation completed successfully".to_string(),
747 };
748 let columns = sql_status_columns();
749 let row = Row::new(vec![Value::Text("OK".to_string()), Value::Text(message)]);
750 return Ok((columns, vec![row]));
751 }
752
753 let columns = response
754 .columns
755 .iter()
756 .map(|col| Column::new(&col.name, data_type_from_string(&col.data_type)))
757 .collect::<Vec<_>>();
758 let mut rows = response
759 .rows
760 .into_iter()
761 .map(|row| Row::new(row.into_iter().map(remote_value_to_value).collect()))
762 .collect::<Vec<_>>();
763 if let Some(limit) = options.limit {
764 rows.truncate(limit);
765 }
766 Ok((columns, rows))
767}
768
769async fn execute_remote_streaming<W: Write>(
770 client: &HttpClient,
771 sql: &str,
772 writer: &mut W,
773 formatter: Box<dyn Formatter>,
774 options: &SqlExecutionOptions<'_>,
775 fetch_size: Option<usize>,
776 max_rows: Option<usize>,
777) -> Result<()> {
778 let request = RemoteSqlRequest {
779 sql: sql.to_string(),
780 streaming: true,
781 fetch_size,
782 max_rows,
783 };
784
785 let response = tokio::select! {
786 result = tokio::time::timeout(options.deadline.remaining(), client.post_json_stream("api/sql/query", &request)) => {
787 match result {
788 Ok(value) => value.map_err(map_client_error)?,
789 Err(_) => {
790 let _ = send_cancel_request(client).await;
791 return Err(CliError::Timeout(format!(
792 "deadline exceeded after {}",
793 humantime::format_duration(options.deadline.duration())
794 )));
795 }
796 }
797 }
798 _ = options.cancel.wait() => {
799 let _ = send_cancel_request(client).await;
800 return Err(CliError::Cancelled);
801 }
802 };
803
804 if let Some(content_type) = response
805 .headers()
806 .get(reqwest::header::CONTENT_TYPE)
807 .and_then(|value| value.to_str().ok())
808 {
809 if content_type.starts_with("application/jsonl") {
810 return execute_remote_jsonl_streaming(client, response, writer, formatter, options)
811 .await;
812 }
813 }
814
815 let mut stream = response.bytes_stream();
816 let mut buffer: Vec<u8> = Vec::new();
817 let mut pos: usize = 0;
818 let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
819 let mut formatter = Some(formatter);
820 let mut columns: Option<Vec<String>> = None;
821 let mut column_set: Option<HashSet<String>> = None;
822 let mut done = false;
823 let mut saw_array_start = false;
824
825 while !done {
826 if options.cancel.is_cancelled() {
827 let _ = send_cancel_request(client).await;
828 return Err(CliError::Cancelled);
829 }
830 if let Err(err) = options.deadline.check() {
831 let _ = send_cancel_request(client).await;
832 return Err(err);
833 }
834
835 let next = tokio::select! {
836 _ = options.cancel.wait() => {
837 let _ = send_cancel_request(client).await;
838 return Err(CliError::Cancelled);
839 }
840 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
841 match result {
842 Ok(value) => value,
843 Err(_) => {
844 let _ = send_cancel_request(client).await;
845 return Err(CliError::Timeout(format!(
846 "deadline exceeded after {}",
847 humantime::format_duration(options.deadline.duration())
848 )));
849 }
850 }
851 }
852 };
853
854 let chunk = match next {
855 Some(chunk) => chunk,
856 None => break,
857 };
858
859 let bytes = match chunk {
860 Ok(bytes) => bytes,
861 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
862 };
863
864 buffer.extend_from_slice(&bytes);
865
866 loop {
867 skip_whitespace(&buffer, &mut pos);
868 if pos >= buffer.len() {
869 break;
870 }
871
872 if !saw_array_start {
873 if buffer[pos] != b'[' {
874 return Err(CliError::InvalidArgument(
875 "Invalid streaming response: expected JSON array".into(),
876 ));
877 }
878 pos += 1;
879 saw_array_start = true;
880 continue;
881 }
882
883 skip_whitespace(&buffer, &mut pos);
884 if pos >= buffer.len() {
885 break;
886 }
887
888 if buffer[pos] == b']' {
889 pos += 1;
890 done = true;
891 break;
892 }
893
894 let slice = &buffer[pos..];
895 let mut stream =
896 serde_json::Deserializer::from_slice(slice).into_iter::<serde_json::Value>();
897 let value = match stream.next() {
898 Some(Ok(value)) => value,
899 Some(Err(err)) if err.is_eof() => break,
900 Some(Err(err)) => return Err(CliError::Json(err)),
901 None => break,
902 };
903 pos = pos.saturating_add(stream.byte_offset());
904
905 let object = value.as_object().ok_or_else(|| {
906 CliError::InvalidArgument("Invalid streaming row: expected JSON object".into())
907 })?;
908
909 if columns.is_none() {
910 let names: Vec<String> = object.keys().cloned().collect();
911 let set: HashSet<String> = names.iter().cloned().collect();
912 if names.is_empty() {
913 return Err(CliError::InvalidArgument(
914 "Invalid streaming row: empty object".into(),
915 ));
916 }
917 let cols = names
918 .iter()
919 .map(|name| Column::new(name, DataType::Text))
920 .collect::<Vec<_>>();
921 let formatter = formatter
922 .take()
923 .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
924 let mut writer = StreamingWriter::new(&mut *writer, formatter, cols, options.limit)
925 .with_quiet(options.quiet);
926 writer.prepare(None)?;
927 streaming_writer = Some(writer);
928 columns = Some(names);
929 column_set = Some(set);
930 }
931
932 let names = columns
933 .as_ref()
934 .ok_or_else(|| CliError::InvalidArgument("Missing columns".into()))?;
935 let set = column_set
936 .as_ref()
937 .ok_or_else(|| CliError::InvalidArgument("Missing column set".into()))?;
938
939 if object.len() != names.len() || !object.keys().all(|key| set.contains(key)) {
940 return Err(CliError::InvalidArgument(
941 "Invalid streaming row: column mismatch".into(),
942 ));
943 }
944
945 let values = names
946 .iter()
947 .map(|name| {
948 object.get(name).ok_or_else(|| {
949 CliError::InvalidArgument(format!(
950 "Invalid streaming row: missing column '{name}'"
951 ))
952 })
953 })
954 .map(|value| value.and_then(json_value_to_value))
955 .collect::<Result<Vec<_>>>()?;
956
957 if let Some(writer) = streaming_writer.as_mut() {
958 match writer.write_row(Row::new(values))? {
959 WriteStatus::LimitReached => {
960 let _ = send_cancel_request(client).await;
961 return writer.finish();
962 }
963 WriteStatus::Continue => {}
964 }
965 }
966
967 skip_whitespace(&buffer, &mut pos);
968 if pos >= buffer.len() {
969 break;
970 }
971 match buffer[pos] {
972 b',' => {
973 pos += 1;
974 }
975 b']' => {
976 pos += 1;
977 done = true;
978 break;
979 }
980 _ => {
981 return Err(CliError::InvalidArgument(
982 "Invalid streaming response: expected ',' or ']'".into(),
983 ))
984 }
985 }
986 }
987
988 if pos > 0 {
989 buffer.drain(..pos);
990 pos = 0;
991 }
992 }
993
994 if done {
995 if has_non_whitespace(&buffer) {
996 return Err(CliError::InvalidArgument(
997 "Invalid streaming response: unexpected trailing data".into(),
998 ));
999 }
1000 buffer.clear();
1001 loop {
1002 let next = tokio::select! {
1003 _ = options.cancel.wait() => {
1004 let _ = send_cancel_request(client).await;
1005 return Err(CliError::Cancelled);
1006 }
1007 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
1008 match result {
1009 Ok(value) => value,
1010 Err(_) => {
1011 let _ = send_cancel_request(client).await;
1012 return Err(CliError::Timeout(format!(
1013 "deadline exceeded after {}",
1014 humantime::format_duration(options.deadline.duration())
1015 )));
1016 }
1017 }
1018 }
1019 };
1020
1021 let chunk = match next {
1022 Some(chunk) => chunk,
1023 None => break,
1024 };
1025
1026 let bytes = match chunk {
1027 Ok(bytes) => bytes,
1028 Err(err) => {
1029 return Err(CliError::ServerConnection(format!("request failed: {err}")))
1030 }
1031 };
1032
1033 if has_non_whitespace(&bytes) {
1034 return Err(CliError::InvalidArgument(
1035 "Invalid streaming response: unexpected trailing data".into(),
1036 ));
1037 }
1038 }
1039 } else {
1040 skip_whitespace(&buffer, &mut pos);
1041 if pos < buffer.len() {
1042 return Err(CliError::InvalidArgument(
1043 "Invalid streaming response: unexpected trailing data".into(),
1044 ));
1045 }
1046 return Err(CliError::InvalidArgument(
1047 "Invalid streaming response: unexpected end of stream".into(),
1048 ));
1049 }
1050
1051 if let Some(mut writer) = streaming_writer {
1052 return writer.finish();
1053 }
1054
1055 if done && saw_array_start {
1056 if let Some(formatter) = formatter.take() {
1057 let mut writer =
1058 StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
1059 .with_quiet(options.quiet);
1060 writer.prepare(None)?;
1061 return writer.finish();
1062 }
1063 }
1064
1065 Ok(())
1066}
1067
1068async fn collect_remote_jsonl_rows(
1069 client: &HttpClient,
1070 response: Response,
1071 options: &SqlExecutionOptions<'_>,
1072) -> Result<(Vec<Column>, Vec<Row>)> {
1073 let mut stream = response.bytes_stream();
1074 let mut buffer: Vec<u8> = Vec::new();
1075 let mut columns: Option<Vec<Column>> = None;
1076 let mut rows: Vec<Row> = Vec::new();
1077 let mut done = false;
1078
1079 while !done {
1080 if options.cancel.is_cancelled() {
1081 let _ = send_cancel_request(client).await;
1082 return Err(CliError::Cancelled);
1083 }
1084 if let Err(err) = options.deadline.check() {
1085 let _ = send_cancel_request(client).await;
1086 return Err(err);
1087 }
1088
1089 let next = tokio::select! {
1090 _ = options.cancel.wait() => {
1091 let _ = send_cancel_request(client).await;
1092 return Err(CliError::Cancelled);
1093 }
1094 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
1095 match result {
1096 Ok(value) => value,
1097 Err(_) => {
1098 let _ = send_cancel_request(client).await;
1099 return Err(CliError::Timeout(format!(
1100 "deadline exceeded after {}",
1101 humantime::format_duration(options.deadline.duration())
1102 )));
1103 }
1104 }
1105 }
1106 };
1107
1108 let chunk = match next {
1109 Some(chunk) => chunk,
1110 None => break,
1111 };
1112
1113 let bytes = match chunk {
1114 Ok(bytes) => bytes,
1115 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
1116 };
1117
1118 buffer.extend_from_slice(&bytes);
1119
1120 while let Some(newline) = buffer.iter().position(|&b| b == b'\n') {
1121 let line = buffer.drain(..=newline).collect::<Vec<u8>>();
1122 if let Some(item) = parse_jsonl_line(&line)? {
1123 if let Some(error) = item.error {
1124 return Err(CliError::InvalidArgument(format!(
1125 "Server error: {}",
1126 error.message
1127 )));
1128 }
1129 if item.done {
1130 done = true;
1131 break;
1132 }
1133 let row = item.row.ok_or_else(|| {
1134 CliError::InvalidArgument("Invalid streaming response: missing row".into())
1135 })?;
1136 if columns.is_none() {
1137 columns = Some(default_stream_columns(row.len()));
1138 }
1139 rows.push(Row::new(
1140 row.into_iter().map(remote_value_to_value).collect(),
1141 ));
1142 if let Some(limit) = options.limit {
1143 if rows.len() >= limit {
1144 let _ = send_cancel_request(client).await;
1145 done = true;
1146 break;
1147 }
1148 }
1149 }
1150 }
1151 }
1152
1153 if !done {
1154 if let Some(item) = parse_jsonl_line(&buffer)? {
1155 if let Some(error) = item.error {
1156 return Err(CliError::InvalidArgument(format!(
1157 "Server error: {}",
1158 error.message
1159 )));
1160 }
1161 if item.done {
1162 done = true;
1163 } else if let Some(row) = item.row {
1164 if columns.is_none() {
1165 columns = Some(default_stream_columns(row.len()));
1166 }
1167 rows.push(Row::new(
1168 row.into_iter().map(remote_value_to_value).collect(),
1169 ));
1170 if let Some(limit) = options.limit {
1171 if rows.len() >= limit {
1172 let _ = send_cancel_request(client).await;
1173 done = true;
1174 }
1175 }
1176 } else {
1177 return Err(CliError::InvalidArgument(
1178 "Invalid streaming response: missing row".into(),
1179 ));
1180 }
1181 }
1182 }
1183
1184 if !done {
1185 return Err(CliError::InvalidArgument(
1186 "Invalid streaming response: unexpected end of stream".into(),
1187 ));
1188 }
1189
1190 Ok((columns.unwrap_or_default(), rows))
1191}
1192
1193async fn execute_remote_jsonl_streaming<W: Write>(
1194 client: &HttpClient,
1195 response: Response,
1196 writer: &mut W,
1197 formatter: Box<dyn Formatter>,
1198 options: &SqlExecutionOptions<'_>,
1199) -> Result<()> {
1200 let mut stream = response.bytes_stream();
1201 let mut buffer: Vec<u8> = Vec::new();
1202 let mut streaming_writer: Option<StreamingWriter<&mut W>> = None;
1203 let mut formatter = Some(formatter);
1204 let mut done = false;
1205
1206 while !done {
1207 if options.cancel.is_cancelled() {
1208 let _ = send_cancel_request(client).await;
1209 return Err(CliError::Cancelled);
1210 }
1211 if let Err(err) = options.deadline.check() {
1212 let _ = send_cancel_request(client).await;
1213 return Err(err);
1214 }
1215
1216 let next = tokio::select! {
1217 _ = options.cancel.wait() => {
1218 let _ = send_cancel_request(client).await;
1219 return Err(CliError::Cancelled);
1220 }
1221 result = tokio::time::timeout(options.deadline.remaining(), stream.next()) => {
1222 match result {
1223 Ok(value) => value,
1224 Err(_) => {
1225 let _ = send_cancel_request(client).await;
1226 return Err(CliError::Timeout(format!(
1227 "deadline exceeded after {}",
1228 humantime::format_duration(options.deadline.duration())
1229 )));
1230 }
1231 }
1232 }
1233 };
1234
1235 let chunk = match next {
1236 Some(chunk) => chunk,
1237 None => break,
1238 };
1239
1240 let bytes = match chunk {
1241 Ok(bytes) => bytes,
1242 Err(err) => return Err(CliError::ServerConnection(format!("request failed: {err}"))),
1243 };
1244
1245 buffer.extend_from_slice(&bytes);
1246
1247 while let Some(newline) = buffer.iter().position(|&b| b == b'\n') {
1248 let line = buffer.drain(..=newline).collect::<Vec<u8>>();
1249 if let Some(item) = parse_jsonl_line(&line)? {
1250 if let Some(error) = item.error {
1251 return Err(CliError::InvalidArgument(format!(
1252 "Server error: {}",
1253 error.message
1254 )));
1255 }
1256 if item.done {
1257 done = true;
1258 break;
1259 }
1260 let row = item.row.ok_or_else(|| {
1261 CliError::InvalidArgument("Invalid streaming response: missing row".into())
1262 })?;
1263 if streaming_writer.is_none() {
1264 let columns = default_stream_columns(row.len());
1265 let formatter = formatter
1266 .take()
1267 .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
1268 let mut writer =
1269 StreamingWriter::new(&mut *writer, formatter, columns, options.limit)
1270 .with_quiet(options.quiet);
1271 writer.prepare(None)?;
1272 streaming_writer = Some(writer);
1273 }
1274 let values = row.into_iter().map(remote_value_to_value).collect();
1275 if let Some(writer) = streaming_writer.as_mut() {
1276 match writer.write_row(Row::new(values))? {
1277 WriteStatus::LimitReached => {
1278 let _ = send_cancel_request(client).await;
1279 done = true;
1280 break;
1281 }
1282 WriteStatus::Continue => {}
1283 }
1284 }
1285 }
1286 }
1287 }
1288
1289 if !done {
1290 if let Some(item) = parse_jsonl_line(&buffer)? {
1291 if let Some(error) = item.error {
1292 return Err(CliError::InvalidArgument(format!(
1293 "Server error: {}",
1294 error.message
1295 )));
1296 }
1297 if item.done {
1298 done = true;
1299 } else if let Some(row) = item.row {
1300 if streaming_writer.is_none() {
1301 let columns = default_stream_columns(row.len());
1302 let formatter = formatter
1303 .take()
1304 .ok_or_else(|| CliError::InvalidArgument("Missing formatter".into()))?;
1305 let mut writer =
1306 StreamingWriter::new(&mut *writer, formatter, columns, options.limit)
1307 .with_quiet(options.quiet);
1308 writer.prepare(None)?;
1309 streaming_writer = Some(writer);
1310 }
1311 let values = row.into_iter().map(remote_value_to_value).collect();
1312 if let Some(writer) = streaming_writer.as_mut() {
1313 match writer.write_row(Row::new(values))? {
1314 WriteStatus::LimitReached => {
1315 let _ = send_cancel_request(client).await;
1316 done = true;
1317 }
1318 WriteStatus::Continue => {}
1319 }
1320 }
1321 } else {
1322 return Err(CliError::InvalidArgument(
1323 "Invalid streaming response: missing row".into(),
1324 ));
1325 }
1326 }
1327 }
1328
1329 if !done {
1330 return Err(CliError::InvalidArgument(
1331 "Invalid streaming response: unexpected end of stream".into(),
1332 ));
1333 }
1334
1335 if let Some(mut writer) = streaming_writer {
1336 return writer.finish();
1337 }
1338
1339 if let Some(formatter) = formatter.take() {
1340 let mut writer = StreamingWriter::new(&mut *writer, formatter, Vec::new(), options.limit)
1341 .with_quiet(options.quiet);
1342 writer.prepare(None)?;
1343 return writer.finish();
1344 }
1345
1346 Ok(())
1347}
1348
1349fn default_stream_columns(count: usize) -> Vec<Column> {
1350 (0..count)
1351 .map(|idx| Column::new(format!("col{}", idx + 1), DataType::Text))
1352 .collect()
1353}
1354
1355fn parse_jsonl_line(line: &[u8]) -> Result<Option<RemoteStreamItem>> {
1356 if line
1357 .iter()
1358 .all(|b| matches!(b, b' ' | b'\n' | b'\r' | b'\t'))
1359 {
1360 return Ok(None);
1361 }
1362 let text = std::str::from_utf8(line)
1363 .map_err(|err| CliError::InvalidArgument(format!("Invalid UTF-8: {err}")))?;
1364 let trimmed = text.trim();
1365 if trimmed.is_empty() {
1366 return Ok(None);
1367 }
1368 let item = serde_json::from_str::<RemoteStreamItem>(trimmed)?;
1369 Ok(Some(item))
1370}
1371
1372fn skip_whitespace(buffer: &[u8], pos: &mut usize) {
1373 while *pos < buffer.len() {
1374 match buffer[*pos] {
1375 b' ' | b'\n' | b'\r' | b'\t' => *pos += 1,
1376 _ => break,
1377 }
1378 }
1379}
1380
1381fn has_non_whitespace(buffer: &[u8]) -> bool {
1382 buffer
1383 .iter()
1384 .any(|byte| !matches!(byte, b' ' | b'\n' | b'\r' | b'\t'))
1385}
1386
1387fn json_value_to_value(value: &serde_json::Value) -> Result<Value> {
1388 match value {
1389 serde_json::Value::Null => Ok(Value::Null),
1390 serde_json::Value::Bool(value) => Ok(Value::Bool(*value)),
1391 serde_json::Value::Number(value) => {
1392 if let Some(value) = value.as_i64() {
1393 Ok(Value::Int(value))
1394 } else if let Some(value) = value.as_f64() {
1395 Ok(Value::Float(value))
1396 } else {
1397 Err(CliError::InvalidArgument(
1398 "Invalid numeric value in streaming row".into(),
1399 ))
1400 }
1401 }
1402 serde_json::Value::String(value) => Ok(Value::Text(value.clone())),
1403 serde_json::Value::Array(values) => {
1404 let mut vector = Vec::with_capacity(values.len());
1405 for entry in values {
1406 let number = entry.as_f64().ok_or_else(|| {
1407 CliError::InvalidArgument("Invalid vector value in streaming row".into())
1408 })?;
1409 vector.push(number as f32);
1410 }
1411 Ok(Value::Vector(vector))
1412 }
1413 serde_json::Value::Object(_) => Err(CliError::InvalidArgument(
1414 "Invalid streaming row: nested objects are not supported".into(),
1415 )),
1416 }
1417}
1418
1419#[allow(dead_code)]
1421pub fn execute<W: Write>(
1422 db: &Database,
1423 cmd: SqlCommand,
1424 batch_mode: &BatchMode,
1425 writer: &mut StreamingWriter<W>,
1426) -> Result<()> {
1427 let sql = cmd.resolve_query(batch_mode)?;
1428
1429 execute_sql(db, &sql, writer)
1430}
1431
1432impl SqlCommand {
1433 pub fn resolve_query(&self, batch_mode: &BatchMode) -> Result<String> {
1435 match (&self.query, &self.file) {
1436 (Some(query), None) => Ok(query.clone()),
1437 (None, Some(file)) => fs::read_to_string(file).map_err(|e| {
1438 CliError::InvalidArgument(format!("Failed to read SQL file '{}': {}", file, e))
1439 }),
1440 (None, None) if !batch_mode.is_tty => {
1441 let mut buf = String::new();
1442 io::stdin().read_to_string(&mut buf)?;
1443 Ok(buf)
1444 }
1445 (None, None) => Err(CliError::NoQueryProvided),
1446 (Some(_), Some(_)) => Err(CliError::InvalidArgument(
1447 "Cannot specify both query and file".to_string(),
1448 )),
1449 }
1450 }
1451}
1452
1453fn execute_sql<W: Write>(db: &Database, sql: &str, writer: &mut StreamingWriter<W>) -> Result<()> {
1455 use alopex_embedded::SqlResult;
1456
1457 let result = db.execute_sql(sql)?;
1458
1459 match result {
1460 SqlResult::Success => {
1461 writer.prepare(Some(1))?;
1463 let row = Row::new(vec![
1464 Value::Text("OK".to_string()),
1465 Value::Text("Operation completed successfully".to_string()),
1466 ]);
1467 writer.write_row(row)?;
1468 writer.finish()?;
1469 }
1470 SqlResult::RowsAffected(count) => {
1471 writer.prepare(Some(1))?;
1473 let row = Row::new(vec![
1474 Value::Text("OK".to_string()),
1475 Value::Text(format!("{} row(s) affected", count)),
1476 ]);
1477 writer.write_row(row)?;
1478 writer.finish()?;
1479 }
1480 SqlResult::Query(query_result) => {
1481 let row_count = query_result.rows.len();
1483 writer.prepare(Some(row_count))?;
1484
1485 for sql_row in query_result.rows {
1486 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1487 let row = Row::new(values);
1488
1489 match writer.write_row(row)? {
1490 WriteStatus::LimitReached => break,
1491 WriteStatus::Continue => {}
1492 }
1493 }
1494
1495 writer.finish()?;
1496 }
1497 }
1498
1499 Ok(())
1500}
1501
1502fn execute_sql_with_formatter<W: Write>(
1514 db: &Database,
1515 sql: &str,
1516 writer: &mut W,
1517 formatter: Box<dyn Formatter>,
1518 options: &SqlExecutionOptions<'_>,
1519) -> Result<()> {
1520 use alopex_sql::{AlopexDialect, Parser, StatementKind};
1521
1522 let dialect = AlopexDialect;
1525 let stmts = Parser::parse_sql(&dialect, sql).map_err(|e| CliError::Parse(format!("{}", e)))?;
1526
1527 let is_select = stmts.len() == 1
1528 && matches!(
1529 stmts.first().map(|s| &s.kind),
1530 Some(StatementKind::Select(_))
1531 );
1532
1533 if is_select {
1534 execute_sql_select_streaming(db, sql, writer, formatter, options)
1536 } else {
1537 execute_sql_ddl_dml(db, sql, writer, formatter, options)
1539 }
1540}
1541
1542fn execute_sql_select_streaming<W: Write>(
1548 db: &Database,
1549 sql: &str,
1550 writer: &mut W,
1551 formatter: Box<dyn Formatter>,
1552 options: &SqlExecutionOptions<'_>,
1553) -> Result<()> {
1554 use alopex_embedded::StreamingQueryResult;
1555 use std::sync::atomic::{AtomicBool, Ordering};
1556 use std::sync::Arc;
1557
1558 fn cli_err_to_embedded(e: crate::error::CliError) -> alopex_embedded::Error {
1560 alopex_embedded::Error::Sql(alopex_sql::SqlError::Execution {
1561 message: e.to_string(),
1562 code: "ALOPEX-C001",
1563 })
1564 }
1565
1566 let cancelled = Arc::new(AtomicBool::new(false));
1567 let timed_out = Arc::new(AtomicBool::new(false));
1568 let cancel_flag = cancelled.clone();
1569 let timeout_flag = timed_out.clone();
1570
1571 let result = db.execute_sql_with_rows(sql, |mut rows| {
1572 let columns = columns_from_streaming_rows(&rows);
1574 let mut streaming_writer = StreamingWriter::new(writer, formatter, columns, options.limit)
1575 .with_quiet(options.quiet);
1576
1577 streaming_writer
1579 .prepare(None)
1580 .map_err(cli_err_to_embedded)?;
1581
1582 if let Err(err) = options.deadline.check() {
1583 timeout_flag.store(true, Ordering::SeqCst);
1584 return Err(cli_err_to_embedded(err));
1585 }
1586
1587 while let Ok(Some(sql_row)) = rows.next_row() {
1589 if options.cancel.is_cancelled() {
1590 cancel_flag.store(true, Ordering::SeqCst);
1591 return Err(cli_err_to_embedded(CliError::Cancelled));
1592 }
1593 if let Err(err) = options.deadline.check() {
1594 timeout_flag.store(true, Ordering::SeqCst);
1595 return Err(cli_err_to_embedded(err));
1596 }
1597 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1598 let row = Row::new(values);
1599
1600 match streaming_writer
1601 .write_row(row)
1602 .map_err(cli_err_to_embedded)?
1603 {
1604 WriteStatus::LimitReached => break,
1605 WriteStatus::Continue => {}
1606 }
1607 }
1608
1609 streaming_writer.finish().map_err(cli_err_to_embedded)?;
1610 Ok(())
1611 });
1612
1613 let result = match result {
1614 Ok(value) => value,
1615 Err(err) => {
1616 if cancelled.load(Ordering::SeqCst) {
1617 return Err(CliError::Cancelled);
1618 }
1619 if timed_out.load(Ordering::SeqCst) {
1620 return Err(CliError::Timeout(format!(
1621 "deadline exceeded after {}",
1622 humantime::format_duration(options.deadline.duration())
1623 )));
1624 }
1625 return Err(CliError::Database(err));
1626 }
1627 };
1628
1629 match result {
1630 StreamingQueryResult::QueryProcessed(()) => Ok(()),
1631 StreamingQueryResult::Success | StreamingQueryResult::RowsAffected(_) => {
1632 Ok(())
1634 }
1635 }
1636}
1637
1638#[derive(serde::Serialize)]
1639struct RemoteSqlRequest {
1640 sql: String,
1641 #[serde(default)]
1642 streaming: bool,
1643 #[serde(skip_serializing_if = "Option::is_none")]
1644 fetch_size: Option<usize>,
1645 #[serde(skip_serializing_if = "Option::is_none")]
1646 max_rows: Option<usize>,
1647}
1648
1649#[derive(serde::Deserialize)]
1650struct RemoteColumnInfo {
1651 name: String,
1652 data_type: String,
1653}
1654
1655#[derive(serde::Deserialize)]
1656struct RemoteSqlResponse {
1657 columns: Vec<RemoteColumnInfo>,
1658 rows: Vec<Vec<alopex_sql::storage::SqlValue>>,
1659 affected_rows: Option<u64>,
1660}
1661
1662#[derive(serde::Deserialize)]
1663struct RemoteStreamItem {
1664 row: Option<Vec<alopex_sql::storage::SqlValue>>,
1665 error: Option<RemoteStreamError>,
1666 #[serde(default)]
1667 done: bool,
1668}
1669
1670#[derive(serde::Deserialize)]
1671struct RemoteStreamError {
1672 message: String,
1673}
1674
1675fn map_client_error(err: ClientError) -> CliError {
1676 match err {
1677 ClientError::Request { source, .. } => {
1678 CliError::ServerConnection(format!("request failed: {source}"))
1679 }
1680 ClientError::InvalidUrl(message) => CliError::InvalidArgument(message),
1681 ClientError::Build(message) => CliError::InvalidArgument(message),
1682 ClientError::Auth(err) => CliError::InvalidArgument(err.to_string()),
1683 ClientError::HttpStatus { status, body } => {
1684 CliError::InvalidArgument(format!("Server error: HTTP {} - {}", status.as_u16(), body))
1685 }
1686 }
1687}
1688
1689async fn send_cancel_request(client: &HttpClient) -> Result<()> {
1690 #[derive(serde::Serialize)]
1691 struct CancelRequest {}
1692
1693 let request = CancelRequest {};
1694 let _: serde_json::Value = client
1695 .post_json("api/sql/cancel", &request)
1696 .await
1697 .map_err(map_client_error)?;
1698 Ok(())
1699}
1700
1701fn merge_limit(limit: Option<usize>, max_rows: Option<usize>) -> Option<usize> {
1702 match (limit, max_rows) {
1703 (Some(a), Some(b)) => Some(a.min(b)),
1704 (Some(value), None) | (None, Some(value)) => Some(value),
1705 (None, None) => None,
1706 }
1707}
1708
1709fn execute_sql_ddl_dml<W: Write>(
1714 db: &Database,
1715 sql: &str,
1716 writer: &mut W,
1717 formatter: Box<dyn Formatter>,
1718 options: &SqlExecutionOptions<'_>,
1719) -> Result<()> {
1720 use alopex_sql::ExecutionResult;
1721
1722 options.deadline.check()?;
1723 let result = db.execute_sql(sql)?;
1724 options.deadline.check()?;
1725
1726 match result {
1727 ExecutionResult::Success => {
1728 if !options.quiet {
1730 let columns = sql_status_columns();
1731 let mut streaming_writer =
1732 StreamingWriter::new(writer, formatter, columns, options.limit)
1733 .with_quiet(options.quiet);
1734 streaming_writer.prepare(Some(1))?;
1735 let row = Row::new(vec![
1736 Value::Text("OK".to_string()),
1737 Value::Text("Operation completed successfully".to_string()),
1738 ]);
1739 streaming_writer.write_row(row)?;
1740 streaming_writer.finish()?;
1741 }
1742 }
1743 ExecutionResult::RowsAffected(count) => {
1744 if !options.quiet {
1746 let columns = sql_status_columns();
1747 let mut streaming_writer =
1748 StreamingWriter::new(writer, formatter, columns, options.limit)
1749 .with_quiet(options.quiet);
1750 streaming_writer.prepare(Some(1))?;
1751 let row = Row::new(vec![
1752 Value::Text("OK".to_string()),
1753 Value::Text(format!("{} row(s) affected", count)),
1754 ]);
1755 streaming_writer.write_row(row)?;
1756 streaming_writer.finish()?;
1757 }
1758 }
1759 ExecutionResult::Query(query_result) => {
1760 let columns = columns_from_query_result(&query_result);
1763 let mut streaming_writer =
1764 StreamingWriter::new(writer, formatter, columns, options.limit)
1765 .with_quiet(options.quiet);
1766 streaming_writer.prepare(Some(query_result.rows.len()))?;
1767 for sql_row in query_result.rows {
1768 let values: Vec<Value> = sql_row.into_iter().map(sql_value_to_value).collect();
1769 let row = Row::new(values);
1770 match streaming_writer.write_row(row)? {
1771 WriteStatus::LimitReached => break,
1772 WriteStatus::Continue => {}
1773 }
1774 }
1775 streaming_writer.finish()?;
1776 }
1777 }
1778
1779 Ok(())
1780}
1781
1782fn sql_value_to_value(sql_value: alopex_sql::SqlValue) -> Value {
1784 use alopex_sql::SqlValue;
1785
1786 match sql_value {
1787 SqlValue::Null => Value::Null,
1788 SqlValue::Integer(i) => Value::Int(i as i64),
1789 SqlValue::BigInt(i) => Value::Int(i),
1790 SqlValue::Float(f) => Value::Float(f as f64),
1791 SqlValue::Double(f) => Value::Float(f),
1792 SqlValue::Text(s) => Value::Text(s),
1793 SqlValue::Blob(b) => Value::Bytes(b),
1794 SqlValue::Boolean(b) => Value::Bool(b),
1795 SqlValue::Timestamp(ts) => {
1796 Value::Text(format!("{}", ts))
1798 }
1799 SqlValue::Vector(v) => Value::Vector(v),
1800 }
1801}
1802
1803fn remote_value_to_value(sql_value: alopex_sql::storage::SqlValue) -> Value {
1804 use alopex_sql::storage::SqlValue;
1805
1806 match sql_value {
1807 SqlValue::Null => Value::Null,
1808 SqlValue::Integer(i) => Value::Int(i as i64),
1809 SqlValue::BigInt(i) => Value::Int(i),
1810 SqlValue::Float(f) => Value::Float(f as f64),
1811 SqlValue::Double(f) => Value::Float(f),
1812 SqlValue::Text(s) => Value::Text(s),
1813 SqlValue::Blob(b) => Value::Bytes(b),
1814 SqlValue::Boolean(b) => Value::Bool(b),
1815 SqlValue::Timestamp(ts) => Value::Text(ts.to_string()),
1816 SqlValue::Vector(v) => Value::Vector(v),
1817 }
1818}
1819
1820fn data_type_from_string(value: &str) -> DataType {
1821 let upper = value.to_ascii_uppercase();
1822 if upper.starts_with("INT") || upper.starts_with("BIGINT") {
1823 DataType::Int
1824 } else if upper.starts_with("FLOAT") || upper.starts_with("DOUBLE") {
1825 DataType::Float
1826 } else if upper.starts_with("BLOB") {
1827 DataType::Bytes
1828 } else if upper.starts_with("BOOLEAN") {
1829 DataType::Bool
1830 } else if upper.starts_with("VECTOR") {
1831 DataType::Vector
1832 } else {
1833 DataType::Text
1834 }
1835}
1836
1837fn sql_column_to_column(col: &alopex_sql::executor::ColumnInfo) -> Column {
1839 use alopex_sql::planner::ResolvedType;
1840
1841 let data_type = match &col.data_type {
1842 ResolvedType::Integer | ResolvedType::BigInt => DataType::Int,
1843 ResolvedType::Float | ResolvedType::Double => DataType::Float,
1844 ResolvedType::Text => DataType::Text,
1845 ResolvedType::Blob => DataType::Bytes,
1846 ResolvedType::Boolean => DataType::Bool,
1847 ResolvedType::Timestamp => DataType::Text, ResolvedType::Vector { .. } => DataType::Vector,
1849 ResolvedType::Null => DataType::Text, };
1851
1852 Column::new(&col.name, data_type)
1853}
1854
1855#[allow(dead_code)] fn columns_from_query_result(query_result: &alopex_sql::executor::QueryResult) -> Vec<Column> {
1858 query_result
1859 .columns
1860 .iter()
1861 .map(sql_column_to_column)
1862 .collect()
1863}
1864
1865#[allow(dead_code)] fn columns_from_streaming_result(
1868 query_iter: &alopex_embedded::QueryRowIterator<'_>,
1869) -> Vec<Column> {
1870 query_iter
1871 .columns()
1872 .iter()
1873 .map(sql_column_to_column)
1874 .collect()
1875}
1876
1877fn columns_from_streaming_rows(rows: &alopex_embedded::StreamingRows<'_>) -> Vec<Column> {
1879 rows.columns().iter().map(sql_column_to_column).collect()
1880}
1881
1882pub fn sql_status_columns() -> Vec<Column> {
1884 vec![
1885 Column::new("status", DataType::Text),
1886 Column::new("message", DataType::Text),
1887 ]
1888}
1889
1890#[cfg(test)]
1891mod tests {
1892 use super::*;
1893 use crate::batch::BatchModeSource;
1894 use crate::output::jsonl::JsonlFormatter;
1895
1896 fn create_test_db() -> Database {
1897 Database::open_in_memory().unwrap()
1898 }
1899
1900 fn create_status_writer(output: &mut Vec<u8>) -> StreamingWriter<&mut Vec<u8>> {
1901 let formatter = Box::new(JsonlFormatter::new());
1902 let columns = sql_status_columns();
1903 StreamingWriter::new(output, formatter, columns, None)
1904 }
1905
1906 fn create_query_writer(
1907 output: &mut Vec<u8>,
1908 columns: Vec<Column>,
1909 ) -> StreamingWriter<&mut Vec<u8>> {
1910 let formatter = Box::new(JsonlFormatter::new());
1911 StreamingWriter::new(output, formatter, columns, None)
1912 }
1913
1914 fn default_batch_mode() -> BatchMode {
1915 BatchMode {
1916 is_batch: false,
1917 is_tty: true,
1918 source: BatchModeSource::Default,
1919 }
1920 }
1921
1922 #[test]
1923 fn test_create_table() {
1924 let db = create_test_db();
1925
1926 let mut output = Vec::new();
1927 {
1928 let mut writer = create_status_writer(&mut output);
1929 execute_sql(
1930 &db,
1931 "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1932 &mut writer,
1933 )
1934 .unwrap();
1935 }
1936
1937 let result = String::from_utf8(output).unwrap();
1938 assert!(result.contains("OK"));
1939 }
1940
1941 #[test]
1942 fn test_insert_and_select() {
1943 let db = create_test_db();
1944
1945 {
1947 let mut output = Vec::new();
1948 let mut writer = create_status_writer(&mut output);
1949 execute_sql(
1950 &db,
1951 "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);",
1952 &mut writer,
1953 )
1954 .unwrap();
1955 }
1956
1957 {
1959 let mut output = Vec::new();
1960 let mut writer = create_status_writer(&mut output);
1961 execute_sql(
1962 &db,
1963 "INSERT INTO users (id, name) VALUES (1, 'Alice');",
1964 &mut writer,
1965 )
1966 .unwrap();
1967 let result = String::from_utf8(output).unwrap();
1968 assert!(result.contains("row(s) affected"));
1969 }
1970
1971 {
1973 let mut output = Vec::new();
1974 let columns = vec![
1975 Column::new("id", DataType::Int),
1976 Column::new("name", DataType::Text),
1977 ];
1978 let mut writer = create_query_writer(&mut output, columns);
1979 execute_sql(&db, "SELECT id, name FROM users;", &mut writer).unwrap();
1980 let result = String::from_utf8(output).unwrap();
1981 assert!(result.contains("Alice"));
1982 }
1983 }
1984
1985 #[test]
1986 fn test_syntax_error() {
1987 let db = create_test_db();
1988
1989 let mut output = Vec::new();
1990 let mut writer = create_status_writer(&mut output);
1991 let result = execute_sql(&db, "CREATE TABEL invalid_syntax;", &mut writer);
1992 assert!(result.is_err());
1993 }
1994
1995 #[test]
1996 fn test_multiple_statements() {
1997 let db = create_test_db();
1998
1999 let mut output = Vec::new();
2000 {
2001 let mut writer = create_status_writer(&mut output);
2002 execute_sql(
2003 &db,
2004 "CREATE TABLE t (id INTEGER PRIMARY KEY); INSERT INTO t (id) VALUES (1);",
2005 &mut writer,
2006 )
2007 .unwrap();
2008 }
2009
2010 {
2012 let mut output = Vec::new();
2013 let columns = vec![Column::new("id", DataType::Int)];
2014 let mut writer = create_query_writer(&mut output, columns);
2015 execute_sql(&db, "SELECT id FROM t;", &mut writer).unwrap();
2016 let result = String::from_utf8(output).unwrap();
2017 assert!(result.contains("1"));
2018 }
2019 }
2020
2021 #[test]
2022 fn test_sql_value_conversion() {
2023 use alopex_sql::SqlValue;
2024
2025 assert!(matches!(sql_value_to_value(SqlValue::Null), Value::Null));
2026 assert!(matches!(
2027 sql_value_to_value(SqlValue::Integer(42)),
2028 Value::Int(42)
2029 ));
2030 assert!(matches!(
2031 sql_value_to_value(SqlValue::BigInt(100)),
2032 Value::Int(100)
2033 ));
2034 assert!(matches!(
2035 sql_value_to_value(SqlValue::Boolean(true)),
2036 Value::Bool(true)
2037 ));
2038 assert!(
2039 matches!(sql_value_to_value(SqlValue::Text("hello".to_string())), Value::Text(s) if s == "hello")
2040 );
2041 }
2042
2043 #[test]
2044 fn resolve_query_from_argument() {
2045 let cmd = SqlCommand {
2046 query: Some("SELECT 1".to_string()),
2047 file: None,
2048 fetch_size: None,
2049 max_rows: None,
2050 deadline: None,
2051 tui: false,
2052 };
2053
2054 let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
2055 assert_eq!(sql, "SELECT 1");
2056 }
2057
2058 #[test]
2059 fn resolve_query_from_file() {
2060 let mut file = tempfile::NamedTempFile::new().unwrap();
2061 writeln!(file, "SELECT * FROM users").unwrap();
2062
2063 let cmd = SqlCommand {
2064 query: None,
2065 file: Some(file.path().display().to_string()),
2066 fetch_size: None,
2067 max_rows: None,
2068 deadline: None,
2069 tui: false,
2070 };
2071
2072 let sql = cmd.resolve_query(&default_batch_mode()).unwrap();
2073 assert_eq!(sql, "SELECT * FROM users\n");
2074 }
2075
2076 #[test]
2077 fn resolve_query_returns_no_query_error() {
2078 let cmd = SqlCommand {
2079 query: None,
2080 file: None,
2081 fetch_size: None,
2082 max_rows: None,
2083 deadline: None,
2084 tui: false,
2085 };
2086
2087 let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
2088 assert!(matches!(err, CliError::NoQueryProvided));
2089 }
2090
2091 #[test]
2092 fn resolve_query_rejects_query_and_file() {
2093 let cmd = SqlCommand {
2094 query: Some("SELECT 1".to_string()),
2095 file: Some("query.sql".into()),
2096 fetch_size: None,
2097 max_rows: None,
2098 deadline: None,
2099 tui: false,
2100 };
2101
2102 let err = cmd.resolve_query(&default_batch_mode()).unwrap_err();
2103 assert!(matches!(
2104 err,
2105 CliError::InvalidArgument(msg) if msg == "Cannot specify both query and file"
2106 ));
2107 }
2108}