1use crate::crd::{ClusterCondition, ClusterPhase, RivvenCluster, RivvenClusterStatus};
8use crate::error::{OperatorError, Result};
9use crate::resources::ResourceBuilder;
10use chrono::Utc;
11use futures::StreamExt;
12use k8s_openapi::api::apps::v1::StatefulSet;
13use k8s_openapi::api::core::v1::{ConfigMap, Service};
14use k8s_openapi::api::policy::v1::PodDisruptionBudget;
15use kube::api::{Api, Patch, PatchParams};
16use kube::runtime::controller::{Action, Controller};
17use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
18use kube::runtime::watcher::Config;
19use kube::{Client, Resource, ResourceExt};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, error, info, instrument, warn};
23use validator::Validate;
24
25pub const FINALIZER_NAME: &str = "rivven.hupe1980.github.io/cluster-finalizer";
27
28const DEFAULT_REQUEUE_SECONDS: u64 = 300; const ERROR_REQUEUE_SECONDS: u64 = 30;
33
34pub struct ControllerContext {
36 pub client: Client,
38 pub metrics: Option<ControllerMetrics>,
40}
41
42#[derive(Clone)]
44pub struct ControllerMetrics {
45 pub reconciliations: metrics::Counter,
47 pub errors: metrics::Counter,
49 pub duration: metrics::Histogram,
51}
52
53impl ControllerMetrics {
54 pub fn new() -> Self {
56 Self {
57 reconciliations: metrics::counter!("rivven_operator_reconciliations_total"),
58 errors: metrics::counter!("rivven_operator_reconciliation_errors_total"),
59 duration: metrics::histogram!("rivven_operator_reconciliation_duration_seconds"),
60 }
61 }
62}
63
64impl Default for ControllerMetrics {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70pub async fn run_controller(client: Client, namespace: Option<String>) -> Result<()> {
72 let clusters: Api<RivvenCluster> = match &namespace {
73 Some(ns) => Api::namespaced(client.clone(), ns),
74 None => Api::all(client.clone()),
75 };
76
77 let ctx = Arc::new(ControllerContext {
78 client: client.clone(),
79 metrics: Some(ControllerMetrics::new()),
80 });
81
82 info!(
83 namespace = namespace.as_deref().unwrap_or("all"),
84 "Starting RivvenCluster controller"
85 );
86
87 let statefulsets = match &namespace {
89 Some(ns) => Api::<StatefulSet>::namespaced(client.clone(), ns),
90 None => Api::<StatefulSet>::all(client.clone()),
91 };
92
93 let services = match &namespace {
94 Some(ns) => Api::<Service>::namespaced(client.clone(), ns),
95 None => Api::<Service>::all(client.clone()),
96 };
97
98 Controller::new(clusters.clone(), Config::default())
99 .owns(statefulsets, Config::default())
100 .owns(services, Config::default())
101 .run(reconcile, error_policy, ctx)
102 .for_each(|result| async move {
103 match result {
104 Ok((obj, action)) => {
105 debug!(
106 name = obj.name,
107 namespace = obj.namespace,
108 ?action,
109 "Reconciliation completed"
110 );
111 }
112 Err(e) => {
113 error!(error = %e, "Reconciliation failed");
114 }
115 }
116 })
117 .await;
118
119 Ok(())
120}
121
122#[instrument(skip(cluster, ctx), fields(name = %cluster.name_any(), namespace = cluster.namespace()))]
124async fn reconcile(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
125 let start = std::time::Instant::now();
126
127 if let Some(ref metrics) = ctx.metrics {
128 metrics.reconciliations.increment(1);
129 }
130
131 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
132 let clusters: Api<RivvenCluster> = Api::namespaced(ctx.client.clone(), &namespace);
133
134 let result = finalizer(&clusters, FINALIZER_NAME, cluster, |event| async {
135 match event {
136 FinalizerEvent::Apply(cluster) => apply_cluster(cluster, ctx.clone()).await,
137 FinalizerEvent::Cleanup(cluster) => cleanup_cluster(cluster, ctx.clone()).await,
138 }
139 })
140 .await;
141
142 if let Some(ref metrics) = ctx.metrics {
143 metrics.duration.record(start.elapsed().as_secs_f64());
144 }
145
146 result.map_err(|e| {
147 if let Some(ref metrics) = ctx.metrics {
148 metrics.errors.increment(1);
149 }
150 OperatorError::ReconcileFailed(e.to_string())
151 })
152}
153
154#[instrument(skip(cluster, ctx))]
156async fn apply_cluster(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
157 let name = cluster.name_any();
158 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
159
160 info!(name = %name, namespace = %namespace, "Reconciling RivvenCluster");
161
162 if let Err(errors) = cluster.spec.validate() {
164 let error_messages: Vec<String> = errors
165 .field_errors()
166 .iter()
167 .flat_map(|(field, errs)| {
168 errs.iter()
169 .map(move |e| format!("{}: {:?}", field, e.message))
170 })
171 .collect();
172 let error_msg = error_messages.join("; ");
173 warn!(name = %name, errors = %error_msg, "Cluster spec validation failed");
174 return Err(OperatorError::InvalidConfig(error_msg));
175 }
176
177 validate_cluster_security(&cluster)?;
179
180 let builder = ResourceBuilder::new(&cluster)?;
182
183 let configmap = builder.build_configmap()?;
185 apply_configmap(&ctx.client, &namespace, configmap).await?;
186
187 let headless_svc = builder.build_headless_service();
189 apply_service(&ctx.client, &namespace, headless_svc).await?;
190
191 let client_svc = builder.build_client_service();
193 apply_service(&ctx.client, &namespace, client_svc).await?;
194
195 let statefulset = builder.build_statefulset();
197 let sts_status = apply_statefulset(&ctx.client, &namespace, statefulset).await?;
198
199 if let Some(pdb) = builder.build_pdb() {
201 apply_pdb(&ctx.client, &namespace, pdb).await?;
202 }
203
204 let status = build_status(&cluster, sts_status);
206 update_status(&ctx.client, &namespace, &name, status).await?;
207
208 info!(name = %name, "Reconciliation complete");
209
210 Ok(Action::requeue(Duration::from_secs(
211 DEFAULT_REQUEUE_SECONDS,
212 )))
213}
214
215fn validate_cluster_security(cluster: &RivvenCluster) -> Result<()> {
217 let spec = &cluster.spec;
218
219 if spec.tls.enabled && spec.tls.cert_secret_name.is_none() {
221 return Err(OperatorError::InvalidConfig(
222 "TLS is enabled but no certificate secret is specified".to_string(),
223 ));
224 }
225
226 if spec.tls.mtls_enabled && spec.tls.ca_secret_name.is_none() {
227 return Err(OperatorError::InvalidConfig(
228 "mTLS is enabled but no CA secret is specified".to_string(),
229 ));
230 }
231
232 if spec.config.default_replication_factor > spec.replicas {
234 return Err(OperatorError::InvalidConfig(format!(
235 "Replication factor ({}) cannot exceed replica count ({})",
236 spec.config.default_replication_factor, spec.replicas
237 )));
238 }
239
240 if spec.replicas == 1 {
242 warn!(
243 cluster = cluster.name_any(),
244 "Running with single replica - not recommended for production"
245 );
246 }
247
248 let min_election_timeout = spec.config.raft_heartbeat_interval_ms * 3;
250 if spec.config.raft_election_timeout_ms < min_election_timeout {
251 return Err(OperatorError::InvalidConfig(format!(
252 "Raft election timeout ({}) should be at least 3x heartbeat interval ({})",
253 spec.config.raft_election_timeout_ms, spec.config.raft_heartbeat_interval_ms
254 )));
255 }
256
257 Ok(())
258}
259
260#[instrument(skip(cluster, _ctx))]
262async fn cleanup_cluster(
263 cluster: Arc<RivvenCluster>,
264 _ctx: Arc<ControllerContext>,
265) -> Result<Action> {
266 let name = cluster.name_any();
267 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
268
269 info!(name = %name, namespace = %namespace, "Cleaning up RivvenCluster resources");
270
271 info!(name = %name, "Cleanup complete");
280
281 Ok(Action::await_change())
282}
283
284fn verify_ownership<K: Resource>(existing: &K) -> Result<()> {
292 let labels = existing.meta().labels.as_ref();
293 let managed_by = labels.and_then(|l| l.get("app.kubernetes.io/managed-by"));
294 match managed_by {
295 Some(manager) if manager != "rivven-operator" => {
296 let name = existing.meta().name.as_deref().unwrap_or("<unknown>");
297 Err(OperatorError::InvalidConfig(format!(
298 "resource '{}' is managed by '{}', not rivven-operator; \
299 refusing to force-apply to avoid ownership conflict",
300 name, manager
301 )))
302 }
303 _ => Ok(()),
304 }
305}
306
307async fn apply_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
318 let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
319 let name =
320 cm.metadata.name.as_ref().ok_or_else(|| {
321 OperatorError::InvalidConfig("ConfigMap missing metadata.name".into())
322 })?;
323
324 debug!(name = %name, "Applying ConfigMap");
325
326 if let Ok(existing) = api.get(name).await {
328 verify_ownership(&existing)?;
329 }
330
331 let patch_params = PatchParams::apply("rivven-operator").force();
332 api.patch(name, &patch_params, &Patch::Apply(&cm))
333 .await
334 .map_err(OperatorError::from)?;
335
336 Ok(())
337}
338
339async fn apply_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
341 let api: Api<Service> = Api::namespaced(client.clone(), namespace);
342 let name = svc
343 .metadata
344 .name
345 .as_ref()
346 .ok_or_else(|| OperatorError::InvalidConfig("Service missing metadata.name".into()))?;
347
348 debug!(name = %name, "Applying Service");
349
350 if let Ok(existing) = api.get(name).await {
352 verify_ownership(&existing)?;
353 }
354
355 let patch_params = PatchParams::apply("rivven-operator").force();
356 api.patch(name, &patch_params, &Patch::Apply(&svc))
357 .await
358 .map_err(OperatorError::from)?;
359
360 Ok(())
361}
362
363async fn apply_statefulset(
365 client: &Client,
366 namespace: &str,
367 sts: StatefulSet,
368) -> Result<Option<k8s_openapi::api::apps::v1::StatefulSetStatus>> {
369 let api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
370 let name =
371 sts.metadata.name.as_ref().ok_or_else(|| {
372 OperatorError::InvalidConfig("StatefulSet missing metadata.name".into())
373 })?;
374
375 debug!(name = %name, "Applying StatefulSet");
376
377 if let Ok(existing) = api.get(name).await {
379 verify_ownership(&existing)?;
380 }
381
382 let patch_params = PatchParams::apply("rivven-operator").force();
383 let result = api
384 .patch(name, &patch_params, &Patch::Apply(&sts))
385 .await
386 .map_err(OperatorError::from)?;
387
388 Ok(result.status)
389}
390
391async fn apply_pdb(client: &Client, namespace: &str, pdb: PodDisruptionBudget) -> Result<()> {
393 let api: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
394 let name = pdb
395 .metadata
396 .name
397 .as_ref()
398 .ok_or_else(|| OperatorError::InvalidConfig("PDB missing metadata.name".into()))?;
399
400 debug!(name = %name, "Applying PodDisruptionBudget");
401
402 if let Ok(existing) = api.get(name).await {
404 verify_ownership(&existing)?;
405 }
406
407 let patch_params = PatchParams::apply("rivven-operator").force();
408 api.patch(name, &patch_params, &Patch::Apply(&pdb))
409 .await
410 .map_err(OperatorError::from)?;
411
412 Ok(())
413}
414
415fn build_status(
417 cluster: &RivvenCluster,
418 sts_status: Option<k8s_openapi::api::apps::v1::StatefulSetStatus>,
419) -> RivvenClusterStatus {
420 let now = Utc::now().to_rfc3339();
421
422 let (replicas, ready_replicas, updated_replicas) = sts_status
423 .map(|s| {
424 (
425 s.replicas,
426 s.ready_replicas.unwrap_or(0),
427 s.updated_replicas.unwrap_or(0),
428 )
429 })
430 .unwrap_or((0, 0, 0));
431
432 let desired_replicas = cluster.spec.replicas;
433
434 let phase = if ready_replicas == 0 {
436 ClusterPhase::Provisioning
437 } else if ready_replicas < desired_replicas {
438 if updated_replicas < desired_replicas {
439 ClusterPhase::Updating
440 } else {
441 ClusterPhase::Degraded
442 }
443 } else if ready_replicas == desired_replicas {
444 ClusterPhase::Running
445 } else {
446 ClusterPhase::Degraded
447 };
448
449 let mut conditions = vec![];
451
452 conditions.push(ClusterCondition {
454 condition_type: "Ready".to_string(),
455 status: if ready_replicas >= desired_replicas {
456 "True".to_string()
457 } else {
458 "False".to_string()
459 },
460 reason: Some(format!(
461 "{}/{} replicas ready",
462 ready_replicas, desired_replicas
463 )),
464 message: None,
465 last_transition_time: Some(now.clone()),
466 });
467
468 conditions.push(ClusterCondition {
470 condition_type: "Available".to_string(),
471 status: if ready_replicas > 0 {
472 "True".to_string()
473 } else {
474 "False".to_string()
475 },
476 reason: Some(
477 if ready_replicas > 0 {
478 "AtLeastOneReplicaReady"
479 } else {
480 "NoReplicasReady"
481 }
482 .to_string(),
483 ),
484 message: None,
485 last_transition_time: Some(now.clone()),
486 });
487
488 let name = cluster
490 .metadata
491 .name
492 .as_ref()
493 .map(|n| format!("rivven-{}", n));
494 let namespace = cluster
495 .metadata
496 .namespace
497 .as_ref()
498 .cloned()
499 .unwrap_or_else(|| "default".to_string());
500
501 let broker_endpoints: Vec<String> = (0..ready_replicas)
502 .map(|i| {
503 format!(
504 "{}-{}.{}-headless.{}.svc.cluster.local:9092",
505 name.as_deref().unwrap_or("rivven"),
506 i,
507 name.as_deref().unwrap_or("rivven"),
508 namespace
509 )
510 })
511 .collect();
512
513 RivvenClusterStatus {
514 phase,
515 replicas,
516 ready_replicas,
517 updated_replicas,
518 observed_generation: cluster.metadata.generation.unwrap_or(0),
519 conditions,
520 broker_endpoints,
521 leader: None, last_updated: Some(now),
523 message: None,
524 }
525}
526
527async fn update_status(
529 client: &Client,
530 namespace: &str,
531 name: &str,
532 status: RivvenClusterStatus,
533) -> Result<()> {
534 let api: Api<RivvenCluster> = Api::namespaced(client.clone(), namespace);
535
536 debug!(name = %name, phase = ?status.phase, "Updating cluster status");
537
538 let patch = serde_json::json!({
539 "status": status
540 });
541
542 let patch_params = PatchParams::default();
543 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
544 .await
545 .map_err(OperatorError::from)?;
546
547 Ok(())
548}
549
550fn error_policy(
552 _cluster: Arc<RivvenCluster>,
553 error: &OperatorError,
554 _ctx: Arc<ControllerContext>,
555) -> Action {
556 warn!(
557 error = %error,
558 "Reconciliation error, will retry"
559 );
560
561 let delay = error
563 .requeue_delay()
564 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
565
566 Action::requeue(delay)
567}
568
569#[cfg(test)]
570mod tests {
571 use super::*;
572 use crate::crd::{
573 BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, RivvenClusterSpec, StorageSpec, TlsSpec,
574 };
575 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
576 use std::collections::BTreeMap;
577
578 fn create_test_cluster() -> RivvenCluster {
579 RivvenCluster {
580 metadata: ObjectMeta {
581 name: Some("test-cluster".to_string()),
582 namespace: Some("default".to_string()),
583 uid: Some("test-uid".to_string()),
584 generation: Some(1),
585 ..Default::default()
586 },
587 spec: RivvenClusterSpec {
588 replicas: 3,
589 version: "0.0.1".to_string(),
590 image: None,
591 image_pull_policy: "IfNotPresent".to_string(),
592 image_pull_secrets: vec![],
593 storage: StorageSpec::default(),
594 resources: None,
595 config: BrokerConfig::default(),
596 tls: TlsSpec::default(),
597 metrics: MetricsSpec::default(),
598 affinity: None,
599 node_selector: BTreeMap::new(),
600 tolerations: vec![],
601 pod_disruption_budget: PdbSpec::default(),
602 service_account: None,
603 pod_annotations: BTreeMap::new(),
604 pod_labels: BTreeMap::new(),
605 env: vec![],
606 liveness_probe: ProbeSpec::default(),
607 readiness_probe: ProbeSpec::default(),
608 security_context: None,
609 container_security_context: None,
610 },
611 status: None,
612 }
613 }
614
615 #[test]
616 fn test_build_status_provisioning() {
617 let cluster = create_test_cluster();
618 let status = build_status(&cluster, None);
619
620 assert_eq!(status.phase, ClusterPhase::Provisioning);
621 assert_eq!(status.replicas, 0);
622 assert_eq!(status.ready_replicas, 0);
623 }
624
625 #[test]
626 fn test_build_status_running() {
627 let cluster = create_test_cluster();
628 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
629 replicas: 3,
630 ready_replicas: Some(3),
631 updated_replicas: Some(3),
632 ..Default::default()
633 };
634
635 let status = build_status(&cluster, Some(sts_status));
636
637 assert_eq!(status.phase, ClusterPhase::Running);
638 assert_eq!(status.replicas, 3);
639 assert_eq!(status.ready_replicas, 3);
640 }
641
642 #[test]
643 fn test_build_status_degraded() {
644 let cluster = create_test_cluster();
645 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
646 replicas: 3,
647 ready_replicas: Some(2),
648 updated_replicas: Some(3),
649 ..Default::default()
650 };
651
652 let status = build_status(&cluster, Some(sts_status));
653
654 assert_eq!(status.phase, ClusterPhase::Degraded);
655 }
656
657 #[test]
658 fn test_build_status_updating() {
659 let cluster = create_test_cluster();
660 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
661 replicas: 3,
662 ready_replicas: Some(2),
663 updated_replicas: Some(1),
664 ..Default::default()
665 };
666
667 let status = build_status(&cluster, Some(sts_status));
668
669 assert_eq!(status.phase, ClusterPhase::Updating);
670 }
671
672 #[test]
673 fn test_broker_endpoints() {
674 let cluster = create_test_cluster();
675 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
676 replicas: 3,
677 ready_replicas: Some(3),
678 updated_replicas: Some(3),
679 ..Default::default()
680 };
681
682 let status = build_status(&cluster, Some(sts_status));
683
684 assert_eq!(status.broker_endpoints.len(), 3);
685 assert!(status.broker_endpoints[0].contains("rivven-test-cluster-0"));
686 }
687
688 #[test]
689 fn test_conditions() {
690 let cluster = create_test_cluster();
691 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
692 replicas: 3,
693 ready_replicas: Some(3),
694 updated_replicas: Some(3),
695 ..Default::default()
696 };
697
698 let status = build_status(&cluster, Some(sts_status));
699
700 assert_eq!(status.conditions.len(), 2);
701
702 let ready_cond = status
703 .conditions
704 .iter()
705 .find(|c| c.condition_type == "Ready")
706 .unwrap();
707 assert_eq!(ready_cond.status, "True");
708
709 let available_cond = status
710 .conditions
711 .iter()
712 .find(|c| c.condition_type == "Available")
713 .unwrap();
714 assert_eq!(available_cond.status, "True");
715 }
716}