Skip to main content

rivven_operator/
controller.rs

1//! RivvenCluster Controller
2//!
3//! This module implements the Kubernetes controller pattern for managing
4//! RivvenCluster custom resources. It watches for changes and reconciles
5//! the actual cluster state to match the desired specification.
6
7use crate::crd::{ClusterCondition, ClusterPhase, RivvenCluster, RivvenClusterStatus};
8use crate::error::{OperatorError, Result};
9use crate::resources::ResourceBuilder;
10use chrono::Utc;
11use futures::StreamExt;
12use k8s_openapi::api::apps::v1::StatefulSet;
13use k8s_openapi::api::core::v1::{ConfigMap, Service};
14use k8s_openapi::api::policy::v1::PodDisruptionBudget;
15use kube::api::{Api, Patch, PatchParams};
16use kube::runtime::controller::{Action, Controller};
17use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
18use kube::runtime::watcher::Config;
19use kube::{Client, ResourceExt};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, error, info, instrument, warn};
23use validator::Validate;
24
25/// Finalizer name for cleanup operations
26pub const FINALIZER_NAME: &str = "rivven.hupe1980.github.io/cluster-finalizer";
27
28/// Default requeue interval for successful reconciliations
29const DEFAULT_REQUEUE_SECONDS: u64 = 300; // 5 minutes
30
31/// Requeue interval for error cases
32const ERROR_REQUEUE_SECONDS: u64 = 30;
33
34/// Context passed to the controller
35pub struct ControllerContext {
36    /// Kubernetes client
37    pub client: Client,
38    /// Metrics recorder (optional)
39    pub metrics: Option<ControllerMetrics>,
40}
41
42/// Metrics for the controller
43#[derive(Clone)]
44pub struct ControllerMetrics {
45    /// Counter for reconciliation attempts
46    pub reconciliations: metrics::Counter,
47    /// Counter for reconciliation errors
48    pub errors: metrics::Counter,
49    /// Histogram for reconciliation duration
50    pub duration: metrics::Histogram,
51}
52
53impl ControllerMetrics {
54    /// Create new controller metrics
55    pub fn new() -> Self {
56        Self {
57            reconciliations: metrics::counter!("rivven_operator_reconciliations_total"),
58            errors: metrics::counter!("rivven_operator_reconciliation_errors_total"),
59            duration: metrics::histogram!("rivven_operator_reconciliation_duration_seconds"),
60        }
61    }
62}
63
64impl Default for ControllerMetrics {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70/// Start the RivvenCluster controller
71pub async fn run_controller(client: Client, namespace: Option<String>) -> Result<()> {
72    let clusters: Api<RivvenCluster> = match &namespace {
73        Some(ns) => Api::namespaced(client.clone(), ns),
74        None => Api::all(client.clone()),
75    };
76
77    let ctx = Arc::new(ControllerContext {
78        client: client.clone(),
79        metrics: Some(ControllerMetrics::new()),
80    });
81
82    info!(
83        namespace = namespace.as_deref().unwrap_or("all"),
84        "Starting RivvenCluster controller"
85    );
86
87    // Watch related resources for changes
88    let statefulsets = match &namespace {
89        Some(ns) => Api::<StatefulSet>::namespaced(client.clone(), ns),
90        None => Api::<StatefulSet>::all(client.clone()),
91    };
92
93    let services = match &namespace {
94        Some(ns) => Api::<Service>::namespaced(client.clone(), ns),
95        None => Api::<Service>::all(client.clone()),
96    };
97
98    Controller::new(clusters.clone(), Config::default())
99        .owns(statefulsets, Config::default())
100        .owns(services, Config::default())
101        .run(reconcile, error_policy, ctx)
102        .for_each(|result| async move {
103            match result {
104                Ok((obj, action)) => {
105                    debug!(
106                        name = obj.name,
107                        namespace = obj.namespace,
108                        ?action,
109                        "Reconciliation completed"
110                    );
111                }
112                Err(e) => {
113                    error!(error = %e, "Reconciliation failed");
114                }
115            }
116        })
117        .await;
118
119    Ok(())
120}
121
122/// Main reconciliation function
123#[instrument(skip(cluster, ctx), fields(name = %cluster.name_any(), namespace = cluster.namespace()))]
124async fn reconcile(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
125    let start = std::time::Instant::now();
126
127    if let Some(ref metrics) = ctx.metrics {
128        metrics.reconciliations.increment(1);
129    }
130
131    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
132    let clusters: Api<RivvenCluster> = Api::namespaced(ctx.client.clone(), &namespace);
133
134    let result = finalizer(&clusters, FINALIZER_NAME, cluster, |event| async {
135        match event {
136            FinalizerEvent::Apply(cluster) => apply_cluster(cluster, ctx.clone()).await,
137            FinalizerEvent::Cleanup(cluster) => cleanup_cluster(cluster, ctx.clone()).await,
138        }
139    })
140    .await;
141
142    if let Some(ref metrics) = ctx.metrics {
143        metrics.duration.record(start.elapsed().as_secs_f64());
144    }
145
146    result.map_err(|e| {
147        if let Some(ref metrics) = ctx.metrics {
148            metrics.errors.increment(1);
149        }
150        OperatorError::ReconcileFailed(e.to_string())
151    })
152}
153
154/// Apply (create/update) the cluster resources
155#[instrument(skip(cluster, ctx))]
156async fn apply_cluster(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
157    let name = cluster.name_any();
158    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
159
160    info!(name = %name, namespace = %namespace, "Reconciling RivvenCluster");
161
162    // Validate the cluster spec before reconciliation
163    if let Err(errors) = cluster.spec.validate() {
164        let error_messages: Vec<String> = errors
165            .field_errors()
166            .iter()
167            .flat_map(|(field, errs)| {
168                errs.iter()
169                    .map(move |e| format!("{}: {:?}", field, e.message))
170            })
171            .collect();
172        let error_msg = error_messages.join("; ");
173        warn!(name = %name, errors = %error_msg, "Cluster spec validation failed");
174        return Err(OperatorError::InvalidConfig(error_msg));
175    }
176
177    // Additional security validations
178    validate_cluster_security(&cluster)?;
179
180    // Build all resources
181    let builder = ResourceBuilder::new(&cluster)?;
182
183    // Apply ConfigMap
184    let configmap = builder.build_configmap();
185    apply_configmap(&ctx.client, &namespace, configmap).await?;
186
187    // Apply headless service
188    let headless_svc = builder.build_headless_service();
189    apply_service(&ctx.client, &namespace, headless_svc).await?;
190
191    // Apply client service
192    let client_svc = builder.build_client_service();
193    apply_service(&ctx.client, &namespace, client_svc).await?;
194
195    // Apply StatefulSet
196    let statefulset = builder.build_statefulset();
197    let sts_status = apply_statefulset(&ctx.client, &namespace, statefulset).await?;
198
199    // Apply PDB if enabled
200    if let Some(pdb) = builder.build_pdb() {
201        apply_pdb(&ctx.client, &namespace, pdb).await?;
202    }
203
204    // Update cluster status
205    let status = build_status(&cluster, sts_status);
206    update_status(&ctx.client, &namespace, &name, status).await?;
207
208    info!(name = %name, "Reconciliation complete");
209
210    Ok(Action::requeue(Duration::from_secs(
211        DEFAULT_REQUEUE_SECONDS,
212    )))
213}
214
215/// Validate cluster for security best practices
216fn validate_cluster_security(cluster: &RivvenCluster) -> Result<()> {
217    let spec = &cluster.spec;
218
219    // Validate TLS configuration consistency
220    if spec.tls.enabled && spec.tls.cert_secret_name.is_none() {
221        return Err(OperatorError::InvalidConfig(
222            "TLS is enabled but no certificate secret is specified".to_string(),
223        ));
224    }
225
226    if spec.tls.mtls_enabled && spec.tls.ca_secret_name.is_none() {
227        return Err(OperatorError::InvalidConfig(
228            "mTLS is enabled but no CA secret is specified".to_string(),
229        ));
230    }
231
232    // Validate replication factor vs replicas
233    if spec.config.default_replication_factor > spec.replicas {
234        return Err(OperatorError::InvalidConfig(format!(
235            "Replication factor ({}) cannot exceed replica count ({})",
236            spec.config.default_replication_factor, spec.replicas
237        )));
238    }
239
240    // Warn if running with single replica in production
241    if spec.replicas == 1 {
242        warn!(
243            cluster = cluster.name_any(),
244            "Running with single replica - not recommended for production"
245        );
246    }
247
248    // Validate Raft timing constraints (heartbeat should be < election timeout / 3)
249    let min_election_timeout = spec.config.raft_heartbeat_interval_ms * 3;
250    if spec.config.raft_election_timeout_ms < min_election_timeout {
251        return Err(OperatorError::InvalidConfig(format!(
252            "Raft election timeout ({}) should be at least 3x heartbeat interval ({})",
253            spec.config.raft_election_timeout_ms, spec.config.raft_heartbeat_interval_ms
254        )));
255    }
256
257    Ok(())
258}
259
260/// Cleanup resources when cluster is deleted
261#[instrument(skip(cluster, _ctx))]
262async fn cleanup_cluster(
263    cluster: Arc<RivvenCluster>,
264    _ctx: Arc<ControllerContext>,
265) -> Result<Action> {
266    let name = cluster.name_any();
267    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
268
269    info!(name = %name, namespace = %namespace, "Cleaning up RivvenCluster resources");
270
271    // Resources with owner references will be garbage collected automatically
272    // Here we can add any additional cleanup logic if needed
273
274    // For example, we might want to:
275    // 1. Wait for graceful shutdown of brokers
276    // 2. Clean up external resources (PVCs if not retained)
277    // 3. Notify external systems
278
279    info!(name = %name, "Cleanup complete");
280
281    Ok(Action::await_change())
282}
283
284/// Apply a ConfigMap using server-side apply
285async fn apply_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
286    let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
287    let name = cm.metadata.name.as_ref().unwrap();
288
289    debug!(name = %name, "Applying ConfigMap");
290
291    let patch_params = PatchParams::apply("rivven-operator").force();
292    api.patch(name, &patch_params, &Patch::Apply(&cm))
293        .await
294        .map_err(OperatorError::from)?;
295
296    Ok(())
297}
298
299/// Apply a Service using server-side apply
300async fn apply_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
301    let api: Api<Service> = Api::namespaced(client.clone(), namespace);
302    let name = svc.metadata.name.as_ref().unwrap();
303
304    debug!(name = %name, "Applying Service");
305
306    let patch_params = PatchParams::apply("rivven-operator").force();
307    api.patch(name, &patch_params, &Patch::Apply(&svc))
308        .await
309        .map_err(OperatorError::from)?;
310
311    Ok(())
312}
313
314/// Apply a StatefulSet using server-side apply
315async fn apply_statefulset(
316    client: &Client,
317    namespace: &str,
318    sts: StatefulSet,
319) -> Result<Option<k8s_openapi::api::apps::v1::StatefulSetStatus>> {
320    let api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
321    let name = sts.metadata.name.as_ref().unwrap();
322
323    debug!(name = %name, "Applying StatefulSet");
324
325    let patch_params = PatchParams::apply("rivven-operator").force();
326    let result = api
327        .patch(name, &patch_params, &Patch::Apply(&sts))
328        .await
329        .map_err(OperatorError::from)?;
330
331    Ok(result.status)
332}
333
334/// Apply a PodDisruptionBudget using server-side apply
335async fn apply_pdb(client: &Client, namespace: &str, pdb: PodDisruptionBudget) -> Result<()> {
336    let api: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
337    let name = pdb.metadata.name.as_ref().unwrap();
338
339    debug!(name = %name, "Applying PodDisruptionBudget");
340
341    let patch_params = PatchParams::apply("rivven-operator").force();
342    api.patch(name, &patch_params, &Patch::Apply(&pdb))
343        .await
344        .map_err(OperatorError::from)?;
345
346    Ok(())
347}
348
349/// Build cluster status from StatefulSet status
350fn build_status(
351    cluster: &RivvenCluster,
352    sts_status: Option<k8s_openapi::api::apps::v1::StatefulSetStatus>,
353) -> RivvenClusterStatus {
354    let now = Utc::now().to_rfc3339();
355
356    let (replicas, ready_replicas, updated_replicas) = sts_status
357        .map(|s| {
358            (
359                s.replicas,
360                s.ready_replicas.unwrap_or(0),
361                s.updated_replicas.unwrap_or(0),
362            )
363        })
364        .unwrap_or((0, 0, 0));
365
366    let desired_replicas = cluster.spec.replicas;
367
368    // Determine phase based on state
369    let phase = if ready_replicas == 0 {
370        ClusterPhase::Provisioning
371    } else if ready_replicas < desired_replicas {
372        if updated_replicas < desired_replicas {
373            ClusterPhase::Updating
374        } else {
375            ClusterPhase::Degraded
376        }
377    } else if ready_replicas == desired_replicas {
378        ClusterPhase::Running
379    } else {
380        ClusterPhase::Degraded
381    };
382
383    // Build conditions
384    let mut conditions = vec![];
385
386    // Ready condition
387    conditions.push(ClusterCondition {
388        condition_type: "Ready".to_string(),
389        status: if ready_replicas >= desired_replicas {
390            "True".to_string()
391        } else {
392            "False".to_string()
393        },
394        reason: Some(format!(
395            "{}/{} replicas ready",
396            ready_replicas, desired_replicas
397        )),
398        message: None,
399        last_transition_time: Some(now.clone()),
400    });
401
402    // Available condition
403    conditions.push(ClusterCondition {
404        condition_type: "Available".to_string(),
405        status: if ready_replicas > 0 {
406            "True".to_string()
407        } else {
408            "False".to_string()
409        },
410        reason: Some(
411            if ready_replicas > 0 {
412                "AtLeastOneReplicaReady"
413            } else {
414                "NoReplicasReady"
415            }
416            .to_string(),
417        ),
418        message: None,
419        last_transition_time: Some(now.clone()),
420    });
421
422    // Build broker endpoints
423    let name = cluster
424        .metadata
425        .name
426        .as_ref()
427        .map(|n| format!("rivven-{}", n));
428    let namespace = cluster
429        .metadata
430        .namespace
431        .as_ref()
432        .cloned()
433        .unwrap_or_else(|| "default".to_string());
434
435    let broker_endpoints: Vec<String> = (0..ready_replicas)
436        .map(|i| {
437            format!(
438                "{}-{}.{}-headless.{}.svc.cluster.local:9092",
439                name.as_deref().unwrap_or("rivven"),
440                i,
441                name.as_deref().unwrap_or("rivven"),
442                namespace
443            )
444        })
445        .collect();
446
447    RivvenClusterStatus {
448        phase,
449        replicas,
450        ready_replicas,
451        updated_replicas,
452        observed_generation: cluster.metadata.generation.unwrap_or(0),
453        conditions,
454        broker_endpoints,
455        leader: None, // Would need to query the cluster to determine leader
456        last_updated: Some(now),
457        message: None,
458    }
459}
460
461/// Update the cluster status subresource
462async fn update_status(
463    client: &Client,
464    namespace: &str,
465    name: &str,
466    status: RivvenClusterStatus,
467) -> Result<()> {
468    let api: Api<RivvenCluster> = Api::namespaced(client.clone(), namespace);
469
470    debug!(name = %name, phase = ?status.phase, "Updating cluster status");
471
472    let patch = serde_json::json!({
473        "status": status
474    });
475
476    let patch_params = PatchParams::default();
477    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
478        .await
479        .map_err(OperatorError::from)?;
480
481    Ok(())
482}
483
484/// Error policy for the controller
485fn error_policy(
486    _cluster: Arc<RivvenCluster>,
487    error: &OperatorError,
488    _ctx: Arc<ControllerContext>,
489) -> Action {
490    warn!(
491        error = %error,
492        "Reconciliation error, will retry"
493    );
494
495    // Use the error's suggested requeue delay, or default
496    let delay = error
497        .requeue_delay()
498        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
499
500    Action::requeue(delay)
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use crate::crd::{
507        BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, RivvenClusterSpec, StorageSpec, TlsSpec,
508    };
509    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
510    use std::collections::BTreeMap;
511
512    fn create_test_cluster() -> RivvenCluster {
513        RivvenCluster {
514            metadata: ObjectMeta {
515                name: Some("test-cluster".to_string()),
516                namespace: Some("default".to_string()),
517                uid: Some("test-uid".to_string()),
518                generation: Some(1),
519                ..Default::default()
520            },
521            spec: RivvenClusterSpec {
522                replicas: 3,
523                version: "0.0.1".to_string(),
524                image: None,
525                image_pull_policy: "IfNotPresent".to_string(),
526                image_pull_secrets: vec![],
527                storage: StorageSpec::default(),
528                resources: None,
529                config: BrokerConfig::default(),
530                tls: TlsSpec::default(),
531                metrics: MetricsSpec::default(),
532                affinity: None,
533                node_selector: BTreeMap::new(),
534                tolerations: vec![],
535                pod_disruption_budget: PdbSpec::default(),
536                service_account: None,
537                pod_annotations: BTreeMap::new(),
538                pod_labels: BTreeMap::new(),
539                env: vec![],
540                liveness_probe: ProbeSpec::default(),
541                readiness_probe: ProbeSpec::default(),
542                security_context: None,
543                container_security_context: None,
544            },
545            status: None,
546        }
547    }
548
549    #[test]
550    fn test_build_status_provisioning() {
551        let cluster = create_test_cluster();
552        let status = build_status(&cluster, None);
553
554        assert_eq!(status.phase, ClusterPhase::Provisioning);
555        assert_eq!(status.replicas, 0);
556        assert_eq!(status.ready_replicas, 0);
557    }
558
559    #[test]
560    fn test_build_status_running() {
561        let cluster = create_test_cluster();
562        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
563            replicas: 3,
564            ready_replicas: Some(3),
565            updated_replicas: Some(3),
566            ..Default::default()
567        };
568
569        let status = build_status(&cluster, Some(sts_status));
570
571        assert_eq!(status.phase, ClusterPhase::Running);
572        assert_eq!(status.replicas, 3);
573        assert_eq!(status.ready_replicas, 3);
574    }
575
576    #[test]
577    fn test_build_status_degraded() {
578        let cluster = create_test_cluster();
579        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
580            replicas: 3,
581            ready_replicas: Some(2),
582            updated_replicas: Some(3),
583            ..Default::default()
584        };
585
586        let status = build_status(&cluster, Some(sts_status));
587
588        assert_eq!(status.phase, ClusterPhase::Degraded);
589    }
590
591    #[test]
592    fn test_build_status_updating() {
593        let cluster = create_test_cluster();
594        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
595            replicas: 3,
596            ready_replicas: Some(2),
597            updated_replicas: Some(1),
598            ..Default::default()
599        };
600
601        let status = build_status(&cluster, Some(sts_status));
602
603        assert_eq!(status.phase, ClusterPhase::Updating);
604    }
605
606    #[test]
607    fn test_broker_endpoints() {
608        let cluster = create_test_cluster();
609        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
610            replicas: 3,
611            ready_replicas: Some(3),
612            updated_replicas: Some(3),
613            ..Default::default()
614        };
615
616        let status = build_status(&cluster, Some(sts_status));
617
618        assert_eq!(status.broker_endpoints.len(), 3);
619        assert!(status.broker_endpoints[0].contains("rivven-test-cluster-0"));
620    }
621
622    #[test]
623    fn test_conditions() {
624        let cluster = create_test_cluster();
625        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
626            replicas: 3,
627            ready_replicas: Some(3),
628            updated_replicas: Some(3),
629            ..Default::default()
630        };
631
632        let status = build_status(&cluster, Some(sts_status));
633
634        assert_eq!(status.conditions.len(), 2);
635
636        let ready_cond = status
637            .conditions
638            .iter()
639            .find(|c| c.condition_type == "Ready")
640            .unwrap();
641        assert_eq!(ready_cond.status, "True");
642
643        let available_cond = status
644            .conditions
645            .iter()
646            .find(|c| c.condition_type == "Available")
647            .unwrap();
648        assert_eq!(available_cond.status, "True");
649    }
650}