use scylla_cql::frame::response::error::DbError;
use tracing::{error, warn};
use tracing_subscriber::Layer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use uuid::Uuid;
use crate::client::caching_session::CachingSession;
use crate::client::session::Session;
use crate::client::session_builder::{GenericSessionBuilder, SessionBuilderKind};
use crate::cluster::ClusterState;
use crate::cluster::NodeRef;
use crate::errors::{ExecutionError, RequestAttemptError};
use crate::network::Connection;
use crate::policies::load_balancing::{FallbackPlan, LoadBalancingPolicy, RoutingInfo};
use crate::policies::retry::{RequestInfo, RetryDecision, RetryPolicy, RetrySession};
use crate::routing::Shard;
use crate::statement::unprepared::Statement;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{num::NonZeroU32, time::Duration};
pub(crate) fn unique_keyspace_name() -> String {
let id = Uuid::new_v4();
let name = format!("test_rust_{}", id.as_simple(),);
println!("Unique name: {name}");
name
}
pub(crate) async fn resolve_hostname(hostname: &str) -> SocketAddr {
match tokio::net::lookup_host(hostname).await {
Ok(mut addrs) => addrs.next().unwrap(),
Err(_) => {
tokio::net::lookup_host((hostname, 9042)) .await
.unwrap()
.next()
.unwrap()
}
}
}
pub(crate) async fn supports_feature(session: &Session, feature: &str) -> bool {
let meta = session.get_cluster_state();
let system_local = meta
.keyspaces
.get("system")
.unwrap()
.tables
.get("local")
.unwrap();
if !system_local.columns.contains_key("supported_features") {
return false;
}
let result = session
.query_unpaged(
"SELECT supported_features FROM system.local WHERE key='local'",
(),
)
.await
.unwrap()
.into_rows_result()
.unwrap();
let (features,): (Option<&str>,) = result.single_row().unwrap();
features
.unwrap_or_default()
.split(',')
.any(|f| f == feature)
}
pub(crate) fn create_new_session_builder() -> GenericSessionBuilder<impl SessionBuilderKind> {
let session_builder = {
use crate::client::session_builder::SessionBuilder;
let uri = std::env::var("SCYLLA_URI").unwrap_or_else(|_| "172.42.0.2:9042".to_string());
SessionBuilder::new().known_node(uri)
};
session_builder
.tracing_info_fetch_attempts(NonZeroU32::new(200).unwrap())
.tracing_info_fetch_interval(Duration::from_millis(50))
}
pub(crate) async fn scylla_supports_tablets(session: &Session) -> bool {
supports_feature(session, "TABLETS").await
}
pub(crate) fn setup_tracing() {
let testing_layer = tracing_subscriber::fmt::layer()
.with_test_writer()
.with_filter(tracing_subscriber::EnvFilter::from_default_env());
let noop_layer = tracing_subscriber::fmt::layer().with_writer(std::io::sink);
let _ = tracing_subscriber::registry()
.with(testing_layer)
.with(noop_layer)
.try_init();
}
#[derive(Debug)]
struct SchemaQueriesLBP;
impl LoadBalancingPolicy for SchemaQueriesLBP {
fn pick<'a>(
&'a self,
_query: &'a RoutingInfo,
cluster: &'a ClusterState,
) -> Option<(NodeRef<'a>, Option<Shard>)> {
cluster.get_nodes_info().first().map(|node| (node, Some(0)))
}
fn fallback<'a>(
&'a self,
_query: &'a RoutingInfo,
cluster: &'a ClusterState,
) -> FallbackPlan<'a> {
Box::new(cluster.get_nodes_info().iter().map(|node| (node, Some(0))))
}
fn name(&self) -> String {
"SchemaQueriesLBP".to_owned()
}
}
#[derive(Debug, Default)]
struct SchemaQueriesRetrySession {
count: usize,
}
impl RetrySession for SchemaQueriesRetrySession {
fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
match request_info.error {
RequestAttemptError::DbError(DbError::ServerError, s)
if s == "Failed to apply group 0 change due to concurrent modification" =>
{
self.count += 1;
if self.count >= 10 {
error!(
"Received TENTH(!) group 0 concurrent modification error during DDL. Please fix ScyllaDB Core."
);
RetryDecision::DontRetry
} else {
warn!(
"Received group 0 concurrent modification error during DDL. Performing retry #{}.",
self.count
);
RetryDecision::RetrySameTarget(None)
}
}
_ => RetryDecision::DontRetry,
}
}
fn reset(&mut self) {
*self = Default::default()
}
}
#[derive(Debug)]
struct SchemaQueriesRetryPolicy;
impl RetryPolicy for SchemaQueriesRetryPolicy {
fn new_session(&self) -> Box<dyn RetrySession> {
Box::new(SchemaQueriesRetrySession::default())
}
}
fn apply_ddl_lbp(query: &mut Statement) {
let policy = query
.get_execution_profile_handle()
.map(|profile| profile.pointee_to_builder())
.unwrap_or_default()
.load_balancing_policy(Arc::new(SchemaQueriesLBP))
.retry_policy(Arc::new(SchemaQueriesRetryPolicy))
.build();
query.set_execution_profile_handle(Some(policy.into_handle()));
}
#[async_trait::async_trait]
pub(crate) trait PerformDDL {
async fn ddl(&self, query: impl Into<Statement> + Send) -> Result<(), ExecutionError>;
}
#[async_trait::async_trait]
impl PerformDDL for Session {
async fn ddl(&self, query: impl Into<Statement> + Send) -> Result<(), ExecutionError> {
let mut query = query.into();
apply_ddl_lbp(&mut query);
self.query_unpaged(query, &[]).await.map(|_| ())
}
}
#[async_trait::async_trait]
impl PerformDDL for CachingSession {
async fn ddl(&self, query: impl Into<Statement> + Send) -> Result<(), ExecutionError> {
let mut query = query.into();
apply_ddl_lbp(&mut query);
self.execute_unpaged(query, &[]).await.map(|_| ())
}
}
#[async_trait::async_trait]
impl PerformDDL for Connection {
async fn ddl(&self, query: impl Into<Statement> + Send) -> Result<(), ExecutionError> {
let mut query = query.into();
apply_ddl_lbp(&mut query);
self.query_unpaged(&query)
.await
.map(|_| ())
.map_err(ExecutionError::LastAttemptError)
}
}