use crate::client::TimeoutSettings;
use crate::errors::*;
use crate::session::Session;
use crate::session_pool::SessionPool;
use crate::transaction::{AutoCommit, Mode, SerializableReadWriteTx, Transaction};
use crate::types::Value;
use crate::grpc_connection_manager::GrpcConnectionManager;
use crate::grpc_wrapper::runtime_interceptors::InterceptedChannel;
use crate::table_service_types::{CopyTableItem, TableDescription};
use crate::{Query, StreamResult};
use num::pow;
use std::future::Future;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::time::sleep;
use tracing::{instrument, trace};
use ydb_grpc::ydb_proto::table::v1::table_service_client::TableServiceClient;
const DEFAULT_RETRY_TIMEOUT: Duration = Duration::from_secs(5);
const INITIAL_RETRY_BACKOFF_MILLISECONDS: u64 = 1;
pub(crate) type TableServiceClientType = TableServiceClient<InterceptedChannel>;
type TransactionArgType = Box<dyn Transaction>;
#[derive(Clone)]
pub struct TransactionOptions {
mode: Mode,
autocommit: bool, }
impl TransactionOptions {
pub fn new() -> Self {
Self {
mode: Mode::SerializableReadWrite,
autocommit: false,
}
}
pub fn with_mode(mut self, mode: Mode) -> Self {
self.mode = mode;
self
}
pub fn with_autocommit(mut self, autocommit: bool) -> Self {
self.autocommit = autocommit;
self
}
}
impl Default for TransactionOptions {
fn default() -> Self {
Self::new()
}
}
pub struct RetryOptions {
idempotent_operation: bool,
retrier: Option<Arc<Box<dyn Retry>>>,
}
impl RetryOptions {
pub fn new() -> Self {
Self {
idempotent_operation: false,
retrier: None,
}
}
#[allow(dead_code)]
pub(crate) fn with_idempotent(mut self, idempotent: bool) -> Self {
self.idempotent_operation = idempotent;
self
}
#[allow(dead_code)]
pub(crate) fn with_timeout(mut self, timeout: Duration) -> Self {
self.retrier = Some(Arc::new(Box::new(TimeoutRetrier { timeout })));
self
}
}
impl Default for RetryOptions {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct TableClient {
error_on_truncate: bool,
session_pool: SessionPool,
retrier: Arc<Box<dyn Retry>>,
transaction_options: TransactionOptions,
idempotent_operation: bool,
timeouts: TimeoutSettings,
}
impl TableClient {
pub(crate) fn new(
connection_manager: GrpcConnectionManager,
timeouts: TimeoutSettings,
) -> Self {
Self {
error_on_truncate: false,
session_pool: SessionPool::new(Box::new(connection_manager), timeouts),
retrier: Arc::new(Box::<TimeoutRetrier>::default()),
transaction_options: TransactionOptions::new(),
idempotent_operation: false,
timeouts,
}
}
#[allow(dead_code)]
pub(crate) fn with_max_active_sessions(mut self, size: usize) -> Self {
self.session_pool = self.session_pool.with_max_active_sessions(size);
self
}
pub fn clone_with_timeouts(&self, timeouts: TimeoutSettings) -> Self {
Self {
timeouts,
..self.clone()
}
}
#[allow(dead_code)]
pub fn clone_with_retry_timeout(&self, timeout: Duration) -> Self {
Self {
retrier: Arc::new(Box::new(TimeoutRetrier { timeout })),
..self.clone()
}
}
#[allow(dead_code)]
pub fn clone_with_no_retry(&self) -> Self {
Self {
retrier: Arc::new(Box::new(NoRetrier {})),
..self.clone()
}
}
#[allow(dead_code)]
pub fn clone_with_idempotent_operations(&self, idempotent: bool) -> Self {
Self {
idempotent_operation: idempotent,
..self.clone()
}
}
pub fn clone_with_transaction_options(&self, opts: TransactionOptions) -> Self {
Self {
transaction_options: opts,
..self.clone()
}
}
pub(crate) fn create_autocommit_transaction(&self, mode: Mode) -> impl Transaction {
AutoCommit::new(self.session_pool.clone(), mode, self.timeouts)
.with_error_on_truncate(self.error_on_truncate)
}
pub(crate) fn create_interactive_transaction(&self) -> impl Transaction {
SerializableReadWriteTx::new(self.session_pool.clone(), self.timeouts)
.with_error_on_truncate(self.error_on_truncate)
}
#[allow(dead_code)]
pub(crate) async fn create_session(&self) -> YdbResult<Session> {
Ok(self
.session_pool
.session()
.await?
.with_timeouts(self.timeouts))
}
async fn retry<CallbackFuture, CallbackResult>(
&self,
callback: impl Fn() -> CallbackFuture,
) -> YdbResult<CallbackResult>
where
CallbackFuture: Future<Output = YdbResult<CallbackResult>>,
{
let mut attempt: usize = 0;
let start = Instant::now();
loop {
attempt += 1;
let last_err = match callback().await {
Ok(res) => return Ok(res),
Err(err) => match (err.need_retry(), self.idempotent_operation) {
(NeedRetry::True, _) => err,
(NeedRetry::IdempotentOnly, true) => err,
_ => return Err(err),
},
};
let now = std::time::Instant::now();
let retry_decision = self.retrier.wait_duration(RetryParams {
attempt,
time_from_start: now.duration_since(start),
});
if !retry_decision.allow_retry {
return Err(last_err);
}
tokio::time::sleep(retry_decision.wait_timeout).await;
}
}
pub async fn retry_execute_scan_query(&self, query: Query) -> YdbResult<StreamResult> {
self.retry(|| async {
let mut session = self.create_session().await?;
session.execute_scan_query(query.clone()).await
})
.await
}
pub async fn retry_execute_scheme_query<T: Into<String>>(&self, query: T) -> YdbResult<()> {
let query = query.into();
self.retry(|| async {
let mut session = self.create_session().await?;
session.execute_schema_query(query.clone()).await
})
.await
}
pub async fn retry_explain_data_query<T: Into<String>>(
&self,
query: T,
collect_full_diagnostics: bool,
) -> YdbResult<crate::result::ExplainResult> {
let query = query.into();
self.retry(|| async {
let mut session = self.create_session().await?;
session
.explain_data_query(query.clone(), collect_full_diagnostics)
.await
})
.await
}
pub async fn retry_execute_bulk_upsert(
&self,
table_path: String,
rows: Vec<Value>,
) -> YdbResult<()> {
if rows.is_empty() {
return Ok(());
}
let examle_value = rows[0].clone();
if !matches!(&examle_value, Value::Struct(_)) {
return Err(YdbError::Custom(
"expected ValueStruct type for items".to_string(),
));
}
let value = Value::list_from(examle_value, rows)?;
self.retry(|| async {
let mut session = self.create_session().await?;
session
.execute_bulk_upsert(table_path.clone(), value.clone())
.await
})
.await
}
#[instrument(level = "trace", skip_all, err)]
pub async fn retry_transaction<CallbackFuture, CallbackResult>(
&self,
callback: impl Fn(TransactionArgType) -> CallbackFuture,
) -> YdbResultWithCustomerErr<CallbackResult>
where
CallbackFuture: Future<Output = YdbResultWithCustomerErr<CallbackResult>>,
{
let mut attempts: usize = 0;
let start = Instant::now();
loop {
attempts += 1;
trace!("attempt: {}", attempts);
let transaction: Box<dyn Transaction> = if self.transaction_options.autocommit {
Box::new(self.create_autocommit_transaction(self.transaction_options.mode))
} else {
if self.transaction_options.mode != Mode::SerializableReadWrite {
return Err(YdbOrCustomerError::YDB(YdbError::Custom(
"only serializable rw transactions allow to interactive mode".into(),
)));
}
Box::new(self.create_interactive_transaction())
};
let res = callback(transaction).await;
let err = if let Err(err) = res {
err
} else {
match &res {
Ok(_) => trace!("return successfully after '{}' attempts", attempts),
Err(err) => trace!(
"return with customer error after '{}' attempts: {:?}",
attempts,
err
),
};
return res;
};
if !Self::check_retry_error(self.idempotent_operation, &err) {
return Err(err);
}
let now = Instant::now();
let loop_decision = self.retrier.wait_duration(RetryParams {
attempt: attempts,
time_from_start: now.duration_since(start),
});
if loop_decision.allow_retry {
sleep(loop_decision.wait_timeout).await;
} else {
trace!(
"return with ydb error after '{}' attempts by retry decision: {}",
attempts,
err
);
return Err(err);
};
}
}
#[allow(dead_code)]
pub(crate) async fn retry_with_session<CallbackFuture, CallbackResult>(
&self,
opts: RetryOptions,
callback: impl Fn(Session) -> CallbackFuture,
) -> YdbResultWithCustomerErr<CallbackResult>
where
CallbackFuture: Future<Output = YdbResultWithCustomerErr<CallbackResult>>,
{
let retrier = opts.retrier.unwrap_or_else(|| self.retrier.clone());
let mut attempts: usize = 0;
let start = Instant::now();
loop {
let session = self.create_session().await?;
let res = callback(session).await;
let err = if let Err(err) = res {
err
} else {
return res;
};
if !Self::check_retry_error(opts.idempotent_operation, &err) {
return Err(err);
}
let now = Instant::now();
attempts += 1;
let loop_decision = retrier.wait_duration(RetryParams {
attempt: attempts,
time_from_start: now.duration_since(start),
});
if loop_decision.allow_retry {
sleep(loop_decision.wait_timeout).await;
} else {
return Err(err);
};
}
}
pub fn with_error_on_truncate(mut self, error_on_truncate: bool) -> Self {
self.error_on_truncate = error_on_truncate;
self
}
#[instrument(level = "trace", ret)]
fn check_retry_error(is_idempotent_operation: bool, err: &YdbOrCustomerError) -> bool {
let ydb_err = match &err {
YdbOrCustomerError::Customer(_) => return false,
YdbOrCustomerError::YDB(err) => err,
};
match ydb_err.need_retry() {
NeedRetry::True => true,
NeedRetry::IdempotentOnly => is_idempotent_operation,
NeedRetry::False => false,
}
}
pub async fn copy_table(&self, source_path: String, destination_path: String) -> YdbResult<()> {
self.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.copy_table(source_path.clone(), destination_path.clone())
.await?;
Ok(())
})
.await
.map_err(YdbOrCustomerError::to_ydb_error)
}
pub async fn copy_tables(&self, tables: Vec<CopyTableItem>) -> YdbResult<()> {
self.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session.copy_tables(tables.to_vec()).await?;
Ok(())
})
.await
.map_err(YdbOrCustomerError::to_ydb_error)
}
pub async fn describe_table(&self, path: String) -> YdbResult<TableDescription> {
self.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session;
let result = session.describe_table(path.clone()).await?;
Ok(result)
})
.await
.map_err(YdbOrCustomerError::to_ydb_error)
}
}
#[derive(Debug)]
struct RetryParams {
pub(crate) attempt: usize,
pub(crate) time_from_start: Duration,
}
#[derive(Default, Debug)]
struct RetryDecision {
pub(crate) allow_retry: bool,
pub(crate) wait_timeout: Duration,
}
trait Retry: Send + Sync {
fn wait_duration(&self, params: RetryParams) -> RetryDecision;
}
#[derive(Debug)]
struct TimeoutRetrier {
timeout: Duration,
}
impl Default for TimeoutRetrier {
fn default() -> Self {
Self {
timeout: DEFAULT_RETRY_TIMEOUT,
}
}
}
impl Retry for TimeoutRetrier {
#[instrument(ret)]
fn wait_duration(&self, params: RetryParams) -> RetryDecision {
let mut res = RetryDecision::default();
if params.time_from_start < self.timeout {
if params.attempt > 0 {
res.wait_timeout =
Duration::from_millis(pow(INITIAL_RETRY_BACKOFF_MILLISECONDS, params.attempt));
}
res.allow_retry = (params.time_from_start + res.wait_timeout) < self.timeout;
};
res
}
}
struct NoRetrier {}
impl Retry for NoRetrier {
#[instrument(skip_all)]
fn wait_duration(&self, _: RetryParams) -> RetryDecision {
RetryDecision {
allow_retry: false,
wait_timeout: Duration::default(),
}
}
}