mod common;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::cluster::session::{SessionBuilder, TcpSessionBuilder};
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::cluster::NodeTcpConfigBuilder;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy;
#[cfg(feature = "e2e-tests")]
use common::*;
#[cfg(feature = "e2e-tests")]
use std::sync::Arc;
#[cfg(feature = "e2e-tests")]
use std::time::Duration;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::query_values;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::retry::NeverReconnectionPolicy;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::speculative_execution::ConstantSpeculativeExecutionPolicy;
#[cfg(feature = "e2e-tests")]
use cdrs_tokio::types::IntoRustByName;
#[tokio::test]
#[cfg(feature = "e2e-tests")]
async fn multi_node_speculative_execution() {
let cluster_config = NodeTcpConfigBuilder::new()
.with_contact_point(ADDR.into())
.with_contact_point(ADDR.into())
.with_contact_point(ADDR.into())
.build()
.await
.unwrap();
let session = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config)
.with_reconnection_policy(Arc::new(NeverReconnectionPolicy))
.with_speculative_execution_policy(Box::new(ConstantSpeculativeExecutionPolicy::new(
5,
Duration::from_secs(0),
)))
.build()
.await
.unwrap();
let create_keyspace_query = "CREATE KEYSPACE IF NOT EXISTS cdrs_test WITH \
replication = {'class': 'SimpleStrategy', 'replication_factor': 1} \
AND durable_writes = false";
session
.query(create_keyspace_query)
.await
.expect("create keyspace error");
let cql = "CREATE TABLE IF NOT EXISTS cdrs_test.single_node_speculative_execution \
(id text PRIMARY KEY)";
session.query(cql).await.expect("create table error");
let query_insert = "INSERT INTO cdrs_test.single_node_speculative_execution \
(id) VALUES (?)";
let items = vec!["1".to_string(), "2".to_string(), "3".to_string()];
for item in items {
let values = query_values!(item);
session
.query_with_values(query_insert, values)
.await
.expect("insert item error");
}
let cql = "SELECT * FROM cdrs_test.single_node_speculative_execution WHERE id IN ?";
let criteria = vec!["1".to_string(), "3".to_string()];
let rows = session
.query_with_values(cql, query_values!(criteria.clone()))
.await
.expect("select values query error")
.response_body()
.expect("get body error")
.into_rows()
.expect("converting into rows error");
assert_eq!(rows.len(), criteria.len());
let found_all_matching_criteria = criteria.iter().all(|criteria_item: &String| {
rows.iter().any(|row| {
let id: String = row.get_r_by_name("id").expect("id");
criteria_item.clone() == id
})
});
assert!(
found_all_matching_criteria,
"should find at least one element for each criteria"
);
}