Skip to main content

rivven_operator/
cluster_client.rs

1//! Rivven Cluster Client
2//!
3//! This module provides a client for communicating with Rivven clusters
4//! from the Kubernetes operator. It wraps rivven-client with operator-specific
5//! error handling and connection management.
6
7use crate::error::{OperatorError, Result};
8use rivven_client::Client;
9use std::time::Duration;
10use tokio::time::timeout;
11use tracing::{debug, info, warn};
12
13/// Default connection timeout for cluster operations
14const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
15
16/// Default operation timeout for topic operations
17const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
18
19/// Information about a topic in the Rivven cluster
20#[derive(Debug, Clone)]
21pub struct TopicInfo {
22    /// Topic name
23    pub name: String,
24    /// Number of partitions
25    pub partitions: u32,
26    /// Whether the topic already existed
27    pub existed: bool,
28}
29
30/// Configuration for cluster client
31#[derive(Debug, Clone)]
32pub struct ClusterClientConfig {
33    /// Connection timeout
34    pub connection_timeout: Duration,
35    /// Operation timeout
36    pub operation_timeout: Duration,
37    /// Maximum retry attempts
38    pub max_retries: u32,
39    /// Retry backoff duration
40    pub retry_backoff: Duration,
41}
42
43impl Default for ClusterClientConfig {
44    fn default() -> Self {
45        Self {
46            connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
47            operation_timeout: DEFAULT_OPERATION_TIMEOUT,
48            max_retries: 3,
49            retry_backoff: Duration::from_millis(500),
50        }
51    }
52}
53
54/// Client for interacting with a Rivven cluster
55pub struct ClusterClient {
56    config: ClusterClientConfig,
57}
58
59impl ClusterClient {
60    /// Create a new cluster client with default configuration
61    pub fn new() -> Self {
62        Self {
63            config: ClusterClientConfig::default(),
64        }
65    }
66
67    /// Create a new cluster client with custom configuration
68    pub fn with_config(config: ClusterClientConfig) -> Self {
69        Self { config }
70    }
71
72    /// Connect to one of the broker endpoints
73    async fn connect(&self, endpoints: &[String]) -> Result<Client> {
74        if endpoints.is_empty() {
75            return Err(OperatorError::ClusterNotFound(
76                "No broker endpoints available".to_string(),
77            ));
78        }
79
80        let mut last_error = None;
81
82        for endpoint in endpoints {
83            debug!(endpoint = %endpoint, "Attempting to connect to broker");
84
85            match timeout(self.config.connection_timeout, Client::connect(endpoint)).await {
86                Ok(Ok(client)) => {
87                    info!(endpoint = %endpoint, "Successfully connected to broker");
88                    return Ok(client);
89                }
90                Ok(Err(e)) => {
91                    warn!(endpoint = %endpoint, error = %e, "Failed to connect to broker");
92                    last_error = Some(e.to_string());
93                }
94                Err(_) => {
95                    warn!(endpoint = %endpoint, "Connection to broker timed out");
96                    last_error = Some("Connection timed out".to_string());
97                }
98            }
99        }
100
101        Err(OperatorError::ConnectionFailed(
102            last_error.unwrap_or_else(|| "Unknown error".to_string()),
103        ))
104    }
105
106    /// Create or ensure a topic exists in the cluster
107    ///
108    /// Returns information about the topic including whether it already existed.
109    pub async fn ensure_topic(
110        &self,
111        endpoints: &[String],
112        topic_name: &str,
113        partitions: u32,
114    ) -> Result<TopicInfo> {
115        let mut client = self.connect(endpoints).await?;
116
117        // First, check if topic already exists
118        let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
119            .await
120            .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
121            .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
122
123        if existing_topics.contains(&topic_name.to_string()) {
124            info!(topic = %topic_name, "Topic already exists in cluster");
125            // Topic exists - return info
126            // Note: In a real implementation, we'd get actual partition count from describe_topic
127            return Ok(TopicInfo {
128                name: topic_name.to_string(),
129                partitions,
130                existed: true,
131            });
132        }
133
134        // Topic doesn't exist, create it
135        info!(topic = %topic_name, partitions = partitions, "Creating topic in cluster");
136
137        let actual_partitions = timeout(
138            self.config.operation_timeout,
139            client.create_topic(topic_name, Some(partitions)),
140        )
141        .await
142        .map_err(|_| OperatorError::Timeout("create_topic timed out".to_string()))?
143        .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
144
145        info!(topic = %topic_name, partitions = actual_partitions, "Topic created successfully");
146
147        Ok(TopicInfo {
148            name: topic_name.to_string(),
149            partitions: actual_partitions,
150            existed: false,
151        })
152    }
153
154    /// Delete a topic from the cluster
155    pub async fn delete_topic(&self, endpoints: &[String], topic_name: &str) -> Result<()> {
156        let mut client = self.connect(endpoints).await?;
157
158        // Check if topic exists first
159        let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
160            .await
161            .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
162            .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
163
164        if !existing_topics.contains(&topic_name.to_string()) {
165            info!(topic = %topic_name, "Topic does not exist in cluster, nothing to delete");
166            return Ok(());
167        }
168
169        info!(topic = %topic_name, "Deleting topic from cluster");
170
171        timeout(
172            self.config.operation_timeout,
173            client.delete_topic(topic_name),
174        )
175        .await
176        .map_err(|_| OperatorError::Timeout("delete_topic timed out".to_string()))?
177        .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
178
179        info!(topic = %topic_name, "Topic deleted successfully");
180        Ok(())
181    }
182
183    /// List all topics in the cluster
184    pub async fn list_topics(&self, endpoints: &[String]) -> Result<Vec<String>> {
185        let mut client = self.connect(endpoints).await?;
186
187        let topics = timeout(self.config.operation_timeout, client.list_topics())
188            .await
189            .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
190            .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
191
192        Ok(topics)
193    }
194
195    /// Check if a topic exists in the cluster
196    pub async fn topic_exists(&self, endpoints: &[String], topic_name: &str) -> Result<bool> {
197        let topics = self.list_topics(endpoints).await?;
198        Ok(topics.contains(&topic_name.to_string()))
199    }
200}
201
202impl Default for ClusterClient {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn test_default_config() {
214        let config = ClusterClientConfig::default();
215        assert_eq!(config.connection_timeout, Duration::from_secs(10));
216        assert_eq!(config.operation_timeout, Duration::from_secs(30));
217        assert_eq!(config.max_retries, 3);
218    }
219
220    #[test]
221    fn test_cluster_client_new() {
222        let client = ClusterClient::new();
223        assert_eq!(client.config.max_retries, 3);
224    }
225
226    #[test]
227    fn test_custom_config() {
228        let config = ClusterClientConfig {
229            connection_timeout: Duration::from_secs(5),
230            operation_timeout: Duration::from_secs(15),
231            max_retries: 5,
232            retry_backoff: Duration::from_millis(250),
233        };
234        let client = ClusterClient::with_config(config.clone());
235        assert_eq!(client.config.connection_timeout, Duration::from_secs(5));
236        assert_eq!(client.config.max_retries, 5);
237    }
238}