use std::{fmt::Debug, sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use scylla_cql::{Consistency, frame::types::SerialConsistency};
use crate::policies::load_balancing::LoadBalancingPolicy;
use crate::policies::retry::RetryPolicy;
use crate::policies::speculative_execution::SpeculativeExecutionPolicy;
pub(crate) mod defaults {
use super::ExecutionProfileInner;
use crate::policies::load_balancing::{self, LoadBalancingPolicy};
use crate::policies::retry::{DefaultRetryPolicy, RetryPolicy};
use crate::policies::speculative_execution::SpeculativeExecutionPolicy;
use scylla_cql::Consistency;
use scylla_cql::frame::types::SerialConsistency;
use std::sync::Arc;
use std::time::Duration;
pub(crate) fn consistency() -> Consistency {
Consistency::LocalQuorum
}
pub(crate) fn serial_consistency() -> Option<SerialConsistency> {
Some(SerialConsistency::LocalSerial)
}
pub(crate) fn request_timeout() -> Option<Duration> {
Some(Duration::from_secs(30))
}
pub(crate) fn load_balancing_policy() -> Arc<dyn LoadBalancingPolicy> {
Arc::new(load_balancing::DefaultPolicy::default())
}
pub(crate) fn retry_policy() -> Arc<dyn RetryPolicy> {
Arc::new(DefaultRetryPolicy::new())
}
pub(crate) fn speculative_execution_policy() -> Option<Arc<dyn SpeculativeExecutionPolicy>> {
None
}
impl Default for ExecutionProfileInner {
fn default() -> Self {
Self {
request_timeout: request_timeout(),
consistency: consistency(),
serial_consistency: serial_consistency(),
load_balancing_policy: load_balancing_policy(),
retry_policy: retry_policy(),
speculative_execution_policy: speculative_execution_policy(),
}
}
}
}
#[derive(Clone, Debug)]
pub struct ExecutionProfileBuilder {
request_timeout: Option<Option<Duration>>,
consistency: Option<Consistency>,
serial_consistency: Option<Option<SerialConsistency>>,
load_balancing_policy: Option<Arc<dyn LoadBalancingPolicy>>,
retry_policy: Option<Arc<dyn RetryPolicy>>,
speculative_execution_policy: Option<Option<Arc<dyn SpeculativeExecutionPolicy>>>,
}
impl ExecutionProfileBuilder {
pub fn request_timeout(mut self, timeout: Option<Duration>) -> Self {
self.request_timeout = Some(timeout);
self
}
pub fn consistency(mut self, consistency: Consistency) -> Self {
self.consistency = Some(consistency);
self
}
pub fn serial_consistency(mut self, serial_consistency: Option<SerialConsistency>) -> Self {
self.serial_consistency = Some(serial_consistency);
self
}
pub fn load_balancing_policy(
mut self,
load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
) -> Self {
self.load_balancing_policy = Some(load_balancing_policy);
self
}
pub fn retry_policy(mut self, retry_policy: Arc<dyn RetryPolicy>) -> Self {
self.retry_policy = Some(retry_policy);
self
}
pub fn speculative_execution_policy(
mut self,
speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
) -> Self {
self.speculative_execution_policy = Some(speculative_execution_policy);
self
}
pub fn build(self) -> ExecutionProfile {
ExecutionProfile(Arc::new(ExecutionProfileInner {
request_timeout: self
.request_timeout
.unwrap_or_else(defaults::request_timeout),
consistency: self.consistency.unwrap_or_else(defaults::consistency),
serial_consistency: self
.serial_consistency
.unwrap_or_else(defaults::serial_consistency),
load_balancing_policy: self
.load_balancing_policy
.unwrap_or_else(defaults::load_balancing_policy),
retry_policy: self.retry_policy.unwrap_or_else(defaults::retry_policy),
speculative_execution_policy: self
.speculative_execution_policy
.unwrap_or_else(defaults::speculative_execution_policy),
}))
}
}
impl Default for ExecutionProfileBuilder {
fn default() -> Self {
ExecutionProfile::builder()
}
}
#[derive(Debug, Clone)]
pub struct ExecutionProfile(pub(crate) Arc<ExecutionProfileInner>);
#[derive(Debug)]
pub(crate) struct ExecutionProfileInner {
pub(crate) request_timeout: Option<Duration>,
pub(crate) consistency: Consistency,
pub(crate) serial_consistency: Option<SerialConsistency>,
pub(crate) load_balancing_policy: Arc<dyn LoadBalancingPolicy>,
pub(crate) retry_policy: Arc<dyn RetryPolicy>,
pub(crate) speculative_execution_policy: Option<Arc<dyn SpeculativeExecutionPolicy>>,
}
impl ExecutionProfileInner {
pub(crate) fn to_builder(&self) -> ExecutionProfileBuilder {
ExecutionProfileBuilder {
request_timeout: Some(self.request_timeout),
consistency: Some(self.consistency),
serial_consistency: Some(self.serial_consistency),
load_balancing_policy: Some(self.load_balancing_policy.clone()),
retry_policy: Some(self.retry_policy.clone()),
speculative_execution_policy: Some(self.speculative_execution_policy.clone()),
}
}
}
impl ExecutionProfile {
pub(crate) fn new_from_inner(inner: ExecutionProfileInner) -> Self {
Self(Arc::new(inner))
}
pub fn builder() -> ExecutionProfileBuilder {
ExecutionProfileBuilder {
request_timeout: None,
consistency: None,
serial_consistency: None,
load_balancing_policy: None,
retry_policy: None,
speculative_execution_policy: None,
}
}
pub fn to_builder(&self) -> ExecutionProfileBuilder {
self.0.to_builder()
}
pub fn into_handle(self) -> ExecutionProfileHandle {
ExecutionProfileHandle(Arc::new((ArcSwap::new(self.0), None)))
}
pub fn into_handle_with_label(self, label: String) -> ExecutionProfileHandle {
ExecutionProfileHandle(Arc::new((ArcSwap::new(self.0), Some(label))))
}
pub fn get_request_timeout(&self) -> Option<Duration> {
self.0.request_timeout
}
pub fn get_consistency(&self) -> Consistency {
self.0.consistency
}
pub fn get_serial_consistency(&self) -> Option<SerialConsistency> {
self.0.serial_consistency
}
pub fn get_load_balancing_policy(&self) -> &Arc<dyn LoadBalancingPolicy> {
&self.0.load_balancing_policy
}
pub fn get_retry_policy(&self) -> &Arc<dyn RetryPolicy> {
&self.0.retry_policy
}
pub fn get_speculative_execution_policy(&self) -> Option<&Arc<dyn SpeculativeExecutionPolicy>> {
self.0.speculative_execution_policy.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct ExecutionProfileHandle(Arc<(ArcSwap<ExecutionProfileInner>, Option<String>)>);
impl ExecutionProfileHandle {
pub(crate) fn access(&self) -> Arc<ExecutionProfileInner> {
self.0.0.load_full()
}
pub fn pointee_to_builder(&self) -> ExecutionProfileBuilder {
self.0.0.load().to_builder()
}
pub fn to_profile(&self) -> ExecutionProfile {
ExecutionProfile(self.access())
}
pub fn map_to_another_profile(&mut self, profile: ExecutionProfile) {
self.0.0.store(profile.0)
}
}