fluss/client/
connection.rs1use crate::client::WriterClient;
19use crate::client::admin::FlussAdmin;
20use crate::client::metadata::Metadata;
21use crate::client::table::FlussTable;
22use crate::config::Config;
23use crate::error::{Error, FlussError, Result};
24use crate::metadata::TablePath;
25use crate::rpc::RpcClient;
26use parking_lot::RwLock;
27use std::sync::Arc;
28use std::time::Duration;
29
30pub struct FlussConnection {
35 metadata: Arc<Metadata>,
36 network_connects: Arc<RpcClient>,
37 args: Config,
38 writer_client: RwLock<Option<Arc<WriterClient>>>,
39 admin_client: RwLock<Option<Arc<FlussAdmin>>>,
40}
41
42impl FlussConnection {
43 pub async fn new(arg: Config) -> Result<Self> {
44 arg.validate_security()
45 .map_err(|msg| Error::IllegalArgument { message: msg })?;
46 arg.validate_scanner_fetch()
47 .map_err(|msg| Error::IllegalArgument { message: msg })?;
48
49 let timeout = Duration::from_millis(arg.connect_timeout_ms);
50 let connections = if arg.is_sasl_enabled() {
51 Arc::new(
52 RpcClient::new()
53 .with_sasl(
54 arg.security_sasl_username.clone(),
55 arg.security_sasl_password.clone(),
56 )
57 .with_timeout(timeout),
58 )
59 } else {
60 Arc::new(RpcClient::new().with_timeout(timeout))
61 };
62 let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?;
63
64 Ok(FlussConnection {
65 metadata: Arc::new(metadata),
66 network_connects: connections.clone(),
67 args: arg.clone(),
68 writer_client: Default::default(),
69 admin_client: RwLock::new(None),
70 })
71 }
72
73 pub fn get_metadata(&self) -> Arc<Metadata> {
74 self.metadata.clone()
75 }
76
77 pub fn get_connections(&self) -> Arc<RpcClient> {
78 self.network_connects.clone()
79 }
80
81 pub fn config(&self) -> &Config {
82 &self.args
83 }
84
85 pub fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
86 if let Some(admin) = self.admin_client.read().as_ref() {
88 return Ok(admin.clone());
89 }
90
91 let mut admin_guard = self.admin_client.write();
93
94 if let Some(admin) = admin_guard.as_ref() {
96 return Ok(admin.clone());
97 }
98
99 let admin = Arc::new(FlussAdmin::new(
101 self.network_connects.clone(),
102 self.metadata.clone(),
103 ));
104 *admin_guard = Some(admin.clone());
105 Ok(admin)
106 }
107
108 pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
109 if let Some(client) = self.writer_client.read().as_ref() {
111 return Ok(client.clone());
112 }
113
114 let mut writer_guard = self.writer_client.write();
116
117 if let Some(client) = writer_guard.as_ref() {
120 return Ok(client.clone());
121 }
122
123 let new_client = Arc::new(WriterClient::new(self.args.clone(), self.metadata.clone())?);
125
126 *writer_guard = Some(new_client.clone());
128 Ok(new_client)
129 }
130
131 pub async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> {
132 self.metadata.update_table_metadata(table_path).await?;
133 let table_info = self
134 .metadata
135 .get_cluster()
136 .get_table(table_path)
137 .map_err(|e| {
138 if e.api_error() == Some(FlussError::InvalidTableException) {
139 Error::table_not_exist(format!("Table not found: {table_path}"))
140 } else {
141 e
142 }
143 })?
144 .clone();
145 Ok(FlussTable::new(self, self.metadata.clone(), table_info))
146 }
147}