1use crate::cluster_client::ClusterClient;
8use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
9use crate::error::{OperatorError, Result};
10use chrono::Utc;
11use futures::StreamExt;
12use kube::api::{Api, Patch, PatchParams};
13use kube::runtime::controller::{Action, Controller};
14use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
15use kube::runtime::watcher::Config;
16use kube::{Client, ResourceExt};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20use validator::Validate;
21
22pub const TOPIC_FINALIZER: &str = "rivven.hupe1980.github.io/topic-finalizer";
24
25const DEFAULT_REQUEUE_SECONDS: u64 = 120; const ERROR_REQUEUE_SECONDS: u64 = 30;
30
31pub struct TopicControllerContext {
33 pub client: Client,
35 pub cluster_client: ClusterClient,
37 pub metrics: Option<TopicControllerMetrics>,
39}
40
41#[derive(Clone)]
43pub struct TopicControllerMetrics {
44 pub reconciliations: metrics::Counter,
46 pub errors: metrics::Counter,
48 pub duration: metrics::Histogram,
50 pub topics_total: metrics::Gauge,
52}
53
54impl TopicControllerMetrics {
55 pub fn new() -> Self {
57 Self {
58 reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
59 errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
60 duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
61 topics_total: metrics::gauge!("rivven_topic_managed_total"),
62 }
63 }
64}
65
66impl Default for TopicControllerMetrics {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
74 let topics: Api<RivvenTopic> = match &namespace {
75 Some(ns) => Api::namespaced(client.clone(), ns),
76 None => Api::all(client.clone()),
77 };
78
79 let ctx = Arc::new(TopicControllerContext {
80 client: client.clone(),
81 cluster_client: ClusterClient::new(),
82 metrics: Some(TopicControllerMetrics::new()),
83 });
84
85 info!(
86 namespace = namespace.as_deref().unwrap_or("all"),
87 "Starting RivvenTopic controller"
88 );
89
90 Controller::new(topics.clone(), Config::default())
91 .run(reconcile_topic, topic_error_policy, ctx)
92 .for_each(|result| async move {
93 match result {
94 Ok((obj, action)) => {
95 debug!(
96 name = obj.name,
97 namespace = obj.namespace,
98 ?action,
99 "Topic reconciliation completed"
100 );
101 }
102 Err(e) => {
103 error!(error = %e, "Topic reconciliation failed");
104 }
105 }
106 })
107 .await;
108
109 Ok(())
110}
111
112#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
114async fn reconcile_topic(
115 topic: Arc<RivvenTopic>,
116 ctx: Arc<TopicControllerContext>,
117) -> Result<Action> {
118 let start = std::time::Instant::now();
119
120 if let Some(ref metrics) = ctx.metrics {
121 metrics.reconciliations.increment(1);
122 }
123
124 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
125 let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
126
127 let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
128 match event {
129 FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
130 FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
131 }
132 })
133 .await;
134
135 if let Some(ref metrics) = ctx.metrics {
136 metrics.duration.record(start.elapsed().as_secs_f64());
137 }
138
139 result.map_err(|e| {
140 if let Some(ref metrics) = ctx.metrics {
141 metrics.errors.increment(1);
142 }
143 OperatorError::ReconcileFailed(e.to_string())
144 })
145}
146
147#[instrument(skip(topic, ctx))]
149async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
150 let name = topic.name_any();
151 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
152
153 info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
154
155 if let Err(errors) = topic.spec.validate() {
157 let error_messages: Vec<String> = errors
158 .field_errors()
159 .iter()
160 .flat_map(|(field, errs)| {
161 errs.iter()
162 .map(move |e| format!("{}: {:?}", field, e.message))
163 })
164 .collect();
165 let error_msg = error_messages.join("; ");
166 warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
167
168 update_topic_status(
169 &ctx.client,
170 &namespace,
171 &name,
172 build_failed_status(&topic, &error_msg),
173 )
174 .await?;
175
176 return Err(OperatorError::InvalidConfig(error_msg));
177 }
178
179 let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
181
182 let broker_endpoints = get_cluster_endpoints(&cluster);
184
185 if broker_endpoints.is_empty() {
186 warn!(name = %name, "Cluster has no ready broker endpoints");
187 update_topic_status(
188 &ctx.client,
189 &namespace,
190 &name,
191 build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
192 )
193 .await?;
194 return Ok(Action::requeue(Duration::from_secs(30)));
195 }
196
197 let topic_result =
199 reconcile_topic_with_cluster(&topic, &broker_endpoints, &ctx.cluster_client).await;
200
201 match topic_result {
202 Ok(topic_info) => {
203 let status = build_ready_status(&topic, topic_info);
205 update_topic_status(&ctx.client, &namespace, &name, status).await?;
206 if let Some(ref metrics) = ctx.metrics {
207 metrics.topics_total.increment(1.0);
208 }
209 info!(name = %name, "Topic reconciliation complete");
210 Ok(Action::requeue(Duration::from_secs(
211 DEFAULT_REQUEUE_SECONDS,
212 )))
213 }
214 Err(e) => {
215 let status = build_error_status(&topic, &e.to_string());
217 update_topic_status(&ctx.client, &namespace, &name, status).await?;
218 warn!(name = %name, error = %e, "Topic reconciliation failed");
219 Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
220 }
221 }
222}
223
224async fn verify_cluster_ref(
226 client: &Client,
227 namespace: &str,
228 cluster_ref: &ClusterReference,
229) -> Result<crate::crd::RivvenCluster> {
230 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
231 let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
232
233 match clusters.get(&cluster_ref.name).await {
234 Ok(cluster) => Ok(cluster),
235 Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
236 format!("{}/{}", cluster_ns, cluster_ref.name),
237 )),
238 Err(e) => Err(OperatorError::from(e)),
239 }
240}
241
242fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
244 cluster
245 .status
246 .as_ref()
247 .map(|s| s.broker_endpoints.clone())
248 .unwrap_or_default()
249}
250
251struct TopicInfo {
253 partitions: i32,
255 replication_factor: i32,
257 existed: bool,
259 partition_info: Vec<PartitionInfo>,
261}
262
263async fn reconcile_topic_with_cluster(
267 topic: &RivvenTopic,
268 broker_endpoints: &[String],
269 cluster_client: &ClusterClient,
270) -> Result<TopicInfo> {
271 let topic_name = topic.name_any();
272 let spec = &topic.spec;
273
274 info!(
275 topic = %topic_name,
276 partitions = spec.partitions,
277 replication = spec.replication_factor,
278 brokers = ?broker_endpoints,
279 "Reconciling topic with cluster"
280 );
281
282 let partitions_u32 = u32::try_from(spec.partitions).map_err(|_| {
284 OperatorError::ValidationError(format!(
285 "partitions {} is negative or exceeds u32::MAX",
286 spec.partitions
287 ))
288 })?;
289 let cluster_topic_info = cluster_client
290 .ensure_topic(broker_endpoints, &topic_name, partitions_u32)
291 .await?;
292
293 let partition_info: Vec<PartitionInfo> = (0..i32::try_from(cluster_topic_info.partitions)
298 .unwrap_or(i32::MAX))
299 .map(|i| PartitionInfo {
300 partition: i,
301 leader: -1, replicas: vec![],
303 isr: vec![],
304 })
305 .collect();
306
307 info!(
308 topic = %topic_name,
309 partitions = cluster_topic_info.partitions,
310 existed = cluster_topic_info.existed,
311 "Topic reconciliation successful"
312 );
313
314 Ok(TopicInfo {
315 partitions: i32::try_from(cluster_topic_info.partitions).unwrap_or(i32::MAX),
316 replication_factor: spec.replication_factor,
317 existed: cluster_topic_info.existed,
318 partition_info,
319 })
320}
321
322fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
324 let now = Utc::now().to_rfc3339();
325
326 let conditions = vec![
327 TopicCondition {
328 r#type: "Ready".to_string(),
329 status: "True".to_string(),
330 reason: "TopicReady".to_string(),
331 message: "Topic is ready and serving traffic".to_string(),
332 last_transition_time: now.clone(),
333 },
334 TopicCondition {
335 r#type: "Synced".to_string(),
336 status: "True".to_string(),
337 reason: "SyncSucceeded".to_string(),
338 message: if info.existed {
339 "Topic configuration synchronized".to_string()
340 } else {
341 "Topic created successfully".to_string()
342 },
343 last_transition_time: now.clone(),
344 },
345 TopicCondition {
346 r#type: "ConfigApplied".to_string(),
347 status: "True".to_string(),
348 reason: "ConfigApplied".to_string(),
349 message: "Topic configuration applied".to_string(),
350 last_transition_time: now.clone(),
351 },
352 TopicCondition {
353 r#type: "ACLsApplied".to_string(),
354 status: if topic.spec.acls.is_empty() {
355 "N/A".to_string()
356 } else {
357 "True".to_string()
358 },
359 reason: if topic.spec.acls.is_empty() {
360 "NoACLs".to_string()
361 } else {
362 "ACLsApplied".to_string()
363 },
364 message: if topic.spec.acls.is_empty() {
365 "No ACLs configured".to_string()
366 } else {
367 format!("{} ACL entries applied", topic.spec.acls.len())
368 },
369 last_transition_time: now.clone(),
370 },
371 ];
372
373 RivvenTopicStatus {
374 phase: "Ready".to_string(),
375 message: "Topic is ready".to_string(),
376 current_partitions: info.partitions,
377 current_replication_factor: info.replication_factor,
378 topic_exists: true,
379 observed_generation: topic.metadata.generation.unwrap_or(0),
380 conditions,
381 last_sync_time: Some(now),
382 partition_info: info.partition_info,
383 }
384}
385
386fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
388 let now = Utc::now().to_rfc3339();
389
390 RivvenTopicStatus {
391 phase: "Pending".to_string(),
392 message: message.to_string(),
393 current_partitions: 0,
394 current_replication_factor: 0,
395 topic_exists: false,
396 observed_generation: topic.metadata.generation.unwrap_or(0),
397 conditions: vec![TopicCondition {
398 r#type: "Ready".to_string(),
399 status: "False".to_string(),
400 reason: "Pending".to_string(),
401 message: message.to_string(),
402 last_transition_time: now.clone(),
403 }],
404 last_sync_time: Some(now),
405 partition_info: vec![],
406 }
407}
408
409fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
411 let now = Utc::now().to_rfc3339();
412
413 RivvenTopicStatus {
414 phase: "Failed".to_string(),
415 message: error_msg.to_string(),
416 current_partitions: 0,
417 current_replication_factor: 0,
418 topic_exists: false,
419 observed_generation: topic.metadata.generation.unwrap_or(0),
420 conditions: vec![TopicCondition {
421 r#type: "Ready".to_string(),
422 status: "False".to_string(),
423 reason: "ValidationFailed".to_string(),
424 message: error_msg.to_string(),
425 last_transition_time: now.clone(),
426 }],
427 last_sync_time: Some(now),
428 partition_info: vec![],
429 }
430}
431
432fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
434 let now = Utc::now().to_rfc3339();
435
436 let existing_status = topic.status.clone().unwrap_or_default();
438
439 RivvenTopicStatus {
440 phase: "Error".to_string(),
441 message: error_msg.to_string(),
442 current_partitions: existing_status.current_partitions,
443 current_replication_factor: existing_status.current_replication_factor,
444 topic_exists: existing_status.topic_exists,
445 observed_generation: topic.metadata.generation.unwrap_or(0),
446 conditions: vec![
447 TopicCondition {
448 r#type: "Ready".to_string(),
449 status: if existing_status.topic_exists {
450 "True".to_string()
451 } else {
452 "False".to_string()
453 },
454 reason: "ReconcileError".to_string(),
455 message: error_msg.to_string(),
456 last_transition_time: now.clone(),
457 },
458 TopicCondition {
459 r#type: "Synced".to_string(),
460 status: "False".to_string(),
461 reason: "SyncFailed".to_string(),
462 message: error_msg.to_string(),
463 last_transition_time: now.clone(),
464 },
465 ],
466 last_sync_time: Some(now),
467 partition_info: existing_status.partition_info,
468 }
469}
470
471async fn update_topic_status(
473 client: &Client,
474 namespace: &str,
475 name: &str,
476 status: RivvenTopicStatus,
477) -> Result<()> {
478 let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
479
480 debug!(name = %name, phase = %status.phase, "Updating topic status");
481
482 let patch = serde_json::json!({
483 "status": status
484 });
485
486 let patch_params = PatchParams::default();
487 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
488 .await
489 .map_err(OperatorError::from)?;
490
491 Ok(())
492}
493
494#[instrument(skip(topic, ctx))]
496async fn cleanup_topic(
497 topic: Arc<RivvenTopic>,
498 ctx: Arc<TopicControllerContext>,
499) -> Result<Action> {
500 let name = topic.name_any();
501 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
502
503 info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
504
505 if topic.spec.delete_on_remove {
507 info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
508
509 match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
511 Ok(cluster) => {
512 let endpoints = get_cluster_endpoints(&cluster);
513 if !endpoints.is_empty() {
514 if let Err(e) =
516 delete_topic_from_cluster(&name, &endpoints, &ctx.cluster_client).await
517 {
518 warn!(
519 name = %name,
520 error = %e,
521 "Failed to delete topic from cluster (may not exist)"
522 );
523 }
524 }
525 }
526 Err(e) => {
527 warn!(
528 name = %name,
529 error = %e,
530 "Cluster not found during cleanup, topic may be orphaned"
531 );
532 }
533 }
534 } else {
535 info!(
536 name = %name,
537 "delete_on_remove is false, topic will remain in cluster"
538 );
539 }
540
541 info!(name = %name, "Topic cleanup complete");
542
543 if let Some(ref metrics) = ctx.metrics {
544 metrics.topics_total.decrement(1.0);
545 }
546
547 Ok(Action::await_change())
548}
549
550async fn delete_topic_from_cluster(
552 topic_name: &str,
553 broker_endpoints: &[String],
554 cluster_client: &ClusterClient,
555) -> Result<()> {
556 info!(topic = %topic_name, "Deleting topic from cluster");
557 cluster_client
558 .delete_topic(broker_endpoints, topic_name)
559 .await
560}
561
562fn topic_error_policy(
564 _topic: Arc<RivvenTopic>,
565 error: &OperatorError,
566 _ctx: Arc<TopicControllerContext>,
567) -> Action {
568 warn!(
569 error = %error,
570 "Topic reconciliation error, will retry"
571 );
572
573 let delay = error
574 .requeue_delay()
575 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
576
577 Action::requeue(delay)
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
584 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
585 use std::collections::BTreeMap;
586
587 fn create_test_topic() -> RivvenTopic {
588 RivvenTopic {
589 metadata: ObjectMeta {
590 name: Some("test-topic".to_string()),
591 namespace: Some("default".to_string()),
592 uid: Some("test-uid".to_string()),
593 generation: Some(1),
594 ..Default::default()
595 },
596 spec: RivvenTopicSpec {
597 cluster_ref: ClusterReference {
598 name: "test-cluster".to_string(),
599 namespace: None,
600 },
601 partitions: 6,
602 replication_factor: 3,
603 config: TopicConfig::default(),
604 acls: vec![],
605 delete_on_remove: true,
606 topic_labels: BTreeMap::new(),
607 },
608 status: None,
609 }
610 }
611
612 fn create_test_topic_with_acls() -> RivvenTopic {
613 let mut topic = create_test_topic();
614 topic.spec.acls = vec![
615 TopicAcl {
616 principal: "user:app1".to_string(),
617 operations: vec!["Read".to_string(), "Write".to_string()],
618 permission_type: "Allow".to_string(),
619 host: "*".to_string(),
620 },
621 TopicAcl {
622 principal: "user:analytics".to_string(),
623 operations: vec!["Read".to_string()],
624 permission_type: "Allow".to_string(),
625 host: "*".to_string(),
626 },
627 ];
628 topic
629 }
630
631 #[test]
632 fn test_build_ready_status() {
633 let topic = create_test_topic();
634 let info = TopicInfo {
635 partitions: 6,
636 replication_factor: 3,
637 existed: false,
638 partition_info: vec![PartitionInfo {
639 partition: 0,
640 leader: 0,
641 replicas: vec![0, 1, 2],
642 isr: vec![0, 1, 2],
643 }],
644 };
645
646 let status = build_ready_status(&topic, info);
647
648 assert_eq!(status.phase, "Ready");
649 assert_eq!(status.current_partitions, 6);
650 assert_eq!(status.current_replication_factor, 3);
651 assert!(status.topic_exists);
652 assert_eq!(status.conditions.len(), 4);
653
654 let ready_cond = status
655 .conditions
656 .iter()
657 .find(|c| c.r#type == "Ready")
658 .unwrap();
659 assert_eq!(ready_cond.status, "True");
660 }
661
662 #[test]
663 fn test_build_ready_status_with_acls() {
664 let topic = create_test_topic_with_acls();
665 let info = TopicInfo {
666 partitions: 6,
667 replication_factor: 3,
668 existed: true,
669 partition_info: vec![],
670 };
671
672 let status = build_ready_status(&topic, info);
673
674 let acl_cond = status
675 .conditions
676 .iter()
677 .find(|c| c.r#type == "ACLsApplied")
678 .unwrap();
679 assert_eq!(acl_cond.status, "True");
680 assert!(acl_cond.message.contains("2 ACL entries"));
681 }
682
683 #[test]
684 fn test_build_pending_status() {
685 let topic = create_test_topic();
686 let status = build_pending_status(&topic, "Waiting for cluster");
687
688 assert_eq!(status.phase, "Pending");
689 assert_eq!(status.message, "Waiting for cluster");
690 assert!(!status.topic_exists);
691 assert_eq!(status.current_partitions, 0);
692 }
693
694 #[test]
695 fn test_build_failed_status() {
696 let topic = create_test_topic();
697 let status = build_failed_status(&topic, "Validation error");
698
699 assert_eq!(status.phase, "Failed");
700 assert!(!status.topic_exists);
701
702 let ready_cond = status
703 .conditions
704 .iter()
705 .find(|c| c.r#type == "Ready")
706 .unwrap();
707 assert_eq!(ready_cond.status, "False");
708 assert_eq!(ready_cond.reason, "ValidationFailed");
709 }
710
711 #[test]
712 fn test_build_error_status_preserves_existing() {
713 let mut topic = create_test_topic();
714 topic.status = Some(RivvenTopicStatus {
715 phase: "Ready".to_string(),
716 message: "Was ready".to_string(),
717 current_partitions: 6,
718 current_replication_factor: 3,
719 topic_exists: true,
720 observed_generation: 1,
721 conditions: vec![],
722 last_sync_time: None,
723 partition_info: vec![PartitionInfo {
724 partition: 0,
725 leader: 0,
726 replicas: vec![0, 1, 2],
727 isr: vec![0, 1, 2],
728 }],
729 });
730
731 let status = build_error_status(&topic, "Sync failed");
732
733 assert_eq!(status.phase, "Error");
734 assert!(status.topic_exists);
736 assert_eq!(status.current_partitions, 6);
737 assert_eq!(status.partition_info.len(), 1);
738 }
739
740 #[test]
741 fn test_partition_info_generation() {
742 let topic = create_test_topic();
743
744 let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
746 .map(|i| PartitionInfo {
747 partition: i,
748 leader: (i % 3),
749 replicas: (0..topic.spec.replication_factor).collect(),
750 isr: (0..topic.spec.replication_factor).collect(),
751 })
752 .collect();
753
754 assert_eq!(partition_info.len(), 6);
755 assert_eq!(partition_info[0].partition, 0);
756 assert_eq!(partition_info[0].leader, 0);
757 assert_eq!(partition_info[3].leader, 0); assert_eq!(partition_info[4].leader, 1); }
760
761 #[test]
762 fn test_conditions_structure() {
763 let topic = create_test_topic();
764 let info = TopicInfo {
765 partitions: 6,
766 replication_factor: 3,
767 existed: false,
768 partition_info: vec![],
769 };
770
771 let status = build_ready_status(&topic, info);
772
773 let condition_types: Vec<&str> = status
775 .conditions
776 .iter()
777 .map(|c| c.r#type.as_str())
778 .collect();
779
780 assert!(condition_types.contains(&"Ready"));
781 assert!(condition_types.contains(&"Synced"));
782 assert!(condition_types.contains(&"ConfigApplied"));
783 assert!(condition_types.contains(&"ACLsApplied"));
784 }
785
786 #[test]
787 fn test_no_acls_condition() {
788 let topic = create_test_topic(); let info = TopicInfo {
790 partitions: 6,
791 replication_factor: 3,
792 existed: false,
793 partition_info: vec![],
794 };
795
796 let status = build_ready_status(&topic, info);
797
798 let acl_cond = status
799 .conditions
800 .iter()
801 .find(|c| c.r#type == "ACLsApplied")
802 .unwrap();
803 assert_eq!(acl_cond.status, "N/A");
804 assert_eq!(acl_cond.reason, "NoACLs");
805 }
806}