Skip to main content

rivven_operator/
controller.rs

1//! RivvenCluster Controller
2//!
3//! This module implements the Kubernetes controller pattern for managing
4//! RivvenCluster custom resources. It watches for changes and reconciles
5//! the actual cluster state to match the desired specification.
6
7use crate::crd::{ClusterCondition, ClusterPhase, RivvenCluster, RivvenClusterStatus};
8use crate::error::{OperatorError, Result};
9use crate::resources::ResourceBuilder;
10use chrono::Utc;
11use futures::StreamExt;
12use k8s_openapi::api::apps::v1::StatefulSet;
13use k8s_openapi::api::core::v1::{ConfigMap, Service};
14use k8s_openapi::api::policy::v1::PodDisruptionBudget;
15use kube::api::{Api, Patch, PatchParams};
16use kube::runtime::controller::{Action, Controller};
17use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
18use kube::runtime::watcher::Config;
19use kube::{Client, Resource, ResourceExt};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, error, info, instrument, warn};
23use validator::Validate;
24
25/// Finalizer name for cleanup operations
26pub const FINALIZER_NAME: &str = "rivven.hupe1980.github.io/cluster-finalizer";
27
28/// Default requeue interval for successful reconciliations
29const DEFAULT_REQUEUE_SECONDS: u64 = 300; // 5 minutes
30
31/// Requeue interval for error cases
32const ERROR_REQUEUE_SECONDS: u64 = 30;
33
34/// Context passed to the controller
35pub struct ControllerContext {
36    /// Kubernetes client
37    pub client: Client,
38    /// Metrics recorder (optional)
39    pub metrics: Option<ControllerMetrics>,
40}
41
42/// Metrics for the controller
43#[derive(Clone)]
44pub struct ControllerMetrics {
45    /// Counter for reconciliation attempts
46    pub reconciliations: metrics::Counter,
47    /// Counter for reconciliation errors
48    pub errors: metrics::Counter,
49    /// Histogram for reconciliation duration
50    pub duration: metrics::Histogram,
51}
52
53impl ControllerMetrics {
54    /// Create new controller metrics
55    pub fn new() -> Self {
56        Self {
57            reconciliations: metrics::counter!("rivven_operator_reconciliations_total"),
58            errors: metrics::counter!("rivven_operator_reconciliation_errors_total"),
59            duration: metrics::histogram!("rivven_operator_reconciliation_duration_seconds"),
60        }
61    }
62}
63
64impl Default for ControllerMetrics {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70/// Start the RivvenCluster controller
71pub async fn run_controller(client: Client, namespace: Option<String>) -> Result<()> {
72    let clusters: Api<RivvenCluster> = match &namespace {
73        Some(ns) => Api::namespaced(client.clone(), ns),
74        None => Api::all(client.clone()),
75    };
76
77    let ctx = Arc::new(ControllerContext {
78        client: client.clone(),
79        metrics: Some(ControllerMetrics::new()),
80    });
81
82    info!(
83        namespace = namespace.as_deref().unwrap_or("all"),
84        "Starting RivvenCluster controller"
85    );
86
87    // Watch related resources for changes
88    let statefulsets = match &namespace {
89        Some(ns) => Api::<StatefulSet>::namespaced(client.clone(), ns),
90        None => Api::<StatefulSet>::all(client.clone()),
91    };
92
93    let services = match &namespace {
94        Some(ns) => Api::<Service>::namespaced(client.clone(), ns),
95        None => Api::<Service>::all(client.clone()),
96    };
97
98    Controller::new(clusters.clone(), Config::default())
99        .owns(statefulsets, Config::default())
100        .owns(services, Config::default())
101        .run(reconcile, error_policy, ctx)
102        .for_each(|result| async move {
103            match result {
104                Ok((obj, action)) => {
105                    debug!(
106                        name = obj.name,
107                        namespace = obj.namespace,
108                        ?action,
109                        "Reconciliation completed"
110                    );
111                }
112                Err(e) => {
113                    error!(error = %e, "Reconciliation failed");
114                }
115            }
116        })
117        .await;
118
119    Ok(())
120}
121
122/// Main reconciliation function
123#[instrument(skip(cluster, ctx), fields(name = %cluster.name_any(), namespace = cluster.namespace()))]
124async fn reconcile(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
125    let start = std::time::Instant::now();
126
127    if let Some(ref metrics) = ctx.metrics {
128        metrics.reconciliations.increment(1);
129    }
130
131    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
132    let clusters: Api<RivvenCluster> = Api::namespaced(ctx.client.clone(), &namespace);
133
134    let result = finalizer(&clusters, FINALIZER_NAME, cluster, |event| async {
135        match event {
136            FinalizerEvent::Apply(cluster) => apply_cluster(cluster, ctx.clone()).await,
137            FinalizerEvent::Cleanup(cluster) => cleanup_cluster(cluster, ctx.clone()).await,
138        }
139    })
140    .await;
141
142    if let Some(ref metrics) = ctx.metrics {
143        metrics.duration.record(start.elapsed().as_secs_f64());
144    }
145
146    result.map_err(|e| {
147        if let Some(ref metrics) = ctx.metrics {
148            metrics.errors.increment(1);
149        }
150        OperatorError::ReconcileFailed(e.to_string())
151    })
152}
153
154/// Apply (create/update) the cluster resources
155#[instrument(skip(cluster, ctx))]
156async fn apply_cluster(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
157    let name = cluster.name_any();
158    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
159
160    info!(name = %name, namespace = %namespace, "Reconciling RivvenCluster");
161
162    // Validate the cluster spec before reconciliation
163    if let Err(errors) = cluster.spec.validate() {
164        let error_messages: Vec<String> = errors
165            .field_errors()
166            .iter()
167            .flat_map(|(field, errs)| {
168                errs.iter()
169                    .map(move |e| format!("{}: {:?}", field, e.message))
170            })
171            .collect();
172        let error_msg = error_messages.join("; ");
173        warn!(name = %name, errors = %error_msg, "Cluster spec validation failed");
174        return Err(OperatorError::InvalidConfig(error_msg));
175    }
176
177    // Additional security validations
178    validate_cluster_security(&cluster)?;
179
180    // Build all resources
181    let builder = ResourceBuilder::new(&cluster)?;
182
183    // Apply ConfigMap
184    let configmap = builder.build_configmap()?;
185    apply_configmap(&ctx.client, &namespace, configmap).await?;
186
187    // Apply headless service
188    let headless_svc = builder.build_headless_service();
189    apply_service(&ctx.client, &namespace, headless_svc).await?;
190
191    // Apply client service
192    let client_svc = builder.build_client_service();
193    apply_service(&ctx.client, &namespace, client_svc).await?;
194
195    // Apply StatefulSet
196    let statefulset = builder.build_statefulset();
197    let sts_status = apply_statefulset(&ctx.client, &namespace, statefulset).await?;
198
199    // Apply PDB if enabled
200    if let Some(pdb) = builder.build_pdb() {
201        apply_pdb(&ctx.client, &namespace, pdb).await?;
202    }
203
204    // Update cluster status
205    let status = build_status(&cluster, sts_status);
206    update_status(&ctx.client, &namespace, &name, status).await?;
207
208    info!(name = %name, "Reconciliation complete");
209
210    Ok(Action::requeue(Duration::from_secs(
211        DEFAULT_REQUEUE_SECONDS,
212    )))
213}
214
215/// Validate cluster for security best practices
216fn validate_cluster_security(cluster: &RivvenCluster) -> Result<()> {
217    let spec = &cluster.spec;
218
219    // Validate TLS configuration consistency
220    if spec.tls.enabled && spec.tls.cert_secret_name.is_none() {
221        return Err(OperatorError::InvalidConfig(
222            "TLS is enabled but no certificate secret is specified".to_string(),
223        ));
224    }
225
226    if spec.tls.mtls_enabled && spec.tls.ca_secret_name.is_none() {
227        return Err(OperatorError::InvalidConfig(
228            "mTLS is enabled but no CA secret is specified".to_string(),
229        ));
230    }
231
232    // Validate replication factor vs replicas
233    if spec.config.default_replication_factor > spec.replicas {
234        return Err(OperatorError::InvalidConfig(format!(
235            "Replication factor ({}) cannot exceed replica count ({})",
236            spec.config.default_replication_factor, spec.replicas
237        )));
238    }
239
240    // Warn if running with single replica in production
241    if spec.replicas == 1 {
242        warn!(
243            cluster = cluster.name_any(),
244            "Running with single replica - not recommended for production"
245        );
246    }
247
248    // Validate Raft timing constraints (heartbeat should be < election timeout / 3)
249    let min_election_timeout = spec.config.raft_heartbeat_interval_ms * 3;
250    if spec.config.raft_election_timeout_ms < min_election_timeout {
251        return Err(OperatorError::InvalidConfig(format!(
252            "Raft election timeout ({}) should be at least 3x heartbeat interval ({})",
253            spec.config.raft_election_timeout_ms, spec.config.raft_heartbeat_interval_ms
254        )));
255    }
256
257    Ok(())
258}
259
260/// Cleanup resources when cluster is deleted
261#[instrument(skip(cluster, _ctx))]
262async fn cleanup_cluster(
263    cluster: Arc<RivvenCluster>,
264    _ctx: Arc<ControllerContext>,
265) -> Result<Action> {
266    let name = cluster.name_any();
267    let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
268
269    info!(name = %name, namespace = %namespace, "Cleaning up RivvenCluster resources");
270
271    // Resources with owner references will be garbage collected automatically
272    // Here we can add any additional cleanup logic if needed
273
274    // For example, we might want to:
275    // 1. Wait for graceful shutdown of brokers
276    // 2. Clean up external resources (PVCs if not retained)
277    // 3. Notify external systems
278
279    info!(name = %name, "Cleanup complete");
280
281    Ok(Action::await_change())
282}
283
284/// Verify the operator still owns a resource before force-applying.
285///
286/// Checks whether an existing resource was created by the rivven-operator by
287/// inspecting the `app.kubernetes.io/managed-by` label. If the resource
288/// exists and is managed by a different controller (e.g. Helm, another
289/// operator), the apply is rejected to prevent silently hijacking ownership.
290/// If the resource does not yet exist (404), ownership is trivially valid.
291fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
292    let labels = existing.meta().labels.as_ref();
293    let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
294    match managed_by {
295        Some(manager) if manager != "rivven-operator" => {
296            let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
297            Err(OperatorError::InvalidConfig(format!(
298                "resource '{}' is managed by '{}', not rivven-operator; \
299                 refusing to force-apply to avoid ownership conflict",
300                name, manager
301            )))
302        }
303        _ => Ok(()),
304    }
305}
306
307/// Apply a ConfigMap using server-side apply.
308///
309/// # Force-apply and field ownership
310///
311/// Uses `PatchParams::apply("rivven-operator").force()` which takes
312/// ownership of **all** fields in the patch, even if they were previously
313/// managed by another field manager (e.g. kubectl, Helm, or another
314/// controller). Before force-applying, verifies the operator owns the
315/// resource via the `app.kubernetes.io/managed-by` label to prevent
316/// silently hijacking resources managed by other controllers.
317async fn apply_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
318    let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
319    let name =
320        cm.metadata.name.as_ref().ok_or_else(|| {
321            OperatorError::InvalidConfig("ConfigMap missing metadata.name".into())
322        })?;
323
324    debug!(name = %name, "Applying ConfigMap");
325
326    // Verify ownership before force-applying
327    if let Ok(existing) = api.get(name).await {
328        verify_ownership(&existing)?;
329    }
330
331    let patch_params = PatchParams::apply("rivven-operator").force();
332    api.patch(name, &patch_params, &Patch::Apply(&cm))
333        .await
334        .map_err(OperatorError::from)?;
335
336    Ok(())
337}
338
339/// Apply a Service using server-side apply
340async fn apply_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
341    let api: Api<Service> = Api::namespaced(client.clone(), namespace);
342    let name = svc
343        .metadata
344        .name
345        .as_ref()
346        .ok_or_else(|| OperatorError::InvalidConfig("Service missing metadata.name".into()))?;
347
348    debug!(name = %name, "Applying Service");
349
350    // Verify ownership before force-applying
351    if let Ok(existing) = api.get(name).await {
352        verify_ownership(&existing)?;
353    }
354
355    let patch_params = PatchParams::apply("rivven-operator").force();
356    api.patch(name, &patch_params, &Patch::Apply(&svc))
357        .await
358        .map_err(OperatorError::from)?;
359
360    Ok(())
361}
362
363/// Apply a StatefulSet using server-side apply
364async fn apply_statefulset(
365    client: &Client,
366    namespace: &str,
367    sts: StatefulSet,
368) -> Result<Option<k8s_openapi::api::apps::v1::StatefulSetStatus>> {
369    let api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
370    let name =
371        sts.metadata.name.as_ref().ok_or_else(|| {
372            OperatorError::InvalidConfig("StatefulSet missing metadata.name".into())
373        })?;
374
375    debug!(name = %name, "Applying StatefulSet");
376
377    // Verify ownership before force-applying
378    if let Ok(existing) = api.get(name).await {
379        verify_ownership(&existing)?;
380    }
381
382    let patch_params = PatchParams::apply("rivven-operator").force();
383    let result = api
384        .patch(name, &patch_params, &Patch::Apply(&sts))
385        .await
386        .map_err(OperatorError::from)?;
387
388    Ok(result.status)
389}
390
391/// Apply a PodDisruptionBudget using server-side apply
392async fn apply_pdb(client: &Client, namespace: &str, pdb: PodDisruptionBudget) -> Result<()> {
393    let api: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
394    let name = pdb
395        .metadata
396        .name
397        .as_ref()
398        .ok_or_else(|| OperatorError::InvalidConfig("PDB missing metadata.name".into()))?;
399
400    debug!(name = %name, "Applying PodDisruptionBudget");
401
402    // Verify ownership before force-applying
403    if let Ok(existing) = api.get(name).await {
404        verify_ownership(&existing)?;
405    }
406
407    let patch_params = PatchParams::apply("rivven-operator").force();
408    api.patch(name, &patch_params, &Patch::Apply(&pdb))
409        .await
410        .map_err(OperatorError::from)?;
411
412    Ok(())
413}
414
415/// Build cluster status from StatefulSet status
416fn build_status(
417    cluster: &RivvenCluster,
418    sts_status: Option<k8s_openapi::api::apps::v1::StatefulSetStatus>,
419) -> RivvenClusterStatus {
420    let now = Utc::now().to_rfc3339();
421
422    let (replicas, ready_replicas, updated_replicas) = sts_status
423        .map(|s| {
424            (
425                s.replicas,
426                s.ready_replicas.unwrap_or(0),
427                s.updated_replicas.unwrap_or(0),
428            )
429        })
430        .unwrap_or((0, 0, 0));
431
432    let desired_replicas = cluster.spec.replicas;
433
434    // Determine phase based on state
435    let phase = if ready_replicas == 0 {
436        ClusterPhase::Provisioning
437    } else if ready_replicas < desired_replicas {
438        if updated_replicas < desired_replicas {
439            ClusterPhase::Updating
440        } else {
441            ClusterPhase::Degraded
442        }
443    } else if ready_replicas == desired_replicas {
444        ClusterPhase::Running
445    } else {
446        ClusterPhase::Degraded
447    };
448
449    // Build conditions
450    let mut conditions = vec![];
451
452    // Ready condition
453    conditions.push(ClusterCondition {
454        condition_type: "Ready".to_string(),
455        status: if ready_replicas >= desired_replicas {
456            "True".to_string()
457        } else {
458            "False".to_string()
459        },
460        reason: Some(format!(
461            "{}/{} replicas ready",
462            ready_replicas, desired_replicas
463        )),
464        message: None,
465        last_transition_time: Some(now.clone()),
466    });
467
468    // Available condition
469    conditions.push(ClusterCondition {
470        condition_type: "Available".to_string(),
471        status: if ready_replicas > 0 {
472            "True".to_string()
473        } else {
474            "False".to_string()
475        },
476        reason: Some(
477            if ready_replicas > 0 {
478                "AtLeastOneReplicaReady"
479            } else {
480                "NoReplicasReady"
481            }
482            .to_string(),
483        ),
484        message: None,
485        last_transition_time: Some(now.clone()),
486    });
487
488    // Build broker endpoints
489    let name = cluster
490        .metadata
491        .name
492        .as_ref()
493        .map(|n| format!("rivven-{}", n));
494    let namespace = cluster
495        .metadata
496        .namespace
497        .as_ref()
498        .cloned()
499        .unwrap_or_else(|| "default".to_string());
500
501    let broker_endpoints: Vec<String> = (0..ready_replicas)
502        .map(|i| {
503            format!(
504                "{}-{}.{}-headless.{}.svc.cluster.local:9092",
505                name.as_deref().unwrap_or("rivven"),
506                i,
507                name.as_deref().unwrap_or("rivven"),
508                namespace
509            )
510        })
511        .collect();
512
513    RivvenClusterStatus {
514        phase,
515        replicas,
516        ready_replicas,
517        updated_replicas,
518        observed_generation: cluster.metadata.generation.unwrap_or(0),
519        conditions,
520        broker_endpoints,
521        leader: None, // Would need to query the cluster to determine leader
522        last_updated: Some(now),
523        message: None,
524    }
525}
526
527/// Update the cluster status subresource
528async fn update_status(
529    client: &Client,
530    namespace: &str,
531    name: &str,
532    status: RivvenClusterStatus,
533) -> Result<()> {
534    let api: Api<RivvenCluster> = Api::namespaced(client.clone(), namespace);
535
536    debug!(name = %name, phase = ?status.phase, "Updating cluster status");
537
538    let patch = serde_json::json!({
539        "status": status
540    });
541
542    let patch_params = PatchParams::default();
543    api.patch_status(name, &patch_params, &Patch::Merge(&patch))
544        .await
545        .map_err(OperatorError::from)?;
546
547    Ok(())
548}
549
550/// Error policy for the controller
551fn error_policy(
552    _cluster: Arc<RivvenCluster>,
553    error: &OperatorError,
554    _ctx: Arc<ControllerContext>,
555) -> Action {
556    warn!(
557        error = %error,
558        "Reconciliation error, will retry"
559    );
560
561    // Use the error's suggested requeue delay, or default
562    let delay = error
563        .requeue_delay()
564        .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
565
566    Action::requeue(delay)
567}
568
569#[cfg(test)]
570mod tests {
571    use super::*;
572    use crate::crd::{
573        BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, RivvenClusterSpec, StorageSpec, TlsSpec,
574    };
575    use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
576    use std::collections::BTreeMap;
577
578    fn create_test_cluster() -> RivvenCluster {
579        RivvenCluster {
580            metadata: ObjectMeta {
581                name: Some("test-cluster".to_string()),
582                namespace: Some("default".to_string()),
583                uid: Some("test-uid".to_string()),
584                generation: Some(1),
585                ..Default::default()
586            },
587            spec: RivvenClusterSpec {
588                replicas: 3,
589                version: "0.0.1".to_string(),
590                image: None,
591                image_pull_policy: "IfNotPresent".to_string(),
592                image_pull_secrets: vec![],
593                storage: StorageSpec::default(),
594                resources: None,
595                config: BrokerConfig::default(),
596                tls: TlsSpec::default(),
597                metrics: MetricsSpec::default(),
598                affinity: None,
599                node_selector: BTreeMap::new(),
600                tolerations: vec![],
601                pod_disruption_budget: PdbSpec::default(),
602                service_account: None,
603                pod_annotations: BTreeMap::new(),
604                pod_labels: BTreeMap::new(),
605                env: vec![],
606                liveness_probe: ProbeSpec::default(),
607                readiness_probe: ProbeSpec::default(),
608                security_context: None,
609                container_security_context: None,
610            },
611            status: None,
612        }
613    }
614
615    #[test]
616    fn test_build_status_provisioning() {
617        let cluster = create_test_cluster();
618        let status = build_status(&cluster, None);
619
620        assert_eq!(status.phase, ClusterPhase::Provisioning);
621        assert_eq!(status.replicas, 0);
622        assert_eq!(status.ready_replicas, 0);
623    }
624
625    #[test]
626    fn test_build_status_running() {
627        let cluster = create_test_cluster();
628        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
629            replicas: 3,
630            ready_replicas: Some(3),
631            updated_replicas: Some(3),
632            ..Default::default()
633        };
634
635        let status = build_status(&cluster, Some(sts_status));
636
637        assert_eq!(status.phase, ClusterPhase::Running);
638        assert_eq!(status.replicas, 3);
639        assert_eq!(status.ready_replicas, 3);
640    }
641
642    #[test]
643    fn test_build_status_degraded() {
644        let cluster = create_test_cluster();
645        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
646            replicas: 3,
647            ready_replicas: Some(2),
648            updated_replicas: Some(3),
649            ..Default::default()
650        };
651
652        let status = build_status(&cluster, Some(sts_status));
653
654        assert_eq!(status.phase, ClusterPhase::Degraded);
655    }
656
657    #[test]
658    fn test_build_status_updating() {
659        let cluster = create_test_cluster();
660        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
661            replicas: 3,
662            ready_replicas: Some(2),
663            updated_replicas: Some(1),
664            ..Default::default()
665        };
666
667        let status = build_status(&cluster, Some(sts_status));
668
669        assert_eq!(status.phase, ClusterPhase::Updating);
670    }
671
672    #[test]
673    fn test_broker_endpoints() {
674        let cluster = create_test_cluster();
675        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
676            replicas: 3,
677            ready_replicas: Some(3),
678            updated_replicas: Some(3),
679            ..Default::default()
680        };
681
682        let status = build_status(&cluster, Some(sts_status));
683
684        assert_eq!(status.broker_endpoints.len(), 3);
685        assert!(status.broker_endpoints[0].contains("rivven-test-cluster-0"));
686    }
687
688    #[test]
689    fn test_conditions() {
690        let cluster = create_test_cluster();
691        let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
692            replicas: 3,
693            ready_replicas: Some(3),
694            updated_replicas: Some(3),
695            ..Default::default()
696        };
697
698        let status = build_status(&cluster, Some(sts_status));
699
700        assert_eq!(status.conditions.len(), 2);
701
702        let ready_cond = status
703            .conditions
704            .iter()
705            .find(|c| c.condition_type == "Ready")
706            .unwrap();
707        assert_eq!(ready_cond.status, "True");
708
709        let available_cond = status
710            .conditions
711            .iter()
712            .find(|c| c.condition_type == "Available")
713            .unwrap();
714        assert_eq!(available_cond.status, "True");
715    }
716}