use itertools::Itertools;
use std::collections::HashMap;
use std::iter::FromIterator;
use std::sync::Arc;
use std::time;
use std::time::UNIX_EPOCH;
use tokio::sync::Mutex as AsyncMutex;
use rand::distributions::{Alphanumeric, DistString};
use tonic::{Code, Status};
use tracing::trace;
use tracing_test::traced_test;
use crate::client_table::RetryOptions;
use crate::errors::{YdbError, YdbOrCustomerError, YdbResult};
use crate::query::Query;
use crate::table_service_types::{CopyTableItem, IndexType, StoreType};
use crate::test_integration_helper::create_client;
use crate::transaction::Mode;
use crate::transaction::Transaction;
use crate::types::{Value, ValueList, ValueStruct};
use crate::{ydb_params, ydb_struct, Bytes, TableClient};
#[tokio::test]
#[traced_test]
#[ignore] async fn create_session() -> YdbResult<()> {
let res = create_client()
.await?
.table_client()
.create_session()
.await?;
trace!("session: {:?}", res);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn explain_data_query() -> YdbResult<()> {
let client = create_client().await?;
let table_client = client.table_client();
let result = table_client
.retry_explain_data_query("SELECT MIN(NodeId) FROM `.sys/nodes`", false)
.await?;
assert!(
!result.query_ast.is_empty(),
"Query AST should not be empty"
);
assert!(
!result.query_plan.is_empty(),
"Query Plan should not be empty"
);
assert!(
result.query_full_diagnostics.is_empty(),
"Full diagnostics should be empty when not enabled"
);
let result_with_diagnostics = table_client
.retry_explain_data_query("SELECT MIN(NodeId) FROM `.sys/nodes`", true)
.await?;
assert!(
!result_with_diagnostics.query_ast.is_empty(),
"Query AST should not be empty"
);
assert!(
!result_with_diagnostics.query_plan.is_empty(),
"Query Plan should not be empty"
);
assert!(
!result_with_diagnostics.query_full_diagnostics.is_empty(),
"Full diagnostics should not be empty when enabled"
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn execute_data_query() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction.query("SELECT 1+1".into()).await?;
trace!("result: {:?}", &res);
assert_eq!(
Value::Int32(2),
res.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field(0)
.unwrap()
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn execute_data_query_field_name() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction.query("SELECT 1+1 as s".into()).await?;
trace!("result: {:?}", &res);
assert_eq!(
Value::Int32(2),
res.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field_by_name("s")
.unwrap()
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn execute_data_query_params() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let mut params = HashMap::new();
params.insert("$v".to_string(), Value::Int32(3));
let res = transaction
.query(
Query::new(
"
DECLARE $v AS Int32;
SELECT $v+$v
",
)
.with_params(params),
)
.await?;
trace!("result: {:?}", res);
assert_eq!(
Value::Int32(6),
res.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field(0)
.unwrap()
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore]
async fn query_yson() -> YdbResult<()> {
let client = create_client().await?;
let res = client
.table_client()
.retry_transaction(|mut t| async move {
let tst_query = "DECLARE $p AS YSON; \
SELECT $p";
let res = t
.query(
Query::new(tst_query)
.with_params(ydb_params!("$p" => Value::Yson("[]".into()))),
)
.await?;
Ok(res
.into_only_result()?
.rows()
.next()
.unwrap()
.remove_field(0))
})
.await??;
assert!(res == Value::Yson("[]".into()));
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn interactive_transaction() -> YdbResult<()> {
let client = create_client().await?;
client
.table_client()
.create_session()
.await?
.execute_schema_query(
"CREATE TABLE test_values (id Int64, vInt64 Int64, PRIMARY KEY (id))".to_string(),
)
.await?;
let mut tx_auto = client
.table_client()
.create_autocommit_transaction(Mode::SerializableReadWrite);
let mut tx = client.table_client().create_interactive_transaction();
tx.query(Query::new("DELETE FROM test_values")).await?;
tx.commit().await?;
let mut tx = client.table_client().create_interactive_transaction();
tx.query(Query::new(
"UPSERT INTO test_values (id, vInt64) VALUES (1, 2)",
))
.await?;
tx.query(
Query::new(
"
DECLARE $key AS Int64;
DECLARE $val AS Int64;
UPSERT INTO test_values (id, vInt64) VALUES ($key, $val)
",
)
.with_params(HashMap::from([
("$key".into(), Value::Int64(2)),
("$val".into(), Value::Int64(3)),
])),
)
.await?;
let auto_res = tx_auto
.query(Query::new("SELECT vInt64 FROM test_values WHERE id=1"))
.await?;
assert!(auto_res.into_only_result().unwrap().rows().next().is_none());
tx.commit().await?;
let auto_res = tx_auto
.query(Query::new("SELECT vInt64 FROM test_values WHERE id=1"))
.await?;
assert_eq!(
Value::optional_from(Value::Int64(0), Some(Value::Int64(2)))?,
auto_res
.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field_by_name("vInt64")
.unwrap()
);
client
.table_client()
.create_session()
.await?
.execute_schema_query("DROP TABLE test_values".to_string())
.await?;
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn copy_table() -> YdbResult<()> {
let client = create_client().await?;
let table_client = client.table_client();
let rand_str = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
let table_name = format!("temp_table_{rand_str}");
let copy_table_name = format!("copy_{table_name}");
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.execute_schema_query(format!(
"CREATE TABLE {table_name} (id Int64, vInt64 Int64, PRIMARY KEY (id))"
))
.await?;
Ok(())
})
.await
.unwrap();
let mut transaction = table_client.create_autocommit_transaction(Mode::SerializableReadWrite);
let mut interactive_tx = table_client.create_interactive_transaction();
interactive_tx
.query(format!("UPSERT INTO {table_name} (id, vInt64) VALUES (1, 2)").into())
.await?;
interactive_tx.commit().await?;
let database_path = client.database();
table_client
.copy_table(
format!("{database_path}/{table_name}"),
format!("{database_path}/{copy_table_name}"),
)
.await
.unwrap();
let res = transaction
.query(format!("SELECT vInt64 FROM {copy_table_name} WHERE id=1").into())
.await?;
assert_eq!(
Value::optional_from(Value::Int64(0), Some(Value::Int64(2)))?,
res.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field_by_name("vInt64")
.unwrap()
);
for &target in [&table_name, ©_table_name].iter() {
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.execute_schema_query(format!("DROP TABLE {target}"))
.await?;
Ok(())
})
.await
.unwrap();
}
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn copy_tables() -> YdbResult<()> {
let client = create_client().await?;
let table_client = client.table_client();
let rand_str = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
let table_name = format!("temp_table_{rand_str}");
let copy_table_name = format!("copy_{table_name}");
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.execute_schema_query(format!(
"CREATE TABLE {table_name} (id Int64, vInt64 Int64, PRIMARY KEY (id))"
))
.await?;
Ok(())
})
.await
.unwrap();
let mut transaction = table_client.create_autocommit_transaction(Mode::SerializableReadWrite);
let mut interactive_tx = table_client.create_interactive_transaction();
interactive_tx
.query(format!("UPSERT INTO {table_name} (id, vInt64) VALUES (1, 2)").into())
.await?;
interactive_tx.commit().await?;
let database_path = client.database();
table_client
.copy_tables(vec![CopyTableItem::new(
format!("{database_path}/{table_name}"),
format!("{database_path}/{copy_table_name}"),
true,
)])
.await
.unwrap();
let res = transaction
.query(format!("SELECT vInt64 FROM {copy_table_name} WHERE id=1").into())
.await?;
assert_eq!(
Value::optional_from(Value::Int64(0), Some(Value::Int64(2)))?,
res.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field_by_name("vInt64")
.unwrap()
);
for &target in [&table_name, ©_table_name].iter() {
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.execute_schema_query(format!("DROP TABLE {target}"))
.await?;
Ok(())
})
.await
.unwrap();
}
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn retry_test() -> YdbResult<()> {
let client = create_client().await?;
let attempt = Arc::new(AsyncMutex::new(0));
let res = client
.table_client()
.retry_transaction(|t| async {
let mut t = t; let mut locked_res = attempt.lock().await;
*locked_res += 1;
let res = t.query(Query::new("SELECT 1+1 as res")).await?;
let res = res
.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field_by_name("res")
.unwrap();
assert_eq!(Value::Int32(2), res);
if *locked_res < 3 {
return Err(YdbOrCustomerError::YDB(YdbError::TransportGRPCStatus(
Arc::new(Status::new(Code::Aborted, "test")),
)));
}
t.commit().await?;
Ok(*locked_res)
})
.await;
match res {
Ok(val) => assert_eq!(val, 3),
Err(err) => panic!("retry test failed with error result: {err:?}"),
}
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn scheme_query() -> YdbResult<()> {
let client = create_client().await?;
let table_client = client.table_client();
let time_now = time::SystemTime::now().duration_since(UNIX_EPOCH)?;
let table_name = format!("test_table_{}", time_now.as_millis());
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.execute_schema_query(format!(
"CREATE TABLE {table_name} (id String, PRIMARY KEY (id))"
))
.await?;
Ok(())
})
.await
.unwrap();
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.execute_schema_query(format!("DROP TABLE {table_name}"))
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn read_rows() -> YdbResult<()> {
const TABLE_NAME: &str = "read_rows";
const TABLE_PATH: &str = "/local/read_rows";
let client = create_client().await?;
let table_client = client.table_client();
table_client
.retry_with_session(RetryOptions::new(), |mut session| async move {
session
.execute_schema_query(format!(
"CREATE TABLE {TABLE_NAME} (id Int64 NOT NULL, first Int64 NOT NULL, second Int64 NOT NULL, PRIMARY KEY (id))"
))
.await?;
Ok(())
})
.await
.unwrap();
let values: [(i64, i64); 4] = [(0, 0), (0, 1), (1, 0), (1, 1)];
let ydb_values = values.map(|pair| (Value::Int64(pair.0), Value::Int64(pair.1)));
let rows = values
.into_iter()
.enumerate()
.map(|t| {
let (id, (first, second)) = t;
ydb_struct!("id" => id as i64, "first" => first, "second" => second)
})
.collect_vec();
table_client
.retry_execute_bulk_upsert(TABLE_PATH.into(), rows)
.await?;
let empty = table_client.retry_read_rows(TABLE_PATH, vec![], None).await;
assert_eq!(empty.unwrap().rows().count(), 0);
let non_structs = table_client
.retry_read_rows(TABLE_PATH, vec![Value::Int64(1i64)], None)
.await;
assert!(non_structs.is_err());
let vec_to_values = |ids: Vec<i64>| {
ids.into_iter()
.map(|id| ydb_struct!("id" => id))
.collect_vec()
};
let all_columns = table_client
.retry_read_rows(TABLE_PATH, vec_to_values((0i64..4i64).collect_vec()), None)
.await;
for (mut row, (first, second)) in all_columns.unwrap().rows().zip(ydb_values.iter()) {
assert_eq!(&row.remove_field_by_name("first").unwrap(), first);
assert_eq!(&row.remove_field_by_name("second").unwrap(), second);
}
let all_columns_rev = table_client
.retry_read_rows(
TABLE_PATH,
vec_to_values((0i64..4i64).rev().collect_vec()),
None,
)
.await;
for (mut row, (first, second)) in all_columns_rev.unwrap().rows().zip(ydb_values.iter().rev()) {
assert_eq!(&row.remove_field_by_name("first").unwrap(), first);
assert_eq!(&row.remove_field_by_name("second").unwrap(), second);
}
let keys = vec![0i64, 2i64, 4i64, 6i64];
let partial = table_client
.retry_read_rows(
TABLE_PATH,
vec_to_values(keys.clone()),
Some(vec!["first".into()]),
)
.await;
let rows = partial
.unwrap()
.rows()
.map(|mut t| {
assert!(t.remove_field_by_name("second").is_err());
assert!(t.remove_field_by_name("third").is_err());
t.remove_field_by_name("first").unwrap()
})
.collect_vec();
for key in keys {
if let Some((first, _)) = ydb_values.get(key as usize) {
assert!(rows.contains(first));
}
}
let unknown = table_client
.retry_read_rows(
TABLE_PATH,
vec_to_values(vec![1i64]),
Some(vec!["first".into(), "unknown".into()]),
)
.await;
assert!(unknown.is_err());
table_client
.retry_with_session(RetryOptions::new(), |mut session| async move {
session
.execute_schema_query(format!("DROP TABLE {TABLE_NAME}"))
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_int() -> YdbResult<()> {
let client = create_client().await?;
let v = Value::Int32(123);
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(
Query::new(
"
DECLARE $test AS Int32;
SELECT $test AS test;
",
)
.with_params(HashMap::from_iter([("$test".into(), v.clone())])),
)
.await?;
let res = res.results.into_iter().next().unwrap();
assert_eq!(1, res.columns().len());
assert_eq!(v, res.rows().next().unwrap().remove_field_by_name("test")?);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_optional() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(
Query::new(
"
DECLARE $test AS Optional<Int32>;
SELECT $test AS test;
",
)
.with_params(HashMap::from_iter([(
"$test".into(),
Value::optional_from(Value::Int32(0), Some(Value::Int32(3)))?,
)])),
)
.await?;
let res = res.results.into_iter().next().unwrap();
assert_eq!(1, res.columns().len());
assert_eq!(
Value::optional_from(Value::Int32(0), Some(Value::Int32(3)))?,
res.rows().next().unwrap().remove_field_by_name("test")?
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_list() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(
Query::new(
"
DECLARE $l AS List<Int32>;
SELECT $l AS l;
",
)
.with_params(HashMap::from_iter([(
"$l".into(),
Value::List(Box::new(ValueList {
t: Value::Int32(0),
values: Vec::from([Value::Int32(1), Value::Int32(2), Value::Int32(3)]),
})),
)])),
)
.await?;
trace!("{:?}", res);
let res = res.results.into_iter().next().unwrap();
assert_eq!(1, res.columns().len());
assert_eq!(
Value::list_from(
Value::Int32(0),
vec![Value::Int32(1), Value::Int32(2), Value::Int32(3)]
)?,
res.rows().next().unwrap().remove_field_by_name("l")?
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_struct() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(
Query::new(
"
DECLARE $l AS List<Struct<
a: Int64
>>;
SELECT
SUM(a) AS s
FROM
AS_TABLE($l);
;
",
)
.with_params(HashMap::from_iter([(
"$l".into(),
Value::List(Box::new(ValueList {
t: Value::Struct(ValueStruct::from_names_and_values(
vec!["a".into()],
vec![Value::Int64(0)],
)?),
values: vec![
Value::Struct(ValueStruct::from_names_and_values(
vec!["a".into()],
vec![Value::Int64(1)],
)?),
Value::Struct(ValueStruct::from_names_and_values(
vec!["a".into()],
vec![Value::Int64(2)],
)?),
Value::Struct(ValueStruct::from_names_and_values(
vec!["a".into()],
vec![Value::Int64(3)],
)?),
],
})),
)])),
)
.await?;
trace!("{:?}", res);
let res = res.results.into_iter().next().unwrap();
assert_eq!(1, res.columns().len());
assert_eq!(
Value::optional_from(Value::Int64(0), Some(Value::Int64(6)))?,
res.rows().next().unwrap().remove_field_by_name("s")?
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_int64_null4() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(Query::new(
"
SELECT CAST(NULL AS Optional<Int64>)
;
",
))
.await?;
trace!("{:?}", res);
let res = res.results.into_iter().next().unwrap();
assert_eq!(1, res.columns().len());
assert_eq!(
Value::optional_from(Value::Int64(0), None)?,
res.rows().next().unwrap().remove_field(0)?
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_with_u8_param() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(
Query::from(
r#"
DECLARE $val AS Uint8;
SELECT $val as s
"#,
)
.with_params(ydb_params!(
"$val" => 99u8
)),
)
.await?;
trace!("result: {:?}", &res);
assert_eq!(
Value::Uint8(99u8),
res.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field_by_name("s")
.unwrap()
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_with_u16_param() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(
Query::from(
r#"
DECLARE $val AS Uint16;
SELECT $val as s
"#,
)
.with_params(ydb_params!(
"$val" => 34111u16
)),
)
.await?;
trace!("result: {:?}", &res);
assert_eq!(
Value::Uint16(34111u16),
res.into_only_result()
.unwrap()
.rows()
.next()
.unwrap()
.remove_field_by_name("s")
.unwrap()
);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn select_void_null() -> YdbResult<()> {
let client = create_client().await?;
let mut transaction = client
.table_client()
.create_autocommit_transaction(Mode::OnlineReadonly);
let res = transaction
.query(Query::new(
"
SELECT NULL
;
",
))
.await?;
trace!("{:?}", res);
let res = res.results.into_iter().next().unwrap();
assert_eq!(1, res.columns().len());
assert_eq!(Value::Null, res.rows().next().unwrap().remove_field(0)?);
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn stream_query() -> YdbResult<()> {
let client = create_client().await?.table_client();
let mut session = client.create_session().await?;
let _ = session
.execute_schema_query("DROP TABLE stream_query".to_string())
.await;
session
.execute_schema_query(
"CREATE TABLE stream_query (id Int64, val Bytes, PRIMARY KEY (val))".into(),
)
.await?;
const ONE_ROW_SIZE_BYTES: usize = 1024 * 1024;
const KEY_SIZE_BYTES: usize = 8;
fn gen_value_by_id(id: i64) -> Vec<u8> {
const VECTOR_SIZE: usize = ONE_ROW_SIZE_BYTES - KEY_SIZE_BYTES;
let mut res: Vec<u8> = Vec::with_capacity(VECTOR_SIZE);
let mut last_byte: u8 = (id % 256) as u8;
for _ in 0..VECTOR_SIZE {
res.push(last_byte);
last_byte = last_byte.wrapping_add(1);
}
res
}
async fn insert_values(client: &TableClient, ids: Vec<i64>) -> YdbResult<()> {
client
.retry_transaction(|tr| async {
let mut ydb_values: Vec<Value> = Vec::with_capacity(ids.len());
for v in ids.iter() {
ydb_values.push(Value::Struct(ValueStruct::from_names_and_values(
vec!["id".to_string(), "val".to_string()],
vec![
Value::Int64(*v),
Value::Bytes(Bytes::from(gen_value_by_id(*v))),
],
)?))
}
let ydb_values = Value::list_from(ydb_values[0].clone(), ydb_values)?;
let query = Query::new(
"
DECLARE $values AS List<Struct<
id: Int64,
val: Bytes,
> >;
UPSERT INTO stream_query
SELECT
*
FROM
AS_TABLE($values);
",
)
.with_params(ydb_params!(
"$values" => ydb_values
));
let mut tr = tr;
tr.query(query).await?;
tr.commit().await?;
Ok(())
})
.await?;
Ok(())
}
let min_target_bytes = (60 * 1024 * 1024) as usize;
let target_row_count = min_target_bytes / ONE_ROW_SIZE_BYTES + 1;
let target_batch_count = 10;
let target_batch_size = target_row_count / target_batch_count;
let mut expected_sum: i64 = 0;
let mut last_item_value = 0;
for _ in 0..target_batch_count {
let mut values = Vec::with_capacity(target_batch_size);
for _ in 0..target_batch_size {
last_item_value += 1;
expected_sum += last_item_value;
values.push(last_item_value);
}
insert_values(&client, values).await?;
}
let expected_item_count = last_item_value;
let mut expected_id: i64 = 1;
let query = Query::new("SELECT * FROM stream_query ORDER BY id".to_string());
let mut res = session.execute_scan_query(query).await?;
let mut sum: i64 = 0;
let mut item_count = 0;
let mut result_set_count = 0;
while let Some(result_set) = res.next().await? {
result_set_count += 1;
for mut row in result_set.into_iter() {
item_count += 1;
match row.remove_field_by_name("id")? {
Value::Optional(boxed_id) => match boxed_id.value.unwrap() {
Value::Int64(id) => {
assert_eq!(id, expected_id);
sum += id
}
id => panic!("unexpected ydb boxed_id type: {id:?}"),
},
id => panic!("unexpected ydb id type: {id:?}"),
};
match row.remove_field_by_name("val")? {
Value::Optional(boxed_val) => match boxed_val.value.unwrap() {
Value::Bytes(content) => {
assert_eq!(gen_value_by_id(expected_id), Vec::<u8>::from(content))
}
val => panic!("unexpected ydb id type: {val:?}"),
},
val => panic!("unexpected ydb boxed_id type: {val:?}"),
};
expected_id += 1;
}
}
assert_eq!(expected_item_count, item_count);
assert_eq!(expected_sum, sum);
assert!(result_set_count > 1); Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn bulk_upsert() -> YdbResult<()> {
let client = create_client().await?;
let table_client = client.table_client();
let table_name = "bulk_upsert";
table_client
.retry_execute_scheme_query(format!(
"
CREATE TABLE {table_name} (
id Int64 NOT NULL,
val Utf8,
PRIMARY KEY (id)
);
"
))
.await?;
let rows = vec![
ydb_struct!(
"id" => 3_i64,
"val" => Value::Text("test".to_string()),
),
ydb_struct!(
"id" => 6_i64,
"val" => Value::Null,
),
];
table_client
.retry_execute_bulk_upsert(format!("/local/{table_name}"), rows)
.await?;
let read = table_client
.retry_transaction(|t| async {
let mut t = t;
let res = t
.query(Query::new(format!(
"SELECT * FROM {table_name} ORDER BY id"
)))
.await?;
Ok(res)
})
.await?;
let read_rows_id: YdbResult<Vec<i64>> = read
.into_only_result()?
.rows()
.map(|mut row| {
let val = row.remove_field_by_name("id")?;
let res: i64 = val.try_into()?;
Ok(res)
})
.collect();
let read_rows_id = read_rows_id?;
assert_eq!(vec![3, 6], read_rows_id);
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session; session
.execute_schema_query(format!("DROP TABLE {table_name}"))
.await?;
Ok(())
})
.await
.unwrap();
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn describe_table() -> YdbResult<()> {
let client = create_client().await?;
let table_client = client.table_client();
let table_name = "temp_describe_test";
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session;
session
.execute_schema_query(format!("DROP TABLE IF EXISTS {table_name}"))
.await?;
session
.execute_schema_query(format!(
"
CREATE TABLE {table_name} (
id Utf8 NOT NULL,
id_hash Uint32 NOT NULL,
timestamp Timestamp,
host Utf8,
message Utf8,
level Int32,
payload JsonDocument,
optional_field Int32,
price Decimal(22, 9),
PRIMARY KEY(id_hash, id),
INDEX idx_timestamp GLOBAL ON (timestamp),
INDEX idx_host GLOBAL ON (host)
);
"
))
.await?;
Ok(())
})
.await?;
let database_path = client.database();
let table_desc = table_client
.describe_table(format!("{database_path}/{table_name}"))
.await?;
trace!("describe_table result: {:?}", table_desc);
assert_eq!(table_desc.columns.len(), 9);
assert_eq!(table_desc.primary_key, vec!["id_hash", "id"]);
assert_eq!(table_desc.indexes.len(), 2);
assert_eq!(table_desc.store_type, StoreType::Unspecified);
let id_col = table_desc.columns.iter().find(|c| c.name == "id").unwrap();
assert!(matches!(id_col.type_value, Ok(Value::Text(_))));
let id_hash_col = table_desc
.columns
.iter()
.find(|c| c.name == "id_hash")
.unwrap();
assert!(matches!(id_hash_col.type_value, Ok(Value::Uint32(_))));
let price_col = table_desc
.columns
.iter()
.find(|c| c.name == "price")
.unwrap();
match &price_col.type_value {
Ok(Value::Optional(opt)) => match &opt.t {
Value::Decimal(d) => {
assert!(d.precision() > 0, "precision should be set from schema");
}
_ => panic!("Expected Optional<Decimal>"),
},
Err(e) => panic!("Type conversion failed: {:?}", e),
_ => panic!("Expected Ok(Optional<Decimal>)"),
}
for idx in &table_desc.indexes {
assert_eq!(idx.index_type, IndexType::Global);
}
table_client
.retry_with_session(RetryOptions::new(), |session| async {
let mut session = session;
session
.execute_schema_query(format!("DROP TABLE {table_name}"))
.await?;
Ok(())
})
.await?;
Ok(())
}
#[tokio::test]
#[traced_test]
#[ignore] async fn grpc_max_message_size_limit_exceeded() -> YdbResult<()> {
use crate::test_helpers::test_client_builder;
const LIMIT_BYTES: usize = 1024 * 1024;
const PAYLOAD_BYTES: usize = 2 * 1024 * 1024;
const TABLE: &str = "grpc_limit_test";
let setup_client = test_client_builder().client()?;
setup_client.wait().await?;
let setup_table_client = setup_client.table_client();
let _ = setup_table_client
.create_session()
.await?
.execute_schema_query(format!("DROP TABLE {TABLE}"))
.await;
setup_table_client
.create_session()
.await?
.execute_schema_query(format!(
"CREATE TABLE {TABLE} (id Int64, val Bytes, PRIMARY KEY (id))"
))
.await?;
let payload = vec![0xABu8; PAYLOAD_BYTES];
setup_table_client
.retry_transaction(|mut tr| {
let payload = payload.clone();
async move {
tr.query(
Query::new(format!(
"DECLARE $id AS Int64; DECLARE $val AS Bytes;
UPSERT INTO {TABLE} (id, val) VALUES ($id, $val)"
))
.with_params(ydb_params!(
"$id" => Value::Int64(1),
"$val" => Value::Bytes(Bytes::from(payload))
)),
)
.await?;
tr.commit().await?;
Ok(())
}
})
.await?;
let limited_client = test_client_builder()
.with_grpc_max_message_size(LIMIT_BYTES)
.client()?;
limited_client.wait().await?;
let limited = limited_client.table_client();
let mut tx = limited.create_interactive_transaction();
let encode_err = tx
.query(
Query::new(format!(
"DECLARE $id AS Int64; DECLARE $val AS Bytes;
UPSERT INTO {TABLE} (id, val) VALUES ($id, $val)"
))
.with_params(ydb_params!(
"$id" => Value::Int64(2),
"$val" => Value::Bytes(Bytes::from(vec![0xCDu8; PAYLOAD_BYTES]))
)),
)
.await
.expect_err("upsert exceeding grpc encoding limit must fail");
trace!("encode-limit error: {:?}", encode_err);
match &encode_err {
YdbError::TransportGRPCStatus(status) => {
assert_eq!(
status.code(),
Code::Unknown,
"expected Unknown (transport reset from local encode limit), got {status:?}"
);
assert!(
status.message().contains("INTERNAL_ERROR"),
"expected message mentioning the stream reset, got: {}",
status.message()
);
}
other => panic!("expected TransportGRPCStatus, got {other:?}"),
}
let mut tx = limited.create_autocommit_transaction(Mode::OnlineReadonly);
let decode_err = tx
.query(Query::new(format!("SELECT val FROM {TABLE} WHERE id = 1")))
.await
.expect_err("select exceeding grpc decoding limit must fail");
trace!("decode-limit error: {:?}", decode_err);
match &decode_err {
YdbError::TransportGRPCStatus(status) => {
assert_eq!(
status.code(),
Code::OutOfRange,
"expected OutOfRange (tonic decode-size limit), got {status:?}"
);
}
other => panic!("expected TransportGRPCStatus(OutOfRange), got {other:?}"),
}
let _ = setup_table_client
.create_session()
.await?
.execute_schema_query(format!("DROP TABLE {TABLE}"))
.await;
Ok(())
}