rivven_operator/
cluster_client.rs1use crate::error::{OperatorError, Result};
8use rivven_client::Client;
9use std::time::Duration;
10use tokio::time::timeout;
11use tracing::{debug, info, warn};
12
13const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
15
16const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
18
19#[derive(Debug, Clone)]
21pub struct TopicInfo {
22 pub name: String,
24 pub partitions: u32,
26 pub existed: bool,
28}
29
30#[derive(Debug, Clone)]
32pub struct ClusterClientConfig {
33 pub connection_timeout: Duration,
35 pub operation_timeout: Duration,
37 pub max_retries: u32,
39 pub retry_backoff: Duration,
41}
42
43impl Default for ClusterClientConfig {
44 fn default() -> Self {
45 Self {
46 connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
47 operation_timeout: DEFAULT_OPERATION_TIMEOUT,
48 max_retries: 3,
49 retry_backoff: Duration::from_millis(500),
50 }
51 }
52}
53
54pub struct ClusterClient {
56 config: ClusterClientConfig,
57}
58
59impl ClusterClient {
60 pub fn new() -> Self {
62 Self {
63 config: ClusterClientConfig::default(),
64 }
65 }
66
67 pub fn with_config(config: ClusterClientConfig) -> Self {
69 Self { config }
70 }
71
72 async fn connect(&self, endpoints: &[String]) -> Result<Client> {
74 if endpoints.is_empty() {
75 return Err(OperatorError::ClusterNotFound(
76 "No broker endpoints available".to_string(),
77 ));
78 }
79
80 let mut last_error = None;
81
82 for endpoint in endpoints {
83 debug!(endpoint = %endpoint, "Attempting to connect to broker");
84
85 match timeout(self.config.connection_timeout, Client::connect(endpoint)).await {
86 Ok(Ok(client)) => {
87 info!(endpoint = %endpoint, "Successfully connected to broker");
88 return Ok(client);
89 }
90 Ok(Err(e)) => {
91 warn!(endpoint = %endpoint, error = %e, "Failed to connect to broker");
92 last_error = Some(e.to_string());
93 }
94 Err(_) => {
95 warn!(endpoint = %endpoint, "Connection to broker timed out");
96 last_error = Some("Connection timed out".to_string());
97 }
98 }
99 }
100
101 Err(OperatorError::ConnectionFailed(
102 last_error.unwrap_or_else(|| "Unknown error".to_string()),
103 ))
104 }
105
106 pub async fn ensure_topic(
110 &self,
111 endpoints: &[String],
112 topic_name: &str,
113 partitions: u32,
114 ) -> Result<TopicInfo> {
115 let mut client = self.connect(endpoints).await?;
116
117 let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
119 .await
120 .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
121 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
122
123 if existing_topics.contains(&topic_name.to_string()) {
124 info!(topic = %topic_name, "Topic already exists in cluster");
125 return Ok(TopicInfo {
128 name: topic_name.to_string(),
129 partitions,
130 existed: true,
131 });
132 }
133
134 info!(topic = %topic_name, partitions = partitions, "Creating topic in cluster");
136
137 let actual_partitions = timeout(
138 self.config.operation_timeout,
139 client.create_topic(topic_name, Some(partitions)),
140 )
141 .await
142 .map_err(|_| OperatorError::Timeout("create_topic timed out".to_string()))?
143 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
144
145 info!(topic = %topic_name, partitions = actual_partitions, "Topic created successfully");
146
147 Ok(TopicInfo {
148 name: topic_name.to_string(),
149 partitions: actual_partitions,
150 existed: false,
151 })
152 }
153
154 pub async fn delete_topic(&self, endpoints: &[String], topic_name: &str) -> Result<()> {
156 let mut client = self.connect(endpoints).await?;
157
158 let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
160 .await
161 .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
162 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
163
164 if !existing_topics.contains(&topic_name.to_string()) {
165 info!(topic = %topic_name, "Topic does not exist in cluster, nothing to delete");
166 return Ok(());
167 }
168
169 info!(topic = %topic_name, "Deleting topic from cluster");
170
171 timeout(
172 self.config.operation_timeout,
173 client.delete_topic(topic_name),
174 )
175 .await
176 .map_err(|_| OperatorError::Timeout("delete_topic timed out".to_string()))?
177 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
178
179 info!(topic = %topic_name, "Topic deleted successfully");
180 Ok(())
181 }
182
183 pub async fn list_topics(&self, endpoints: &[String]) -> Result<Vec<String>> {
185 let mut client = self.connect(endpoints).await?;
186
187 let topics = timeout(self.config.operation_timeout, client.list_topics())
188 .await
189 .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
190 .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
191
192 Ok(topics)
193 }
194
195 pub async fn topic_exists(&self, endpoints: &[String], topic_name: &str) -> Result<bool> {
197 let topics = self.list_topics(endpoints).await?;
198 Ok(topics.contains(&topic_name.to_string()))
199 }
200}
201
202impl Default for ClusterClient {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 #[test]
213 fn test_default_config() {
214 let config = ClusterClientConfig::default();
215 assert_eq!(config.connection_timeout, Duration::from_secs(10));
216 assert_eq!(config.operation_timeout, Duration::from_secs(30));
217 assert_eq!(config.max_retries, 3);
218 }
219
220 #[test]
221 fn test_cluster_client_new() {
222 let client = ClusterClient::new();
223 assert_eq!(client.config.max_retries, 3);
224 }
225
226 #[test]
227 fn test_custom_config() {
228 let config = ClusterClientConfig {
229 connection_timeout: Duration::from_secs(5),
230 operation_timeout: Duration::from_secs(15),
231 max_retries: 5,
232 retry_backoff: Duration::from_millis(250),
233 };
234 let client = ClusterClient::with_config(config.clone());
235 assert_eq!(client.config.connection_timeout, Duration::from_secs(5));
236 assert_eq!(client.config.max_retries, 5);
237 }
238}