rivven_operator/
cluster_client.rs1use crate::error::{OperatorError, Result};
8use rivven_client::Client;
9use std::time::Duration;
10use tokio::time::timeout;
11use tracing::{debug, info, warn};
12
13const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
15
16const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
18
19#[derive(Debug, Clone)]
21#[allow(dead_code)] pub struct TopicInfo {
23 pub name: String,
25 pub partitions: u32,
27 pub existed: bool,
29}
30
31#[derive(Debug, Clone)]
33pub struct ClusterClientConfig {
34 pub connection_timeout: Duration,
36 pub operation_timeout: Duration,
38 pub max_retries: u32,
40 pub retry_backoff: Duration,
42}
43
44impl Default for ClusterClientConfig {
45 fn default() -> Self {
46 Self {
47 connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
48 operation_timeout: DEFAULT_OPERATION_TIMEOUT,
49 max_retries: 3,
50 retry_backoff: Duration::from_millis(500),
51 }
52 }
53}
54
55#[allow(dead_code)] pub struct ClusterClient {
58 config: ClusterClientConfig,
59}
60
61#[allow(dead_code)] impl ClusterClient {
63 pub fn new() -> Self {
65 Self {
66 config: ClusterClientConfig::default(),
67 }
68 }
69
70 pub fn with_config(config: ClusterClientConfig) -> Self {
72 Self { config }
73 }
74
75 async fn connect(&self, endpoints: &[String]) -> Result<Client> {
77 if endpoints.is_empty() {
78 return Err(OperatorError::ClusterNotFound(
79 "No broker endpoints available".to_string(),
80 ));
81 }
82
83 let mut last_error = None;
84
85 for attempt in 0..=self.config.max_retries {
86 if attempt > 0 {
87 debug!(
88 attempt = attempt,
89 "Retrying broker connection after backoff"
90 );
91 tokio::time::sleep(self.config.retry_backoff).await;
92 }
93
94 for endpoint in endpoints {
95 debug!(endpoint = %endpoint, attempt = attempt, "Attempting to connect to broker");
96
97 match timeout(self.config.connection_timeout, Client::connect(endpoint)).await {
98 Ok(Ok(client)) => {
99 info!(endpoint = %endpoint, "Successfully connected to broker");
100 return Ok(client);
101 }
102 Ok(Err(e)) => {
103 warn!(endpoint = %endpoint, error = %e, "Failed to connect to broker");
104 last_error = Some(e.to_string());
105 }
106 Err(_) => {
107 warn!(endpoint = %endpoint, "Connection to broker timed out");
108 last_error = Some("Connection timed out".to_string());
109 }
110 }
111 }
112 }
113
114 Err(OperatorError::ConnectionFailed(
115 last_error.unwrap_or_else(|| "Unknown error".to_string()),
116 ))
117 }
118
119 pub async fn ensure_topic(
123 &self,
124 endpoints: &[String],
125 topic_name: &str,
126 partitions: u32,
127 ) -> Result<TopicInfo> {
128 let mut client = self.connect(endpoints).await?;
129
130 let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
132 .await
133 .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
134 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
135
136 if existing_topics.contains(&topic_name.to_string()) {
137 info!(topic = %topic_name, "Topic already exists in cluster");
138 return Ok(TopicInfo {
141 name: topic_name.to_string(),
142 partitions,
143 existed: true,
144 });
145 }
146
147 info!(topic = %topic_name, partitions = partitions, "Creating topic in cluster");
149
150 let actual_partitions = timeout(
151 self.config.operation_timeout,
152 client.create_topic(topic_name, Some(partitions)),
153 )
154 .await
155 .map_err(|_| OperatorError::Timeout("create_topic timed out".to_string()))?
156 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
157
158 info!(topic = %topic_name, partitions = actual_partitions, "Topic created successfully");
159
160 Ok(TopicInfo {
161 name: topic_name.to_string(),
162 partitions: actual_partitions,
163 existed: false,
164 })
165 }
166
167 pub async fn delete_topic(&self, endpoints: &[String], topic_name: &str) -> Result<()> {
169 let mut client = self.connect(endpoints).await?;
170
171 let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
173 .await
174 .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
175 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
176
177 if !existing_topics.contains(&topic_name.to_string()) {
178 info!(topic = %topic_name, "Topic does not exist in cluster, nothing to delete");
179 return Ok(());
180 }
181
182 info!(topic = %topic_name, "Deleting topic from cluster");
183
184 timeout(
185 self.config.operation_timeout,
186 client.delete_topic(topic_name),
187 )
188 .await
189 .map_err(|_| OperatorError::Timeout("delete_topic timed out".to_string()))?
190 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
191
192 info!(topic = %topic_name, "Topic deleted successfully");
193 Ok(())
194 }
195
196 pub async fn list_topics(&self, endpoints: &[String]) -> Result<Vec<String>> {
198 let mut client = self.connect(endpoints).await?;
199
200 let topics = timeout(self.config.operation_timeout, client.list_topics())
201 .await
202 .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
203 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
204
205 Ok(topics)
206 }
207
208 pub async fn topic_exists(&self, endpoints: &[String], topic_name: &str) -> Result<bool> {
210 let topics = self.list_topics(endpoints).await?;
211 Ok(topics.contains(&topic_name.to_string()))
212 }
213}
214
215impl Default for ClusterClient {
216 fn default() -> Self {
217 Self::new()
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn test_default_config() {
227 let config = ClusterClientConfig::default();
228 assert_eq!(config.connection_timeout, Duration::from_secs(10));
229 assert_eq!(config.operation_timeout, Duration::from_secs(30));
230 assert_eq!(config.max_retries, 3);
231 }
232
233 #[test]
234 fn test_cluster_client_new() {
235 let client = ClusterClient::new();
236 assert_eq!(client.config.max_retries, 3);
237 }
238
239 #[test]
240 fn test_custom_config() {
241 let config = ClusterClientConfig {
242 connection_timeout: Duration::from_secs(5),
243 operation_timeout: Duration::from_secs(15),
244 max_retries: 5,
245 retry_backoff: Duration::from_millis(250),
246 };
247 let client = ClusterClient::with_config(config.clone());
248 assert_eq!(client.config.connection_timeout, Duration::from_secs(5));
249 assert_eq!(client.config.max_retries, 5);
250 }
251}