1use crate::crd::{ClusterCondition, ClusterPhase, RivvenCluster, RivvenClusterStatus};
8use crate::error::{OperatorError, Result};
9use crate::resources::ResourceBuilder;
10use chrono::Utc;
11use futures::StreamExt;
12use k8s_openapi::api::apps::v1::StatefulSet;
13use k8s_openapi::api::core::v1::{ConfigMap, Service};
14use k8s_openapi::api::policy::v1::PodDisruptionBudget;
15use kube::api::{Api, Patch, PatchParams};
16use kube::runtime::controller::{Action, Controller};
17use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
18use kube::runtime::watcher::Config;
19use kube::{Client, ResourceExt};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, error, info, instrument, warn};
23use validator::Validate;
24
25pub const FINALIZER_NAME: &str = "rivven.hupe1980.github.io/cluster-finalizer";
27
28const DEFAULT_REQUEUE_SECONDS: u64 = 300; const ERROR_REQUEUE_SECONDS: u64 = 30;
33
34pub struct ControllerContext {
36 pub client: Client,
38 pub metrics: Option<ControllerMetrics>,
40}
41
42#[derive(Clone)]
44pub struct ControllerMetrics {
45 pub reconciliations: metrics::Counter,
47 pub errors: metrics::Counter,
49 pub duration: metrics::Histogram,
51}
52
53impl ControllerMetrics {
54 pub fn new() -> Self {
56 Self {
57 reconciliations: metrics::counter!("rivven_operator_reconciliations_total"),
58 errors: metrics::counter!("rivven_operator_reconciliation_errors_total"),
59 duration: metrics::histogram!("rivven_operator_reconciliation_duration_seconds"),
60 }
61 }
62}
63
64impl Default for ControllerMetrics {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70pub async fn run_controller(client: Client, namespace: Option<String>) -> Result<()> {
72 let clusters: Api<RivvenCluster> = match &namespace {
73 Some(ns) => Api::namespaced(client.clone(), ns),
74 None => Api::all(client.clone()),
75 };
76
77 let ctx = Arc::new(ControllerContext {
78 client: client.clone(),
79 metrics: Some(ControllerMetrics::new()),
80 });
81
82 info!(
83 namespace = namespace.as_deref().unwrap_or("all"),
84 "Starting RivvenCluster controller"
85 );
86
87 let statefulsets = match &namespace {
89 Some(ns) => Api::<StatefulSet>::namespaced(client.clone(), ns),
90 None => Api::<StatefulSet>::all(client.clone()),
91 };
92
93 let services = match &namespace {
94 Some(ns) => Api::<Service>::namespaced(client.clone(), ns),
95 None => Api::<Service>::all(client.clone()),
96 };
97
98 Controller::new(clusters.clone(), Config::default())
99 .owns(statefulsets, Config::default())
100 .owns(services, Config::default())
101 .run(reconcile, error_policy, ctx)
102 .for_each(|result| async move {
103 match result {
104 Ok((obj, action)) => {
105 debug!(
106 name = obj.name,
107 namespace = obj.namespace,
108 ?action,
109 "Reconciliation completed"
110 );
111 }
112 Err(e) => {
113 error!(error = %e, "Reconciliation failed");
114 }
115 }
116 })
117 .await;
118
119 Ok(())
120}
121
122#[instrument(skip(cluster, ctx), fields(name = %cluster.name_any(), namespace = cluster.namespace()))]
124async fn reconcile(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
125 let start = std::time::Instant::now();
126
127 if let Some(ref metrics) = ctx.metrics {
128 metrics.reconciliations.increment(1);
129 }
130
131 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
132 let clusters: Api<RivvenCluster> = Api::namespaced(ctx.client.clone(), &namespace);
133
134 let result = finalizer(&clusters, FINALIZER_NAME, cluster, |event| async {
135 match event {
136 FinalizerEvent::Apply(cluster) => apply_cluster(cluster, ctx.clone()).await,
137 FinalizerEvent::Cleanup(cluster) => cleanup_cluster(cluster, ctx.clone()).await,
138 }
139 })
140 .await;
141
142 if let Some(ref metrics) = ctx.metrics {
143 metrics.duration.record(start.elapsed().as_secs_f64());
144 }
145
146 result.map_err(|e| {
147 if let Some(ref metrics) = ctx.metrics {
148 metrics.errors.increment(1);
149 }
150 OperatorError::ReconcileFailed(e.to_string())
151 })
152}
153
154#[instrument(skip(cluster, ctx))]
156async fn apply_cluster(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
157 let name = cluster.name_any();
158 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
159
160 info!(name = %name, namespace = %namespace, "Reconciling RivvenCluster");
161
162 if let Err(errors) = cluster.spec.validate() {
164 let error_messages: Vec<String> = errors
165 .field_errors()
166 .iter()
167 .flat_map(|(field, errs)| {
168 errs.iter()
169 .map(move |e| format!("{}: {:?}", field, e.message))
170 })
171 .collect();
172 let error_msg = error_messages.join("; ");
173 warn!(name = %name, errors = %error_msg, "Cluster spec validation failed");
174 return Err(OperatorError::InvalidConfig(error_msg));
175 }
176
177 validate_cluster_security(&cluster)?;
179
180 let builder = ResourceBuilder::new(&cluster)?;
182
183 let configmap = builder.build_configmap()?;
185 apply_configmap(&ctx.client, &namespace, configmap).await?;
186
187 let headless_svc = builder.build_headless_service();
189 apply_service(&ctx.client, &namespace, headless_svc).await?;
190
191 let client_svc = builder.build_client_service();
193 apply_service(&ctx.client, &namespace, client_svc).await?;
194
195 let statefulset = builder.build_statefulset();
197 let sts_status = apply_statefulset(&ctx.client, &namespace, statefulset).await?;
198
199 if let Some(pdb) = builder.build_pdb() {
201 apply_pdb(&ctx.client, &namespace, pdb).await?;
202 }
203
204 let status = build_status(&cluster, sts_status);
206 update_status(&ctx.client, &namespace, &name, status).await?;
207
208 info!(name = %name, "Reconciliation complete");
209
210 Ok(Action::requeue(Duration::from_secs(
211 DEFAULT_REQUEUE_SECONDS,
212 )))
213}
214
215fn validate_cluster_security(cluster: &RivvenCluster) -> Result<()> {
217 let spec = &cluster.spec;
218
219 if spec.tls.enabled && spec.tls.cert_secret_name.is_none() {
221 return Err(OperatorError::InvalidConfig(
222 "TLS is enabled but no certificate secret is specified".to_string(),
223 ));
224 }
225
226 if spec.tls.mtls_enabled && spec.tls.ca_secret_name.is_none() {
227 return Err(OperatorError::InvalidConfig(
228 "mTLS is enabled but no CA secret is specified".to_string(),
229 ));
230 }
231
232 if spec.config.default_replication_factor > spec.replicas {
234 return Err(OperatorError::InvalidConfig(format!(
235 "Replication factor ({}) cannot exceed replica count ({})",
236 spec.config.default_replication_factor, spec.replicas
237 )));
238 }
239
240 if spec.replicas == 1 {
242 warn!(
243 cluster = cluster.name_any(),
244 "Running with single replica - not recommended for production"
245 );
246 }
247
248 let min_election_timeout = spec.config.raft_heartbeat_interval_ms * 3;
250 if spec.config.raft_election_timeout_ms < min_election_timeout {
251 return Err(OperatorError::InvalidConfig(format!(
252 "Raft election timeout ({}) should be at least 3x heartbeat interval ({})",
253 spec.config.raft_election_timeout_ms, spec.config.raft_heartbeat_interval_ms
254 )));
255 }
256
257 Ok(())
258}
259
260#[instrument(skip(cluster, _ctx))]
262async fn cleanup_cluster(
263 cluster: Arc<RivvenCluster>,
264 _ctx: Arc<ControllerContext>,
265) -> Result<Action> {
266 let name = cluster.name_any();
267 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
268
269 info!(name = %name, namespace = %namespace, "Cleaning up RivvenCluster resources");
270
271 info!(name = %name, "Cleanup complete");
280
281 Ok(Action::await_change())
282}
283
284async fn apply_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
286 let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
287 let name =
288 cm.metadata.name.as_ref().ok_or_else(|| {
289 OperatorError::InvalidConfig("ConfigMap missing metadata.name".into())
290 })?;
291
292 debug!(name = %name, "Applying ConfigMap");
293
294 let patch_params = PatchParams::apply("rivven-operator").force();
295 api.patch(name, &patch_params, &Patch::Apply(&cm))
296 .await
297 .map_err(OperatorError::from)?;
298
299 Ok(())
300}
301
302async fn apply_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
304 let api: Api<Service> = Api::namespaced(client.clone(), namespace);
305 let name = svc
306 .metadata
307 .name
308 .as_ref()
309 .ok_or_else(|| OperatorError::InvalidConfig("Service missing metadata.name".into()))?;
310
311 debug!(name = %name, "Applying Service");
312
313 let patch_params = PatchParams::apply("rivven-operator").force();
314 api.patch(name, &patch_params, &Patch::Apply(&svc))
315 .await
316 .map_err(OperatorError::from)?;
317
318 Ok(())
319}
320
321async fn apply_statefulset(
323 client: &Client,
324 namespace: &str,
325 sts: StatefulSet,
326) -> Result<Option<k8s_openapi::api::apps::v1::StatefulSetStatus>> {
327 let api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
328 let name =
329 sts.metadata.name.as_ref().ok_or_else(|| {
330 OperatorError::InvalidConfig("StatefulSet missing metadata.name".into())
331 })?;
332
333 debug!(name = %name, "Applying StatefulSet");
334
335 let patch_params = PatchParams::apply("rivven-operator").force();
336 let result = api
337 .patch(name, &patch_params, &Patch::Apply(&sts))
338 .await
339 .map_err(OperatorError::from)?;
340
341 Ok(result.status)
342}
343
344async fn apply_pdb(client: &Client, namespace: &str, pdb: PodDisruptionBudget) -> Result<()> {
346 let api: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
347 let name = pdb
348 .metadata
349 .name
350 .as_ref()
351 .ok_or_else(|| OperatorError::InvalidConfig("PDB missing metadata.name".into()))?;
352
353 debug!(name = %name, "Applying PodDisruptionBudget");
354
355 let patch_params = PatchParams::apply("rivven-operator").force();
356 api.patch(name, &patch_params, &Patch::Apply(&pdb))
357 .await
358 .map_err(OperatorError::from)?;
359
360 Ok(())
361}
362
363fn build_status(
365 cluster: &RivvenCluster,
366 sts_status: Option<k8s_openapi::api::apps::v1::StatefulSetStatus>,
367) -> RivvenClusterStatus {
368 let now = Utc::now().to_rfc3339();
369
370 let (replicas, ready_replicas, updated_replicas) = sts_status
371 .map(|s| {
372 (
373 s.replicas,
374 s.ready_replicas.unwrap_or(0),
375 s.updated_replicas.unwrap_or(0),
376 )
377 })
378 .unwrap_or((0, 0, 0));
379
380 let desired_replicas = cluster.spec.replicas;
381
382 let phase = if ready_replicas == 0 {
384 ClusterPhase::Provisioning
385 } else if ready_replicas < desired_replicas {
386 if updated_replicas < desired_replicas {
387 ClusterPhase::Updating
388 } else {
389 ClusterPhase::Degraded
390 }
391 } else if ready_replicas == desired_replicas {
392 ClusterPhase::Running
393 } else {
394 ClusterPhase::Degraded
395 };
396
397 let mut conditions = vec![];
399
400 conditions.push(ClusterCondition {
402 condition_type: "Ready".to_string(),
403 status: if ready_replicas >= desired_replicas {
404 "True".to_string()
405 } else {
406 "False".to_string()
407 },
408 reason: Some(format!(
409 "{}/{} replicas ready",
410 ready_replicas, desired_replicas
411 )),
412 message: None,
413 last_transition_time: Some(now.clone()),
414 });
415
416 conditions.push(ClusterCondition {
418 condition_type: "Available".to_string(),
419 status: if ready_replicas > 0 {
420 "True".to_string()
421 } else {
422 "False".to_string()
423 },
424 reason: Some(
425 if ready_replicas > 0 {
426 "AtLeastOneReplicaReady"
427 } else {
428 "NoReplicasReady"
429 }
430 .to_string(),
431 ),
432 message: None,
433 last_transition_time: Some(now.clone()),
434 });
435
436 let name = cluster
438 .metadata
439 .name
440 .as_ref()
441 .map(|n| format!("rivven-{}", n));
442 let namespace = cluster
443 .metadata
444 .namespace
445 .as_ref()
446 .cloned()
447 .unwrap_or_else(|| "default".to_string());
448
449 let broker_endpoints: Vec<String> = (0..ready_replicas)
450 .map(|i| {
451 format!(
452 "{}-{}.{}-headless.{}.svc.cluster.local:9092",
453 name.as_deref().unwrap_or("rivven"),
454 i,
455 name.as_deref().unwrap_or("rivven"),
456 namespace
457 )
458 })
459 .collect();
460
461 RivvenClusterStatus {
462 phase,
463 replicas,
464 ready_replicas,
465 updated_replicas,
466 observed_generation: cluster.metadata.generation.unwrap_or(0),
467 conditions,
468 broker_endpoints,
469 leader: None, last_updated: Some(now),
471 message: None,
472 }
473}
474
475async fn update_status(
477 client: &Client,
478 namespace: &str,
479 name: &str,
480 status: RivvenClusterStatus,
481) -> Result<()> {
482 let api: Api<RivvenCluster> = Api::namespaced(client.clone(), namespace);
483
484 debug!(name = %name, phase = ?status.phase, "Updating cluster status");
485
486 let patch = serde_json::json!({
487 "status": status
488 });
489
490 let patch_params = PatchParams::default();
491 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
492 .await
493 .map_err(OperatorError::from)?;
494
495 Ok(())
496}
497
498fn error_policy(
500 _cluster: Arc<RivvenCluster>,
501 error: &OperatorError,
502 _ctx: Arc<ControllerContext>,
503) -> Action {
504 warn!(
505 error = %error,
506 "Reconciliation error, will retry"
507 );
508
509 let delay = error
511 .requeue_delay()
512 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
513
514 Action::requeue(delay)
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use crate::crd::{
521 BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, RivvenClusterSpec, StorageSpec, TlsSpec,
522 };
523 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
524 use std::collections::BTreeMap;
525
526 fn create_test_cluster() -> RivvenCluster {
527 RivvenCluster {
528 metadata: ObjectMeta {
529 name: Some("test-cluster".to_string()),
530 namespace: Some("default".to_string()),
531 uid: Some("test-uid".to_string()),
532 generation: Some(1),
533 ..Default::default()
534 },
535 spec: RivvenClusterSpec {
536 replicas: 3,
537 version: "0.0.1".to_string(),
538 image: None,
539 image_pull_policy: "IfNotPresent".to_string(),
540 image_pull_secrets: vec![],
541 storage: StorageSpec::default(),
542 resources: None,
543 config: BrokerConfig::default(),
544 tls: TlsSpec::default(),
545 metrics: MetricsSpec::default(),
546 affinity: None,
547 node_selector: BTreeMap::new(),
548 tolerations: vec![],
549 pod_disruption_budget: PdbSpec::default(),
550 service_account: None,
551 pod_annotations: BTreeMap::new(),
552 pod_labels: BTreeMap::new(),
553 env: vec![],
554 liveness_probe: ProbeSpec::default(),
555 readiness_probe: ProbeSpec::default(),
556 security_context: None,
557 container_security_context: None,
558 },
559 status: None,
560 }
561 }
562
563 #[test]
564 fn test_build_status_provisioning() {
565 let cluster = create_test_cluster();
566 let status = build_status(&cluster, None);
567
568 assert_eq!(status.phase, ClusterPhase::Provisioning);
569 assert_eq!(status.replicas, 0);
570 assert_eq!(status.ready_replicas, 0);
571 }
572
573 #[test]
574 fn test_build_status_running() {
575 let cluster = create_test_cluster();
576 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
577 replicas: 3,
578 ready_replicas: Some(3),
579 updated_replicas: Some(3),
580 ..Default::default()
581 };
582
583 let status = build_status(&cluster, Some(sts_status));
584
585 assert_eq!(status.phase, ClusterPhase::Running);
586 assert_eq!(status.replicas, 3);
587 assert_eq!(status.ready_replicas, 3);
588 }
589
590 #[test]
591 fn test_build_status_degraded() {
592 let cluster = create_test_cluster();
593 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
594 replicas: 3,
595 ready_replicas: Some(2),
596 updated_replicas: Some(3),
597 ..Default::default()
598 };
599
600 let status = build_status(&cluster, Some(sts_status));
601
602 assert_eq!(status.phase, ClusterPhase::Degraded);
603 }
604
605 #[test]
606 fn test_build_status_updating() {
607 let cluster = create_test_cluster();
608 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
609 replicas: 3,
610 ready_replicas: Some(2),
611 updated_replicas: Some(1),
612 ..Default::default()
613 };
614
615 let status = build_status(&cluster, Some(sts_status));
616
617 assert_eq!(status.phase, ClusterPhase::Updating);
618 }
619
620 #[test]
621 fn test_broker_endpoints() {
622 let cluster = create_test_cluster();
623 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
624 replicas: 3,
625 ready_replicas: Some(3),
626 updated_replicas: Some(3),
627 ..Default::default()
628 };
629
630 let status = build_status(&cluster, Some(sts_status));
631
632 assert_eq!(status.broker_endpoints.len(), 3);
633 assert!(status.broker_endpoints[0].contains("rivven-test-cluster-0"));
634 }
635
636 #[test]
637 fn test_conditions() {
638 let cluster = create_test_cluster();
639 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
640 replicas: 3,
641 ready_replicas: Some(3),
642 updated_replicas: Some(3),
643 ..Default::default()
644 };
645
646 let status = build_status(&cluster, Some(sts_status));
647
648 assert_eq!(status.conditions.len(), 2);
649
650 let ready_cond = status
651 .conditions
652 .iter()
653 .find(|c| c.condition_type == "Ready")
654 .unwrap();
655 assert_eq!(ready_cond.status, "True");
656
657 let available_cond = status
658 .conditions
659 .iter()
660 .find(|c| c.condition_type == "Available")
661 .unwrap();
662 assert_eq!(available_cond.status, "True");
663 }
664}