use crate::client::WriterClient;
use crate::client::admin::FlussAdmin;
use crate::client::metadata::Metadata;
use crate::client::table::FlussTable;
use crate::config::Config;
use crate::error::{Error, FlussError, Result};
use crate::metadata::TablePath;
use crate::rpc::RpcClient;
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Duration;
pub struct FlussConnection {
metadata: Arc<Metadata>,
network_connects: Arc<RpcClient>,
args: Config,
writer_client: RwLock<Option<Arc<WriterClient>>>,
admin_client: RwLock<Option<Arc<FlussAdmin>>>,
}
impl FlussConnection {
pub async fn new(arg: Config) -> Result<Self> {
arg.validate_security()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_scanner_fetch()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
let timeout = Duration::from_millis(arg.connect_timeout_ms);
let connections = if arg.is_sasl_enabled() {
Arc::new(
RpcClient::new()
.with_sasl(
arg.security_sasl_username.clone(),
arg.security_sasl_password.clone(),
)
.with_timeout(timeout),
)
} else {
Arc::new(RpcClient::new().with_timeout(timeout))
};
let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?;
Ok(FlussConnection {
metadata: Arc::new(metadata),
network_connects: connections.clone(),
args: arg.clone(),
writer_client: Default::default(),
admin_client: RwLock::new(None),
})
}
pub fn get_metadata(&self) -> Arc<Metadata> {
self.metadata.clone()
}
pub fn get_connections(&self) -> Arc<RpcClient> {
self.network_connects.clone()
}
pub fn config(&self) -> &Config {
&self.args
}
pub fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
if let Some(admin) = self.admin_client.read().as_ref() {
return Ok(admin.clone());
}
let mut admin_guard = self.admin_client.write();
if let Some(admin) = admin_guard.as_ref() {
return Ok(admin.clone());
}
let admin = Arc::new(FlussAdmin::new(
self.network_connects.clone(),
self.metadata.clone(),
));
*admin_guard = Some(admin.clone());
Ok(admin)
}
pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
if let Some(client) = self.writer_client.read().as_ref() {
return Ok(client.clone());
}
let mut writer_guard = self.writer_client.write();
if let Some(client) = writer_guard.as_ref() {
return Ok(client.clone());
}
let new_client = Arc::new(WriterClient::new(self.args.clone(), self.metadata.clone())?);
*writer_guard = Some(new_client.clone());
Ok(new_client)
}
pub async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> {
self.metadata.update_table_metadata(table_path).await?;
let table_info = self
.metadata
.get_cluster()
.get_table(table_path)
.map_err(|e| {
if e.api_error() == Some(FlussError::InvalidTableException) {
Error::table_not_exist(format!("Table not found: {table_path}"))
} else {
e
}
})?
.clone();
Ok(FlussTable::new(self, self.metadata.clone(), table_info))
}
}