1use crate::crd::{ClusterCondition, ClusterPhase, RivvenCluster, RivvenClusterStatus};
8use crate::error::{OperatorError, Result};
9use crate::resources::ResourceBuilder;
10use chrono::Utc;
11use futures::StreamExt;
12use k8s_openapi::api::apps::v1::StatefulSet;
13use k8s_openapi::api::core::v1::{ConfigMap, Service};
14use k8s_openapi::api::policy::v1::PodDisruptionBudget;
15use kube::api::{Api, Patch, PatchParams};
16use kube::runtime::controller::{Action, Controller};
17use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
18use kube::runtime::watcher::Config;
19use kube::{Client, ResourceExt};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, error, info, instrument, warn};
23use validator::Validate;
24
25pub const FINALIZER_NAME: &str = "rivven.hupe1980.github.io/cluster-finalizer";
27
28const DEFAULT_REQUEUE_SECONDS: u64 = 300; const ERROR_REQUEUE_SECONDS: u64 = 30;
33
34pub struct ControllerContext {
36 pub client: Client,
38 pub metrics: Option<ControllerMetrics>,
40}
41
42#[derive(Clone)]
44pub struct ControllerMetrics {
45 pub reconciliations: metrics::Counter,
47 pub errors: metrics::Counter,
49 pub duration: metrics::Histogram,
51}
52
53impl ControllerMetrics {
54 pub fn new() -> Self {
56 Self {
57 reconciliations: metrics::counter!("rivven_operator_reconciliations_total"),
58 errors: metrics::counter!("rivven_operator_reconciliation_errors_total"),
59 duration: metrics::histogram!("rivven_operator_reconciliation_duration_seconds"),
60 }
61 }
62}
63
64impl Default for ControllerMetrics {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70pub async fn run_controller(client: Client, namespace: Option<String>) -> Result<()> {
72 let clusters: Api<RivvenCluster> = match &namespace {
73 Some(ns) => Api::namespaced(client.clone(), ns),
74 None => Api::all(client.clone()),
75 };
76
77 let ctx = Arc::new(ControllerContext {
78 client: client.clone(),
79 metrics: Some(ControllerMetrics::new()),
80 });
81
82 info!(
83 namespace = namespace.as_deref().unwrap_or("all"),
84 "Starting RivvenCluster controller"
85 );
86
87 let statefulsets = match &namespace {
89 Some(ns) => Api::<StatefulSet>::namespaced(client.clone(), ns),
90 None => Api::<StatefulSet>::all(client.clone()),
91 };
92
93 let services = match &namespace {
94 Some(ns) => Api::<Service>::namespaced(client.clone(), ns),
95 None => Api::<Service>::all(client.clone()),
96 };
97
98 Controller::new(clusters.clone(), Config::default())
99 .owns(statefulsets, Config::default())
100 .owns(services, Config::default())
101 .run(reconcile, error_policy, ctx)
102 .for_each(|result| async move {
103 match result {
104 Ok((obj, action)) => {
105 debug!(
106 name = obj.name,
107 namespace = obj.namespace,
108 ?action,
109 "Reconciliation completed"
110 );
111 }
112 Err(e) => {
113 error!(error = %e, "Reconciliation failed");
114 }
115 }
116 })
117 .await;
118
119 Ok(())
120}
121
122#[instrument(skip(cluster, ctx), fields(name = %cluster.name_any(), namespace = cluster.namespace()))]
124async fn reconcile(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
125 let start = std::time::Instant::now();
126
127 if let Some(ref metrics) = ctx.metrics {
128 metrics.reconciliations.increment(1);
129 }
130
131 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
132 let clusters: Api<RivvenCluster> = Api::namespaced(ctx.client.clone(), &namespace);
133
134 let result = finalizer(&clusters, FINALIZER_NAME, cluster, |event| async {
135 match event {
136 FinalizerEvent::Apply(cluster) => apply_cluster(cluster, ctx.clone()).await,
137 FinalizerEvent::Cleanup(cluster) => cleanup_cluster(cluster, ctx.clone()).await,
138 }
139 })
140 .await;
141
142 if let Some(ref metrics) = ctx.metrics {
143 metrics.duration.record(start.elapsed().as_secs_f64());
144 }
145
146 result.map_err(|e| {
147 if let Some(ref metrics) = ctx.metrics {
148 metrics.errors.increment(1);
149 }
150 OperatorError::ReconcileFailed(e.to_string())
151 })
152}
153
154#[instrument(skip(cluster, ctx))]
156async fn apply_cluster(cluster: Arc<RivvenCluster>, ctx: Arc<ControllerContext>) -> Result<Action> {
157 let name = cluster.name_any();
158 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
159
160 info!(name = %name, namespace = %namespace, "Reconciling RivvenCluster");
161
162 if let Err(errors) = cluster.spec.validate() {
164 let error_messages: Vec<String> = errors
165 .field_errors()
166 .iter()
167 .flat_map(|(field, errs)| {
168 errs.iter()
169 .map(move |e| format!("{}: {:?}", field, e.message))
170 })
171 .collect();
172 let error_msg = error_messages.join("; ");
173 warn!(name = %name, errors = %error_msg, "Cluster spec validation failed");
174 return Err(OperatorError::InvalidConfig(error_msg));
175 }
176
177 validate_cluster_security(&cluster)?;
179
180 let builder = ResourceBuilder::new(&cluster)?;
182
183 let configmap = builder.build_configmap();
185 apply_configmap(&ctx.client, &namespace, configmap).await?;
186
187 let headless_svc = builder.build_headless_service();
189 apply_service(&ctx.client, &namespace, headless_svc).await?;
190
191 let client_svc = builder.build_client_service();
193 apply_service(&ctx.client, &namespace, client_svc).await?;
194
195 let statefulset = builder.build_statefulset();
197 let sts_status = apply_statefulset(&ctx.client, &namespace, statefulset).await?;
198
199 if let Some(pdb) = builder.build_pdb() {
201 apply_pdb(&ctx.client, &namespace, pdb).await?;
202 }
203
204 let status = build_status(&cluster, sts_status);
206 update_status(&ctx.client, &namespace, &name, status).await?;
207
208 info!(name = %name, "Reconciliation complete");
209
210 Ok(Action::requeue(Duration::from_secs(
211 DEFAULT_REQUEUE_SECONDS,
212 )))
213}
214
215fn validate_cluster_security(cluster: &RivvenCluster) -> Result<()> {
217 let spec = &cluster.spec;
218
219 if spec.tls.enabled && spec.tls.cert_secret_name.is_none() {
221 return Err(OperatorError::InvalidConfig(
222 "TLS is enabled but no certificate secret is specified".to_string(),
223 ));
224 }
225
226 if spec.tls.mtls_enabled && spec.tls.ca_secret_name.is_none() {
227 return Err(OperatorError::InvalidConfig(
228 "mTLS is enabled but no CA secret is specified".to_string(),
229 ));
230 }
231
232 if spec.config.default_replication_factor > spec.replicas {
234 return Err(OperatorError::InvalidConfig(format!(
235 "Replication factor ({}) cannot exceed replica count ({})",
236 spec.config.default_replication_factor, spec.replicas
237 )));
238 }
239
240 if spec.replicas == 1 {
242 warn!(
243 cluster = cluster.name_any(),
244 "Running with single replica - not recommended for production"
245 );
246 }
247
248 let min_election_timeout = spec.config.raft_heartbeat_interval_ms * 3;
250 if spec.config.raft_election_timeout_ms < min_election_timeout {
251 return Err(OperatorError::InvalidConfig(format!(
252 "Raft election timeout ({}) should be at least 3x heartbeat interval ({})",
253 spec.config.raft_election_timeout_ms, spec.config.raft_heartbeat_interval_ms
254 )));
255 }
256
257 Ok(())
258}
259
260#[instrument(skip(cluster, _ctx))]
262async fn cleanup_cluster(
263 cluster: Arc<RivvenCluster>,
264 _ctx: Arc<ControllerContext>,
265) -> Result<Action> {
266 let name = cluster.name_any();
267 let namespace = cluster.namespace().unwrap_or_else(|| "default".to_string());
268
269 info!(name = %name, namespace = %namespace, "Cleaning up RivvenCluster resources");
270
271 info!(name = %name, "Cleanup complete");
280
281 Ok(Action::await_change())
282}
283
284async fn apply_configmap(client: &Client, namespace: &str, cm: ConfigMap) -> Result<()> {
286 let api: Api<ConfigMap> = Api::namespaced(client.clone(), namespace);
287 let name = cm.metadata.name.as_ref().unwrap();
288
289 debug!(name = %name, "Applying ConfigMap");
290
291 let patch_params = PatchParams::apply("rivven-operator").force();
292 api.patch(name, &patch_params, &Patch::Apply(&cm))
293 .await
294 .map_err(OperatorError::from)?;
295
296 Ok(())
297}
298
299async fn apply_service(client: &Client, namespace: &str, svc: Service) -> Result<()> {
301 let api: Api<Service> = Api::namespaced(client.clone(), namespace);
302 let name = svc.metadata.name.as_ref().unwrap();
303
304 debug!(name = %name, "Applying Service");
305
306 let patch_params = PatchParams::apply("rivven-operator").force();
307 api.patch(name, &patch_params, &Patch::Apply(&svc))
308 .await
309 .map_err(OperatorError::from)?;
310
311 Ok(())
312}
313
314async fn apply_statefulset(
316 client: &Client,
317 namespace: &str,
318 sts: StatefulSet,
319) -> Result<Option<k8s_openapi::api::apps::v1::StatefulSetStatus>> {
320 let api: Api<StatefulSet> = Api::namespaced(client.clone(), namespace);
321 let name = sts.metadata.name.as_ref().unwrap();
322
323 debug!(name = %name, "Applying StatefulSet");
324
325 let patch_params = PatchParams::apply("rivven-operator").force();
326 let result = api
327 .patch(name, &patch_params, &Patch::Apply(&sts))
328 .await
329 .map_err(OperatorError::from)?;
330
331 Ok(result.status)
332}
333
334async fn apply_pdb(client: &Client, namespace: &str, pdb: PodDisruptionBudget) -> Result<()> {
336 let api: Api<PodDisruptionBudget> = Api::namespaced(client.clone(), namespace);
337 let name = pdb.metadata.name.as_ref().unwrap();
338
339 debug!(name = %name, "Applying PodDisruptionBudget");
340
341 let patch_params = PatchParams::apply("rivven-operator").force();
342 api.patch(name, &patch_params, &Patch::Apply(&pdb))
343 .await
344 .map_err(OperatorError::from)?;
345
346 Ok(())
347}
348
349fn build_status(
351 cluster: &RivvenCluster,
352 sts_status: Option<k8s_openapi::api::apps::v1::StatefulSetStatus>,
353) -> RivvenClusterStatus {
354 let now = Utc::now().to_rfc3339();
355
356 let (replicas, ready_replicas, updated_replicas) = sts_status
357 .map(|s| {
358 (
359 s.replicas,
360 s.ready_replicas.unwrap_or(0),
361 s.updated_replicas.unwrap_or(0),
362 )
363 })
364 .unwrap_or((0, 0, 0));
365
366 let desired_replicas = cluster.spec.replicas;
367
368 let phase = if ready_replicas == 0 {
370 ClusterPhase::Provisioning
371 } else if ready_replicas < desired_replicas {
372 if updated_replicas < desired_replicas {
373 ClusterPhase::Updating
374 } else {
375 ClusterPhase::Degraded
376 }
377 } else if ready_replicas == desired_replicas {
378 ClusterPhase::Running
379 } else {
380 ClusterPhase::Degraded
381 };
382
383 let mut conditions = vec![];
385
386 conditions.push(ClusterCondition {
388 condition_type: "Ready".to_string(),
389 status: if ready_replicas >= desired_replicas {
390 "True".to_string()
391 } else {
392 "False".to_string()
393 },
394 reason: Some(format!(
395 "{}/{} replicas ready",
396 ready_replicas, desired_replicas
397 )),
398 message: None,
399 last_transition_time: Some(now.clone()),
400 });
401
402 conditions.push(ClusterCondition {
404 condition_type: "Available".to_string(),
405 status: if ready_replicas > 0 {
406 "True".to_string()
407 } else {
408 "False".to_string()
409 },
410 reason: Some(
411 if ready_replicas > 0 {
412 "AtLeastOneReplicaReady"
413 } else {
414 "NoReplicasReady"
415 }
416 .to_string(),
417 ),
418 message: None,
419 last_transition_time: Some(now.clone()),
420 });
421
422 let name = cluster
424 .metadata
425 .name
426 .as_ref()
427 .map(|n| format!("rivven-{}", n));
428 let namespace = cluster
429 .metadata
430 .namespace
431 .as_ref()
432 .cloned()
433 .unwrap_or_else(|| "default".to_string());
434
435 let broker_endpoints: Vec<String> = (0..ready_replicas)
436 .map(|i| {
437 format!(
438 "{}-{}.{}-headless.{}.svc.cluster.local:9092",
439 name.as_deref().unwrap_or("rivven"),
440 i,
441 name.as_deref().unwrap_or("rivven"),
442 namespace
443 )
444 })
445 .collect();
446
447 RivvenClusterStatus {
448 phase,
449 replicas,
450 ready_replicas,
451 updated_replicas,
452 observed_generation: cluster.metadata.generation.unwrap_or(0),
453 conditions,
454 broker_endpoints,
455 leader: None, last_updated: Some(now),
457 message: None,
458 }
459}
460
461async fn update_status(
463 client: &Client,
464 namespace: &str,
465 name: &str,
466 status: RivvenClusterStatus,
467) -> Result<()> {
468 let api: Api<RivvenCluster> = Api::namespaced(client.clone(), namespace);
469
470 debug!(name = %name, phase = ?status.phase, "Updating cluster status");
471
472 let patch = serde_json::json!({
473 "status": status
474 });
475
476 let patch_params = PatchParams::default();
477 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
478 .await
479 .map_err(OperatorError::from)?;
480
481 Ok(())
482}
483
484fn error_policy(
486 _cluster: Arc<RivvenCluster>,
487 error: &OperatorError,
488 _ctx: Arc<ControllerContext>,
489) -> Action {
490 warn!(
491 error = %error,
492 "Reconciliation error, will retry"
493 );
494
495 let delay = error
497 .requeue_delay()
498 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
499
500 Action::requeue(delay)
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use crate::crd::{
507 BrokerConfig, MetricsSpec, PdbSpec, ProbeSpec, RivvenClusterSpec, StorageSpec, TlsSpec,
508 };
509 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
510 use std::collections::BTreeMap;
511
512 fn create_test_cluster() -> RivvenCluster {
513 RivvenCluster {
514 metadata: ObjectMeta {
515 name: Some("test-cluster".to_string()),
516 namespace: Some("default".to_string()),
517 uid: Some("test-uid".to_string()),
518 generation: Some(1),
519 ..Default::default()
520 },
521 spec: RivvenClusterSpec {
522 replicas: 3,
523 version: "0.0.1".to_string(),
524 image: None,
525 image_pull_policy: "IfNotPresent".to_string(),
526 image_pull_secrets: vec![],
527 storage: StorageSpec::default(),
528 resources: None,
529 config: BrokerConfig::default(),
530 tls: TlsSpec::default(),
531 metrics: MetricsSpec::default(),
532 affinity: None,
533 node_selector: BTreeMap::new(),
534 tolerations: vec![],
535 pod_disruption_budget: PdbSpec::default(),
536 service_account: None,
537 pod_annotations: BTreeMap::new(),
538 pod_labels: BTreeMap::new(),
539 env: vec![],
540 liveness_probe: ProbeSpec::default(),
541 readiness_probe: ProbeSpec::default(),
542 security_context: None,
543 container_security_context: None,
544 },
545 status: None,
546 }
547 }
548
549 #[test]
550 fn test_build_status_provisioning() {
551 let cluster = create_test_cluster();
552 let status = build_status(&cluster, None);
553
554 assert_eq!(status.phase, ClusterPhase::Provisioning);
555 assert_eq!(status.replicas, 0);
556 assert_eq!(status.ready_replicas, 0);
557 }
558
559 #[test]
560 fn test_build_status_running() {
561 let cluster = create_test_cluster();
562 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
563 replicas: 3,
564 ready_replicas: Some(3),
565 updated_replicas: Some(3),
566 ..Default::default()
567 };
568
569 let status = build_status(&cluster, Some(sts_status));
570
571 assert_eq!(status.phase, ClusterPhase::Running);
572 assert_eq!(status.replicas, 3);
573 assert_eq!(status.ready_replicas, 3);
574 }
575
576 #[test]
577 fn test_build_status_degraded() {
578 let cluster = create_test_cluster();
579 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
580 replicas: 3,
581 ready_replicas: Some(2),
582 updated_replicas: Some(3),
583 ..Default::default()
584 };
585
586 let status = build_status(&cluster, Some(sts_status));
587
588 assert_eq!(status.phase, ClusterPhase::Degraded);
589 }
590
591 #[test]
592 fn test_build_status_updating() {
593 let cluster = create_test_cluster();
594 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
595 replicas: 3,
596 ready_replicas: Some(2),
597 updated_replicas: Some(1),
598 ..Default::default()
599 };
600
601 let status = build_status(&cluster, Some(sts_status));
602
603 assert_eq!(status.phase, ClusterPhase::Updating);
604 }
605
606 #[test]
607 fn test_broker_endpoints() {
608 let cluster = create_test_cluster();
609 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
610 replicas: 3,
611 ready_replicas: Some(3),
612 updated_replicas: Some(3),
613 ..Default::default()
614 };
615
616 let status = build_status(&cluster, Some(sts_status));
617
618 assert_eq!(status.broker_endpoints.len(), 3);
619 assert!(status.broker_endpoints[0].contains("rivven-test-cluster-0"));
620 }
621
622 #[test]
623 fn test_conditions() {
624 let cluster = create_test_cluster();
625 let sts_status = k8s_openapi::api::apps::v1::StatefulSetStatus {
626 replicas: 3,
627 ready_replicas: Some(3),
628 updated_replicas: Some(3),
629 ..Default::default()
630 };
631
632 let status = build_status(&cluster, Some(sts_status));
633
634 assert_eq!(status.conditions.len(), 2);
635
636 let ready_cond = status
637 .conditions
638 .iter()
639 .find(|c| c.condition_type == "Ready")
640 .unwrap();
641 assert_eq!(ready_cond.status, "True");
642
643 let available_cond = status
644 .conditions
645 .iter()
646 .find(|c| c.condition_type == "Available")
647 .unwrap();
648 assert_eq!(available_cond.status, "True");
649 }
650}