tsubakuro_rust_core/service/sql/
sql_client.rs

1use std::{collections::HashMap, io::Read, path::Path, sync::Arc, time::Duration};
2
3use log::trace;
4
5use crate::{
6    error::TgError,
7    invalid_response_error, io_error,
8    job::Job,
9    jogasaki::proto::sql::{
10        common::Transaction as ProtoTransaction,
11        request::{request::Request as SqlCommand, Request as SqlRequest},
12        response::{response::Response as SqlResponseType, Response as SqlResponse},
13    },
14    prelude::{
15        convert_lob_parameters,
16        error_info::{transaction_error_info_processor, TransactionErrorInfo},
17        execute_result_processor,
18        explain::explain_processor,
19        list_tables_processor, prepare_dispose_processor, prepare_processor,
20        query_result_processor,
21        r#type::large_object::{
22            lob_cache_processor, lob_copy_to_processor, lob_open_processor, TgLargeObjectCache,
23        },
24        table_metadata_processor, transaction_status_processor, CommitOption, ServiceClient,
25        SqlExecuteResult, SqlParameter, SqlPlaceholder, SqlQueryResult, TableList, TableMetadata,
26        TgBlobReference, TgClobReference, TransactionStatusWithMessage,
27    },
28    prost_decode_error,
29    session::{
30        wire::{response::WireResponse, response_box::SlotEntryHandle, Wire},
31        Session,
32    },
33    sql_service_error,
34    tateyama::proto::framework::common::BlobInfo,
35    transaction::{
36        option::TransactionOption, transaction_begin_processor, transaction_commit_processor,
37        transaction_dispose_processor, transaction_rollback_processor, Transaction,
38    },
39};
40
41use prost::{alloc::string::String as ProstString, Message};
42
43use super::{
44    explain::SqlExplainResult, prepare::SqlPreparedStatement,
45    r#type::large_object::TgLargeObjectReference,
46};
47
48/// The symbolic ID of the destination service.
49const SERVICE_SYMBOLIC_ID: &str = "sql";
50
51/// The major service message version which this client requests.
52const SERVICE_MESSAGE_VERSION_MAJOR: u64 = 1;
53
54/// The minor service message version which this client requests.
55const SERVICE_MESSAGE_VERSION_MINOR: u64 = 6;
56
57pub(crate) const SERVICE_ID_SQL: i32 = 3;
58
59/// A SQL service client.
60///
61/// # Examples
62/// ```
63/// use std::sync::Arc;
64/// use tsubakuro_rust_core::prelude::*;
65///
66/// async fn example(session: &Arc<Session>) -> Result<(), TgError> {
67///     let client: SqlClient = session.make_client();
68///
69///     // In Tsurugi, DDL is also executed in a transaction.
70///     // (DDL and DML must not be executed in the same transaction)
71///     let transaction = client.start_transaction(&TransactionOption::default()).await?;
72///     let result = {
73///         let sql = "
74///           create table customer (
75///             c_id bigint primary key,
76///             c_name varchar(30) not null,
77///             c_age int
78///           )
79///         ";
80///         let result = client.execute(&transaction, sql).await;
81///         match result {
82///            Ok(_) => client.commit(&transaction, &CommitOption::default()).await,
83///            Err(e) => Err(e)
84///         }
85///     };
86///     transaction.close().await?;
87///     result?;
88///
89///     let table_list = client.list_tables().await?;
90///     let table_metadata = client.get_table_metadata("customer").await?;
91///
92///     Ok(())
93/// }
94/// ```
95pub struct SqlClient {
96    session: Arc<Session>,
97    default_timeout: Duration,
98}
99
100impl ServiceClient for SqlClient {
101    fn new(session: Arc<Session>) -> Self {
102        let default_timeout = session.default_timeout();
103        SqlClient {
104            session,
105            default_timeout,
106        }
107    }
108}
109
110impl SqlClient {
111    /// Get service message version.
112    pub fn service_message_version() -> String {
113        format!(
114            "{}-{}.{}",
115            SERVICE_SYMBOLIC_ID, SERVICE_MESSAGE_VERSION_MAJOR, SERVICE_MESSAGE_VERSION_MINOR
116        )
117    }
118
119    /// Set default timeout.
120    pub fn set_default_timeout(&mut self, timeout: Duration) {
121        self.default_timeout = timeout;
122    }
123
124    /// Get default timeout.
125    pub fn default_timeout(&self) -> Duration {
126        self.default_timeout
127    }
128}
129
130impl SqlClient {
131    /// Returns the list of available table names in the database, except system tables.
132    ///
133    /// The table names are each fully qualified (maybe with a schema name).
134    /// To retrieve more details for the individual tables, you can use [Self::get_table_metadata].
135    ///
136    /// # Examples
137    /// ```
138    /// use tsubakuro_rust_core::prelude::*;
139    ///
140    /// async fn example(client: &SqlClient) -> Result<(), TgError> {
141    ///     let table_list = client.list_tables().await?;
142    ///
143    ///     let table_names = table_list.table_names();
144    ///     for table_name in table_names {
145    ///         println!("{}", table_name);
146    ///     }
147    ///
148    ///     Ok(())
149    /// }
150    /// ```
151    pub async fn list_tables(&self) -> Result<TableList, TgError> {
152        let timeout = self.default_timeout;
153        self.list_tables_for(timeout).await
154    }
155
156    /// Returns the list of available table names in the database, except system tables.
157    ///
158    /// The table names are each fully qualified (maybe with a schema name).
159    /// To retrieve more details for the individual tables, you can use [Self::get_table_metadata_for].
160    pub async fn list_tables_for(&self, timeout: Duration) -> Result<TableList, TgError> {
161        const FUNCTION_NAME: &str = "list_tables()";
162        trace!("{} start", FUNCTION_NAME);
163
164        let command = Self::list_tables_command();
165        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
166        let table_list = list_tables_processor(slot_handle, response)?;
167
168        trace!("{} end", FUNCTION_NAME);
169        Ok(table_list)
170    }
171
172    /// Returns the list of available table names in the database, except system tables.
173    ///
174    /// The table names are each fully qualified (maybe with a schema name).
175    /// To retrieve more details for the individual tables, you can use [Self::get_table_metadata_async].
176    pub async fn list_tables_async(&self) -> Result<Job<TableList>, TgError> {
177        const FUNCTION_NAME: &str = "list_tables_async()";
178        trace!("{} start", FUNCTION_NAME);
179
180        let command = Self::list_tables_command();
181        let job = self
182            .send_and_pull_async("ListTables", command, None, Box::new(list_tables_processor))
183            .await?;
184
185        trace!("{} end", FUNCTION_NAME);
186        Ok(job)
187    }
188
189    fn list_tables_command() -> SqlCommand {
190        let request = crate::jogasaki::proto::sql::request::ListTables {};
191        SqlCommand::ListTables(request)
192    }
193
194    /// Retrieves metadata for a table.
195    ///
196    /// # Examples
197    /// ```
198    /// use tsubakuro_rust_core::prelude::*;
199    ///
200    /// async fn example(client: &SqlClient) -> Result<(), TgError> {
201    ///     let table_metadata = client.get_table_metadata("customer").await?;
202    ///     println!("table name={}", table_metadata.table_name());
203    ///
204    ///     let columns = table_metadata.columns();
205    ///     for column in columns {
206    ///         println!("column name={}", column.name());
207    ///     }
208    ///
209    ///     Ok(())
210    /// }
211    /// ```
212    pub async fn get_table_metadata(&self, table_name: &str) -> Result<TableMetadata, TgError> {
213        let timeout = self.default_timeout;
214        self.get_table_metadata_for(table_name, timeout).await
215    }
216
217    /// Retrieves metadata for a table.
218    pub async fn get_table_metadata_for(
219        &self,
220        table_name: &str,
221        timeout: Duration,
222    ) -> Result<TableMetadata, TgError> {
223        const FUNCTION_NAME: &str = "get_table_metadata()";
224        trace!("{} start", FUNCTION_NAME);
225
226        let command = Self::table_metadata_command(table_name);
227        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
228        let metadata = table_metadata_processor(slot_handle, response)?;
229
230        trace!("{} end", FUNCTION_NAME);
231        Ok(metadata)
232    }
233
234    /// Retrieves metadata for a table.
235    pub async fn get_table_metadata_async(
236        &self,
237        table_name: &str,
238    ) -> Result<Job<TableMetadata>, TgError> {
239        const FUNCTION_NAME: &str = "get_table_metadata_async()";
240        trace!("{} start", FUNCTION_NAME);
241
242        let command = Self::table_metadata_command(table_name);
243        let job = self
244            .send_and_pull_async(
245                "TableMetadata",
246                command,
247                None,
248                Box::new(table_metadata_processor),
249            )
250            .await?;
251
252        trace!("{} end", FUNCTION_NAME);
253        Ok(job)
254    }
255
256    fn table_metadata_command(table_name: &str) -> SqlCommand {
257        let request = crate::jogasaki::proto::sql::request::DescribeTable {
258            name: table_name.to_string(),
259        };
260        SqlCommand::DescribeTable(request)
261    }
262
263    /// Prepares a SQL statement.
264    ///
265    /// Note: Should invoke [`SqlPreparedStatement::close`] before [`SqlPreparedStatement::drop`] to dispose the prepared statement.
266    ///
267    /// # Examples
268    /// ```
269    /// use tsubakuro_rust_core::prelude::*;
270    ///
271    /// async fn example(client: &SqlClient) -> Result<(), TgError> {
272    ///     let sql = "insert into customer values(:id, :name, :age)";
273    ///     let placeholders = vec![
274    ///         SqlPlaceholder::of::<i64>("id"),
275    ///         SqlPlaceholder::of::<String>("name"),
276    ///         SqlPlaceholder::of::<i32>("age"),
277    ///     ];
278    ///     let prepared_statement = client.prepare(sql, placeholders).await?;
279    ///
280    ///     prepared_statement.close().await?;
281    ///     Ok(())
282    /// }
283    /// ```
284    pub async fn prepare(
285        &self,
286        sql: &str,
287        placeholders: Vec<SqlPlaceholder>,
288    ) -> Result<SqlPreparedStatement, TgError> {
289        let timeout = self.default_timeout;
290        self.prepare_for(sql, placeholders, timeout).await
291    }
292
293    /// Prepares a SQL statement.
294    ///
295    /// Note: Should invoke [`SqlPreparedStatement::close`] before [`SqlPreparedStatement::drop`] to dispose the prepared statement.
296    pub async fn prepare_for(
297        &self,
298        sql: &str,
299        placeholders: Vec<SqlPlaceholder>,
300        timeout: Duration,
301    ) -> Result<SqlPreparedStatement, TgError> {
302        const FUNCTION_NAME: &str = "prepare()";
303        trace!("{} start", FUNCTION_NAME);
304
305        let command = Self::prepare_command(sql, placeholders);
306        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
307
308        let session = self.session.clone();
309        let close_timeout = self.default_timeout;
310        let ps = prepare_processor(session, response, close_timeout)?;
311
312        trace!("{} end", FUNCTION_NAME);
313        Ok(ps)
314    }
315
316    /// Prepares a SQL statement.
317    ///
318    /// Note: Should invoke [`SqlPreparedStatement::close`] before [`SqlPreparedStatement::drop`] to dispose the prepared statement.
319    pub async fn prepare_async(
320        &self,
321        sql: &str,
322        placeholders: Vec<SqlPlaceholder>,
323    ) -> Result<Job<SqlPreparedStatement>, TgError> {
324        const FUNCTION_NAME: &str = "prepare_async()";
325        trace!("{} start", FUNCTION_NAME);
326
327        let command = Self::prepare_command(sql, placeholders);
328        let session = self.session.clone();
329        let close_timeout = self.default_timeout;
330        let job = self
331            .send_and_pull_async(
332                "Prepare",
333                command,
334                None,
335                Box::new(move |_, response| {
336                    prepare_processor(session.clone(), response, close_timeout)
337                }),
338            )
339            .await?;
340
341        trace!("{} end", FUNCTION_NAME);
342        Ok(job)
343    }
344
345    fn prepare_command(sql: &str, placeholders: Vec<SqlPlaceholder>) -> SqlCommand {
346        let request = crate::jogasaki::proto::sql::request::Prepare {
347            sql: sql.to_string(),
348            placeholders,
349        };
350        SqlCommand::Prepare(request)
351    }
352
353    pub(crate) async fn dispose_prepare(
354        &self,
355        prepare_handle: u64,
356        has_result_records: bool,
357        timeout: Duration,
358    ) -> Result<(), TgError> {
359        const FUNCTION_NAME: &str = "dispose_prepare()";
360        trace!("{} start", FUNCTION_NAME);
361
362        let command = Self::dispose_prepare_statement_command(prepare_handle, has_result_records);
363        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
364        prepare_dispose_processor(response)?;
365
366        trace!("{} end", FUNCTION_NAME);
367        Ok(())
368    }
369
370    pub(crate) async fn dispose_prepare_send_only(
371        &self,
372        prepare_handle: u64,
373        has_result_records: bool,
374    ) -> Result<(), TgError> {
375        const FUNCTION_NAME: &str = "dispose_prepare_send_only()";
376        trace!("{} start", FUNCTION_NAME);
377
378        let command = Self::dispose_prepare_statement_command(prepare_handle, has_result_records);
379        let _ = self.send_only(command).await?;
380
381        trace!("{} end", FUNCTION_NAME);
382        Ok(())
383    }
384
385    fn dispose_prepare_statement_command(
386        prepare_handle: u64,
387        has_result_records: bool,
388    ) -> SqlCommand {
389        let ps = crate::jogasaki::proto::sql::common::PreparedStatement {
390            handle: prepare_handle,
391            has_result_records,
392        };
393
394        let request = crate::jogasaki::proto::sql::request::DisposePreparedStatement {
395            prepared_statement_handle: Some(ps),
396        };
397        SqlCommand::DisposePreparedStatement(request)
398    }
399
400    /// Retrieves execution plan of the statement.
401    ///
402    /// # Examples
403    /// ```
404    /// use tsubakuro_rust_core::prelude::*;
405    ///
406    /// async fn example(client: &SqlClient) -> Result<(), TgError> {
407    ///     let sql = "select * from customer oder by c_id";
408    ///     let explain_result = client.explain(sql).await?;
409    ///     println!("json={}", explain_result.contents());
410    ///
411    ///     Ok(())
412    /// }
413    /// ```
414    pub async fn explain(&self, sql: &str) -> Result<SqlExplainResult, TgError> {
415        let timeout = self.default_timeout;
416        self.explain_for(sql, timeout).await
417    }
418
419    /// Retrieves execution plan of the statement.
420    pub async fn explain_for(
421        &self,
422        sql: &str,
423        timeout: Duration,
424    ) -> Result<SqlExplainResult, TgError> {
425        const FUNCTION_NAME: &str = "explain()";
426        trace!("{} start", FUNCTION_NAME);
427
428        let command = Self::explain_text_command(sql);
429        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
430
431        let explain_result = explain_processor(slot_handle, response)?;
432
433        trace!("{} end", FUNCTION_NAME);
434        Ok(explain_result)
435    }
436
437    /// Retrieves execution plan of the statement.
438    pub async fn explain_async(&self, sql: &str) -> Result<Job<SqlExplainResult>, TgError> {
439        const FUNCTION_NAME: &str = "explain_async()";
440        trace!("{} start", FUNCTION_NAME);
441
442        let command = Self::explain_text_command(sql);
443        let job = self
444            .send_and_pull_async("Explain", command, None, Box::new(explain_processor))
445            .await?;
446
447        trace!("{} end", FUNCTION_NAME);
448        Ok(job)
449    }
450
451    fn explain_text_command(sql: &str) -> SqlCommand {
452        let request = crate::jogasaki::proto::sql::request::ExplainByText {
453            sql: sql.to_string(),
454        };
455        SqlCommand::ExplainByText(request)
456    }
457
458    /// Retrieves execution plan of the statement.
459    ///
460    /// # Examples
461    /// ```
462    /// use tsubakuro_rust_core::prelude::*;
463    ///
464    /// async fn example(client: &SqlClient, prepared_statement: &SqlPreparedStatement) -> Result<(), TgError> {
465    ///     // prepared_statement: "select * from customer where c_id = :id"
466    ///     let parameters = vec![SqlParameter::of("id", 3_i64)];
467    ///     let explain_result = client.prepared_explain(prepared_statement, parameters).await?;
468    ///     println!("json={}", explain_result.contents());
469    ///
470    ///     Ok(())
471    /// }
472    /// ```
473    pub async fn prepared_explain(
474        &self,
475        prepared_statement: &SqlPreparedStatement,
476        parameters: Vec<SqlParameter>,
477    ) -> Result<SqlExplainResult, TgError> {
478        let timeout = self.default_timeout;
479        self.prepared_explain_for(prepared_statement, parameters, timeout)
480            .await
481    }
482
483    /// Retrieves execution plan of the statement.
484    pub async fn prepared_explain_for(
485        &self,
486        prepared_statement: &SqlPreparedStatement,
487        parameters: Vec<SqlParameter>,
488        timeout: Duration,
489    ) -> Result<SqlExplainResult, TgError> {
490        const FUNCTION_NAME: &str = "prepared_explain()";
491        trace!("{} start", FUNCTION_NAME);
492
493        let (parameters, lobs) =
494            convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
495        let command = Self::explain_prepared_command(prepared_statement, parameters);
496        let (slot_handle, response) = self.send_and_pull_response(command, lobs, timeout).await?;
497        let explain_result = explain_processor(slot_handle, response)?;
498
499        trace!("{} end", FUNCTION_NAME);
500        Ok(explain_result)
501    }
502
503    /// Retrieves execution plan of the statement.
504    pub async fn prepared_explain_async(
505        &self,
506        prepared_statement: &SqlPreparedStatement,
507        parameters: Vec<SqlParameter>,
508    ) -> Result<Job<SqlExplainResult>, TgError> {
509        const FUNCTION_NAME: &str = "prepared_explain_async()";
510        trace!("{} start", FUNCTION_NAME);
511
512        let (parameters, lobs) =
513            convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
514        let command = Self::explain_prepared_command(prepared_statement, parameters);
515        let job = self
516            .send_and_pull_async("Explain", command, lobs, Box::new(explain_processor))
517            .await?;
518
519        trace!("{} end", FUNCTION_NAME);
520        Ok(job)
521    }
522
523    fn explain_prepared_command(
524        prepared_statement: &SqlPreparedStatement,
525        parameters: Vec<SqlParameter>,
526    ) -> SqlCommand {
527        let ps_handle = crate::jogasaki::proto::sql::common::PreparedStatement {
528            handle: prepared_statement.prepare_handle(),
529            has_result_records: prepared_statement.has_result_records(),
530        };
531        let request = crate::jogasaki::proto::sql::request::Explain {
532            prepared_statement_handle: Some(ps_handle),
533            parameters,
534        };
535        SqlCommand::Explain(request)
536    }
537
538    /// Starts a new transaction.
539    ///
540    /// Note: Should invoke [`Transaction::close`] before [`Transaction::drop`] to dispose the transaction.
541    ///
542    /// # Examples
543    /// ```
544    /// use tsubakuro_rust_core::prelude::*;
545    ///
546    /// async fn example(client: &SqlClient, transaction_option: &TransactionOption) -> Result<(), TgError> {
547    ///     let transaction = client.start_transaction(transaction_option).await?;
548    ///
549    ///     let result = client.commit(&transaction, &CommitOption::default()).await;
550    ///
551    ///     transaction.close().await?;
552    ///
553    ///     result
554    /// }
555    /// ```
556    pub async fn start_transaction(
557        &self,
558        transaction_option: &TransactionOption,
559    ) -> Result<Transaction, TgError> {
560        let timeout = self.default_timeout;
561        self.start_transaction_for(transaction_option, timeout)
562            .await
563    }
564
565    /// Starts a new transaction.
566    ///
567    /// Note: Should invoke [`Transaction::close`] before [`Transaction::drop`] to dispose the transaction.
568    pub async fn start_transaction_for(
569        &self,
570        transaction_option: &TransactionOption,
571        timeout: Duration,
572    ) -> Result<Transaction, TgError> {
573        const FUNCTION_NAME: &str = "start_transaction()";
574        trace!("{} start", FUNCTION_NAME);
575
576        let command = Self::begin_transaction_command(transaction_option);
577        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
578
579        let session = self.session.clone();
580        let close_timeout = transaction_option
581            .close_timeout()
582            .unwrap_or(self.default_timeout);
583        let transaction = transaction_begin_processor(session, response, close_timeout)?;
584
585        trace!("{} end", FUNCTION_NAME);
586        Ok(transaction)
587    }
588
589    /// Starts a new transaction.
590    ///
591    /// Note: Should invoke [`Transaction::close`] before [`Transaction::drop`] to dispose the transaction.
592    pub async fn start_transaction_async(
593        &self,
594        transaction_option: &TransactionOption,
595    ) -> Result<Job<Transaction>, TgError> {
596        const FUNCTION_NAME: &str = "start_transaction_async()";
597        trace!("{} start", FUNCTION_NAME);
598
599        let command = Self::begin_transaction_command(transaction_option);
600        let session = self.session.clone();
601        let close_timeout = transaction_option
602            .close_timeout()
603            .unwrap_or(self.default_timeout);
604        let job = self
605            .send_and_pull_async(
606                "StartTransaction",
607                command,
608                None,
609                Box::new(move |_, response| {
610                    transaction_begin_processor(session.clone(), response, close_timeout)
611                }),
612            )
613            .await?;
614
615        trace!("{} end", FUNCTION_NAME);
616        Ok(job)
617    }
618
619    fn begin_transaction_command(transaction_option: &TransactionOption) -> SqlCommand {
620        let tx_option = transaction_option.request();
621
622        let request = crate::jogasaki::proto::sql::request::Begin {
623            option: Some(tx_option),
624        };
625        SqlCommand::Begin(request)
626    }
627
628    /// Returns occurred error in the target transaction.
629    ///
630    /// # Examples
631    /// ```
632    /// use tsubakuro_rust_core::prelude::*;
633    ///
634    /// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
635    ///     let status = client.get_transaction_error_info(transaction).await?;
636    ///     println!("is_error={}", status.is_error());
637    ///
638    ///     if let Some(code) = status.diagnostic_code() {
639    ///         println!("diagnostic_code={}", code);
640    ///     }
641    ///
642    ///     Ok(())
643    /// }
644    /// ```
645    ///
646    /// since 0.2.0
647    pub async fn get_transaction_error_info(
648        &self,
649        transaction: &Transaction,
650    ) -> Result<TransactionErrorInfo, TgError> {
651        let timeout = self.default_timeout;
652        self.get_transaction_error_info_for(transaction, timeout)
653            .await
654    }
655
656    /// Returns occurred error in the target transaction.
657    ///
658    /// since 0.2.0
659    pub async fn get_transaction_error_info_for(
660        &self,
661        transaction: &Transaction,
662        timeout: Duration,
663    ) -> Result<TransactionErrorInfo, TgError> {
664        const FUNCTION_NAME: &str = "get_transaction_error_info()";
665        trace!("{} start", FUNCTION_NAME);
666
667        let command = Self::transaction_error_info_command(transaction.transaction_handle()?);
668        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
669        let status = transaction_error_info_processor(slot_handle, response)?;
670
671        trace!("{} end", FUNCTION_NAME);
672        Ok(status)
673    }
674
675    /// Returns occurred error in the target transaction.
676    ///
677    /// since 0.2.0
678    pub async fn get_transaction_error_info_async(
679        &self,
680        transaction: &Transaction,
681    ) -> Result<Job<TransactionErrorInfo>, TgError> {
682        const FUNCTION_NAME: &str = "get_transaction_error_info_async()";
683        trace!("{} start", FUNCTION_NAME);
684
685        let command = Self::transaction_error_info_command(transaction.transaction_handle()?);
686        let job = self
687            .send_and_pull_async(
688                "TransactionErrorInfo",
689                command,
690                None,
691                Box::new(transaction_error_info_processor),
692            )
693            .await?;
694
695        trace!("{} end", FUNCTION_NAME);
696        Ok(job)
697    }
698
699    fn transaction_error_info_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
700        let request = crate::jogasaki::proto::sql::request::GetErrorInfo {
701            transaction_handle: Some(*transaction_handle),
702        };
703        SqlCommand::GetErrorInfo(request)
704    }
705
706    /// Get the transaction status on the server.
707    ///
708    /// # Examples
709    /// ```
710    /// use tsubakuro_rust_core::prelude::*;
711    ///
712    /// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
713    ///     let status = client.get_transaction_status(transaction).await?;
714    ///     println!("status={:?}", status.status());
715    ///     println!("message={}", status.message());
716    ///
717    ///     Ok(())
718    /// }
719    /// ```
720    ///
721    /// since 0.2.0
722    pub async fn get_transaction_status(
723        &self,
724        transaction: &Transaction,
725    ) -> Result<TransactionStatusWithMessage, TgError> {
726        let timeout = self.default_timeout;
727        self.get_transaction_status_for(transaction, timeout).await
728    }
729
730    /// Get the transaction status on the server.
731    ///
732    /// since 0.2.0
733    pub async fn get_transaction_status_for(
734        &self,
735        transaction: &Transaction,
736        timeout: Duration,
737    ) -> Result<TransactionStatusWithMessage, TgError> {
738        const FUNCTION_NAME: &str = "get_transaction_status()";
739        trace!("{} start", FUNCTION_NAME);
740
741        let command = Self::transaction_status_command(transaction.transaction_handle()?);
742        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
743        let status = transaction_status_processor(slot_handle, response)?;
744
745        trace!("{} end", FUNCTION_NAME);
746        Ok(status)
747    }
748
749    /// Get the transaction status on the server.
750    ///
751    /// since 0.2.0
752    pub async fn get_transaction_status_async(
753        &self,
754        transaction: &Transaction,
755    ) -> Result<Job<TransactionStatusWithMessage>, TgError> {
756        const FUNCTION_NAME: &str = "get_transaction_status_async()";
757        trace!("{} start", FUNCTION_NAME);
758
759        let command = Self::transaction_status_command(transaction.transaction_handle()?);
760        let job = self
761            .send_and_pull_async(
762                "TransactionStatus",
763                command,
764                None,
765                Box::new(transaction_status_processor),
766            )
767            .await?;
768
769        trace!("{} end", FUNCTION_NAME);
770        Ok(job)
771    }
772
773    fn transaction_status_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
774        let request = crate::jogasaki::proto::sql::request::GetTransactionStatus {
775            transaction_handle: Some(*transaction_handle),
776        };
777        SqlCommand::GetTransactionStatus(request)
778    }
779
780    /// Executes a SQL statement.
781    ///
782    /// # Examples
783    /// ```
784    /// use tsubakuro_rust_core::prelude::*;
785    ///
786    /// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
787    ///     let sql = "insert into customer values(4, 'example', 20)";
788    ///     let execute_result = client.execute(&transaction, sql).await?;
789    ///     println!("inserted rows={}", execute_result.inserted_rows());
790    ///
791    ///     Ok(())
792    /// }
793    /// ```
794    pub async fn execute(
795        &self,
796        transaction: &Transaction,
797        sql: &str,
798    ) -> Result<SqlExecuteResult, TgError> {
799        let timeout = self.default_timeout;
800        self.execute_for(transaction, sql, timeout).await
801    }
802
803    /// Executes a SQL statement.
804    pub async fn execute_for(
805        &self,
806        transaction: &Transaction,
807        sql: &str,
808        timeout: Duration,
809    ) -> Result<SqlExecuteResult, TgError> {
810        const FUNCTION_NAME: &str = "execute()";
811        trace!("{} start", FUNCTION_NAME);
812
813        let tx_handle = transaction.transaction_handle()?;
814
815        let command = Self::execute_statement_command(tx_handle, sql);
816        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
817        let execute_result = execute_result_processor(slot_handle, response)?;
818
819        trace!("{} end", FUNCTION_NAME);
820        Ok(execute_result)
821    }
822
823    /// Executes a SQL statement.
824    pub async fn execute_async(
825        &self,
826        transaction: &Transaction,
827        sql: &str,
828    ) -> Result<Job<SqlExecuteResult>, TgError> {
829        const FUNCTION_NAME: &str = "execute_async()";
830        trace!("{} start", FUNCTION_NAME);
831
832        let tx_handle = transaction.transaction_handle()?;
833
834        let command = Self::execute_statement_command(tx_handle, sql);
835        let job = self
836            .send_and_pull_async("Execute", command, None, Box::new(execute_result_processor))
837            .await?;
838
839        trace!("{} end", FUNCTION_NAME);
840        Ok(job)
841    }
842
843    fn execute_statement_command(transaction_handle: &ProtoTransaction, sql: &str) -> SqlCommand {
844        let request = crate::jogasaki::proto::sql::request::ExecuteStatement {
845            transaction_handle: Some(*transaction_handle),
846            sql: ProstString::from(sql),
847        };
848        SqlCommand::ExecuteStatement(request)
849    }
850
851    /// Executes a SQL statement.
852    ///
853    /// # Examples
854    /// ```
855    /// use tsubakuro_rust_core::prelude::*;
856    ///
857    /// async fn example(client: &SqlClient, transaction: &Transaction, prepared_statement: &SqlPreparedStatement) -> Result<(), TgError> {
858    ///     // prepared_statement: "insert into customer values(:id, :name, :age)"
859    ///     let parameters = vec![
860    ///         SqlParameter::of("id", 4_i64),
861    ///         SqlParameter::of("name", "example"),
862    ///         SqlParameter::of("age", 20),
863    ///     ];
864    ///     let execute_result = client.prepared_execute(&transaction, prepared_statement, parameters).await?;
865    ///     println!("inserted rows={}", execute_result.inserted_rows());
866    ///
867    ///     Ok(())
868    /// }
869    /// ```
870    pub async fn prepared_execute(
871        &self,
872        transaction: &Transaction,
873        prepared_statement: &SqlPreparedStatement,
874        parameters: Vec<SqlParameter>,
875    ) -> Result<SqlExecuteResult, TgError> {
876        let timeout = self.default_timeout;
877        self.prepared_execute_for(transaction, prepared_statement, parameters, timeout)
878            .await
879    }
880
881    /// Executes a SQL statement.
882    pub async fn prepared_execute_for(
883        &self,
884        transaction: &Transaction,
885        prepared_statement: &SqlPreparedStatement,
886        parameters: Vec<SqlParameter>,
887        timeout: Duration,
888    ) -> Result<SqlExecuteResult, TgError> {
889        const FUNCTION_NAME: &str = "prepared_execute()";
890        trace!("{} start", FUNCTION_NAME);
891
892        let tx_handle = transaction.transaction_handle()?;
893        let (parameters, lobs) =
894            convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
895
896        let command =
897            Self::execute_prepared_statement_command(tx_handle, prepared_statement, parameters);
898        let (slot_handle, response) = self.send_and_pull_response(command, lobs, timeout).await?;
899        let execute_result = execute_result_processor(slot_handle, response)?;
900
901        trace!("{} end", FUNCTION_NAME);
902        Ok(execute_result)
903    }
904
905    /// Executes a SQL statement.
906    pub async fn prepared_execute_async(
907        &self,
908        transaction: &Transaction,
909        prepared_statement: &SqlPreparedStatement,
910        parameters: Vec<SqlParameter>,
911    ) -> Result<Job<SqlExecuteResult>, TgError> {
912        const FUNCTION_NAME: &str = "prepared_execute_async()";
913        trace!("{} start", FUNCTION_NAME);
914
915        let tx_handle = transaction.transaction_handle()?;
916        let (parameters, lobs) =
917            convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
918
919        let command =
920            Self::execute_prepared_statement_command(tx_handle, prepared_statement, parameters);
921        let job = self
922            .send_and_pull_async("Execute", command, lobs, Box::new(execute_result_processor))
923            .await?;
924
925        trace!("{} end", FUNCTION_NAME);
926        Ok(job)
927    }
928
929    fn execute_prepared_statement_command(
930        transaction_handle: &ProtoTransaction,
931        prepared_statement: &SqlPreparedStatement,
932        parameters: Vec<SqlParameter>,
933    ) -> SqlCommand {
934        let ps_handle = crate::jogasaki::proto::sql::common::PreparedStatement {
935            handle: prepared_statement.prepare_handle(),
936            has_result_records: prepared_statement.has_result_records(),
937        };
938        let request = crate::jogasaki::proto::sql::request::ExecutePreparedStatement {
939            transaction_handle: Some(*transaction_handle),
940            prepared_statement_handle: Some(ps_handle),
941            parameters,
942        };
943        SqlCommand::ExecutePreparedStatement(request)
944    }
945
946    /// Executes a SQL statement and retrieve its result.
947    ///
948    /// # Examples
949    /// ```
950    /// use tsubakuro_rust_core::prelude::*;
951    ///
952    /// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
953    ///     let sql = "select c_id, c_name, c_age from customer order by c_id";
954    ///     let mut query_result = client.query(&transaction, sql).await?;
955    ///
956    ///     while query_result.next_row().await? {
957    ///         if query_result.next_column().await? {
958    ///             let id: i64 = query_result.fetch().await?;
959    ///         }
960    ///         if query_result.next_column().await? {
961    ///             let name: Option<String> = query_result.fetch().await?;
962    ///         }
963    ///         if query_result.next_column().await? {
964    ///             let age: Option<i32> = query_result.fetch().await?;
965    ///         }
966    ///     }
967    ///
968    ///     query_result.close().await?;
969    ///
970    ///     Ok(())
971    /// }
972    /// ```
973    pub async fn query(
974        &self,
975        transaction: &Transaction,
976        sql: &str,
977    ) -> Result<SqlQueryResult, TgError> {
978        let timeout = self.default_timeout;
979        self.query_for(transaction, sql, timeout).await
980    }
981
982    /// Executes a SQL statement and retrieve its result.
983    pub async fn query_for(
984        &self,
985        transaction: &Transaction,
986        sql: &str,
987        timeout: Duration,
988    ) -> Result<SqlQueryResult, TgError> {
989        const FUNCTION_NAME: &str = "query()";
990        trace!("{} start", FUNCTION_NAME);
991
992        let tx_handle = transaction.transaction_handle()?;
993
994        let command = Self::execute_query_command(tx_handle, sql);
995        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
996
997        let wire = self.wire().clone();
998        let default_timeout = self.default_timeout;
999        let query_result = query_result_processor(wire, slot_handle, response, default_timeout)?;
1000
1001        trace!("{} end", FUNCTION_NAME);
1002        Ok(query_result)
1003    }
1004
1005    /// Executes a SQL statement and retrieve its result.
1006    pub async fn query_async(
1007        &self,
1008        transaction: &Transaction,
1009        sql: &str,
1010    ) -> Result<Job<SqlQueryResult>, TgError> {
1011        const FUNCTION_NAME: &str = "query_async()";
1012        trace!("{} start", FUNCTION_NAME);
1013
1014        let tx_handle = transaction.transaction_handle()?;
1015
1016        let command = Self::execute_query_command(tx_handle, sql);
1017        let wire = self.wire().clone();
1018        let default_timeout = self.default_timeout;
1019        let job = self
1020            .send_and_pull_async(
1021                "Query",
1022                command,
1023                None,
1024                Box::new(move |slot_handle, response| {
1025                    query_result_processor(wire.clone(), slot_handle, response, default_timeout)
1026                }),
1027            )
1028            .await?;
1029
1030        trace!("{} end", FUNCTION_NAME);
1031        Ok(job)
1032    }
1033
1034    fn execute_query_command(transaction_handle: &ProtoTransaction, sql: &str) -> SqlCommand {
1035        let request = crate::jogasaki::proto::sql::request::ExecuteQuery {
1036            transaction_handle: Some(*transaction_handle),
1037            sql: ProstString::from(sql),
1038        };
1039        SqlCommand::ExecuteQuery(request)
1040    }
1041
1042    /// Executes a SQL statement and retrieve its result.
1043    ///
1044    /// # Examples
1045    /// ```
1046    /// use tsubakuro_rust_core::prelude::*;
1047    ///
1048    /// async fn example(client: &SqlClient, transaction: &Transaction, prepared_statement: &SqlPreparedStatement) -> Result<(), TgError> {
1049    ///     // prepared_statement: "select c_id, c_name, c_age from customer where c_id = :id"
1050    ///     let parameters = vec![SqlParameter::of("id", 3_i64)];
1051    ///     let mut query_result = client.prepared_query(&transaction, prepared_statement, parameters).await?;
1052    ///
1053    ///     while query_result.next_row().await? {
1054    ///         if query_result.next_column().await? {
1055    ///             let id: i64 = query_result.fetch().await?;
1056    ///         }
1057    ///         if query_result.next_column().await? {
1058    ///             let name: Option<String> = query_result.fetch().await?;
1059    ///         }
1060    ///         if query_result.next_column().await? {
1061    ///             let age: Option<i32> = query_result.fetch().await?;
1062    ///         }
1063    ///     }
1064    ///
1065    ///     query_result.close().await?;
1066    ///
1067    ///     Ok(())
1068    /// }
1069    /// ```
1070    pub async fn prepared_query(
1071        &self,
1072        transaction: &Transaction,
1073        prepared_statement: &SqlPreparedStatement,
1074        parameters: Vec<SqlParameter>,
1075    ) -> Result<SqlQueryResult, TgError> {
1076        let timeout = self.default_timeout;
1077        self.prepared_query_for(transaction, prepared_statement, parameters, timeout)
1078            .await
1079    }
1080
1081    /// Executes a SQL statement and retrieve its result.
1082    pub async fn prepared_query_for(
1083        &self,
1084        transaction: &Transaction,
1085        prepared_statement: &SqlPreparedStatement,
1086        parameters: Vec<SqlParameter>,
1087        timeout: Duration,
1088    ) -> Result<SqlQueryResult, TgError> {
1089        const FUNCTION_NAME: &str = "prepared_query()";
1090        trace!("{} start", FUNCTION_NAME);
1091
1092        let tx_handle = transaction.transaction_handle()?;
1093        let (parameters, lobs) =
1094            convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
1095
1096        let command =
1097            Self::execute_prepared_query_command(tx_handle, prepared_statement, parameters);
1098        let (slot_handle, response) = self.send_and_pull_response(command, lobs, timeout).await?;
1099
1100        let wire = self.wire().clone();
1101        let default_timeout = self.default_timeout;
1102        let query_result = query_result_processor(wire, slot_handle, response, default_timeout)?;
1103
1104        trace!("{} end", FUNCTION_NAME);
1105        Ok(query_result)
1106    }
1107
1108    /// Executes a SQL statement and retrieve its result.
1109    pub async fn prepared_query_async(
1110        &self,
1111        transaction: &Transaction,
1112        prepared_statement: &SqlPreparedStatement,
1113        parameters: Vec<SqlParameter>,
1114    ) -> Result<Job<SqlQueryResult>, TgError> {
1115        const FUNCTION_NAME: &str = "prepared_query_async()";
1116        trace!("{} start", FUNCTION_NAME);
1117
1118        let tx_handle = transaction.transaction_handle()?;
1119        let (parameters, lobs) =
1120            convert_lob_parameters(parameters, self.session.large_object_path_mapping_on_send())?;
1121
1122        let command =
1123            Self::execute_prepared_query_command(tx_handle, prepared_statement, parameters);
1124        let wire = self.wire().clone();
1125        let default_timeout = self.default_timeout;
1126        let job = self
1127            .send_and_pull_async(
1128                "Query",
1129                command,
1130                lobs,
1131                Box::new(move |slot_handle, response| {
1132                    query_result_processor(wire.clone(), slot_handle, response, default_timeout)
1133                }),
1134            )
1135            .await?;
1136
1137        trace!("{} end", FUNCTION_NAME);
1138        Ok(job)
1139    }
1140
1141    fn execute_prepared_query_command(
1142        transaction_handle: &ProtoTransaction,
1143        prepared_statement: &SqlPreparedStatement,
1144        parameters: Vec<SqlParameter>,
1145    ) -> SqlCommand {
1146        let ps_handle = crate::jogasaki::proto::sql::common::PreparedStatement {
1147            handle: prepared_statement.prepare_handle(),
1148            has_result_records: prepared_statement.has_result_records(),
1149        };
1150        let request = crate::jogasaki::proto::sql::request::ExecutePreparedQuery {
1151            transaction_handle: Some(*transaction_handle),
1152            prepared_statement_handle: Some(ps_handle),
1153            parameters,
1154        };
1155        SqlCommand::ExecutePreparedQuery(request)
1156    }
1157
1158    /// Open BLOB file.
1159    ///
1160    /// # Examples
1161    /// ```
1162    /// use std::io::Read;
1163    /// use tsubakuro_rust_core::prelude::*;
1164    ///
1165    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<Vec<u8>, TgError> {
1166    ///     let blob: TgBlobReference = query_result.fetch().await?;
1167    ///     let mut file = client.open_blob(transaction, &blob).await?;
1168    ///
1169    ///     let mut buffer = Vec::new();
1170    ///     file.read_to_end(&mut buffer).unwrap();
1171    ///
1172    ///     Ok(buffer)
1173    /// }
1174    /// ```
1175    pub async fn open_blob(
1176        &self,
1177        transaction: &Transaction,
1178        blob: &TgBlobReference,
1179    ) -> Result<std::fs::File, TgError> {
1180        let timeout = self.default_timeout;
1181        self.open_blob_for(transaction, blob, timeout).await
1182    }
1183
1184    /// Open BLOB file.
1185    pub async fn open_blob_for(
1186        &self,
1187        transaction: &Transaction,
1188        blob: &TgBlobReference,
1189        timeout: Duration,
1190    ) -> Result<std::fs::File, TgError> {
1191        const FUNCTION_NAME: &str = "open_blob()";
1192        trace!("{} start", FUNCTION_NAME);
1193
1194        let tx_handle = transaction.transaction_handle()?;
1195
1196        let command = Self::open_lob_command(tx_handle, blob);
1197        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1198        let file = lob_open_processor(response, &self.session)?;
1199
1200        trace!("{} end", FUNCTION_NAME);
1201        Ok(file)
1202    }
1203
1204    /// Open BLOB file.
1205    pub async fn open_blob_async(
1206        &self,
1207        transaction: &Transaction,
1208        blob: &TgBlobReference,
1209    ) -> Result<Job<std::fs::File>, TgError> {
1210        const FUNCTION_NAME: &str = "open_blob_async()";
1211        trace!("{} start", FUNCTION_NAME);
1212
1213        let tx_handle = transaction.transaction_handle()?;
1214
1215        let command = Self::open_lob_command(tx_handle, blob);
1216        let session = self.session.clone();
1217        let job = self
1218            .send_and_pull_async(
1219                "File",
1220                command,
1221                None,
1222                Box::new(move |_, response| lob_open_processor(response, &session)),
1223            )
1224            .await?;
1225
1226        trace!("{} end", FUNCTION_NAME);
1227        Ok(job)
1228    }
1229
1230    /// Open CLOB file.
1231    ///
1232    /// # Examples
1233    /// ```
1234    /// use std::io::Read;
1235    /// use tsubakuro_rust_core::prelude::*;
1236    ///
1237    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<String, TgError> {
1238    ///     let clob: TgClobReference = query_result.fetch().await?;
1239    ///     let mut file = client.open_clob(transaction, &clob).await?;
1240    ///
1241    ///     let mut buffer = String::new();
1242    ///     file.read_to_string(&mut buffer).unwrap();
1243    ///
1244    ///     Ok(buffer)
1245    /// }
1246    /// ```
1247    pub async fn open_clob(
1248        &self,
1249        transaction: &Transaction,
1250        clob: &TgClobReference,
1251    ) -> Result<std::fs::File, TgError> {
1252        let timeout = self.default_timeout;
1253        self.open_clob_for(transaction, clob, timeout).await
1254    }
1255
1256    /// Open CLOB file.
1257    pub async fn open_clob_for(
1258        &self,
1259        transaction: &Transaction,
1260        clob: &TgClobReference,
1261        timeout: Duration,
1262    ) -> Result<std::fs::File, TgError> {
1263        const FUNCTION_NAME: &str = "open_clob()";
1264        trace!("{} start", FUNCTION_NAME);
1265
1266        let tx_handle = transaction.transaction_handle()?;
1267
1268        let command = Self::open_lob_command(tx_handle, clob);
1269        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1270        let file = lob_open_processor(response, &self.session)?;
1271
1272        trace!("{} end", FUNCTION_NAME);
1273        Ok(file)
1274    }
1275
1276    /// Open CLOB file.
1277    pub async fn open_clob_async(
1278        &self,
1279        transaction: &Transaction,
1280        clob: &TgClobReference,
1281    ) -> Result<Job<std::fs::File>, TgError> {
1282        const FUNCTION_NAME: &str = "open_clob_async()";
1283        trace!("{} start", FUNCTION_NAME);
1284
1285        let tx_handle = transaction.transaction_handle()?;
1286
1287        let command = Self::open_lob_command(tx_handle, clob);
1288        let session = self.session.clone();
1289        let job = self
1290            .send_and_pull_async(
1291                "File",
1292                command,
1293                None,
1294                Box::new(move |_, response| lob_open_processor(response, &session)),
1295            )
1296            .await?;
1297
1298        trace!("{} end", FUNCTION_NAME);
1299        Ok(job)
1300    }
1301
1302    fn open_lob_command<T: TgLargeObjectReference>(
1303        transaction_handle: &ProtoTransaction,
1304        lob: &T,
1305    ) -> SqlCommand {
1306        let lob = crate::jogasaki::proto::sql::common::LargeObjectReference {
1307            provider: lob.provider().into(),
1308            object_id: lob.object_id(),
1309            contents_opt: None,
1310        };
1311
1312        let request = crate::jogasaki::proto::sql::request::GetLargeObjectData {
1313            transaction_handle: Some(*transaction_handle),
1314            reference: Some(lob),
1315        };
1316        SqlCommand::GetLargeObjectData(request)
1317    }
1318
1319    /// Get BLOB cache.
1320    ///
1321    /// # Examples
1322    /// ```
1323    /// use tsubakuro_rust_core::prelude::*;
1324    ///
1325    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<(), TgError> {
1326    ///     let blob: TgBlobReference = query_result.fetch().await?;
1327    ///     let cache = client.get_blob_cache(transaction, &blob).await?;
1328    ///
1329    ///     println!("BLOB.path={:?}", cache.path());
1330    ///
1331    ///     Ok(())
1332    /// }
1333    /// ```
1334    ///
1335    /// since 0.5.0
1336    pub async fn get_blob_cache(
1337        &self,
1338        transaction: &Transaction,
1339        blob: &TgBlobReference,
1340    ) -> Result<TgLargeObjectCache, TgError> {
1341        let timeout = self.default_timeout;
1342        self.get_blob_cache_for(transaction, blob, timeout).await
1343    }
1344
1345    /// Get BLOB cache.
1346    ///
1347    /// since 0.5.0
1348    pub async fn get_blob_cache_for(
1349        &self,
1350        transaction: &Transaction,
1351        blob: &TgBlobReference,
1352        timeout: Duration,
1353    ) -> Result<TgLargeObjectCache, TgError> {
1354        const FUNCTION_NAME: &str = "get_blob_cache()";
1355        trace!("{} start", FUNCTION_NAME);
1356
1357        let cache = self
1358            .get_large_object_cache(transaction, blob, timeout)
1359            .await?;
1360
1361        trace!("{} end", FUNCTION_NAME);
1362        Ok(cache)
1363    }
1364
1365    /// Get BLOB cache.
1366    ///
1367    /// since 0.5.0
1368    pub async fn get_blob_cache_async(
1369        &self,
1370        transaction: &Transaction,
1371        blob: &TgBlobReference,
1372    ) -> Result<Job<TgLargeObjectCache>, TgError> {
1373        const FUNCTION_NAME: &str = "get_blob_cache_async()";
1374        trace!("{} start", FUNCTION_NAME);
1375
1376        let job = self.get_large_object_cache_async(transaction, blob).await?;
1377
1378        trace!("{} end", FUNCTION_NAME);
1379        Ok(job)
1380    }
1381
1382    /// Get CLOB cache.
1383    ///
1384    /// # Examples
1385    /// ```
1386    /// use tsubakuro_rust_core::prelude::*;
1387    ///
1388    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<(), TgError> {
1389    ///     let clob: TgClobReference = query_result.fetch().await?;
1390    ///     let cache = client.get_clob_cache(transaction, &clob).await?;
1391    ///
1392    ///     println!("CLOB.path={:?}", cache.path());
1393    ///
1394    ///     Ok(())
1395    /// }
1396    /// ```
1397    ///
1398    /// since 0.5.0
1399    pub async fn get_clob_cache(
1400        &self,
1401        transaction: &Transaction,
1402        clob: &TgClobReference,
1403    ) -> Result<TgLargeObjectCache, TgError> {
1404        let timeout = self.default_timeout;
1405        self.get_clob_cache_for(transaction, clob, timeout).await
1406    }
1407
1408    /// Get CLOB cache.
1409    ///
1410    /// since 0.5.0
1411    pub async fn get_clob_cache_for(
1412        &self,
1413        transaction: &Transaction,
1414        clob: &TgClobReference,
1415        timeout: Duration,
1416    ) -> Result<TgLargeObjectCache, TgError> {
1417        const FUNCTION_NAME: &str = "get_clob_cache()";
1418        trace!("{} start", FUNCTION_NAME);
1419
1420        let cache = self
1421            .get_large_object_cache(transaction, clob, timeout)
1422            .await?;
1423
1424        trace!("{} end", FUNCTION_NAME);
1425        Ok(cache)
1426    }
1427
1428    /// Get CLOB cache.
1429    ///
1430    /// since 0.5.0
1431    pub async fn get_clob_cache_async(
1432        &self,
1433        transaction: &Transaction,
1434        clob: &TgClobReference,
1435    ) -> Result<Job<TgLargeObjectCache>, TgError> {
1436        const FUNCTION_NAME: &str = "get_clob_cache_async()";
1437        trace!("{} start", FUNCTION_NAME);
1438
1439        let job = self.get_large_object_cache_async(transaction, clob).await?;
1440
1441        trace!("{} end", FUNCTION_NAME);
1442        Ok(job)
1443    }
1444
1445    async fn get_large_object_cache<T: TgLargeObjectReference>(
1446        &self,
1447        transaction: &Transaction,
1448        lob: &T,
1449        timeout: Duration,
1450    ) -> Result<TgLargeObjectCache, TgError> {
1451        let tx_handle = transaction.transaction_handle()?;
1452        let command = Self::open_lob_command(tx_handle, lob);
1453        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1454        let cache = lob_cache_processor(response, &self.session)?;
1455        Ok(cache)
1456    }
1457
1458    async fn get_large_object_cache_async<T: TgLargeObjectReference>(
1459        &self,
1460        transaction: &Transaction,
1461        lob: &T,
1462    ) -> Result<Job<TgLargeObjectCache>, TgError> {
1463        let tx_handle = transaction.transaction_handle()?;
1464        let command = Self::open_lob_command(tx_handle, lob);
1465        let session = self.session.clone();
1466        let job = self
1467            .send_and_pull_async(
1468                "LargeObjectCache",
1469                command,
1470                None,
1471                Box::new(move |_, response| lob_cache_processor(response, &session)),
1472            )
1473            .await?;
1474        Ok(job)
1475    }
1476
1477    /// Read BLOB.
1478    ///
1479    /// # Examples
1480    /// ```
1481    /// use std::io::Read;
1482    /// use tsubakuro_rust_core::prelude::*;
1483    ///
1484    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<Vec<u8>, TgError> {
1485    ///     let blob: TgBlobReference = query_result.fetch().await?;
1486    ///     let bytes = client.read_blob(transaction, &blob).await?;
1487    ///
1488    ///     Ok(bytes)
1489    /// }
1490    /// ```
1491    ///
1492    /// since 0.2.0
1493    pub async fn read_blob(
1494        &self,
1495        transaction: &Transaction,
1496        blob: &TgBlobReference,
1497    ) -> Result<Vec<u8>, TgError> {
1498        let timeout = self.default_timeout;
1499        self.read_blob_for(transaction, blob, timeout).await
1500    }
1501
1502    /// Read BLOB.
1503    ///
1504    /// since 0.2.0
1505    pub async fn read_blob_for(
1506        &self,
1507        transaction: &Transaction,
1508        blob: &TgBlobReference,
1509        timeout: Duration,
1510    ) -> Result<Vec<u8>, TgError> {
1511        const FUNCTION_NAME: &str = "read_blob()";
1512        trace!("{} start", FUNCTION_NAME);
1513
1514        let tx_handle = transaction.transaction_handle()?;
1515
1516        let command = Self::open_lob_command(tx_handle, blob);
1517        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1518        let buf = Self::blob_read_processor(response, &self.session)?;
1519
1520        trace!("{} end", FUNCTION_NAME);
1521        Ok(buf)
1522    }
1523
1524    /// Read BLOB.
1525    ///
1526    /// since 0.2.0
1527    pub async fn read_blob_async(
1528        &self,
1529        transaction: &Transaction,
1530        blob: &TgBlobReference,
1531    ) -> Result<Job<Vec<u8>>, TgError> {
1532        const FUNCTION_NAME: &str = "read_blob_async()";
1533        trace!("{} start", FUNCTION_NAME);
1534
1535        let tx_handle = transaction.transaction_handle()?;
1536
1537        let command = Self::open_lob_command(tx_handle, blob);
1538        let session = self.session.clone();
1539        let job = self
1540            .send_and_pull_async(
1541                "BLOB",
1542                command,
1543                None,
1544                Box::new(move |_, response| Self::blob_read_processor(response, &session)),
1545            )
1546            .await?;
1547
1548        trace!("{} end", FUNCTION_NAME);
1549        Ok(job)
1550    }
1551
1552    fn blob_read_processor(
1553        response: WireResponse,
1554        session: &Arc<Session>,
1555    ) -> Result<Vec<u8>, TgError> {
1556        let mut file = lob_open_processor(response, session)?;
1557        let mut buf = Vec::new();
1558        file.read_to_end(&mut buf)
1559            .map_err(|e| io_error!("BLOB read error", e))?;
1560
1561        Ok(buf)
1562    }
1563
1564    /// Read CLOB.
1565    ///
1566    /// # Examples
1567    /// ```
1568    /// use std::io::Read;
1569    /// use tsubakuro_rust_core::prelude::*;
1570    ///
1571    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<String, TgError> {
1572    ///     let clob: TgClobReference = query_result.fetch().await?;
1573    ///     let text = client.read_clob(transaction, &clob).await?;
1574    ///
1575    ///     Ok(text)
1576    /// }
1577    /// ```
1578    ///
1579    /// since 0.2.0
1580    pub async fn read_clob(
1581        &self,
1582        transaction: &Transaction,
1583        clob: &TgClobReference,
1584    ) -> Result<String, TgError> {
1585        let timeout = self.default_timeout;
1586        self.read_clob_for(transaction, clob, timeout).await
1587    }
1588
1589    /// Read CLOB.
1590    ///
1591    /// since 0.2.0
1592    pub async fn read_clob_for(
1593        &self,
1594        transaction: &Transaction,
1595        clob: &TgClobReference,
1596        timeout: Duration,
1597    ) -> Result<String, TgError> {
1598        const FUNCTION_NAME: &str = "read_clob()";
1599        trace!("{} start", FUNCTION_NAME);
1600
1601        let tx_handle = transaction.transaction_handle()?;
1602
1603        let command = Self::open_lob_command(tx_handle, clob);
1604        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1605        let buf = Self::clob_read_processor(response, &self.session)?;
1606
1607        trace!("{} end", FUNCTION_NAME);
1608        Ok(buf)
1609    }
1610
1611    /// Read CLOB.
1612    ///
1613    /// since 0.2.0
1614    pub async fn read_clob_async(
1615        &self,
1616        transaction: &Transaction,
1617        clob: &TgClobReference,
1618    ) -> Result<Job<String>, TgError> {
1619        const FUNCTION_NAME: &str = "read_clob_async()";
1620        trace!("{} start", FUNCTION_NAME);
1621
1622        let tx_handle = transaction.transaction_handle()?;
1623
1624        let command = Self::open_lob_command(tx_handle, clob);
1625        let session = self.session.clone();
1626        let job = self
1627            .send_and_pull_async(
1628                "CLOB",
1629                command,
1630                None,
1631                Box::new(move |_, response| Self::clob_read_processor(response, &session)),
1632            )
1633            .await?;
1634
1635        trace!("{} end", FUNCTION_NAME);
1636        Ok(job)
1637    }
1638
1639    fn clob_read_processor(
1640        response: WireResponse,
1641        session: &Arc<Session>,
1642    ) -> Result<String, TgError> {
1643        let mut file = lob_open_processor(response, session)?;
1644        let mut buf = String::new();
1645        file.read_to_string(&mut buf)
1646            .map_err(|e| io_error!("CLOB read error", e))?;
1647
1648        Ok(buf)
1649    }
1650
1651    /// Copy BLOB to local file.
1652    ///
1653    /// # Examples
1654    /// ```
1655    /// use tsubakuro_rust_core::prelude::*;
1656    ///
1657    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<(), TgError> {
1658    ///     let blob: TgBlobReference = query_result.fetch().await?;
1659    ///     client.copy_blob_to(transaction, &blob, "/path/to/blob.bin").await?;
1660    ///
1661    ///     Ok(())
1662    /// }
1663    /// ```
1664    pub async fn copy_blob_to<T: AsRef<Path>>(
1665        &self,
1666        transaction: &Transaction,
1667        blob: &TgBlobReference,
1668        destination: T,
1669    ) -> Result<(), TgError> {
1670        let timeout = self.default_timeout;
1671        self.copy_blob_to_for(transaction, blob, destination, timeout)
1672            .await
1673    }
1674
1675    /// Copy BLOB to local file.
1676    pub async fn copy_blob_to_for<T: AsRef<Path>>(
1677        &self,
1678        transaction: &Transaction,
1679        blob: &TgBlobReference,
1680        destination: T,
1681        timeout: Duration,
1682    ) -> Result<(), TgError> {
1683        const FUNCTION_NAME: &str = "copy_blob_to()";
1684        trace!("{} start", FUNCTION_NAME);
1685
1686        let tx_handle = transaction.transaction_handle()?;
1687
1688        let command = Self::copy_lob_to_command(tx_handle, blob);
1689        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1690        lob_copy_to_processor(response, &self.session, destination)?;
1691
1692        trace!("{} end", FUNCTION_NAME);
1693        Ok(())
1694    }
1695
1696    /// Copy BLOB to local file.
1697    pub async fn copy_blob_to_async<T: AsRef<Path> + Send + Clone + 'static>(
1698        &self,
1699        transaction: &Transaction,
1700        blob: &TgBlobReference,
1701        destination: T,
1702    ) -> Result<Job<()>, TgError> {
1703        const FUNCTION_NAME: &str = "copy_blob_to_async()";
1704        trace!("{} start", FUNCTION_NAME);
1705
1706        let tx_handle = transaction.transaction_handle()?;
1707
1708        let command = Self::copy_lob_to_command(tx_handle, blob);
1709        let session = self.session.clone();
1710        let job = self
1711            .send_and_pull_async(
1712                "BlobCopy",
1713                command,
1714                None,
1715                Box::new(move |_, response| {
1716                    lob_copy_to_processor(response, &session, destination.clone())
1717                }),
1718            )
1719            .await?;
1720
1721        trace!("{} end", FUNCTION_NAME);
1722        Ok(job)
1723    }
1724
1725    /// Copy CLOB to local file.
1726    ///
1727    /// # Examples
1728    /// ```
1729    /// use tsubakuro_rust_core::prelude::*;
1730    ///
1731    /// async fn example(client: &SqlClient, transaction: &Transaction, query_result: &mut SqlQueryResult) -> Result<(), TgError> {
1732    ///     let clob: TgClobReference = query_result.fetch().await?;
1733    ///     client.copy_clob_to(transaction, &clob, "/path/to/clob.txt").await?;
1734    ///
1735    ///     Ok(())
1736    /// }
1737    /// ```
1738    pub async fn copy_clob_to<T: AsRef<Path>>(
1739        &self,
1740        transaction: &Transaction,
1741        clob: &TgClobReference,
1742        destination: T,
1743    ) -> Result<(), TgError> {
1744        let timeout = self.default_timeout;
1745        self.copy_clob_to_for(transaction, clob, destination, timeout)
1746            .await
1747    }
1748
1749    /// Copy CLOB to local file.
1750    pub async fn copy_clob_to_for<T: AsRef<Path>>(
1751        &self,
1752        transaction: &Transaction,
1753        clob: &TgClobReference,
1754        destination: T,
1755        timeout: Duration,
1756    ) -> Result<(), TgError> {
1757        const FUNCTION_NAME: &str = "copy_clob_to()";
1758        trace!("{} start", FUNCTION_NAME);
1759
1760        let tx_handle = transaction.transaction_handle()?;
1761
1762        let command = Self::copy_lob_to_command(tx_handle, clob);
1763        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1764        lob_copy_to_processor(response, &self.session, destination)?;
1765
1766        trace!("{} end", FUNCTION_NAME);
1767        Ok(())
1768    }
1769
1770    /// Copy CLOB to local file.
1771    pub async fn copy_clob_to_async<T: AsRef<Path> + Send + Clone + 'static>(
1772        &self,
1773        transaction: &Transaction,
1774        clob: &TgClobReference,
1775        destination: T,
1776    ) -> Result<Job<()>, TgError> {
1777        const FUNCTION_NAME: &str = "copy_clob_to_async()";
1778        trace!("{} start", FUNCTION_NAME);
1779
1780        let tx_handle = transaction.transaction_handle()?;
1781
1782        let command = Self::copy_lob_to_command(tx_handle, clob);
1783        let session = self.session.clone();
1784        let job = self
1785            .send_and_pull_async(
1786                "ClobCopy",
1787                command,
1788                None,
1789                Box::new(move |_, response| {
1790                    lob_copy_to_processor(response, &session, destination.clone())
1791                }),
1792            )
1793            .await?;
1794
1795        trace!("{} end", FUNCTION_NAME);
1796        Ok(job)
1797    }
1798
1799    fn copy_lob_to_command<T: TgLargeObjectReference>(
1800        transaction_handle: &ProtoTransaction,
1801        clob: &T,
1802    ) -> SqlCommand {
1803        let lob = crate::jogasaki::proto::sql::common::LargeObjectReference {
1804            provider: clob.provider().into(),
1805            object_id: clob.object_id(),
1806            contents_opt: None,
1807        };
1808
1809        let request = crate::jogasaki::proto::sql::request::GetLargeObjectData {
1810            transaction_handle: Some(*transaction_handle),
1811            reference: Some(lob),
1812        };
1813        SqlCommand::GetLargeObjectData(request)
1814    }
1815
1816    /// Request commit to the SQL service.
1817    ///
1818    /// # Examples
1819    /// ```
1820    /// use tsubakuro_rust_core::prelude::*;
1821    ///
1822    /// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
1823    ///     let commit_option = CommitOption::default();
1824    ///     client.commit(transaction, &commit_option).await?;
1825    ///
1826    ///     Ok(())
1827    /// }
1828    /// ```
1829    pub async fn commit(
1830        &self,
1831        transaction: &Transaction,
1832        commit_option: &CommitOption,
1833    ) -> Result<(), TgError> {
1834        let timeout = self.default_timeout;
1835        self.commit_for(transaction, commit_option, timeout).await
1836    }
1837
1838    /// Request commit to the SQL service.
1839    pub async fn commit_for(
1840        &self,
1841        transaction: &Transaction,
1842        commit_option: &CommitOption,
1843        timeout: Duration,
1844    ) -> Result<(), TgError> {
1845        const FUNCTION_NAME: &str = "commit()";
1846        trace!("{} start", FUNCTION_NAME);
1847
1848        let tx_handle = transaction.transaction_handle()?;
1849
1850        let command = Self::commit_command(tx_handle, commit_option);
1851        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
1852        transaction_commit_processor(slot_handle, response)?;
1853
1854        trace!("{} end", FUNCTION_NAME);
1855        Ok(())
1856    }
1857
1858    /// Request commit to the SQL service.
1859    pub async fn commit_async(
1860        &self,
1861        transaction: &Transaction,
1862        commit_option: &CommitOption,
1863    ) -> Result<Job<()>, TgError> {
1864        const FUNCTION_NAME: &str = "commit_async()";
1865        trace!("{} start", FUNCTION_NAME);
1866
1867        let tx_handle = transaction.transaction_handle()?;
1868
1869        let command = Self::commit_command(tx_handle, commit_option);
1870        let job = self
1871            .send_and_pull_async(
1872                "Commit",
1873                command,
1874                None,
1875                Box::new(transaction_commit_processor),
1876            )
1877            .await?;
1878
1879        trace!("{} end", FUNCTION_NAME);
1880        Ok(job)
1881    }
1882
1883    fn commit_command(
1884        transaction_handle: &ProtoTransaction,
1885        commit_option: &CommitOption,
1886    ) -> SqlCommand {
1887        let request = crate::jogasaki::proto::sql::request::Commit {
1888            transaction_handle: Some(*transaction_handle),
1889            notification_type: commit_option.notification_type,
1890            auto_dispose: commit_option.auto_dispose,
1891            option: Some(*commit_option),
1892        };
1893        SqlCommand::Commit(request)
1894    }
1895
1896    /// Request rollback to the SQL service.
1897    ///
1898    /// # Examples
1899    /// ```
1900    /// use tsubakuro_rust_core::prelude::*;
1901    ///
1902    /// async fn example(client: &SqlClient, transaction: &Transaction) -> Result<(), TgError> {
1903    ///     client.rollback(transaction).await?;
1904    ///
1905    ///     Ok(())
1906    /// }
1907    /// ```
1908    pub async fn rollback(&self, transaction: &Transaction) -> Result<(), TgError> {
1909        let timeout = self.default_timeout;
1910        self.rollback_for(transaction, timeout).await
1911    }
1912
1913    /// Request rollback to the SQL service.
1914    pub async fn rollback_for(
1915        &self,
1916        transaction: &Transaction,
1917        timeout: Duration,
1918    ) -> Result<(), TgError> {
1919        const FUNCTION_NAME: &str = "rollback()";
1920        trace!("{} start", FUNCTION_NAME);
1921
1922        let tx_handle = transaction.transaction_handle()?;
1923
1924        let command = Self::rollback_command(tx_handle);
1925        let (slot_handle, response) = self.send_and_pull_response(command, None, timeout).await?;
1926        transaction_rollback_processor(slot_handle, response)?;
1927
1928        trace!("{} end", FUNCTION_NAME);
1929        Ok(())
1930    }
1931
1932    /// Request rollback to the SQL service.
1933    pub async fn rollback_async(&self, transaction: &Transaction) -> Result<Job<()>, TgError> {
1934        const FUNCTION_NAME: &str = "rollback_async()";
1935        trace!("{} start", FUNCTION_NAME);
1936
1937        let tx_handle = transaction.transaction_handle()?;
1938
1939        let command = Self::rollback_command(tx_handle);
1940        let job = self
1941            .send_and_pull_async(
1942                "Rollback",
1943                command,
1944                None,
1945                Box::new(transaction_rollback_processor),
1946            )
1947            .await?;
1948
1949        trace!("{} end", FUNCTION_NAME);
1950        Ok(job)
1951    }
1952
1953    fn rollback_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
1954        let request = crate::jogasaki::proto::sql::request::Rollback {
1955            transaction_handle: Some(*transaction_handle),
1956        };
1957        SqlCommand::Rollback(request)
1958    }
1959
1960    pub(crate) async fn dispose_transaction(
1961        &self,
1962        transaction_handle: &ProtoTransaction,
1963        timeout: Duration,
1964    ) -> Result<(), TgError> {
1965        const FUNCTION_NAME: &str = "dispose_transaction()";
1966        trace!("{} start", FUNCTION_NAME);
1967
1968        let command = Self::dispose_transaction_command(transaction_handle);
1969        let (_, response) = self.send_and_pull_response(command, None, timeout).await?;
1970        transaction_dispose_processor(response)?;
1971
1972        trace!("{} end", FUNCTION_NAME);
1973        Ok(())
1974    }
1975
1976    pub(crate) async fn dispose_transaction_send_only(
1977        &self,
1978        transaction_handle: &ProtoTransaction,
1979    ) -> Result<(), TgError> {
1980        const FUNCTION_NAME: &str = "dispose_transaction()";
1981        trace!("{} start", FUNCTION_NAME);
1982
1983        let command = Self::dispose_transaction_command(transaction_handle);
1984        let _ = self.send_only(command).await?;
1985
1986        trace!("{} end", FUNCTION_NAME);
1987        Ok(())
1988    }
1989
1990    fn dispose_transaction_command(transaction_handle: &ProtoTransaction) -> SqlCommand {
1991        let request = crate::jogasaki::proto::sql::request::DisposeTransaction {
1992            transaction_handle: Some(*transaction_handle),
1993        };
1994        SqlCommand::DisposeTransaction(request)
1995    }
1996}
1997
1998impl SqlClient {
1999    fn wire(&self) -> Arc<Wire> {
2000        self.session.wire()
2001    }
2002
2003    async fn send_only(&self, command: SqlCommand) -> Result<Arc<SlotEntryHandle>, TgError> {
2004        let request = Self::new_request(command);
2005        self.wire().send_only(SERVICE_ID_SQL, request, None).await
2006    }
2007
2008    async fn send_and_pull_response(
2009        &self,
2010        command: SqlCommand,
2011        lobs: Option<Vec<BlobInfo>>,
2012        timeout: Duration,
2013    ) -> Result<(Arc<SlotEntryHandle>, WireResponse), TgError> {
2014        let request = Self::new_request(command);
2015        self.wire()
2016            .send_and_pull_response(SERVICE_ID_SQL, request, lobs, timeout)
2017            .await
2018    }
2019
2020    async fn send_and_pull_async<T: 'static>(
2021        &self,
2022        job_name: &str,
2023        command: SqlCommand,
2024        lobs: Option<Vec<BlobInfo>>,
2025        converter: Box<dyn Fn(Arc<SlotEntryHandle>, WireResponse) -> Result<T, TgError> + Send>,
2026    ) -> Result<Job<T>, TgError> {
2027        let request = Self::new_request(command);
2028        self.wire()
2029            .send_and_pull_async(
2030                job_name,
2031                SERVICE_ID_SQL,
2032                request,
2033                lobs,
2034                converter,
2035                self.default_timeout,
2036                self.session.fail_on_drop_error(),
2037            )
2038            .await
2039    }
2040
2041    fn new_request(command: SqlCommand) -> SqlRequest {
2042        SqlRequest {
2043            session_handle: None,
2044            service_message_version_major: SERVICE_MESSAGE_VERSION_MAJOR,
2045            service_message_version_minor: SERVICE_MESSAGE_VERSION_MINOR,
2046            request: Some(command),
2047        }
2048    }
2049}
2050
2051#[allow(clippy::type_complexity)]
2052pub(crate) fn convert_sql_response(
2053    function_name: &str,
2054    response: &WireResponse,
2055) -> Result<(Option<SqlResponse>, Option<HashMap<String, BlobInfo>>), TgError> {
2056    match response {
2057        WireResponse::ResponseSessionPayload(_slot, payload, lobs, error) => {
2058            if let Some(e) = error {
2059                return Err(e.to_tg_error());
2060            }
2061            if payload.is_none() {
2062                return Err(invalid_response_error!(function_name, "payload is None"));
2063            }
2064            // let payload = payload.as_deref().unwrap();
2065            let payload = &payload.as_ref().unwrap()[..];
2066            let sql_response = SqlResponse::decode_length_delimited(payload)
2067                .map_err(|e| prost_decode_error!(function_name, "SqlResponse", e))?;
2068            match &sql_response.response {
2069                Some(SqlResponseType::ResultOnly(result_only)) => match &result_only.result {
2070                    Some(crate::jogasaki::proto::sql::response::result_only::Result::Success(
2071                        _,
2072                    )) => Ok((Some(sql_response), lobs.clone())),
2073                    Some(crate::jogasaki::proto::sql::response::result_only::Result::Error(
2074                        error,
2075                    )) => {
2076                        let error = error.clone();
2077                        Err(sql_service_error!(function_name, error))
2078                    }
2079                    _ => Ok((Some(sql_response), lobs.clone())),
2080                },
2081                _ => Ok((Some(sql_response), lobs.clone())),
2082            }
2083        }
2084        _ => Ok((None, None)),
2085    }
2086}
2087
2088pub(crate) fn sql_result_only_success_processor(
2089    function_name: &str,
2090    response: WireResponse,
2091) -> Result<(), TgError> {
2092    let (sql_response, _) = convert_sql_response(function_name, &response)?;
2093    let message = sql_response.ok_or(invalid_response_error!(
2094        function_name,
2095        format!("response {:?} is not ResponseSessionPayload", response),
2096    ))?;
2097    match message.response {
2098        Some(SqlResponseType::ResultOnly(result_only)) => match result_only.result {
2099            Some(crate::jogasaki::proto::sql::response::result_only::Result::Success(_)) => Ok(()),
2100            _ => Err(invalid_response_error!(
2101                function_name,
2102                format!("fail. {:?}", result_only),
2103            )),
2104        },
2105        _ => Err(invalid_response_error!(
2106            function_name,
2107            format!("response {:?} is not ResultOnly", message.response),
2108        )),
2109    }
2110}
2111
2112#[cfg(test)]
2113mod test {
2114    use super::*;
2115
2116    #[test]
2117    fn service_message_version() {
2118        let smv = SqlClient::service_message_version();
2119        assert_eq!("sql-1.6", smv);
2120    }
2121}