Skip to main content

rivven_operator/
cluster_client.rs

1//! Rivven Cluster Client
2//!
3//! This module provides a client for communicating with Rivven clusters
4//! from the Kubernetes operator. It wraps rivven-client with operator-specific
5//! error handling and connection management.
6
7use crate::error::{OperatorError, Result};
8use rivven_client::Client;
9use std::time::Duration;
10use tokio::time::timeout;
11use tracing::{debug, info, warn};
12
13/// Default connection timeout for cluster operations
14const DEFAULT_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
15
16/// Default operation timeout for topic operations
17const DEFAULT_OPERATION_TIMEOUT: Duration = Duration::from_secs(30);
18
19/// Information about a topic in the Rivven cluster
20#[derive(Debug, Clone)]
21#[allow(dead_code)] // Returned by ensure_topic; callers may not read all fields
22pub struct TopicInfo {
23    /// Topic name
24    pub name: String,
25    /// Number of partitions
26    pub partitions: u32,
27    /// Whether the topic already existed
28    pub existed: bool,
29}
30
31/// Configuration for cluster client
32#[derive(Debug, Clone)]
33pub struct ClusterClientConfig {
34    /// Connection timeout
35    pub connection_timeout: Duration,
36    /// Operation timeout
37    pub operation_timeout: Duration,
38    /// Maximum retry attempts
39    pub max_retries: u32,
40    /// Retry backoff duration
41    pub retry_backoff: Duration,
42}
43
44impl Default for ClusterClientConfig {
45    fn default() -> Self {
46        Self {
47            connection_timeout: DEFAULT_CONNECTION_TIMEOUT,
48            operation_timeout: DEFAULT_OPERATION_TIMEOUT,
49            max_retries: 3,
50            retry_backoff: Duration::from_millis(500),
51        }
52    }
53}
54
55/// Client for interacting with a Rivven cluster
56#[allow(dead_code)] // Public API methods used by future controllers
57pub struct ClusterClient {
58    config: ClusterClientConfig,
59}
60
61#[allow(dead_code)] // Public API — some methods not yet called by controllers
62impl ClusterClient {
63    /// Create a new cluster client with default configuration
64    pub fn new() -> Self {
65        Self {
66            config: ClusterClientConfig::default(),
67        }
68    }
69
70    /// Create a new cluster client with custom configuration
71    pub fn with_config(config: ClusterClientConfig) -> Self {
72        Self { config }
73    }
74
75    /// Connect to one of the broker endpoints with retry
76    async fn connect(&self, endpoints: &[String]) -> Result<Client> {
77        if endpoints.is_empty() {
78            return Err(OperatorError::ClusterNotFound(
79                "No broker endpoints available".to_string(),
80            ));
81        }
82
83        let mut last_error = None;
84
85        for attempt in 0..=self.config.max_retries {
86            if attempt > 0 {
87                debug!(
88                    attempt = attempt,
89                    "Retrying broker connection after backoff"
90                );
91                tokio::time::sleep(self.config.retry_backoff).await;
92            }
93
94            for endpoint in endpoints {
95                debug!(endpoint = %endpoint, attempt = attempt, "Attempting to connect to broker");
96
97                match timeout(self.config.connection_timeout, Client::connect(endpoint)).await {
98                    Ok(Ok(client)) => {
99                        info!(endpoint = %endpoint, "Successfully connected to broker");
100                        return Ok(client);
101                    }
102                    Ok(Err(e)) => {
103                        warn!(endpoint = %endpoint, error = %e, "Failed to connect to broker");
104                        last_error = Some(e.to_string());
105                    }
106                    Err(_) => {
107                        warn!(endpoint = %endpoint, "Connection to broker timed out");
108                        last_error = Some("Connection timed out".to_string());
109                    }
110                }
111            }
112        }
113
114        Err(OperatorError::ConnectionFailed(
115            last_error.unwrap_or_else(|| "Unknown error".to_string()),
116        ))
117    }
118
119    /// Create or ensure a topic exists in the cluster
120    ///
121    /// Returns information about the topic including whether it already existed.
122    pub async fn ensure_topic(
123        &self,
124        endpoints: &[String],
125        topic_name: &str,
126        partitions: u32,
127    ) -> Result<TopicInfo> {
128        let mut client = self.connect(endpoints).await?;
129
130        // First, check if topic already exists
131        let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
132            .await
133            .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
134            .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
135
136        if existing_topics.contains(&topic_name.to_string()) {
137            info!(topic = %topic_name, "Topic already exists in cluster");
138            // Topic exists - return info
139            // Note: In a real implementation, we'd get actual partition count from describe_topic
140            return Ok(TopicInfo {
141                name: topic_name.to_string(),
142                partitions,
143                existed: true,
144            });
145        }
146
147        // Topic doesn't exist, create it
148        info!(topic = %topic_name, partitions = partitions, "Creating topic in cluster");
149
150        let actual_partitions = timeout(
151            self.config.operation_timeout,
152            client.create_topic(topic_name, Some(partitions)),
153        )
154        .await
155        .map_err(|_| OperatorError::Timeout("create_topic timed out".to_string()))?
156        .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
157
158        info!(topic = %topic_name, partitions = actual_partitions, "Topic created successfully");
159
160        Ok(TopicInfo {
161            name: topic_name.to_string(),
162            partitions: actual_partitions,
163            existed: false,
164        })
165    }
166
167    /// Delete a topic from the cluster
168    pub async fn delete_topic(&self, endpoints: &[String], topic_name: &str) -> Result<()> {
169        let mut client = self.connect(endpoints).await?;
170
171        // Check if topic exists first
172        let existing_topics = timeout(self.config.operation_timeout, client.list_topics())
173            .await
174            .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
175            .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
176
177        if !existing_topics.contains(&topic_name.to_string()) {
178            info!(topic = %topic_name, "Topic does not exist in cluster, nothing to delete");
179            return Ok(());
180        }
181
182        info!(topic = %topic_name, "Deleting topic from cluster");
183
184        timeout(
185            self.config.operation_timeout,
186            client.delete_topic(topic_name),
187        )
188        .await
189        .map_err(|_| OperatorError::Timeout("delete_topic timed out".to_string()))?
190        .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
191
192        info!(topic = %topic_name, "Topic deleted successfully");
193        Ok(())
194    }
195
196    /// List all topics in the cluster
197    pub async fn list_topics(&self, endpoints: &[String]) -> Result<Vec<String>> {
198        let mut client = self.connect(endpoints).await?;
199
200        let topics = timeout(self.config.operation_timeout, client.list_topics())
201            .await
202            .map_err(|_| OperatorError::Timeout("list_topics timed out".to_string()))?
203            .map_err(|e| OperatorError::ClusterError(e.to_string()))?;
204
205        Ok(topics)
206    }
207
208    /// Check if a topic exists in the cluster
209    pub async fn topic_exists(&self, endpoints: &[String], topic_name: &str) -> Result<bool> {
210        let topics = self.list_topics(endpoints).await?;
211        Ok(topics.contains(&topic_name.to_string()))
212    }
213}
214
215impl Default for ClusterClient {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn test_default_config() {
227        let config = ClusterClientConfig::default();
228        assert_eq!(config.connection_timeout, Duration::from_secs(10));
229        assert_eq!(config.operation_timeout, Duration::from_secs(30));
230        assert_eq!(config.max_retries, 3);
231    }
232
233    #[test]
234    fn test_cluster_client_new() {
235        let client = ClusterClient::new();
236        assert_eq!(client.config.max_retries, 3);
237    }
238
239    #[test]
240    fn test_custom_config() {
241        let config = ClusterClientConfig {
242            connection_timeout: Duration::from_secs(5),
243            operation_timeout: Duration::from_secs(15),
244            max_retries: 5,
245            retry_backoff: Duration::from_millis(250),
246        };
247        let client = ClusterClient::with_config(config.clone());
248        assert_eq!(client.config.connection_timeout, Duration::from_secs(5));
249        assert_eq!(client.config.max_retries, 5);
250    }
251}