1use std::{collections::HashMap, io::Read, path::Path, sync::Arc, time::Duration};
2
3use log::trace;
4
5use crate::{
6 error::TgError,
7 invalid_response_error, io_error,
8 job::Job,
9 jogasaki::proto::sql::{
10 common::Transaction as ProtoTransaction,
11 request::{request::Request as SqlCommand, Request as SqlRequest},
12 response::{response::Response as SqlResponseType, Response as SqlResponse},
13 },
14 prelude::{
15 convert_lob_parameters,
16 error_info::{transaction_error_info_processor, TransactionErrorInfo},
17 execute_result_processor,
18 explain::explain_processor,
19 list_tables_processor, prepare_dispose_processor, prepare_processor,
20 query_result_processor,
21 r#type::large_object::{
22 lob_cache_processor, lob_copy_to_processor, lob_open_processor, TgLargeObjectCache,
23 },
24 table_metadata_processor, transaction_status_processor, CommitOption, ServiceClient,
25 SqlExecuteResult, SqlParameter, SqlPlaceholder, SqlQueryResult, TableList, TableMetadata,
26 TgBlobReference, TgClobReference, TransactionStatusWithMessage,
27 },
28 prost_decode_error,
29 session::{
30 wire::{response::WireResponse, response_box::SlotEntryHandle, Wire},
31 Session,
32 },
33 sql_service_error,
34 tateyama::proto::framework::common::BlobInfo,
35 transaction::{
36 option::TransactionOption, transaction_begin_processor, transaction_commit_processor,
37 transaction_dispose_processor, transaction_rollback_processor, Transaction,
38 },
39};
40
41use prost::{alloc::string::String as ProstString, Message};
42
43use super::{
44 explain::SqlExplainResult, prepare::SqlPreparedStatement,
45 r#type::large_object::TgLargeObjectReference,
46};
47
48const SERVICE_SYMBOLIC_ID: &str = "sql";
50
51const SERVICE_MESSAGE_VERSION_MAJOR: u64 = 1;
53
54const SERVICE_MESSAGE_VERSION_MINOR: u64 = 6;
56
57pub(crate) const SERVICE_ID_SQL: i32 = 3;
58
59pub struct SqlClient {
96 session: Arc<Session>,
97 default_timeout: Duration,
98}
99
100impl ServiceClient for SqlClient {
101 fn new(session: Arc<Session>) -> Self {
102 let default_timeout = session.default_timeout();
103 SqlClient {
104 session,
105 default_timeout,
106 }
107 }
108}
109
110impl SqlClient {
111 pub fn service_message_version() -> String {
113 format!(
114 "{}-{}.{}",
115 SERVICE_SYMBOLIC_ID, SERVICE_MESSAGE_VERSION_MAJOR, SERVICE_MESSAGE_VERSION_MINOR
116 )
117 }
118
119 pub fn set_default_timeout(&mut self, timeout: Duration) {
121 self.default_timeout = timeout;
122 }
123
124 pub fn default_timeout(&self) -> Duration {
126 self.default_timeout
127 }
128}
129
130impl SqlClient {
131 pub async fn list_tables(&self) -> Result<TableList, TgError> {
152 let timeout = self.default_timeout;
153 self.list_tables_for(timeout).await
154 }
155
156 pub async fn list_tables_for(&self, timeout: Duration) -> Result<TableList, TgError> {
161 const FUNCTION_NAME: &str = "list_tables()";
162 trace!("{} start", FUNCTION_NAME);
163
164 let command = Self::list_tables_command();
165 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
166 let table_list = list_tables_processor(slot_handle, response)?;
167
168 trace!("{} end", FUNCTION_NAME);
169 Ok(table_list)
170 }
171
172 pub async fn list_tables_async(&self) -> Result<Job<TableList>, TgError> {
177 const FUNCTION_NAME: &str = "list_tables_async()";
178 trace!("{} start", FUNCTION_NAME);
179
180 let command = Self::list_tables_command();
181 let job = self
182 .send_and_pull_async("ListTables", command, None, Box::new(list_tables_processor))
183 .await?;
184
185 trace!("{} end", FUNCTION_NAME);
186 Ok(job)
187 }
188
189 fn list_tables_command() -> SqlCommand {
190 let request = crate::jogasaki::proto::sql::request::ListTables {};
191 SqlCommand::ListTables(request)
192 }
193
194 pub async fn get_table_metadata(&self, table_name: &str) -> Result<TableMetadata, TgError> {
213 let timeout = self.default_timeout;
214 self.get_table_metadata_for(table_name, timeout).await
215 }
216
217 pub async fn get_table_metadata_for(
219 &self,
220 table_name: &str,
221 timeout: Duration,
222 ) -> Result<TableMetadata, TgError> {
223 const FUNCTION_NAME: &str = "get_table_metadata()";
224 trace!("{} start", FUNCTION_NAME);
225
226 let command = Self::table_metadata_command(table_name);
227 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
228 let metadata = table_metadata_processor(slot_handle, response)?;
229
230 trace!("{} end", FUNCTION_NAME);
231 Ok(metadata)
232 }
233
234 pub async fn get_table_metadata_async(
236 &self,
237 table_name: &str,
238 ) -> Result<Job<TableMetadata>, TgError> {
239 const FUNCTION_NAME: &str = "get_table_metadata_async()";
240 trace!("{} start", FUNCTION_NAME);
241
242 let command = Self::table_metadata_command(table_name);
243 let job = self
244 .send_and_pull_async(
245 "TableMetadata",
246 command,
247 None,
248 Box::new(table_metadata_processor),
249 )
250 .await?;
251
252 trace!("{} end", FUNCTION_NAME);
253 Ok(job)
254 }
255
256 fn table_metadata_command(table_name: &str) -> SqlCommand {
257 let request = crate::jogasaki::proto::sql::request::DescribeTable {
258 name: table_name.to_string(),
259 };
260 SqlCommand::DescribeTable(request)
261 }
262
263 pub async fn prepare(
285 &self,
286 sql: &str,
287 placeholders: Vec<SqlPlaceholder>,
288 ) -> Result<SqlPreparedStatement, TgError> {
289 let timeout = self.default_timeout;
290 self.prepare_for(sql, placeholders, timeout).await
291 }
292
293 pub async fn prepare_for(
297 &self,
298 sql: &str,
299 placeholders: Vec<SqlPlaceholder>,
300 timeout: Duration,
301 ) -> Result<SqlPreparedStatement, TgError> {
302 const FUNCTION_NAME: &str = "prepare()";
303 trace!("{} start", FUNCTION_NAME);
304
305 let command = Self::prepare_command(sql, placeholders);
306 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
307
308 let session = self.session.clone();
309 let close_timeout = self.default_timeout;
310 let ps = prepare_processor(session, response, close_timeout)?;
311
312 trace!("{} end", FUNCTION_NAME);
313 Ok(ps)
314 }
315
316 pub async fn prepare_async(
320 &self,
321 sql: &str,
322 placeholders: Vec<SqlPlaceholder>,
323 ) -> Result<Job<SqlPreparedStatement>, TgError> {
324 const FUNCTION_NAME: &str = "prepare_async()";
325 trace!("{} start", FUNCTION_NAME);
326
327 let command = Self::prepare_command(sql, placeholders);
328 let session = self.session.clone();
329 let close_timeout = self.default_timeout;
330 let job = self
331 .send_and_pull_async(
332 "Prepare",
333 command,
334 None,
335 Box::new(move |_, response| {
336 prepare_processor(session.clone(), response, close_timeout)
337 }),
338 )
339 .await?;
340
341 trace!("{} end", FUNCTION_NAME);
342 Ok(job)
343 }
344
345 fn prepare_command(sql: &str, placeholders: Vec<SqlPlaceholder>) -> SqlCommand {
346 let request = crate::jogasaki::proto::sql::request::Prepare {
347 sql: sql.to_string(),
348 placeholders,
349 };
350 SqlCommand::Prepare(request)
351 }
352
353 pub(crate) async fn dispose_prepare(
354 &self,
355 prepare_handle: u64,
356 has_result_records: bool,
357 timeout: Duration,
358 ) -> Result<(), TgError> {
359 const FUNCTION_NAME: &str = "dispose_prepare()";
360 trace!("{} start", FUNCTION_NAME);
361
362 let command = Self::dispose_prepare_statement_command(prepare_handle, has_result_records);
363 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
364 prepare_dispose_processor(response)?;
365
366 trace!("{} end", FUNCTION_NAME);
367 Ok(())
368 }
369
370 pub(crate) async fn dispose_prepare_send_only(
371 &self,
372 prepare_handle: u64,
373 has_result_records: bool,
374 ) -> Result<(), TgError> {
375 const FUNCTION_NAME: &str = "dispose_prepare_send_only()";
376 trace!("{} start", FUNCTION_NAME);
377
378 let command = Self::dispose_prepare_statement_command(prepare_handle, has_result_records);
379 let _ = self.send_only(command).await?;
380
381 trace!("{} end", FUNCTION_NAME);
382 Ok(())
383 }
384
385 fn dispose_prepare_statement_command(
386 prepare_handle: u64,
387 has_result_records: bool,
388 ) -> SqlCommand {
389 let ps = crate::jogasaki::proto::sql::common::PreparedStatement {
390 handle: prepare_handle,
391 has_result_records,
392 };
393
394 let request = crate::jogasaki::proto::sql::request::DisposePreparedStatement {
395 prepared_statement_handle: Some(ps),
396 };
397 SqlCommand::DisposePreparedStatement(request)
398 }
399
400 pub async fn explain(&self, sql: &str) -> Result<SqlExplainResult, TgError> {
415 let timeout = self.default_timeout;
416 self.explain_for(sql, timeout).await
417 }
418
419 pub async fn explain_for(
421 &self,
422 sql: &str,
423 timeout: Duration,
424 ) -> Result<SqlExplainResult, TgError> {
425 const FUNCTION_NAME: &str = "explain()";
426 trace!("{} start", FUNCTION_NAME);
427
428 let command = Self::explain_text_command(sql);
429 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
430
431 let explain_result = explain_processor(slot_handle, response)?;
432
433 trace!("{} end", FUNCTION_NAME);
434 Ok(explain_result)
435 }
436
437 pub async fn explain_async(&self, sql: &str) -> Result<Job<SqlExplainResult>, TgError> {
439 const FUNCTION_NAME: &str = "explain_async()";
440 trace!("{} start", FUNCTION_NAME);
441
442 let command = Self::explain_text_command(sql);
443 let job = self
444 .send_and_pull_async("Explain", command, None, Box::new(explain_processor))
445 .await?;
446
447 trace!("{} end", FUNCTION_NAME);
448 Ok(job)
449 }
450
451 fn explain_text_command(sql: &str) -> SqlCommand {
452 let request = crate::jogasaki::proto::sql::request::ExplainByText {
453 sql: sql.to_string(),
454 };
455 SqlCommand::ExplainByText(request)
456 }
457
458 pub async fn prepared_explain(
474 &self,
475 prepared_statement: &SqlPreparedStatement,
476 parameters: Vec<SqlParameter>,
477 ) -> Result<SqlExplainResult, TgError> {
478 let timeout = self.default_timeout;
479 self.prepared_explain_for(prepared_statement, parameters, timeout)
480 .await
481 }
482
483 pub async fn prepared_explain_for(
485 &self,
486 prepared_statement: &SqlPreparedStatement,
487 parameters: Vec<SqlParameter>,
488 timeout: Duration,
489 ) -> Result<SqlExplainResult, TgError> {
490 const FUNCTION_NAME: &str = "prepared_explain()";
491 trace!("{} start", FUNCTION_NAME);
492
493 let (parameters, lobs) =
494 convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
495 let command = Self::explain_prepared_command(prepared_statement, parameters);
496 let (slot_handle, response) = self.send_and_pull_response(command, lobs, timeout).await?;
497 let explain_result = explain_processor(slot_handle, response)?;
498
499 trace!("{} end", FUNCTION_NAME);
500 Ok(explain_result)
501 }
502
503 pub async fn prepared_explain_async(
505 &self,
506 prepared_statement: &SqlPreparedStatement,
507 parameters: Vec<SqlParameter>,
508 ) -> Result<Job<SqlExplainResult>, TgError> {
509 const FUNCTION_NAME: &str = "prepared_explain_async()";
510 trace!("{} start", FUNCTION_NAME);
511
512 let (parameters, lobs) =
513 convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
514 let command = Self::explain_prepared_command(prepared_statement, parameters);
515 let job = self
516 .send_and_pull_async("Explain", command, lobs, Box::new(explain_processor))
517 .await?;
518
519 trace!("{} end", FUNCTION_NAME);
520 Ok(job)
521 }
522
523 fn explain_prepared_command(
524 prepared_statement: &SqlPreparedStatement,
525 parameters: Vec<SqlParameter>,
526 ) -> SqlCommand {
527 let ps_handle = crate::jogasaki::proto::sql::common::PreparedStatement {
528 handle: prepared_statement.prepare_handle(),
529 has_result_records: prepared_statement.has_result_records(),
530 };
531 let request = crate::jogasaki::proto::sql::request::Explain {
532 prepared_statement_handle: Some(ps_handle),
533 parameters,
534 };
535 SqlCommand::Explain(request)
536 }
537
538 pub async fn start_transaction(
557 &self,
558 transaction_option: &TransactionOption,
559 ) -> Result<Transaction, TgError> {
560 let timeout = self.default_timeout;
561 self.start_transaction_for(transaction_option, timeout)
562 .await
563 }
564
565 pub async fn start_transaction_for(
569 &self,
570 transaction_option: &TransactionOption,
571 timeout: Duration,
572 ) -> Result<Transaction, TgError> {
573 const FUNCTION_NAME: &str = "start_transaction()";
574 trace!("{} start", FUNCTION_NAME);
575
576 let command = Self::begin_transaction_command(transaction_option);
577 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
578
579 let session = self.session.clone();
580 let close_timeout = transaction_option
581 .close_timeout()
582 .unwrap_or(self.default_timeout);
583 let transaction = transaction_begin_processor(session, response, close_timeout)?;
584
585 trace!("{} end", FUNCTION_NAME);
586 Ok(transaction)
587 }
588
589 pub async fn start_transaction_async(
593 &self,
594 transaction_option: &TransactionOption,
595 ) -> Result<Job<Transaction>, TgError> {
596 const FUNCTION_NAME: &str = "start_transaction_async()";
597 trace!("{} start", FUNCTION_NAME);
598
599 let command = Self::begin_transaction_command(transaction_option);
600 let session = self.session.clone();
601 let close_timeout = transaction_option
602 .close_timeout()
603 .unwrap_or(self.default_timeout);
604 let job = self
605 .send_and_pull_async(
606 "StartTransaction",
607 command,
608 None,
609 Box::new(move |_, response| {
610 transaction_begin_processor(session.clone(), response, close_timeout)
611 }),
612 )
613 .await?;
614
615 trace!("{} end", FUNCTION_NAME);
616 Ok(job)
617 }
618
619 fn begin_transaction_command(transaction_option: &TransactionOption) -> SqlCommand {
620 let tx_option = transaction_option.request();
621
622 let request = crate::jogasaki::proto::sql::request::Begin {
623 option: Some(tx_option),
624 };
625 SqlCommand::Begin(request)
626 }
627
628 pub async fn get_transaction_error_info(
648 &self,
649 transaction: &Transaction,
650 ) -> Result<TransactionErrorInfo, TgError> {
651 let timeout = self.default_timeout;
652 self.get_transaction_error_info_for(transaction, timeout)
653 .await
654 }
655
656 pub async fn get_transaction_error_info_for(
660 &self,
661 transaction: &Transaction,
662 timeout: Duration,
663 ) -> Result<TransactionErrorInfo, TgError> {
664 const FUNCTION_NAME: &str = "get_transaction_error_info()";
665 trace!("{} start", FUNCTION_NAME);
666
667 let command = Self::transaction_error_info_command(transaction.transaction_handle()?);
668 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
669 let status = transaction_error_info_processor(slot_handle, response)?;
670
671 trace!("{} end", FUNCTION_NAME);
672 Ok(status)
673 }
674
675 pub async fn get_transaction_error_info_async(
679 &self,
680 transaction: &Transaction,
681 ) -> Result<Job<TransactionErrorInfo>, TgError> {
682 const FUNCTION_NAME: &str = "get_transaction_error_info_async()";
683 trace!("{} start", FUNCTION_NAME);
684
685 let command = Self::transaction_error_info_command(transaction.transaction_handle()?);
686 let job = self
687 .send_and_pull_async(
688 "TransactionErrorInfo",
689 command,
690 None,
691 Box::new(transaction_error_info_processor),
692 )
693 .await?;
694
695 trace!("{} end", FUNCTION_NAME);
696 Ok(job)
697 }
698
699 fn transaction_error_info_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
700 let request = crate::jogasaki::proto::sql::request::GetErrorInfo {
701 transaction_handle: Some(*transaction_handle),
702 };
703 SqlCommand::GetErrorInfo(request)
704 }
705
706 pub async fn get_transaction_status(
723 &self,
724 transaction: &Transaction,
725 ) -> Result<TransactionStatusWithMessage, TgError> {
726 let timeout = self.default_timeout;
727 self.get_transaction_status_for(transaction, timeout).await
728 }
729
730 pub async fn get_transaction_status_for(
734 &self,
735 transaction: &Transaction,
736 timeout: Duration,
737 ) -> Result<TransactionStatusWithMessage, TgError> {
738 const FUNCTION_NAME: &str = "get_transaction_status()";
739 trace!("{} start", FUNCTION_NAME);
740
741 let command = Self::transaction_status_command(transaction.transaction_handle()?);
742 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
743 let status = transaction_status_processor(slot_handle, response)?;
744
745 trace!("{} end", FUNCTION_NAME);
746 Ok(status)
747 }
748
749 pub async fn get_transaction_status_async(
753 &self,
754 transaction: &Transaction,
755 ) -> Result<Job<TransactionStatusWithMessage>, TgError> {
756 const FUNCTION_NAME: &str = "get_transaction_status_async()";
757 trace!("{} start", FUNCTION_NAME);
758
759 let command = Self::transaction_status_command(transaction.transaction_handle()?);
760 let job = self
761 .send_and_pull_async(
762 "TransactionStatus",
763 command,
764 None,
765 Box::new(transaction_status_processor),
766 )
767 .await?;
768
769 trace!("{} end", FUNCTION_NAME);
770 Ok(job)
771 }
772
773 fn transaction_status_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
774 let request = crate::jogasaki::proto::sql::request::GetTransactionStatus {
775 transaction_handle: Some(*transaction_handle),
776 };
777 SqlCommand::GetTransactionStatus(request)
778 }
779
780 pub async fn execute(
795 &self,
796 transaction: &Transaction,
797 sql: &str,
798 ) -> Result<SqlExecuteResult, TgError> {
799 let timeout = self.default_timeout;
800 self.execute_for(transaction, sql, timeout).await
801 }
802
803 pub async fn execute_for(
805 &self,
806 transaction: &Transaction,
807 sql: &str,
808 timeout: Duration,
809 ) -> Result<SqlExecuteResult, TgError> {
810 const FUNCTION_NAME: &str = "execute()";
811 trace!("{} start", FUNCTION_NAME);
812
813 let tx_handle = transaction.transaction_handle()?;
814
815 let command = Self::execute_statement_command(tx_handle, sql);
816 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
817 let execute_result = execute_result_processor(slot_handle, response)?;
818
819 trace!("{} end", FUNCTION_NAME);
820 Ok(execute_result)
821 }
822
823 pub async fn execute_async(
825 &self,
826 transaction: &Transaction,
827 sql: &str,
828 ) -> Result<Job<SqlExecuteResult>, TgError> {
829 const FUNCTION_NAME: &str = "execute_async()";
830 trace!("{} start", FUNCTION_NAME);
831
832 let tx_handle = transaction.transaction_handle()?;
833
834 let command = Self::execute_statement_command(tx_handle, sql);
835 let job = self
836 .send_and_pull_async("Execute", command, None, Box::new(execute_result_processor))
837 .await?;
838
839 trace!("{} end", FUNCTION_NAME);
840 Ok(job)
841 }
842
843 fn execute_statement_command(transaction_handle: &ProtoTransaction, sql: &str) -> SqlCommand {
844 let request = crate::jogasaki::proto::sql::request::ExecuteStatement {
845 transaction_handle: Some(*transaction_handle),
846 sql: ProstString::from(sql),
847 };
848 SqlCommand::ExecuteStatement(request)
849 }
850
851 pub async fn prepared_execute(
871 &self,
872 transaction: &Transaction,
873 prepared_statement: &SqlPreparedStatement,
874 parameters: Vec<SqlParameter>,
875 ) -> Result<SqlExecuteResult, TgError> {
876 let timeout = self.default_timeout;
877 self.prepared_execute_for(transaction, prepared_statement, parameters, timeout)
878 .await
879 }
880
881 pub async fn prepared_execute_for(
883 &self,
884 transaction: &Transaction,
885 prepared_statement: &SqlPreparedStatement,
886 parameters: Vec<SqlParameter>,
887 timeout: Duration,
888 ) -> Result<SqlExecuteResult, TgError> {
889 const FUNCTION_NAME: &str = "prepared_execute()";
890 trace!("{} start", FUNCTION_NAME);
891
892 let tx_handle = transaction.transaction_handle()?;
893 let (parameters, lobs) =
894 convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
895
896 let command =
897 Self::execute_prepared_statement_command(tx_handle, prepared_statement, parameters);
898 let (slot_handle, response) = self.send_and_pull_response(command, lobs, timeout).await?;
899 let execute_result = execute_result_processor(slot_handle, response)?;
900
901 trace!("{} end", FUNCTION_NAME);
902 Ok(execute_result)
903 }
904
905 pub async fn prepared_execute_async(
907 &self,
908 transaction: &Transaction,
909 prepared_statement: &SqlPreparedStatement,
910 parameters: Vec<SqlParameter>,
911 ) -> Result<Job<SqlExecuteResult>, TgError> {
912 const FUNCTION_NAME: &str = "prepared_execute_async()";
913 trace!("{} start", FUNCTION_NAME);
914
915 let tx_handle = transaction.transaction_handle()?;
916 let (parameters, lobs) =
917 convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
918
919 let command =
920 Self::execute_prepared_statement_command(tx_handle, prepared_statement, parameters);
921 let job = self
922 .send_and_pull_async("Execute", command, lobs, Box::new(execute_result_processor))
923 .await?;
924
925 trace!("{} end", FUNCTION_NAME);
926 Ok(job)
927 }
928
929 fn execute_prepared_statement_command(
930 transaction_handle: &ProtoTransaction,
931 prepared_statement: &SqlPreparedStatement,
932 parameters: Vec<SqlParameter>,
933 ) -> SqlCommand {
934 let ps_handle = crate::jogasaki::proto::sql::common::PreparedStatement {
935 handle: prepared_statement.prepare_handle(),
936 has_result_records: prepared_statement.has_result_records(),
937 };
938 let request = crate::jogasaki::proto::sql::request::ExecutePreparedStatement {
939 transaction_handle: Some(*transaction_handle),
940 prepared_statement_handle: Some(ps_handle),
941 parameters,
942 };
943 SqlCommand::ExecutePreparedStatement(request)
944 }
945
946 pub async fn query(
974 &self,
975 transaction: &Transaction,
976 sql: &str,
977 ) -> Result<SqlQueryResult, TgError> {
978 let timeout = self.default_timeout;
979 self.query_for(transaction, sql, timeout).await
980 }
981
982 pub async fn query_for(
984 &self,
985 transaction: &Transaction,
986 sql: &str,
987 timeout: Duration,
988 ) -> Result<SqlQueryResult, TgError> {
989 const FUNCTION_NAME: &str = "query()";
990 trace!("{} start", FUNCTION_NAME);
991
992 let tx_handle = transaction.transaction_handle()?;
993
994 let command = Self::execute_query_command(tx_handle, sql);
995 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
996
997 let wire = self.wire().clone();
998 let default_timeout = self.default_timeout;
999 let query_result = query_result_processor(wire, slot_handle, response, default_timeout)?;
1000
1001 trace!("{} end", FUNCTION_NAME);
1002 Ok(query_result)
1003 }
1004
1005 pub async fn query_async(
1007 &self,
1008 transaction: &Transaction,
1009 sql: &str,
1010 ) -> Result<Job<SqlQueryResult>, TgError> {
1011 const FUNCTION_NAME: &str = "query_async()";
1012 trace!("{} start", FUNCTION_NAME);
1013
1014 let tx_handle = transaction.transaction_handle()?;
1015
1016 let command = Self::execute_query_command(tx_handle, sql);
1017 let wire = self.wire().clone();
1018 let default_timeout = self.default_timeout;
1019 let job = self
1020 .send_and_pull_async(
1021 "Query",
1022 command,
1023 None,
1024 Box::new(move |slot_handle, response| {
1025 query_result_processor(wire.clone(), slot_handle, response, default_timeout)
1026 }),
1027 )
1028 .await?;
1029
1030 trace!("{} end", FUNCTION_NAME);
1031 Ok(job)
1032 }
1033
1034 fn execute_query_command(transaction_handle: &ProtoTransaction, sql: &str) -> SqlCommand {
1035 let request = crate::jogasaki::proto::sql::request::ExecuteQuery {
1036 transaction_handle: Some(*transaction_handle),
1037 sql: ProstString::from(sql),
1038 };
1039 SqlCommand::ExecuteQuery(request)
1040 }
1041
1042 pub async fn prepared_query(
1071 &self,
1072 transaction: &Transaction,
1073 prepared_statement: &SqlPreparedStatement,
1074 parameters: Vec<SqlParameter>,
1075 ) -> Result<SqlQueryResult, TgError> {
1076 let timeout = self.default_timeout;
1077 self.prepared_query_for(transaction, prepared_statement, parameters, timeout)
1078 .await
1079 }
1080
1081 pub async fn prepared_query_for(
1083 &self,
1084 transaction: &Transaction,
1085 prepared_statement: &SqlPreparedStatement,
1086 parameters: Vec<SqlParameter>,
1087 timeout: Duration,
1088 ) -> Result<SqlQueryResult, TgError> {
1089 const FUNCTION_NAME: &str = "prepared_query()";
1090 trace!("{} start", FUNCTION_NAME);
1091
1092 let tx_handle = transaction.transaction_handle()?;
1093 let (parameters, lobs) =
1094 convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
1095
1096 let command =
1097 Self::execute_prepared_query_command(tx_handle, prepared_statement, parameters);
1098 let (slot_handle, response) = self.send_and_pull_response(command, lobs, timeout).await?;
1099
1100 let wire = self.wire().clone();
1101 let default_timeout = self.default_timeout;
1102 let query_result = query_result_processor(wire, slot_handle, response, default_timeout)?;
1103
1104 trace!("{} end", FUNCTION_NAME);
1105 Ok(query_result)
1106 }
1107
1108 pub async fn prepared_query_async(
1110 &self,
1111 transaction: &Transaction,
1112 prepared_statement: &SqlPreparedStatement,
1113 parameters: Vec<SqlParameter>,
1114 ) -> Result<Job<SqlQueryResult>, TgError> {
1115 const FUNCTION_NAME: &str = "prepared_query_async()";
1116 trace!("{} start", FUNCTION_NAME);
1117
1118 let tx_handle = transaction.transaction_handle()?;
1119 let (parameters, lobs) =
1120 convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
1121
1122 let command =
1123 Self::execute_prepared_query_command(tx_handle, prepared_statement, parameters);
1124 let wire = self.wire().clone();
1125 let default_timeout = self.default_timeout;
1126 let job = self
1127 .send_and_pull_async(
1128 "Query",
1129 command,
1130 lobs,
1131 Box::new(move |slot_handle, response| {
1132 query_result_processor(wire.clone(), slot_handle, response, default_timeout)
1133 }),
1134 )
1135 .await?;
1136
1137 trace!("{} end", FUNCTION_NAME);
1138 Ok(job)
1139 }
1140
1141 fn execute_prepared_query_command(
1142 transaction_handle: &ProtoTransaction,
1143 prepared_statement: &SqlPreparedStatement,
1144 parameters: Vec<SqlParameter>,
1145 ) -> SqlCommand {
1146 let ps_handle = crate::jogasaki::proto::sql::common::PreparedStatement {
1147 handle: prepared_statement.prepare_handle(),
1148 has_result_records: prepared_statement.has_result_records(),
1149 };
1150 let request = crate::jogasaki::proto::sql::request::ExecutePreparedQuery {
1151 transaction_handle: Some(*transaction_handle),
1152 prepared_statement_handle: Some(ps_handle),
1153 parameters,
1154 };
1155 SqlCommand::ExecutePreparedQuery(request)
1156 }
1157
1158 pub async fn open_blob(
1176 &self,
1177 transaction: &Transaction,
1178 blob: &TgBlobReference,
1179 ) -> Result<std::fs::File, TgError> {
1180 let timeout = self.default_timeout;
1181 self.open_blob_for(transaction, blob, timeout).await
1182 }
1183
1184 pub async fn open_blob_for(
1186 &self,
1187 transaction: &Transaction,
1188 blob: &TgBlobReference,
1189 timeout: Duration,
1190 ) -> Result<std::fs::File, TgError> {
1191 const FUNCTION_NAME: &str = "open_blob()";
1192 trace!("{} start", FUNCTION_NAME);
1193
1194 let tx_handle = transaction.transaction_handle()?;
1195
1196 let command = Self::open_lob_command(tx_handle, blob);
1197 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1198 let file = lob_open_processor(response, &self.session)?;
1199
1200 trace!("{} end", FUNCTION_NAME);
1201 Ok(file)
1202 }
1203
1204 pub async fn open_blob_async(
1206 &self,
1207 transaction: &Transaction,
1208 blob: &TgBlobReference,
1209 ) -> Result<Job<std::fs::File>, TgError> {
1210 const FUNCTION_NAME: &str = "open_blob_async()";
1211 trace!("{} start", FUNCTION_NAME);
1212
1213 let tx_handle = transaction.transaction_handle()?;
1214
1215 let command = Self::open_lob_command(tx_handle, blob);
1216 let session = self.session.clone();
1217 let job = self
1218 .send_and_pull_async(
1219 "File",
1220 command,
1221 None,
1222 Box::new(move |_, response| lob_open_processor(response, &session)),
1223 )
1224 .await?;
1225
1226 trace!("{} end", FUNCTION_NAME);
1227 Ok(job)
1228 }
1229
1230 pub async fn open_clob(
1248 &self,
1249 transaction: &Transaction,
1250 clob: &TgClobReference,
1251 ) -> Result<std::fs::File, TgError> {
1252 let timeout = self.default_timeout;
1253 self.open_clob_for(transaction, clob, timeout).await
1254 }
1255
1256 pub async fn open_clob_for(
1258 &self,
1259 transaction: &Transaction,
1260 clob: &TgClobReference,
1261 timeout: Duration,
1262 ) -> Result<std::fs::File, TgError> {
1263 const FUNCTION_NAME: &str = "open_clob()";
1264 trace!("{} start", FUNCTION_NAME);
1265
1266 let tx_handle = transaction.transaction_handle()?;
1267
1268 let command = Self::open_lob_command(tx_handle, clob);
1269 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1270 let file = lob_open_processor(response, &self.session)?;
1271
1272 trace!("{} end", FUNCTION_NAME);
1273 Ok(file)
1274 }
1275
1276 pub async fn open_clob_async(
1278 &self,
1279 transaction: &Transaction,
1280 clob: &TgClobReference,
1281 ) -> Result<Job<std::fs::File>, TgError> {
1282 const FUNCTION_NAME: &str = "open_clob_async()";
1283 trace!("{} start", FUNCTION_NAME);
1284
1285 let tx_handle = transaction.transaction_handle()?;
1286
1287 let command = Self::open_lob_command(tx_handle, clob);
1288 let session = self.session.clone();
1289 let job = self
1290 .send_and_pull_async(
1291 "File",
1292 command,
1293 None,
1294 Box::new(move |_, response| lob_open_processor(response, &session)),
1295 )
1296 .await?;
1297
1298 trace!("{} end", FUNCTION_NAME);
1299 Ok(job)
1300 }
1301
1302 fn open_lob_command<T: TgLargeObjectReference>(
1303 transaction_handle: &ProtoTransaction,
1304 lob: &T,
1305 ) -> SqlCommand {
1306 let lob = crate::jogasaki::proto::sql::common::LargeObjectReference {
1307 provider: lob.provider().into(),
1308 object_id: lob.object_id(),
1309 contents_opt: None,
1310 };
1311
1312 let request = crate::jogasaki::proto::sql::request::GetLargeObjectData {
1313 transaction_handle: Some(*transaction_handle),
1314 reference: Some(lob),
1315 };
1316 SqlCommand::GetLargeObjectData(request)
1317 }
1318
1319 pub async fn get_blob_cache(
1337 &self,
1338 transaction: &Transaction,
1339 blob: &TgBlobReference,
1340 ) -> Result<TgLargeObjectCache, TgError> {
1341 let timeout = self.default_timeout;
1342 self.get_blob_cache_for(transaction, blob, timeout).await
1343 }
1344
1345 pub async fn get_blob_cache_for(
1349 &self,
1350 transaction: &Transaction,
1351 blob: &TgBlobReference,
1352 timeout: Duration,
1353 ) -> Result<TgLargeObjectCache, TgError> {
1354 const FUNCTION_NAME: &str = "get_blob_cache()";
1355 trace!("{} start", FUNCTION_NAME);
1356
1357 let cache = self
1358 .get_large_object_cache(transaction, blob, timeout)
1359 .await?;
1360
1361 trace!("{} end", FUNCTION_NAME);
1362 Ok(cache)
1363 }
1364
1365 pub async fn get_blob_cache_async(
1369 &self,
1370 transaction: &Transaction,
1371 blob: &TgBlobReference,
1372 ) -> Result<Job<TgLargeObjectCache>, TgError> {
1373 const FUNCTION_NAME: &str = "get_blob_cache_async()";
1374 trace!("{} start", FUNCTION_NAME);
1375
1376 let job = self.get_large_object_cache_async(transaction, blob).await?;
1377
1378 trace!("{} end", FUNCTION_NAME);
1379 Ok(job)
1380 }
1381
1382 pub async fn get_clob_cache(
1400 &self,
1401 transaction: &Transaction,
1402 clob: &TgClobReference,
1403 ) -> Result<TgLargeObjectCache, TgError> {
1404 let timeout = self.default_timeout;
1405 self.get_clob_cache_for(transaction, clob, timeout).await
1406 }
1407
1408 pub async fn get_clob_cache_for(
1412 &self,
1413 transaction: &Transaction,
1414 clob: &TgClobReference,
1415 timeout: Duration,
1416 ) -> Result<TgLargeObjectCache, TgError> {
1417 const FUNCTION_NAME: &str = "get_clob_cache()";
1418 trace!("{} start", FUNCTION_NAME);
1419
1420 let cache = self
1421 .get_large_object_cache(transaction, clob, timeout)
1422 .await?;
1423
1424 trace!("{} end", FUNCTION_NAME);
1425 Ok(cache)
1426 }
1427
1428 pub async fn get_clob_cache_async(
1432 &self,
1433 transaction: &Transaction,
1434 clob: &TgClobReference,
1435 ) -> Result<Job<TgLargeObjectCache>, TgError> {
1436 const FUNCTION_NAME: &str = "get_clob_cache_async()";
1437 trace!("{} start", FUNCTION_NAME);
1438
1439 let job = self.get_large_object_cache_async(transaction, clob).await?;
1440
1441 trace!("{} end", FUNCTION_NAME);
1442 Ok(job)
1443 }
1444
1445 async fn get_large_object_cache<T: TgLargeObjectReference>(
1446 &self,
1447 transaction: &Transaction,
1448 lob: &T,
1449 timeout: Duration,
1450 ) -> Result<TgLargeObjectCache, TgError> {
1451 let tx_handle = transaction.transaction_handle()?;
1452 let command = Self::open_lob_command(tx_handle, lob);
1453 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1454 let cache = lob_cache_processor(response, &self.session)?;
1455 Ok(cache)
1456 }
1457
1458 async fn get_large_object_cache_async<T: TgLargeObjectReference>(
1459 &self,
1460 transaction: &Transaction,
1461 lob: &T,
1462 ) -> Result<Job<TgLargeObjectCache>, TgError> {
1463 let tx_handle = transaction.transaction_handle()?;
1464 let command = Self::open_lob_command(tx_handle, lob);
1465 let session = self.session.clone();
1466 let job = self
1467 .send_and_pull_async(
1468 "LargeObjectCache",
1469 command,
1470 None,
1471 Box::new(move |_, response| lob_cache_processor(response, &session)),
1472 )
1473 .await?;
1474 Ok(job)
1475 }
1476
1477 pub async fn read_blob(
1494 &self,
1495 transaction: &Transaction,
1496 blob: &TgBlobReference,
1497 ) -> Result<Vec<u8>, TgError> {
1498 let timeout = self.default_timeout;
1499 self.read_blob_for(transaction, blob, timeout).await
1500 }
1501
1502 pub async fn read_blob_for(
1506 &self,
1507 transaction: &Transaction,
1508 blob: &TgBlobReference,
1509 timeout: Duration,
1510 ) -> Result<Vec<u8>, TgError> {
1511 const FUNCTION_NAME: &str = "read_blob()";
1512 trace!("{} start", FUNCTION_NAME);
1513
1514 let tx_handle = transaction.transaction_handle()?;
1515
1516 let command = Self::open_lob_command(tx_handle, blob);
1517 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1518 let buf = Self::blob_read_processor(response, &self.session)?;
1519
1520 trace!("{} end", FUNCTION_NAME);
1521 Ok(buf)
1522 }
1523
1524 pub async fn read_blob_async(
1528 &self,
1529 transaction: &Transaction,
1530 blob: &TgBlobReference,
1531 ) -> Result<Job<Vec<u8>>, TgError> {
1532 const FUNCTION_NAME: &str = "read_blob_async()";
1533 trace!("{} start", FUNCTION_NAME);
1534
1535 let tx_handle = transaction.transaction_handle()?;
1536
1537 let command = Self::open_lob_command(tx_handle, blob);
1538 let session = self.session.clone();
1539 let job = self
1540 .send_and_pull_async(
1541 "BLOB",
1542 command,
1543 None,
1544 Box::new(move |_, response| Self::blob_read_processor(response, &session)),
1545 )
1546 .await?;
1547
1548 trace!("{} end", FUNCTION_NAME);
1549 Ok(job)
1550 }
1551
1552 fn blob_read_processor(
1553 response: WireResponse,
1554 session: &Arc<Session>,
1555 ) -> Result<Vec<u8>, TgError> {
1556 let mut file = lob_open_processor(response, session)?;
1557 let mut buf = Vec::new();
1558 file.read_to_end(&mut buf)
1559 .map_err(|e| io_error!("BLOB read error", e))?;
1560
1561 Ok(buf)
1562 }
1563
1564 pub async fn read_clob(
1581 &self,
1582 transaction: &Transaction,
1583 clob: &TgClobReference,
1584 ) -> Result<String, TgError> {
1585 let timeout = self.default_timeout;
1586 self.read_clob_for(transaction, clob, timeout).await
1587 }
1588
1589 pub async fn read_clob_for(
1593 &self,
1594 transaction: &Transaction,
1595 clob: &TgClobReference,
1596 timeout: Duration,
1597 ) -> Result<String, TgError> {
1598 const FUNCTION_NAME: &str = "read_clob()";
1599 trace!("{} start", FUNCTION_NAME);
1600
1601 let tx_handle = transaction.transaction_handle()?;
1602
1603 let command = Self::open_lob_command(tx_handle, clob);
1604 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1605 let buf = Self::clob_read_processor(response, &self.session)?;
1606
1607 trace!("{} end", FUNCTION_NAME);
1608 Ok(buf)
1609 }
1610
1611 pub async fn read_clob_async(
1615 &self,
1616 transaction: &Transaction,
1617 clob: &TgClobReference,
1618 ) -> Result<Job<String>, TgError> {
1619 const FUNCTION_NAME: &str = "read_clob_async()";
1620 trace!("{} start", FUNCTION_NAME);
1621
1622 let tx_handle = transaction.transaction_handle()?;
1623
1624 let command = Self::open_lob_command(tx_handle, clob);
1625 let session = self.session.clone();
1626 let job = self
1627 .send_and_pull_async(
1628 "CLOB",
1629 command,
1630 None,
1631 Box::new(move |_, response| Self::clob_read_processor(response, &session)),
1632 )
1633 .await?;
1634
1635 trace!("{} end", FUNCTION_NAME);
1636 Ok(job)
1637 }
1638
1639 fn clob_read_processor(
1640 response: WireResponse,
1641 session: &Arc<Session>,
1642 ) -> Result<String, TgError> {
1643 let mut file = lob_open_processor(response, session)?;
1644 let mut buf = String::new();
1645 file.read_to_string(&mut buf)
1646 .map_err(|e| io_error!("CLOB read error", e))?;
1647
1648 Ok(buf)
1649 }
1650
1651 pub async fn copy_blob_to<T: AsRef<Path>>(
1665 &self,
1666 transaction: &Transaction,
1667 blob: &TgBlobReference,
1668 destination: T,
1669 ) -> Result<(), TgError> {
1670 let timeout = self.default_timeout;
1671 self.copy_blob_to_for(transaction, blob, destination, timeout)
1672 .await
1673 }
1674
1675 pub async fn copy_blob_to_for<T: AsRef<Path>>(
1677 &self,
1678 transaction: &Transaction,
1679 blob: &TgBlobReference,
1680 destination: T,
1681 timeout: Duration,
1682 ) -> Result<(), TgError> {
1683 const FUNCTION_NAME: &str = "copy_blob_to()";
1684 trace!("{} start", FUNCTION_NAME);
1685
1686 let tx_handle = transaction.transaction_handle()?;
1687
1688 let command = Self::copy_lob_to_command(tx_handle, blob);
1689 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1690 lob_copy_to_processor(response, &self.session, destination)?;
1691
1692 trace!("{} end", FUNCTION_NAME);
1693 Ok(())
1694 }
1695
1696 pub async fn copy_blob_to_async<T: AsRef<Path> + Send + Clone + 'static>(
1698 &self,
1699 transaction: &Transaction,
1700 blob: &TgBlobReference,
1701 destination: T,
1702 ) -> Result<Job<()>, TgError> {
1703 const FUNCTION_NAME: &str = "copy_blob_to_async()";
1704 trace!("{} start", FUNCTION_NAME);
1705
1706 let tx_handle = transaction.transaction_handle()?;
1707
1708 let command = Self::copy_lob_to_command(tx_handle, blob);
1709 let session = self.session.clone();
1710 let job = self
1711 .send_and_pull_async(
1712 "BlobCopy",
1713 command,
1714 None,
1715 Box::new(move |_, response| {
1716 lob_copy_to_processor(response, &session, destination.clone())
1717 }),
1718 )
1719 .await?;
1720
1721 trace!("{} end", FUNCTION_NAME);
1722 Ok(job)
1723 }
1724
1725 pub async fn copy_clob_to<T: AsRef<Path>>(
1739 &self,
1740 transaction: &Transaction,
1741 clob: &TgClobReference,
1742 destination: T,
1743 ) -> Result<(), TgError> {
1744 let timeout = self.default_timeout;
1745 self.copy_clob_to_for(transaction, clob, destination, timeout)
1746 .await
1747 }
1748
1749 pub async fn copy_clob_to_for<T: AsRef<Path>>(
1751 &self,
1752 transaction: &Transaction,
1753 clob: &TgClobReference,
1754 destination: T,
1755 timeout: Duration,
1756 ) -> Result<(), TgError> {
1757 const FUNCTION_NAME: &str = "copy_clob_to()";
1758 trace!("{} start", FUNCTION_NAME);
1759
1760 let tx_handle = transaction.transaction_handle()?;
1761
1762 let command = Self::copy_lob_to_command(tx_handle, clob);
1763 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1764 lob_copy_to_processor(response, &self.session, destination)?;
1765
1766 trace!("{} end", FUNCTION_NAME);
1767 Ok(())
1768 }
1769
1770 pub async fn copy_clob_to_async<T: AsRef<Path> + Send + Clone + 'static>(
1772 &self,
1773 transaction: &Transaction,
1774 clob: &TgClobReference,
1775 destination: T,
1776 ) -> Result<Job<()>, TgError> {
1777 const FUNCTION_NAME: &str = "copy_clob_to_async()";
1778 trace!("{} start", FUNCTION_NAME);
1779
1780 let tx_handle = transaction.transaction_handle()?;
1781
1782 let command = Self::copy_lob_to_command(tx_handle, clob);
1783 let session = self.session.clone();
1784 let job = self
1785 .send_and_pull_async(
1786 "ClobCopy",
1787 command,
1788 None,
1789 Box::new(move |_, response| {
1790 lob_copy_to_processor(response, &session, destination.clone())
1791 }),
1792 )
1793 .await?;
1794
1795 trace!("{} end", FUNCTION_NAME);
1796 Ok(job)
1797 }
1798
1799 fn copy_lob_to_command<T: TgLargeObjectReference>(
1800 transaction_handle: &ProtoTransaction,
1801 clob: &T,
1802 ) -> SqlCommand {
1803 let lob = crate::jogasaki::proto::sql::common::LargeObjectReference {
1804 provider: clob.provider().into(),
1805 object_id: clob.object_id(),
1806 contents_opt: None,
1807 };
1808
1809 let request = crate::jogasaki::proto::sql::request::GetLargeObjectData {
1810 transaction_handle: Some(*transaction_handle),
1811 reference: Some(lob),
1812 };
1813 SqlCommand::GetLargeObjectData(request)
1814 }
1815
1816 pub async fn commit(
1830 &self,
1831 transaction: &Transaction,
1832 commit_option: &CommitOption,
1833 ) -> Result<(), TgError> {
1834 let timeout = self.default_timeout;
1835 self.commit_for(transaction, commit_option, timeout).await
1836 }
1837
1838 pub async fn commit_for(
1840 &self,
1841 transaction: &Transaction,
1842 commit_option: &CommitOption,
1843 timeout: Duration,
1844 ) -> Result<(), TgError> {
1845 const FUNCTION_NAME: &str = "commit()";
1846 trace!("{} start", FUNCTION_NAME);
1847
1848 let tx_handle = transaction.transaction_handle()?;
1849
1850 let command = Self::commit_command(tx_handle, commit_option);
1851 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
1852 transaction_commit_processor(slot_handle, response)?;
1853
1854 trace!("{} end", FUNCTION_NAME);
1855 Ok(())
1856 }
1857
1858 pub async fn commit_async(
1860 &self,
1861 transaction: &Transaction,
1862 commit_option: &CommitOption,
1863 ) -> Result<Job<()>, TgError> {
1864 const FUNCTION_NAME: &str = "commit_async()";
1865 trace!("{} start", FUNCTION_NAME);
1866
1867 let tx_handle = transaction.transaction_handle()?;
1868
1869 let command = Self::commit_command(tx_handle, commit_option);
1870 let job = self
1871 .send_and_pull_async(
1872 "Commit",
1873 command,
1874 None,
1875 Box::new(transaction_commit_processor),
1876 )
1877 .await?;
1878
1879 trace!("{} end", FUNCTION_NAME);
1880 Ok(job)
1881 }
1882
1883 fn commit_command(
1884 transaction_handle: &ProtoTransaction,
1885 commit_option: &CommitOption,
1886 ) -> SqlCommand {
1887 let request = crate::jogasaki::proto::sql::request::Commit {
1888 transaction_handle: Some(*transaction_handle),
1889 notification_type: commit_option.notification_type,
1890 auto_dispose: commit_option.auto_dispose,
1891 option: Some(*commit_option),
1892 };
1893 SqlCommand::Commit(request)
1894 }
1895
1896 pub async fn rollback(&self, transaction: &Transaction) -> Result<(), TgError> {
1909 let timeout = self.default_timeout;
1910 self.rollback_for(transaction, timeout).await
1911 }
1912
1913 pub async fn rollback_for(
1915 &self,
1916 transaction: &Transaction,
1917 timeout: Duration,
1918 ) -> Result<(), TgError> {
1919 const FUNCTION_NAME: &str = "rollback()";
1920 trace!("{} start", FUNCTION_NAME);
1921
1922 let tx_handle = transaction.transaction_handle()?;
1923
1924 let command = Self::rollback_command(tx_handle);
1925 let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
1926 transaction_rollback_processor(slot_handle, response)?;
1927
1928 trace!("{} end", FUNCTION_NAME);
1929 Ok(())
1930 }
1931
1932 pub async fn rollback_async(&self, transaction: &Transaction) -> Result<Job<()>, TgError> {
1934 const FUNCTION_NAME: &str = "rollback_async()";
1935 trace!("{} start", FUNCTION_NAME);
1936
1937 let tx_handle = transaction.transaction_handle()?;
1938
1939 let command = Self::rollback_command(tx_handle);
1940 let job = self
1941 .send_and_pull_async(
1942 "Rollback",
1943 command,
1944 None,
1945 Box::new(transaction_rollback_processor),
1946 )
1947 .await?;
1948
1949 trace!("{} end", FUNCTION_NAME);
1950 Ok(job)
1951 }
1952
1953 fn rollback_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
1954 let request = crate::jogasaki::proto::sql::request::Rollback {
1955 transaction_handle: Some(*transaction_handle),
1956 };
1957 SqlCommand::Rollback(request)
1958 }
1959
1960 pub(crate) async fn dispose_transaction(
1961 &self,
1962 transaction_handle: &ProtoTransaction,
1963 timeout: Duration,
1964 ) -> Result<(), TgError> {
1965 const FUNCTION_NAME: &str = "dispose_transaction()";
1966 trace!("{} start", FUNCTION_NAME);
1967
1968 let command = Self::dispose_transaction_command(transaction_handle);
1969 let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1970 transaction_dispose_processor(response)?;
1971
1972 trace!("{} end", FUNCTION_NAME);
1973 Ok(())
1974 }
1975
1976 pub(crate) async fn dispose_transaction_send_only(
1977 &self,
1978 transaction_handle: &ProtoTransaction,
1979 ) -> Result<(), TgError> {
1980 const FUNCTION_NAME: &str = "dispose_transaction()";
1981 trace!("{} start", FUNCTION_NAME);
1982
1983 let command = Self::dispose_transaction_command(transaction_handle);
1984 let _ = self.send_only(command).await?;
1985
1986 trace!("{} end", FUNCTION_NAME);
1987 Ok(())
1988 }
1989
1990 fn dispose_transaction_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
1991 let request = crate::jogasaki::proto::sql::request::DisposeTransaction {
1992 transaction_handle: Some(*transaction_handle),
1993 };
1994 SqlCommand::DisposeTransaction(request)
1995 }
1996}
1997
1998impl SqlClient {
1999 fn wire(&self) -> Arc<Wire> {
2000 self.session.wire()
2001 }
2002
2003 async fn send_only(&self, command: SqlCommand) -> Result<Arc<SlotEntryHandle>, TgError> {
2004 let request = Self::new_request(command);
2005 self.wire().send_only(SERVICE_ID_SQL, request, None).await
2006 }
2007
2008 async fn send_and_pull_response(
2009 &self,
2010 command: SqlCommand,
2011 lobs: Option<Vec<BlobInfo>>,
2012 timeout: Duration,
2013 ) -> Result<(Arc<SlotEntryHandle>, WireResponse), TgError> {
2014 let request = Self::new_request(command);
2015 self.wire()
2016 .send_and_pull_response(SERVICE_ID_SQL, request, lobs, timeout)
2017 .await
2018 }
2019
2020 async fn send_and_pull_async<T: 'static>(
2021 &self,
2022 job_name: &str,
2023 command: SqlCommand,
2024 lobs: Option<Vec<BlobInfo>>,
2025 converter: Box<dyn Fn(Arc<SlotEntryHandle>, WireResponse) -> Result<T, TgError> + Send>,
2026 ) -> Result<Job<T>, TgError> {
2027 let request = Self::new_request(command);
2028 self.wire()
2029 .send_and_pull_async(
2030 job_name,
2031 SERVICE_ID_SQL,
2032 request,
2033 lobs,
2034 converter,
2035 self.default_timeout,
2036 self.session.fail_on_drop_error(),
2037 )
2038 .await
2039 }
2040
2041 fn new_request(command: SqlCommand) -> SqlRequest {
2042 SqlRequest {
2043 session_handle: None,
2044 service_message_version_major: SERVICE_MESSAGE_VERSION_MAJOR,
2045 service_message_version_minor: SERVICE_MESSAGE_VERSION_MINOR,
2046 request: Some(command),
2047 }
2048 }
2049}
2050
2051#[allow(clippy::type_complexity)]
2052pub(crate) fn convert_sql_response(
2053 function_name: &str,
2054 response: &WireResponse,
2055) -> Result<(Option<SqlResponse>, Option<HashMap<String, BlobInfo>>), TgError> {
2056 match response {
2057 WireResponse::ResponseSessionPayload(_slot, payload, lobs, error) => {
2058 if let Some(e) = error {
2059 return Err(e.to_tg_error());
2060 }
2061 if payload.is_none() {
2062 return Err(invalid_response_error!(function_name, "payload is None"));
2063 }
2064 let payload = &payload.as_ref().unwrap()[..];
2066 let sql_response = SqlResponse::decode_length_delimited(payload)
2067 .map_err(|e| prost_decode_error!(function_name, "SqlResponse", e))?;
2068 match &sql_response.response {
2069 Some(SqlResponseType::ResultOnly(result_only)) => match &result_only.result {
2070 Some(crate::jogasaki::proto::sql::response::result_only::Result::Success(
2071 _,
2072 )) => Ok((Some(sql_response), lobs.clone())),
2073 Some(crate::jogasaki::proto::sql::response::result_only::Result::Error(
2074 error,
2075 )) => {
2076 let error = error.clone();
2077 Err(sql_service_error!(function_name, error))
2078 }
2079 _ => Ok((Some(sql_response), lobs.clone())),
2080 },
2081 _ => Ok((Some(sql_response), lobs.clone())),
2082 }
2083 }
2084 _ => Ok((None, None)),
2085 }
2086}
2087
2088pub(crate) fn sql_result_only_success_processor(
2089 function_name: &str,
2090 response: WireResponse,
2091) -> Result<(), TgError> {
2092 let (sql_response, _) = convert_sql_response(function_name, &response)?;
2093 let message = sql_response.ok_or(invalid_response_error!(
2094 function_name,
2095 format!("response {:?} is not ResponseSessionPayload", response),
2096 ))?;
2097 match message.response {
2098 Some(SqlResponseType::ResultOnly(result_only)) => match result_only.result {
2099 Some(crate::jogasaki::proto::sql::response::result_only::Result::Success(_)) => Ok(()),
2100 _ => Err(invalid_response_error!(
2101 function_name,
2102 format!("fail. {:?}", result_only),
2103 )),
2104 },
2105 _ => Err(invalid_response_error!(
2106 function_name,
2107 format!("response {:?} is not ResultOnly", message.response),
2108 )),
2109 }
2110}
2111
2112#[cfg(test)]
2113mod test {
2114 use super::*;
2115
2116 #[test]
2117 fn service_message_version() {
2118 let smv = SqlClient::service_message_version();
2119 assert_eq!("sql-1.6", smv);
2120 }
2121}