Skip to main content

fluss/client/
connection.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::client::WriterClient;
19use crate::client::admin::FlussAdmin;
20use crate::client::metadata::Metadata;
21use crate::client::table::FlussTable;
22use crate::config::Config;
23use crate::error::{Error, FlussError, Result};
24use crate::metadata::TablePath;
25use crate::rpc::RpcClient;
26use parking_lot::RwLock;
27use std::sync::Arc;
28use std::time::Duration;
29
30// TODO: implement `close(&self, timeout: Duration)` to gracefully shut down the
31// writer client (drain pending batches, then force-close on timeout).
32// Java's FlussConnection.close() calls writerClient.close(Long.MAX_VALUE).
33// WriterClient::close() already exists but is never called from the public API.
34pub struct FlussConnection {
35    metadata: Arc<Metadata>,
36    network_connects: Arc<RpcClient>,
37    args: Config,
38    writer_client: RwLock<Option<Arc<WriterClient>>>,
39    admin_client: RwLock<Option<Arc<FlussAdmin>>>,
40}
41
42impl FlussConnection {
43    pub async fn new(arg: Config) -> Result<Self> {
44        arg.validate_security()
45            .map_err(|msg| Error::IllegalArgument { message: msg })?;
46        arg.validate_scanner_fetch()
47            .map_err(|msg| Error::IllegalArgument { message: msg })?;
48
49        let timeout = Duration::from_millis(arg.connect_timeout_ms);
50        let connections = if arg.is_sasl_enabled() {
51            Arc::new(
52                RpcClient::new()
53                    .with_sasl(
54                        arg.security_sasl_username.clone(),
55                        arg.security_sasl_password.clone(),
56                    )
57                    .with_timeout(timeout),
58            )
59        } else {
60            Arc::new(RpcClient::new().with_timeout(timeout))
61        };
62        let metadata = Metadata::new(arg.bootstrap_servers.as_str(), connections.clone()).await?;
63
64        Ok(FlussConnection {
65            metadata: Arc::new(metadata),
66            network_connects: connections.clone(),
67            args: arg.clone(),
68            writer_client: Default::default(),
69            admin_client: RwLock::new(None),
70        })
71    }
72
73    pub fn get_metadata(&self) -> Arc<Metadata> {
74        self.metadata.clone()
75    }
76
77    pub fn get_connections(&self) -> Arc<RpcClient> {
78        self.network_connects.clone()
79    }
80
81    pub fn config(&self) -> &Config {
82        &self.args
83    }
84
85    pub fn get_admin(&self) -> Result<Arc<FlussAdmin>> {
86        // 1. Fast path: return cached instance if already initialized.
87        if let Some(admin) = self.admin_client.read().as_ref() {
88            return Ok(admin.clone());
89        }
90
91        // 2. Slow path: acquire write lock.
92        let mut admin_guard = self.admin_client.write();
93
94        // 3. Double-check: another thread may have initialized while we waited.
95        if let Some(admin) = admin_guard.as_ref() {
96            return Ok(admin.clone());
97        }
98
99        // 4. Initialize and cache.
100        let admin = Arc::new(FlussAdmin::new(
101            self.network_connects.clone(),
102            self.metadata.clone(),
103        ));
104        *admin_guard = Some(admin.clone());
105        Ok(admin)
106    }
107
108    pub fn get_or_create_writer_client(&self) -> Result<Arc<WriterClient>> {
109        // 1. Fast path: Attempt to acquire a read lock to check if the client already exists.
110        if let Some(client) = self.writer_client.read().as_ref() {
111            return Ok(client.clone());
112        }
113
114        // 2. Slow path: Acquire the write lock.
115        let mut writer_guard = self.writer_client.write();
116
117        // 3. Double-check: Another thread might have initialized the client
118        // while this thread was waiting for the write lock.
119        if let Some(client) = writer_guard.as_ref() {
120            return Ok(client.clone());
121        }
122
123        // 4. Initialize the client since we are certain it doesn't exist yet.
124        let new_client = Arc::new(WriterClient::new(self.args.clone(), self.metadata.clone())?);
125
126        // 5. Store and return the newly created client.
127        *writer_guard = Some(new_client.clone());
128        Ok(new_client)
129    }
130
131    pub async fn get_table(&self, table_path: &TablePath) -> Result<FlussTable<'_>> {
132        self.metadata.update_table_metadata(table_path).await?;
133        let table_info = self
134            .metadata
135            .get_cluster()
136            .get_table(table_path)
137            .map_err(|e| {
138                if e.api_error() == Some(FlussError::InvalidTableException) {
139                    Error::table_not_exist(format!("Table not found: {table_path}"))
140                } else {
141                    e
142                }
143            })?
144            .clone();
145        Ok(FlussTable::new(self, self.metadata.clone(), table_info))
146    }
147}