use crate::authenticator::Authenticator;
use crate::bucket::Bucket;
use crate::clients::bucket_mgmt_client::BucketMgmtClient;
use crate::clients::cluster_client::ClusterClient;
use crate::clients::diagnostics_client::DiagnosticsClient;
use crate::clients::query_client::QueryClient;
use crate::clients::tracing_client::TracingClient;
use crate::clients::user_mgmt_client::UserMgmtClient;
use crate::error;
use crate::management::buckets::bucket_manager::BucketManager;
use crate::management::users::user_manager::UserManager;
use crate::options::cluster_options::ClusterOptions;
use crate::options::diagnostic_options::{DiagnosticsOptions, PingOptions, WaitUntilReadyOptions};
use crate::options::query_options::QueryOptions;
use crate::results::diagnostics::{DiagnosticsResult, PingReport};
use crate::results::query_results::QueryResult;
use crate::tracing::{Keyspace, SERVICE_VALUE_QUERY};
use couchbase_core::create_span;
use std::sync::Arc;
use tracing::info;
use tracing::Instrument;
#[derive(Clone)]
pub struct Cluster {
client: Arc<ClusterClient>,
query_client: Arc<QueryClient>,
bucket_mgmt_client: Arc<BucketMgmtClient>,
user_mgmt_client: Arc<UserMgmtClient>,
diagnostics_client: Arc<DiagnosticsClient>,
tracing_client: Arc<TracingClient>,
}
impl Cluster {
pub async fn connect(
conn_str: impl AsRef<str>,
opts: ClusterOptions,
) -> error::Result<Cluster> {
info!("SDK Version: {}", env!("CARGO_PKG_VERSION"));
info!("Cluster Options {opts}");
let client = Arc::new(ClusterClient::connect(conn_str, opts).await?);
let query_client = Arc::new(client.query_client());
let bucket_mgmt_client = Arc::new(client.buckets_client());
let user_mgmt_client = Arc::new(client.users_client());
let diagnostics_client = Arc::new(client.diagnostics_client());
let tracing_client = Arc::new(client.tracing_client());
Ok(Cluster {
client,
query_client,
bucket_mgmt_client,
user_mgmt_client,
diagnostics_client,
tracing_client,
})
}
pub fn bucket(&self, name: impl Into<String>) -> Bucket {
let bucket_client = self.client.bucket_client(name.into());
Bucket::new(bucket_client)
}
pub async fn query(
&self,
statement: impl Into<String>,
opts: impl Into<Option<QueryOptions>>,
) -> error::Result<QueryResult> {
let statement: String = statement.into();
let span = create_span!("query").with_statement(&statement);
let ctx = self
.tracing_client
.begin_operation(Some(SERVICE_VALUE_QUERY), Keyspace::Cluster, span)
.await;
let result = self
.query_client
.query(statement, opts.into())
.instrument(ctx.span().clone())
.await;
ctx.end_operation(result.as_ref().err());
result
}
pub fn buckets(&self) -> BucketManager {
BucketManager::new(self.bucket_mgmt_client.clone())
}
pub fn users(&self) -> UserManager {
UserManager::new(self.user_mgmt_client.clone())
}
pub async fn diagnostics(
&self,
opts: impl Into<Option<DiagnosticsOptions>>,
) -> error::Result<DiagnosticsResult> {
let ctx = self
.tracing_client
.begin_operation(None, Keyspace::Cluster, create_span!("diagnostics"))
.await;
let result = self
.diagnostics_client
.diagnostics(opts.into().unwrap_or_default())
.instrument(ctx.span().clone())
.await;
ctx.end_operation(result.as_ref().err());
result
}
pub async fn ping(&self, opts: impl Into<Option<PingOptions>>) -> error::Result<PingReport> {
let ctx = self
.tracing_client
.begin_operation(None, Keyspace::Cluster, create_span!("ping"))
.await;
let result = self
.diagnostics_client
.ping(opts.into().unwrap_or_default())
.instrument(ctx.span().clone())
.await;
ctx.end_operation(result.as_ref().err());
result
}
pub async fn wait_until_ready(
&self,
opts: impl Into<Option<WaitUntilReadyOptions>>,
) -> error::Result<()> {
let ctx = self
.tracing_client
.begin_operation(None, Keyspace::Cluster, create_span!("wait_until_ready"))
.await;
let result = self
.diagnostics_client
.wait_until_ready(opts.into().unwrap_or_default())
.instrument(ctx.span().clone())
.await;
ctx.end_operation(result.as_ref().err());
result
}
pub async fn set_authenticator(&self, authenticator: Authenticator) -> error::Result<()> {
self.client.set_authenticator(authenticator).await
}
}
impl Drop for Cluster {
fn drop(&mut self) {
if Arc::strong_count(&self.client) == 1 {
info!("Dropping Cluster");
}
}
}