use crate as scylla;
use crate::batch::Batch;
use crate::frame::response::result::Row;
use crate::frame::value::ValueList;
use crate::query::Query;
use crate::routing::hash3_x64_128;
use crate::statement::Consistency;
use crate::tracing::TracingInfo;
use crate::transport::connection::{BatchResult, QueryResult};
use crate::transport::errors::{BadKeyspaceName, BadQuery, DbError, QueryError};
use crate::{IntoTypedRows, Session, SessionBuilder};
use bytes::Bytes;
use futures::StreamExt;
use uuid::Uuid;
#[tokio::test]
async fn test_unprepared_statement() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS ks.t;", &[])
.await
.unwrap();
session
.query(
"CREATE TABLE IF NOT EXISTS ks.t (a int, b int, c text, primary key (a, b))",
&[],
)
.await
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(300));
session
.query("INSERT INTO ks.t (a, b, c) VALUES (1, 2, 'abc')", &[])
.await
.unwrap();
session
.query("INSERT INTO ks.t (a, b, c) VALUES (7, 11, '')", &[])
.await
.unwrap();
session
.query("INSERT INTO ks.t (a, b, c) VALUES (1, 4, 'hello')", &[])
.await
.unwrap();
let query_result = session
.query("SELECT a, b, c FROM ks.t", &[])
.await
.unwrap();
let (a_idx, _) = query_result.get_column_spec("a").unwrap();
let (b_idx, _) = query_result.get_column_spec("b").unwrap();
let (c_idx, _) = query_result.get_column_spec("c").unwrap();
assert!(query_result.get_column_spec("d").is_none());
let rs = query_result.rows.unwrap();
let mut results: Vec<(i32, i32, &String)> = rs
.iter()
.map(|r| {
let a = r.columns[a_idx].as_ref().unwrap().as_int().unwrap();
let b = r.columns[b_idx].as_ref().unwrap().as_int().unwrap();
let c = r.columns[c_idx].as_ref().unwrap().as_text().unwrap();
(a, b, c)
})
.collect();
results.sort();
assert_eq!(
results,
vec![
(1, 2, &String::from("abc")),
(1, 4, &String::from("hello")),
(7, 11, &String::from(""))
]
);
let mut results_from_manual_paging: Vec<Row> = vec![];
let query = Query::new("SELECT a, b, c FROM ks.t").with_page_size(1);
let mut paging_state: Option<Bytes> = None;
let mut watchdog = 0;
loop {
let rs_manual = session
.query_paged(query.clone(), &[], paging_state)
.await
.unwrap();
results_from_manual_paging.append(&mut rs_manual.rows.unwrap());
if watchdog > 30 || rs_manual.paging_state == None {
break;
}
watchdog += 1;
paging_state = rs_manual.paging_state;
}
assert_eq!(results_from_manual_paging, rs);
}
#[tokio::test]
async fn test_prepared_statement() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS ks.t2;", &[])
.await
.unwrap();
session
.query("DROP TABLE IF EXISTS ks.complex_pk;", &[])
.await
.unwrap();
session
.query(
"CREATE TABLE IF NOT EXISTS ks.t2 (a int, b int, c text, primary key (a, b))",
&[],
)
.await
.unwrap();
session
.query("CREATE TABLE IF NOT EXISTS ks.complex_pk (a int, b int, c text, d int, e int, primary key ((a,b,c),d))", &[])
.await
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(300));
let prepared_statement = session
.prepare("INSERT INTO ks.t2 (a, b, c) VALUES (?, ?, ?)")
.await
.unwrap();
let prepared_complex_pk_statement = session
.prepare("INSERT INTO ks.complex_pk (a, b, c, d) VALUES (?, ?, ?, 7)")
.await
.unwrap();
let values = (17_i32, 16_i32, "I'm prepared!!!");
let serialized_values = values.serialized().unwrap().into_owned();
session.execute(&prepared_statement, &values).await.unwrap();
session
.execute(&prepared_complex_pk_statement, &values)
.await
.unwrap();
{
let rs = session
.query("SELECT token(a) FROM ks.t2", &[])
.await
.unwrap()
.rows
.unwrap();
let token: i64 = rs.first().unwrap().columns[0]
.as_ref()
.unwrap()
.as_bigint()
.unwrap();
let expected_token = hash3_x64_128(
&prepared_statement
.compute_partition_key(&serialized_values)
.unwrap(),
) as i64;
assert_eq!(token, expected_token)
}
{
let rs = session
.query("SELECT token(a,b,c) FROM ks.complex_pk", &[])
.await
.unwrap()
.rows
.unwrap();
let token: i64 = rs.first().unwrap().columns[0]
.as_ref()
.unwrap()
.as_bigint()
.unwrap();
let expected_token = hash3_x64_128(
&prepared_complex_pk_statement
.compute_partition_key(&serialized_values)
.unwrap(),
) as i64;
assert_eq!(token, expected_token)
}
{
let rs = session
.query("SELECT a,b,c FROM ks.t2", &[])
.await
.unwrap()
.rows
.unwrap();
let r = rs.first().unwrap();
let a = r.columns[0].as_ref().unwrap().as_int().unwrap();
let b = r.columns[1].as_ref().unwrap().as_int().unwrap();
let c = r.columns[2].as_ref().unwrap().as_text().unwrap();
assert_eq!((a, b, c), (17, 16, &String::from("I'm prepared!!!")));
let mut results_from_manual_paging: Vec<Row> = vec![];
let query = Query::new("SELECT a, b, c FROM ks.t2").with_page_size(1);
let prepared_paged = session.prepare(query).await.unwrap();
let mut paging_state: Option<Bytes> = None;
let mut watchdog = 0;
loop {
let rs_manual = session
.execute_paged(&prepared_paged, &[], paging_state)
.await
.unwrap();
results_from_manual_paging.append(&mut rs_manual.rows.unwrap());
if watchdog > 30 || rs_manual.paging_state == None {
break;
}
watchdog += 1;
paging_state = rs_manual.paging_state;
}
assert_eq!(results_from_manual_paging, rs);
}
{
let rs = session
.query("SELECT a,b,c,d,e FROM ks.complex_pk", &[])
.await
.unwrap()
.rows
.unwrap();
let r = rs.first().unwrap();
let a = r.columns[0].as_ref().unwrap().as_int().unwrap();
let b = r.columns[1].as_ref().unwrap().as_int().unwrap();
let c = r.columns[2].as_ref().unwrap().as_text().unwrap();
let d = r.columns[3].as_ref().unwrap().as_int().unwrap();
let e = r.columns[4].as_ref();
assert!(e.is_none());
assert_eq!((a, b, c, d), (17, 16, &String::from("I'm prepared!!!"), 7))
}
{
#[derive(scylla::ValueList, scylla::FromRow, PartialEq, Debug, Clone)]
struct ComplexPk {
a: i32,
b: i32,
c: Option<String>,
d: i32,
e: i32,
}
let input: ComplexPk = ComplexPk {
a: 9,
b: 8,
c: Some("seven".into()),
d: 6,
e: 5,
};
session
.query(
"INSERT INTO ks.complex_pk (a,b,c,d,e) VALUES (?,?,?,?,?)",
input.clone(),
)
.await
.unwrap();
let mut rs = session
.query(
"SELECT * FROM ks.complex_pk WHERE a = 9 and b = 8 and c = 'seven'",
&[],
)
.await
.unwrap()
.rows
.unwrap()
.into_typed::<ComplexPk>();
let output = rs.next().unwrap().unwrap();
assert_eq!(input, output)
}
}
#[tokio::test]
async fn test_batch() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS ks.t_batch;", &[])
.await
.unwrap();
session
.query(
"CREATE TABLE IF NOT EXISTS ks.t_batch (a int, b int, c text, primary key (a, b))",
&[],
)
.await
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(300));
let prepared_statement = session
.prepare("INSERT INTO ks.t_batch (a, b, c) VALUES (?, ?, ?)")
.await
.unwrap();
use crate::batch::Batch;
let mut batch: Batch = Default::default();
batch.append_statement("INSERT INTO ks.t_batch (a, b, c) VALUES (?, ?, ?)");
batch.append_statement("INSERT INTO ks.t_batch (a, b, c) VALUES (7, 11, '')");
batch.append_statement(prepared_statement);
let values = ((1_i32, 2_i32, "abc"), (), (1_i32, 4_i32, "hello"));
session.batch(&batch, values).await.unwrap();
let rs = session
.query("SELECT a, b, c FROM ks.t_batch", &[])
.await
.unwrap()
.rows
.unwrap();
let mut results: Vec<(i32, i32, &String)> = rs
.iter()
.map(|r| {
let a = r.columns[0].as_ref().unwrap().as_int().unwrap();
let b = r.columns[1].as_ref().unwrap().as_int().unwrap();
let c = r.columns[2].as_ref().unwrap().as_text().unwrap();
(a, b, c)
})
.collect();
results.sort();
assert_eq!(
results,
vec![
(1, 2, &String::from("abc")),
(1, 4, &String::from("hello")),
(7, 11, &String::from(""))
]
);
}
#[tokio::test]
async fn test_token_calculation() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS ks.t3;", &[])
.await
.unwrap();
session
.query("CREATE TABLE IF NOT EXISTS ks.t3 (a text primary key)", &[])
.await
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(300));
let prepared_statement = session
.prepare("INSERT INTO ks.t3 (a) VALUES (?)")
.await
.unwrap();
for i in 1..50usize {
eprintln!("Trying key size {}", i);
let mut s = String::new();
for _ in 0..i {
s.push('a');
}
let values = (&s,);
let serialized_values = values.serialized().unwrap().into_owned();
session.execute(&prepared_statement, &values).await.unwrap();
let rs = session
.query("SELECT token(a) FROM ks.t3 WHERE a = ?", &values)
.await
.unwrap()
.rows
.unwrap();
let token: i64 = rs.first().unwrap().columns[0]
.as_ref()
.unwrap()
.as_bigint()
.unwrap();
let expected_token = hash3_x64_128(
&prepared_statement
.compute_partition_key(&serialized_values)
.unwrap(),
) as i64;
assert_eq!(token, expected_token)
}
}
#[tokio::test]
async fn test_use_keyspace() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new()
.known_node(&uri)
.build()
.await
.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS use_ks_test WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS use_ks_test.tab", &[])
.await
.unwrap();
session
.query(
"CREATE TABLE IF NOT EXISTS use_ks_test.tab (a text primary key)",
&[],
)
.await
.unwrap();
session
.query("INSERT INTO use_ks_test.tab (a) VALUES ('test1')", &[])
.await
.unwrap();
session.use_keyspace("use_ks_test", false).await.unwrap();
session
.query("INSERT INTO tab (a) VALUES ('test2')", &[])
.await
.unwrap();
let mut rows: Vec<String> = session
.query("SELECT * FROM tab", &[])
.await
.unwrap()
.rows
.unwrap()
.into_typed::<(String,)>()
.map(|res| res.unwrap().0)
.collect();
rows.sort();
assert_eq!(rows, vec!["test1".to_string(), "test2".to_string()]);
assert!(session
.use_keyspace("this_keyspace_does_not_exist_at_all", false)
.await
.is_err());
assert!(matches!(
session.use_keyspace("", false).await,
Err(QueryError::BadQuery(BadQuery::BadKeyspaceName(
BadKeyspaceName::Empty
)))
));
let long_name: String = vec!['a'; 49].iter().collect();
assert!(matches!(
session.use_keyspace(long_name, false).await,
Err(QueryError::BadQuery(BadQuery::BadKeyspaceName(
BadKeyspaceName::TooLong(_, _)
)))
));
assert!(matches!(
session.use_keyspace("abcd;dfdsf", false).await,
Err(QueryError::BadQuery(BadQuery::BadKeyspaceName(
BadKeyspaceName::IllegalCharacter(_, ';')
)))
));
let session2: Session = SessionBuilder::new()
.known_node(uri)
.use_keyspace("use_ks_test", false)
.build()
.await
.unwrap();
let mut rows2: Vec<String> = session2
.query("SELECT * FROM tab", &[])
.await
.unwrap()
.rows
.unwrap()
.into_typed::<(String,)>()
.map(|res| res.unwrap().0)
.collect();
rows2.sort();
assert_eq!(rows2, vec!["test1".to_string(), "test2".to_string()]);
}
#[tokio::test]
async fn test_use_keyspace_case_sensitivity() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new()
.known_node(&uri)
.build()
.await
.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS \"ks_case_test\" WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS \"KS_CASE_TEST\" WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS ks_case_test.tab", &[])
.await
.unwrap();
session
.query("DROP TABLE IF EXISTS \"KS_CASE_TEST\".tab", &[])
.await
.unwrap();
session
.query("CREATE TABLE ks_case_test.tab (a text primary key)", &[])
.await
.unwrap();
session
.query(
"CREATE TABLE \"KS_CASE_TEST\".tab (a text primary key)",
&[],
)
.await
.unwrap();
session
.query("INSERT INTO ks_case_test.tab (a) VALUES ('lowercase')", &[])
.await
.unwrap();
session
.query(
"INSERT INTO \"KS_CASE_TEST\".tab (a) VALUES ('uppercase')",
&[],
)
.await
.unwrap();
session.use_keyspace("KS_CASE_TEST", false).await.unwrap();
let rows: Vec<String> = session
.query("SELECT * from tab", &[])
.await
.unwrap()
.rows
.unwrap()
.into_typed::<(String,)>()
.map(|row| row.unwrap().0)
.collect();
assert_eq!(rows, vec!["lowercase".to_string()]);
session.use_keyspace("KS_CASE_TEST", true).await.unwrap();
let rows: Vec<String> = session
.query("SELECT * from tab", &[])
.await
.unwrap()
.rows
.unwrap()
.into_typed::<(String,)>()
.map(|row| row.unwrap().0)
.collect();
assert_eq!(rows, vec!["uppercase".to_string()]);
}
#[tokio::test]
async fn test_raw_use_keyspace() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new()
.known_node(&uri)
.build()
.await
.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS raw_use_ks_test WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS raw_use_ks_test.tab", &[])
.await
.unwrap();
session
.query(
"CREATE TABLE IF NOT EXISTS raw_use_ks_test.tab (a text primary key)",
&[],
)
.await
.unwrap();
session
.query(
"INSERT INTO raw_use_ks_test.tab (a) VALUES ('raw_test')",
&[],
)
.await
.unwrap();
session
.query("use \"raw_use_ks_test\" ;", &[])
.await
.unwrap();
let rows: Vec<String> = session
.query("SELECT * FROM tab", &[])
.await
.unwrap()
.rows
.unwrap()
.into_typed::<(String,)>()
.map(|res| res.unwrap().0)
.collect();
assert_eq!(rows, vec!["raw_test".to_string()]);
assert!(session
.query("use \"RAW_USE_KS_TEST\" ;", &[])
.await
.is_err());
assert!(session
.query("use RAW_USE_KS_TEST ;", &[])
.await
.is_ok());
}
#[tokio::test]
async fn test_fetch_system_keyspace() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
let prepared_statement = session
.prepare("SELECT * FROM system_schema.keyspaces")
.await
.unwrap();
session.execute(&prepared_statement, &[]).await.unwrap();
}
#[tokio::test]
async fn test_db_errors() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
assert!(matches!(
session.query("gibberish", &[]).await,
Err(QueryError::DbError(DbError::SyntaxError, _))
));
session.query("CREATE KEYSPACE IF NOT EXISTS db_errors_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
let create_keyspace_res = session.query("CREATE KEYSPACE db_errors_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await;
let keyspace_exists_error: DbError = match create_keyspace_res {
Err(QueryError::DbError(e, _)) => e,
_ => panic!("Second CREATE KEYSPACE didn't return an error!"),
};
assert_eq!(
keyspace_exists_error,
DbError::AlreadyExists {
keyspace: "db_errors_ks".to_string(),
table: "".to_string()
}
);
session
.query(
"CREATE TABLE IF NOT EXISTS db_errors_ks.tab (a text primary key)",
&[],
)
.await
.unwrap();
let create_table_res = session
.query("CREATE TABLE db_errors_ks.tab (a text primary key)", &[])
.await;
let create_tab_error: DbError = match create_table_res {
Err(QueryError::DbError(e, _)) => e,
_ => panic!("Second CREATE TABLE didn't return an error!"),
};
assert_eq!(
create_tab_error,
DbError::AlreadyExists {
keyspace: "db_errors_ks".to_string(),
table: "tab".to_string()
}
);
}
#[tokio::test]
async fn test_tracing() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS test_tracing_ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query(
"CREATE TABLE IF NOT EXISTS test_tracing_ks.tab (a text primary key)",
&[],
)
.await
.unwrap();
test_tracing_query(&session).await;
test_tracing_execute(&session).await;
test_tracing_prepare(&session).await;
test_get_tracing_info(&session).await;
test_tracing_query_iter(&session).await;
test_tracing_execute_iter(&session).await;
test_tracing_batch(&session).await;
}
async fn test_tracing_query(session: &Session) {
let untraced_query: Query = Query::new("SELECT * FROM test_tracing_ks.tab");
let untraced_query_result: QueryResult = session.query(untraced_query, &[]).await.unwrap();
assert!(untraced_query_result.tracing_id.is_none());
let mut traced_query: Query = Query::new("SELECT * FROM test_tracing_ks.tab");
traced_query.config.tracing = true;
let traced_query_result: QueryResult = session.query(traced_query, &[]).await.unwrap();
assert!(traced_query_result.tracing_id.is_some());
assert_in_tracing_table(session, traced_query_result.tracing_id.unwrap()).await;
}
async fn test_tracing_execute(session: &Session) {
let untraced_prepared = session
.prepare("SELECT * FROM test_tracing_ks.tab")
.await
.unwrap();
let untraced_prepared_result: QueryResult =
session.execute(&untraced_prepared, &[]).await.unwrap();
assert!(untraced_prepared_result.tracing_id.is_none());
let mut traced_prepared = session
.prepare("SELECT * FROM test_tracing_ks.tab")
.await
.unwrap();
traced_prepared.config.tracing = true;
let traced_prepared_result: QueryResult = session.execute(&traced_prepared, &[]).await.unwrap();
assert!(traced_prepared_result.tracing_id.is_some());
assert_in_tracing_table(session, traced_prepared_result.tracing_id.unwrap()).await;
}
async fn test_tracing_prepare(session: &Session) {
let untraced_prepared = session
.prepare("SELECT * FROM test_tracing_ks.tab")
.await
.unwrap();
assert!(untraced_prepared.prepare_tracing_ids.is_empty());
let mut to_prepare_traced = Query::new("SELECT * FROM test_tracing_ks.tab");
to_prepare_traced.config.tracing = true;
let traced_prepared = session.prepare(to_prepare_traced).await.unwrap();
assert!(!traced_prepared.prepare_tracing_ids.is_empty());
for tracing_id in traced_prepared.prepare_tracing_ids {
assert_in_tracing_table(session, tracing_id).await;
}
}
async fn test_get_tracing_info(session: &Session) {
let mut traced_query: Query = Query::new("SELECT * FROM test_tracing_ks.tab");
traced_query.config.tracing = true;
let traced_query_result: QueryResult = session.query(traced_query, &[]).await.unwrap();
let tracing_id: Uuid = traced_query_result.tracing_id.unwrap();
let tracing_info: TracingInfo = session.get_tracing_info(&tracing_id).await.unwrap();
assert!(!tracing_info.events.is_empty());
}
async fn test_tracing_query_iter(session: &Session) {
let untraced_query: Query = Query::new("SELECT * FROM test_tracing_ks.tab");
let mut untraced_row_iter = session.query_iter(untraced_query, &[]).await.unwrap();
while let Some(_row) = untraced_row_iter.next().await {
}
assert!(untraced_row_iter.get_tracing_ids().is_empty());
let untraced_typed_row_iter = untraced_row_iter.into_typed::<(i32,)>();
assert!(untraced_typed_row_iter.get_tracing_ids().is_empty());
let mut traced_query: Query = Query::new("SELECT * FROM test_tracing_ks.tab");
traced_query.config.tracing = true;
let mut traced_row_iter = session.query_iter(traced_query, &[]).await.unwrap();
while let Some(_row) = traced_row_iter.next().await {
}
assert!(!traced_row_iter.get_tracing_ids().is_empty());
let traced_typed_row_iter = traced_row_iter.into_typed::<(i32,)>();
assert!(!traced_typed_row_iter.get_tracing_ids().is_empty());
for tracing_id in traced_typed_row_iter.get_tracing_ids() {
assert_in_tracing_table(session, *tracing_id).await;
}
}
async fn test_tracing_execute_iter(session: &Session) {
let untraced_prepared = session
.prepare("SELECT * FROM test_tracing_ks.tab")
.await
.unwrap();
let mut untraced_row_iter = session.execute_iter(untraced_prepared, &[]).await.unwrap();
while let Some(_row) = untraced_row_iter.next().await {
}
assert!(untraced_row_iter.get_tracing_ids().is_empty());
let untraced_typed_row_iter = untraced_row_iter.into_typed::<(i32,)>();
assert!(untraced_typed_row_iter.get_tracing_ids().is_empty());
let mut traced_prepared = session
.prepare("SELECT * FROM test_tracing_ks.tab")
.await
.unwrap();
traced_prepared.config.tracing = true;
let mut traced_row_iter = session.execute_iter(traced_prepared, &[]).await.unwrap();
while let Some(_row) = traced_row_iter.next().await {
}
assert!(!traced_row_iter.get_tracing_ids().is_empty());
let traced_typed_row_iter = traced_row_iter.into_typed::<(i32,)>();
assert!(!traced_typed_row_iter.get_tracing_ids().is_empty());
for tracing_id in traced_typed_row_iter.get_tracing_ids() {
assert_in_tracing_table(session, *tracing_id).await;
}
}
async fn test_tracing_batch(session: &Session) {
let mut untraced_batch: Batch = Default::default();
untraced_batch.append_statement("INSERT INTO test_tracing_ks.tab (a) VALUES('a')");
let untraced_batch_result: BatchResult = session.batch(&untraced_batch, ((),)).await.unwrap();
assert!(untraced_batch_result.tracing_id.is_none());
let mut traced_batch: Batch = Default::default();
traced_batch.append_statement("INSERT INTO test_tracing_ks.tab (a) VALUES('a')");
traced_batch.config.tracing = true;
let traced_batch_result: BatchResult = session.batch(&traced_batch, ((),)).await.unwrap();
assert!(traced_batch_result.tracing_id.is_some());
assert_in_tracing_table(session, traced_batch_result.tracing_id.unwrap()).await;
}
async fn assert_in_tracing_table(session: &Session, tracing_uuid: Uuid) {
let mut traces_query = Query::new("SELECT * FROM system_traces.sessions WHERE session_id = ?");
traces_query.config.consistency = Some(Consistency::One);
for _ in 0..8 {
let row_opt = session
.query(traces_query.clone(), (tracing_uuid,))
.await
.unwrap()
.rows
.into_iter()
.next();
if row_opt.is_some() {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(32)).await;
}
panic!("No rows for tracing with this session id!");
}
#[tokio::test]
async fn test_fetch_schema_version() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.fetch_schema_version().await.unwrap();
}
#[tokio::test]
async fn test_await_schema_agreement() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.await_schema_agreement().await.unwrap();
}
#[tokio::test]
async fn test_await_timed_schema_agreement() {
use std::time::Duration;
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session
.await_timed_schema_agreement(Duration::from_millis(50))
.await
.unwrap();
}
#[tokio::test]
async fn test_timestamp() {
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "127.0.0.1:9042".to_string());
let session = SessionBuilder::new().known_node(uri).build().await.unwrap();
session.query("CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = {'class' : 'SimpleStrategy', 'replication_factor' : 1}", &[]).await.unwrap();
session
.query("DROP TABLE IF EXISTS ks.t_timestamp;", &[])
.await
.unwrap();
session
.query(
"CREATE TABLE IF NOT EXISTS ks.t_timestamp (a text, b text, primary key (a))",
&[],
)
.await
.unwrap();
session.await_schema_agreement().await.unwrap();
let query_str = "INSERT INTO ks.t_timestamp (a, b) VALUES (?, ?)";
let mut regular_query = Query::new(query_str.to_string());
regular_query.set_timestamp(Some(420));
session
.query(regular_query.clone(), ("regular query", "higher timestamp"))
.await
.unwrap();
regular_query.set_timestamp(Some(42));
session
.query(regular_query.clone(), ("regular query", "lower timestamp"))
.await
.unwrap();
let mut prepared_statement = session.prepare(query_str).await.unwrap();
prepared_statement.set_timestamp(Some(420));
session
.execute(&prepared_statement, ("prepared query", "higher timestamp"))
.await
.unwrap();
prepared_statement.set_timestamp(Some(42));
session
.execute(&prepared_statement, ("prepared query", "lower timestamp"))
.await
.unwrap();
let mut batch: Batch = Default::default();
batch.append_statement(regular_query);
batch.append_statement(prepared_statement);
batch.set_timestamp(Some(420));
session
.batch(
&batch,
(
("first query in batch", "higher timestamp"),
("second query in batch", "higher timestamp"),
),
)
.await
.unwrap();
batch.set_timestamp(Some(42));
session
.batch(
&batch,
(
("first query in batch", "lower timestamp"),
("second query in batch", "lower timestamp"),
),
)
.await
.unwrap();
let mut results = session
.query("SELECT a, b, WRITETIME(b) FROM ks.t_timestamp", &[])
.await
.unwrap()
.rows
.unwrap()
.into_typed::<(String, String, i64)>()
.map(Result::unwrap)
.collect::<Vec<_>>();
results.sort();
let expected_results = [
("first query in batch", "higher timestamp", 420),
("prepared query", "higher timestamp", 420),
("regular query", "higher timestamp", 420),
("second query in batch", "higher timestamp", 420),
]
.iter()
.map(|(x, y, t)| (x.to_string(), y.to_string(), *t))
.collect::<Vec<_>>();
assert_eq!(results, expected_results);
}