1use crate::crd::{ClusterCondition, ClusterPhase, RivvenCluster, RivvenClusterStatus};
8use crate::error::{OperatorError, Result};
9use crate::resources::ResourceBuilder;
10use chrono::Utc;
11use futures::StreamExt;
12use k8s_openapi::api::apps::v1::StatefulSet;
13use k8s_openapi::api::core::v1::{ConfigMap, PersistentVolumeClaim, Service};
14use k8s_openapi::api::policy::v1::PodDisruptionBudget;
15use kube::api::{Api, DeleteParams, ListParams, Patch, PatchParams};
16use kube::runtime::controller::{Action, Controller};
17use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
18use kube::runtime::watcher::Config;
19use kube::{Client, Resource, ResourceExt};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, error, info, instrument, warn};
23use validator::Validate;
24
25pub const FINALIZER_NAME: &str = "rivven.hupe1980.github.io/cluster-finalizer";
27
28const DEFAULT_REQUEUE_SECONDS: u64 = 300; const ERROR_REQUEUE_SECONDS: u64 = 30;
33
34const MAX_ERROR_REQUEUE_SECONDS: u64 = 600;
36
37pub struct ControllerContext {
39 pub client: Client,
41 pub metrics: Option<ControllerMetrics>,
43 pub error_counts: dashmap::DashMap<String, u32>,
45}
46
47#[derive(Clone)]
49pub struct ControllerMetrics {
50 pub reconciliations: metrics::Counter,
52 pub errors: metrics::Counter,
54 pub duration: metrics::Histogram,
56}
57
58impl ControllerMetrics {
59 pub fn new() -> Self {
61 Self {
62 reconciliations: metrics::counter!("rivven_operator_reconciliations_total"),
63 errors: metrics::counter!("rivven_operator_reconciliation_errors_total"),
64 duration: metrics::histogram!("rivven_operator_reconciliation_duration_seconds"),
65 }
66 }
67}
68
69impl Default for ControllerMetrics {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75pub async fn run_controller(client: Client, namespace: Option<String>) -> Result<()> {
77 let clusters: Api<RivvenCluster> = match &namespace {
78 Some(ns) => Api::namespaced(client.clone(), ns),
79 None => Api::all(client.clone()),
80 };
81
82 let ctx = Arc::new(ControllerContext {
83 client: client.clone(),
84 metrics: Some(ControllerMetrics::new()),
85 error_counts: dashmap::DashMap::new(),
86 });
87
88 info!(
89 namespace = namespace.as_deref().unwrap_or("all"),
90 "Starting RivvenCluster controller"
91 );
92
93 let statefulsets = match &namespace {
95 Some(ns) => Api::<StatefulSet>::namespaced(client.clone(), ns),
96 None => Api::<StatefulSet>::all(client.clone()),
97 };
98
99 let services = match &namespace {
100 Some(ns) => Api::<Service>::namespaced(client.clone(), ns),
101 None => Api::<Service>::all(client.clone()),
102 };
103
104 Controller::new(clusters.clone(), Config::default())
105 .owns(statefulsets, Config::default())
106 .owns(services, Config::default())
107 .run(reconcile, error_policy, ctx)
108 .for_each(|result| async move {
109 match result {
110 Ok((obj, action)) => {
111 debug!(
112 name = obj.name,
113 namespace = obj.namespace,
114 ?action,
115 "Reconciliation completed"
116 );
117 }
118 Err(e) => {
119 error!(error = %e, "Reconciliation failed");
120 }
121 }
122 })
123 .await;
124
125 Ok(())
126}
127
128#[instrument(skip(cluster, ctx), fields(name = %cluster.name_any(), namespace = cluster.namespace()))]
130async fn reconcile(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
131 let start = std::time::Instant::now();
132
133 if let Some(ref metrics) = ctx.metrics {
134 metrics.reconciliations.increment(1);
135 }
136
137 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
138 let cluster_name = cluster.name_any();
139 let clusters: Api<RivvenCluster> = Api::namespaced(ctx.client.clone(), &namespace);
140
141 let result = finalizer(&clusters, FINALIZER_NAME, cluster, |event| async {
142 match event {
143 FinalizerEvent::Apply(cluster) => apply_cluster(cluster, ctx.clone()).await,
144 FinalizerEvent::Cleanup(cluster) => cleanup_cluster(cluster, ctx.clone()).await,
145 }
146 })
147 .await;
148
149 if let Some(ref metrics) = ctx.metrics {
150 metrics.duration.record(start.elapsed().as_secs_f64());
151 }
152
153 if result.is_ok() {
155 ctx.error_counts.remove(&cluster_name);
156 }
157
158 result.map_err(|e| {
159 if let Some(ref metrics) = ctx.metrics {
160 metrics.errors.increment(1);
161 }
162 OperatorError::ReconcileFailed(e.to_string())
163 })
164}
165
166#[instrument(skip(cluster, ctx))]
168async fn apply_cluster(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
169 let name = cluster.name_any();
170 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
171
172 info!(name = %name, namespace = %namespace, "Reconciling RivvenCluster");
173
174 if let Err(errors) = cluster.spec.validate() {
176 let error_messages: Vec<String> = errors
177 .field_errors()
178 .iter()
179 .flat_map(|(field, errs)| {
180 errs.iter()
181 .map(move |e| format!("{}: {:?}", field, e.message))
182 })
183 .collect();
184 let error_msg = error_messages.join("; ");
185 warn!(name = %name, errors = %error_msg, "Cluster spec validation failed");
186 return Err(OperatorError::InvalidConfig(error_msg));
187 }
188
189 validate_cluster_security(&cluster)?;
191
192 let builder = ResourceBuilder::new(&cluster)?;
194
195 let configmap = builder.build_configmap()?;
197 apply_configmap(&ctx.client, &namespace, configmap).await?;
198
199 let headless_svc = builder.build_headless_service();
201 apply_service(&ctx.client, &namespace, headless_svc).await?;
202
203 let client_svc = builder.build_client_service();
205 apply_service(&ctx.client, &namespace, client_svc).await?;
206
207 let statefulset = builder.build_statefulset();
209 let sts_status = apply_statefulset(&ctx.client, &namespace, statefulset).await?;
210
211 if let Some(pdb) = builder.build_pdb() {
213 apply_pdb(&ctx.client, &namespace, pdb).await?;
214 }
215
216 let status = build_status(&cluster, sts_status);
218 update_status(&ctx.client, &namespace, &name, status).await?;
219
220 info!(name = %name, "Reconciliation complete");
221
222 Ok(Action::requeue(Duration::from_secs(
223 DEFAULT_REQUEUE_SECONDS,
224 )))
225}
226
227fn validate_cluster_security(cluster: &RivvenCluster) -> Result<()> {
229 let spec = &cluster.spec;
230
231 if spec.tls.enabled && spec.tls.cert_secret_name.is_none() {
233 return Err(OperatorError::InvalidConfig(
234 "TLS is enabled but no certificate secret is specified".to_string(),
235 ));
236 }
237
238 if spec.tls.mtls_enabled && spec.tls.ca_secret_name.is_none() {
239 return Err(OperatorError::InvalidConfig(
240 "mTLS is enabled but no CA secret is specified".to_string(),
241 ));
242 }
243
244 if spec.config.default_replication_factor > spec.replicas {
246 return Err(OperatorError::InvalidConfig(format!(
247 "Replication factor ({}) cannot exceed replica count ({})",
248 spec.config.default_replication_factor, spec.replicas
249 )));
250 }
251
252 if spec.replicas == 1 {
254 warn!(
255 cluster = cluster.name_any(),
256 "Running with single replica - not recommended for production"
257 );
258 }
259
260 let min_election_timeout = spec.config.raft_heartbeat_interval_ms * 3;
262 if spec.config.raft_election_timeout_ms < min_election_timeout {
263 return Err(OperatorError::InvalidConfig(format!(
264 "Raft election timeout ({}) should be at least 3x heartbeat interval ({})",
265 spec.config.raft_election_timeout_ms, spec.config.raft_heartbeat_interval_ms
266 )));
267 }
268
269 Ok(())
270}
271
272#[instrument(skip(cluster, ctx))]
274async fn cleanup_cluster(
275 cluster: Arc<RivvenCluster>,
276 ctx: Arc<ControllerContext>,
277) -> Result<Action> {
278 let name = cluster.name_any();
279 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
280
281 info!(name = %name, namespace = %namespace, "Cleaning up RivvenCluster resources");
282
283 let pvcs: Api<PersistentVolumeClaim> = Api::namespaced(ctx.client.clone(), &namespace);
289 let lp = ListParams::default().labels(&format!("app.kubernetes.io/instance={}", name));
290
291 match pvcs.list(&lp).await {
292 Ok(pvc_list) => {
293 for pvc in pvc_list.items {
294 if let Some(pvc_name) = pvc.metadata.name.as_deref() {
295 info!(name = %name, pvc = %pvc_name, "Deleting PVC");
296 if let Err(e) = pvcs.delete(pvc_name, &DeleteParams::default()).await {
297 warn!(
298 name = %name,
299 pvc = %pvc_name,
300 error = %e,
301 "Failed to delete PVC (may have already been removed)"
302 );
303 }
304 }
305 }
306 }
307 Err(e) => {
308 warn!(
309 name = %name,
310 error = %e,
311 "Failed to list PVCs for cleanup"
312 );
313 }
314 }
315
316 info!(name = %name, "Cleanup complete");
317
318 Ok(Action::await_change())
319}
320
321fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
329 let labels = existing.meta().labels.as_ref();
330 let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
331 match managed_by {
332 Some(manager) if manager != "rivven-operator" => {
333 let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
334 Err(OperatorError::InvalidConfig(format!(
335 "resource '{}' is managed by '{}', not rivven-operator; \
336 refusing to force-apply to avoid ownership conflict",
337 name, manager
338 )))
339 }
340 _ => Ok(()),
341 }
342}
343
344async fn apply_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
355 let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
356 let name =
357 cm.metadata.name.as_ref().ok_or_else(|| {
358 OperatorError::InvalidConfig("ConfigMap missing metadata.name".into())
359 })?;
360
361 debug!(name = %name, "Applying ConfigMap");
362
363 if let Ok(existing) = api.get(name).await {
365 verify_ownership(&existing)?;
366 }
367
368 let patch_params = PatchParams::apply("rivven-operator").force();
369 api.patch(name, &patch_params, &Patch::Apply(&cm))
370 .await
371 .map_err(OperatorError::from)?;
372
373 Ok(())
374}
375
376async fn apply_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
378 let api: Api<Service> = Api::namespaced(client.clone(), namespace);
379 let name = svc
380 .metadata
381 .name
382 .as_ref()
383 .ok_or_else(|| OperatorError::InvalidConfig("Service missing metadata.name".into()))?;
384
385 debug!(name = %name, "Applying Service");
386
387 if let Ok(existing) = api.get(name).await {
389 verify_ownership(&existing)?;
390 }
391
392 let patch_params = PatchParams::apply("rivven-operator").force();
393 api.patch(name, &patch_params, &Patch::Apply(&svc))
394 .await
395 .map_err(OperatorError::from)?;
396
397 Ok(())
398}
399
400async fn apply_statefulset(
402 client: &Client,
403 namespace: &str,
404 sts: StatefulSet,
405) -> Result<Option<k8s_openapi::api::apps::v1::StatefulSetStatus>> {
406 let api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
407 let name =
408 sts.metadata.name.as_ref().ok_or_else(|| {
409 OperatorError::InvalidConfig("StatefulSet missing metadata.name".into())
410 })?;
411
412 debug!(name = %name, "Applying StatefulSet");
413
414 if let Ok(existing) = api.get(name).await {
416 verify_ownership(&existing)?;
417 }
418
419 let patch_params = PatchParams::apply("rivven-operator").force();
420 let result = api
421 .patch(name, &patch_params, &Patch::Apply(&sts))
422 .await
423 .map_err(OperatorError::from)?;
424
425 Ok(result.status)
426}
427
428async fn apply_pdb(client: &Client, namespace: &str, pdb: PodDisruptionBudget) -> Result<()> {
430 let api: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
431 let name = pdb
432 .metadata
433 .name
434 .as_ref()
435 .ok_or_else(|| OperatorError::InvalidConfig("PDB missing metadata.name".into()))?;
436
437 debug!(name = %name, "Applying PodDisruptionBudget");
438
439 if let Ok(existing) = api.get(name).await {
441 verify_ownership(&existing)?;
442 }
443
444 let patch_params = PatchParams::apply("rivven-operator").force();
445 api.patch(name, &patch_params, &Patch::Apply(&pdb))
446 .await
447 .map_err(OperatorError::from)?;
448
449 Ok(())
450}
451
452fn build_status(
454 cluster: &RivvenCluster,
455 sts_status: Option<k8s_openapi::api::apps::v1::StatefulSetStatus>,
456) -> RivvenClusterStatus {
457 let now = Utc::now().to_rfc3339();
458
459 let (replicas, ready_replicas, updated_replicas) = sts_status
460 .map(|s| {
461 (
462 s.replicas,
463 s.ready_replicas.unwrap_or(0),
464 s.updated_replicas.unwrap_or(0),
465 )
466 })
467 .unwrap_or((0, 0, 0));
468
469 let desired_replicas = cluster.spec.replicas;
470
471 let phase = if ready_replicas == 0 {
473 ClusterPhase::Provisioning
474 } else if ready_replicas < desired_replicas {
475 if updated_replicas < desired_replicas {
476 ClusterPhase::Updating
477 } else {
478 ClusterPhase::Degraded
479 }
480 } else if ready_replicas == desired_replicas {
481 ClusterPhase::Running
482 } else {
483 ClusterPhase::Degraded
484 };
485
486 let mut conditions = vec![];
488
489 conditions.push(ClusterCondition {
491 condition_type: "Ready".to_string(),
492 status: if ready_replicas >= desired_replicas {
493 "True".to_string()
494 } else {
495 "False".to_string()
496 },
497 reason: Some(format!(
498 "{}/{} replicas ready",
499 ready_replicas, desired_replicas
500 )),
501 message: None,
502 last_transition_time: Some(now.clone()),
503 });
504
505 conditions.push(ClusterCondition {
507 condition_type: "Available".to_string(),
508 status: if ready_replicas > 0 {
509 "True".to_string()
510 } else {
511 "False".to_string()
512 },
513 reason: Some(
514 if ready_replicas > 0 {
515 "AtLeastOneReplicaReady"
516 } else {
517 "NoReplicasReady"
518 }
519 .to_string(),
520 ),
521 message: None,
522 last_transition_time: Some(now.clone()),
523 });
524
525 let name = cluster
527 .metadata
528 .name
529 .as_ref()
530 .map(|n| format!("rivven-{}", n));
531 let namespace = cluster
532 .metadata
533 .namespace
534 .as_ref()
535 .cloned()
536 .unwrap_or_else(|| "default".to_string());
537
538 let broker_endpoints: Vec<String> = (0..ready_replicas)
539 .map(|i| {
540 format!(
541 "{}-{}.{}-headless.{}.svc.cluster.local:9092",
542 name.as_deref().unwrap_or("rivven"),
543 i,
544 name.as_deref().unwrap_or("rivven"),
545 namespace
546 )
547 })
548 .collect();
549
550 RivvenClusterStatus {
551 phase,
552 replicas,
553 ready_replicas,
554 updated_replicas,
555 observed_generation: cluster.metadata.generation.unwrap_or(0),
556 conditions,
557 broker_endpoints,
558 leader: None, last_updated: Some(now),
560 message: None,
561 }
562}
563
564async fn update_status(
566 client: &Client,
567 namespace: &str,
568 name: &str,
569 status: RivvenClusterStatus,
570) -> Result<()> {
571 let api: Api<RivvenCluster> = Api::namespaced(client.clone(), namespace);
572
573 debug!(name = %name, phase = ?status.phase, "Updating cluster status");
574
575 let patch = serde_json::json!({
576 "status": status
577 });
578
579 let patch_params = PatchParams::default();
580 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
581 .await
582 .map_err(OperatorError::from)?;
583
584 Ok(())
585}
586
587fn error_policy(
589 cluster: Arc<RivvenCluster>,
590 error: &OperatorError,
591 ctx: Arc<ControllerContext>,
592) -> Action {
593 let key = cluster.name_any();
594 let retries = {
595 let mut entry = ctx.error_counts.entry(key.clone()).or_insert(0);
596 *entry += 1;
597 *entry
598 };
599
600 let delay = error.requeue_delay().unwrap_or_else(|| {
603 let base = Duration::from_secs(ERROR_REQUEUE_SECONDS);
604 let backoff = base * 2u32.saturating_pow((retries - 1).min(5));
605 backoff.min(Duration::from_secs(MAX_ERROR_REQUEUE_SECONDS))
606 });
607
608 warn!(
609 error = %error,
610 retry = retries,
611 delay_secs = delay.as_secs(),
612 "Reconciliation error for '{}', will retry",
613 key
614 );
615
616 Action::requeue(delay)
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622 use crate::crd::{
623 BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, RivvenClusterSpec, StorageSpec, TlsSpec,
624 };
625 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
626 use std::collections::BTreeMap;
627
628 fn create_test_cluster() -> RivvenCluster {
629 RivvenCluster {
630 metadata: ObjectMeta {
631 name: Some("test-cluster".to_string()),
632 namespace: Some("default".to_string()),
633 uid: Some("test-uid".to_string()),
634 generation: Some(1),
635 ..Default::default()
636 },
637 spec: RivvenClusterSpec {
638 replicas: 3,
639 version: "0.0.1".to_string(),
640 image: None,
641 image_pull_policy: "IfNotPresent".to_string(),
642 image_pull_secrets: vec![],
643 storage: StorageSpec::default(),
644 resources: None,
645 config: BrokerConfig::default(),
646 tls: TlsSpec::default(),
647 metrics: MetricsSpec::default(),
648 affinity: None,
649 node_selector: BTreeMap::new(),
650 tolerations: vec![],
651 pod_disruption_budget: PdbSpec::default(),
652 service_account: None,
653 pod_annotations: BTreeMap::new(),
654 pod_labels: BTreeMap::new(),
655 env: vec![],
656 liveness_probe: ProbeSpec::default(),
657 readiness_probe: ProbeSpec::default(),
658 security_context: None,
659 container_security_context: None,
660 },
661 status: None,
662 }
663 }
664
665 #[test]
666 fn test_build_status_provisioning() {
667 let cluster = create_test_cluster();
668 let status = build_status(&cluster, None);
669
670 assert_eq!(status.phase, ClusterPhase::Provisioning);
671 assert_eq!(status.replicas, 0);
672 assert_eq!(status.ready_replicas, 0);
673 }
674
675 #[test]
676 fn test_build_status_running() {
677 let cluster = create_test_cluster();
678 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
679 replicas: 3,
680 ready_replicas: Some(3),
681 updated_replicas: Some(3),
682 ..Default::default()
683 };
684
685 let status = build_status(&cluster, Some(sts_status));
686
687 assert_eq!(status.phase, ClusterPhase::Running);
688 assert_eq!(status.replicas, 3);
689 assert_eq!(status.ready_replicas, 3);
690 }
691
692 #[test]
693 fn test_build_status_degraded() {
694 let cluster = create_test_cluster();
695 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
696 replicas: 3,
697 ready_replicas: Some(2),
698 updated_replicas: Some(3),
699 ..Default::default()
700 };
701
702 let status = build_status(&cluster, Some(sts_status));
703
704 assert_eq!(status.phase, ClusterPhase::Degraded);
705 }
706
707 #[test]
708 fn test_build_status_updating() {
709 let cluster = create_test_cluster();
710 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
711 replicas: 3,
712 ready_replicas: Some(2),
713 updated_replicas: Some(1),
714 ..Default::default()
715 };
716
717 let status = build_status(&cluster, Some(sts_status));
718
719 assert_eq!(status.phase, ClusterPhase::Updating);
720 }
721
722 #[test]
723 fn test_broker_endpoints() {
724 let cluster = create_test_cluster();
725 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
726 replicas: 3,
727 ready_replicas: Some(3),
728 updated_replicas: Some(3),
729 ..Default::default()
730 };
731
732 let status = build_status(&cluster, Some(sts_status));
733
734 assert_eq!(status.broker_endpoints.len(), 3);
735 assert!(status.broker_endpoints[0].contains("rivven-test-cluster-0"));
736 }
737
738 #[test]
739 fn test_conditions() {
740 let cluster = create_test_cluster();
741 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
742 replicas: 3,
743 ready_replicas: Some(3),
744 updated_replicas: Some(3),
745 ..Default::default()
746 };
747
748 let status = build_status(&cluster, Some(sts_status));
749
750 assert_eq!(status.conditions.len(), 2);
751
752 let ready_cond = status
753 .conditions
754 .iter()
755 .find(|c| c.condition_type == "Ready")
756 .unwrap();
757 assert_eq!(ready_cond.status, "True");
758
759 let available_cond = status
760 .conditions
761 .iter()
762 .find(|c| c.condition_type == "Available")
763 .unwrap();
764 assert_eq!(available_cond.status, "True");
765 }
766}