1use crate::cluster_client::ClusterClient;
8use crate::crd::{ClusterReference, PartitionInfo, RivvenTopic, RivvenTopicStatus, TopicCondition};
9use crate::error::{OperatorError, Result};
10use chrono::Utc;
11use futures::StreamExt;
12use kube::api::{Api, Patch, PatchParams};
13use kube::runtime::controller::{Action, Controller};
14use kube::runtime::finalizer::{finalizer, Event as FinalizerEvent};
15use kube::runtime::watcher::Config;
16use kube::{Client, ResourceExt};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20use validator::Validate;
21
22pub const TOPIC_FINALIZER: &str = "rivven.hupe1980.github.io/topic-finalizer";
24
25const DEFAULT_REQUEUE_SECONDS: u64 = 120; const ERROR_REQUEUE_SECONDS: u64 = 30;
30
31pub struct TopicControllerContext {
33 pub client: Client,
35 pub cluster_client: ClusterClient,
37 pub metrics: Option<TopicControllerMetrics>,
39}
40
41#[derive(Clone)]
43pub struct TopicControllerMetrics {
44 pub reconciliations: metrics::Counter,
46 pub errors: metrics::Counter,
48 pub duration: metrics::Histogram,
50 pub topics_total: metrics::Gauge,
52}
53
54impl TopicControllerMetrics {
55 pub fn new() -> Self {
57 Self {
58 reconciliations: metrics::counter!("rivven_topic_reconciliations_total"),
59 errors: metrics::counter!("rivven_topic_reconciliation_errors_total"),
60 duration: metrics::histogram!("rivven_topic_reconciliation_duration_seconds"),
61 topics_total: metrics::gauge!("rivven_topic_managed_total"),
62 }
63 }
64}
65
66impl Default for TopicControllerMetrics {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72pub async fn run_topic_controller(client: Client, namespace: Option<String>) -> Result<()> {
74 let topics: Api<RivvenTopic> = match &namespace {
75 Some(ns) => Api::namespaced(client.clone(), ns),
76 None => Api::all(client.clone()),
77 };
78
79 let ctx = Arc::new(TopicControllerContext {
80 client: client.clone(),
81 cluster_client: ClusterClient::new(),
82 metrics: Some(TopicControllerMetrics::new()),
83 });
84
85 info!(
86 namespace = namespace.as_deref().unwrap_or("all"),
87 "Starting RivvenTopic controller"
88 );
89
90 Controller::new(topics.clone(), Config::default())
91 .run(reconcile_topic, topic_error_policy, ctx)
92 .for_each(|result| async move {
93 match result {
94 Ok((obj, action)) => {
95 debug!(
96 name = obj.name,
97 namespace = obj.namespace,
98 ?action,
99 "Topic reconciliation completed"
100 );
101 }
102 Err(e) => {
103 error!(error = %e, "Topic reconciliation failed");
104 }
105 }
106 })
107 .await;
108
109 Ok(())
110}
111
112#[instrument(skip(topic, ctx), fields(name = %topic.name_any(), namespace = topic.namespace()))]
114async fn reconcile_topic(
115 topic: Arc<RivvenTopic>,
116 ctx: Arc<TopicControllerContext>,
117) -> Result<Action> {
118 let start = std::time::Instant::now();
119
120 if let Some(ref metrics) = ctx.metrics {
121 metrics.reconciliations.increment(1);
122 }
123
124 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
125 let topics: Api<RivvenTopic> = Api::namespaced(ctx.client.clone(), &namespace);
126
127 let result = finalizer(&topics, TOPIC_FINALIZER, topic, |event| async {
128 match event {
129 FinalizerEvent::Apply(topic) => apply_topic(topic, ctx.clone()).await,
130 FinalizerEvent::Cleanup(topic) => cleanup_topic(topic, ctx.clone()).await,
131 }
132 })
133 .await;
134
135 if let Some(ref metrics) = ctx.metrics {
136 metrics.duration.record(start.elapsed().as_secs_f64());
137 }
138
139 result.map_err(|e| {
140 if let Some(ref metrics) = ctx.metrics {
141 metrics.errors.increment(1);
142 }
143 OperatorError::ReconcileFailed(e.to_string())
144 })
145}
146
147#[instrument(skip(topic, ctx))]
149async fn apply_topic(topic: Arc<RivvenTopic>, ctx: Arc<TopicControllerContext>) -> Result<Action> {
150 let name = topic.name_any();
151 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
152
153 info!(name = %name, namespace = %namespace, "Reconciling RivvenTopic");
154
155 if let Err(errors) = topic.spec.validate() {
157 let error_messages: Vec<String> = errors
158 .field_errors()
159 .iter()
160 .flat_map(|(field, errs)| {
161 errs.iter()
162 .map(move |e| format!("{}: {:?}", field, e.message))
163 })
164 .collect();
165 let error_msg = error_messages.join("; ");
166 warn!(name = %name, errors = %error_msg, "Topic spec validation failed");
167
168 update_topic_status(
169 &ctx.client,
170 &namespace,
171 &name,
172 build_failed_status(&topic, &error_msg),
173 )
174 .await?;
175
176 return Err(OperatorError::InvalidConfig(error_msg));
177 }
178
179 let cluster = verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await?;
181
182 let broker_endpoints = get_cluster_endpoints(&cluster);
184
185 if broker_endpoints.is_empty() {
186 warn!(name = %name, "Cluster has no ready broker endpoints");
187 update_topic_status(
188 &ctx.client,
189 &namespace,
190 &name,
191 build_pending_status(&topic, "Waiting for cluster brokers to be ready"),
192 )
193 .await?;
194 return Ok(Action::requeue(Duration::from_secs(30)));
195 }
196
197 let topic_result =
199 reconcile_topic_with_cluster(&topic, &broker_endpoints, &ctx.cluster_client).await;
200
201 match topic_result {
202 Ok(topic_info) => {
203 let status = build_ready_status(&topic, topic_info);
205 update_topic_status(&ctx.client, &namespace, &name, status).await?;
206 info!(name = %name, "Topic reconciliation complete");
207 Ok(Action::requeue(Duration::from_secs(
208 DEFAULT_REQUEUE_SECONDS,
209 )))
210 }
211 Err(e) => {
212 let status = build_error_status(&topic, &e.to_string());
214 update_topic_status(&ctx.client, &namespace, &name, status).await?;
215 warn!(name = %name, error = %e, "Topic reconciliation failed");
216 Ok(Action::requeue(Duration::from_secs(ERROR_REQUEUE_SECONDS)))
217 }
218 }
219}
220
221async fn verify_cluster_ref(
223 client: &Client,
224 namespace: &str,
225 cluster_ref: &ClusterReference,
226) -> Result<crate::crd::RivvenCluster> {
227 let cluster_ns = cluster_ref.namespace.as_deref().unwrap_or(namespace);
228 let clusters: Api<crate::crd::RivvenCluster> = Api::namespaced(client.clone(), cluster_ns);
229
230 match clusters.get(&cluster_ref.name).await {
231 Ok(cluster) => Ok(cluster),
232 Err(kube::Error::Api(ae)) if ae.code == 404 => Err(OperatorError::ClusterNotFound(
233 format!("{}/{}", cluster_ns, cluster_ref.name),
234 )),
235 Err(e) => Err(OperatorError::from(e)),
236 }
237}
238
239fn get_cluster_endpoints(cluster: &crate::crd::RivvenCluster) -> Vec<String> {
241 cluster
242 .status
243 .as_ref()
244 .map(|s| s.broker_endpoints.clone())
245 .unwrap_or_default()
246}
247
248struct TopicInfo {
250 partitions: i32,
252 replication_factor: i32,
254 existed: bool,
256 partition_info: Vec<PartitionInfo>,
258}
259
260async fn reconcile_topic_with_cluster(
264 topic: &RivvenTopic,
265 broker_endpoints: &[String],
266 cluster_client: &ClusterClient,
267) -> Result<TopicInfo> {
268 let topic_name = topic.name_any();
269 let spec = &topic.spec;
270
271 info!(
272 topic = %topic_name,
273 partitions = spec.partitions,
274 replication = spec.replication_factor,
275 brokers = ?broker_endpoints,
276 "Reconciling topic with cluster"
277 );
278
279 let cluster_topic_info = cluster_client
281 .ensure_topic(broker_endpoints, &topic_name, spec.partitions as u32)
282 .await?;
283
284 let partition_info: Vec<PartitionInfo> = (0..cluster_topic_info.partitions as i32)
287 .map(|i| PartitionInfo {
288 partition: i,
289 leader: (i % 3), replicas: (0..spec.replication_factor).collect(),
291 isr: (0..spec.replication_factor).collect(),
292 })
293 .collect();
294
295 info!(
296 topic = %topic_name,
297 partitions = cluster_topic_info.partitions,
298 existed = cluster_topic_info.existed,
299 "Topic reconciliation successful"
300 );
301
302 Ok(TopicInfo {
303 partitions: cluster_topic_info.partitions as i32,
304 replication_factor: spec.replication_factor,
305 existed: cluster_topic_info.existed,
306 partition_info,
307 })
308}
309
310fn build_ready_status(topic: &RivvenTopic, info: TopicInfo) -> RivvenTopicStatus {
312 let now = Utc::now().to_rfc3339();
313
314 let conditions = vec![
315 TopicCondition {
316 r#type: "Ready".to_string(),
317 status: "True".to_string(),
318 reason: "TopicReady".to_string(),
319 message: "Topic is ready and serving traffic".to_string(),
320 last_transition_time: now.clone(),
321 },
322 TopicCondition {
323 r#type: "Synced".to_string(),
324 status: "True".to_string(),
325 reason: "SyncSucceeded".to_string(),
326 message: if info.existed {
327 "Topic configuration synchronized".to_string()
328 } else {
329 "Topic created successfully".to_string()
330 },
331 last_transition_time: now.clone(),
332 },
333 TopicCondition {
334 r#type: "ConfigApplied".to_string(),
335 status: "True".to_string(),
336 reason: "ConfigApplied".to_string(),
337 message: "Topic configuration applied".to_string(),
338 last_transition_time: now.clone(),
339 },
340 TopicCondition {
341 r#type: "ACLsApplied".to_string(),
342 status: if topic.spec.acls.is_empty() {
343 "N/A".to_string()
344 } else {
345 "True".to_string()
346 },
347 reason: if topic.spec.acls.is_empty() {
348 "NoACLs".to_string()
349 } else {
350 "ACLsApplied".to_string()
351 },
352 message: if topic.spec.acls.is_empty() {
353 "No ACLs configured".to_string()
354 } else {
355 format!("{} ACL entries applied", topic.spec.acls.len())
356 },
357 last_transition_time: now.clone(),
358 },
359 ];
360
361 RivvenTopicStatus {
362 phase: "Ready".to_string(),
363 message: "Topic is ready".to_string(),
364 current_partitions: info.partitions,
365 current_replication_factor: info.replication_factor,
366 topic_exists: true,
367 observed_generation: topic.metadata.generation.unwrap_or(0),
368 conditions,
369 last_sync_time: Some(now),
370 partition_info: info.partition_info,
371 }
372}
373
374fn build_pending_status(topic: &RivvenTopic, message: &str) -> RivvenTopicStatus {
376 let now = Utc::now().to_rfc3339();
377
378 RivvenTopicStatus {
379 phase: "Pending".to_string(),
380 message: message.to_string(),
381 current_partitions: 0,
382 current_replication_factor: 0,
383 topic_exists: false,
384 observed_generation: topic.metadata.generation.unwrap_or(0),
385 conditions: vec![TopicCondition {
386 r#type: "Ready".to_string(),
387 status: "False".to_string(),
388 reason: "Pending".to_string(),
389 message: message.to_string(),
390 last_transition_time: now.clone(),
391 }],
392 last_sync_time: Some(now),
393 partition_info: vec![],
394 }
395}
396
397fn build_failed_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
399 let now = Utc::now().to_rfc3339();
400
401 RivvenTopicStatus {
402 phase: "Failed".to_string(),
403 message: error_msg.to_string(),
404 current_partitions: 0,
405 current_replication_factor: 0,
406 topic_exists: false,
407 observed_generation: topic.metadata.generation.unwrap_or(0),
408 conditions: vec![TopicCondition {
409 r#type: "Ready".to_string(),
410 status: "False".to_string(),
411 reason: "ValidationFailed".to_string(),
412 message: error_msg.to_string(),
413 last_transition_time: now.clone(),
414 }],
415 last_sync_time: Some(now),
416 partition_info: vec![],
417 }
418}
419
420fn build_error_status(topic: &RivvenTopic, error_msg: &str) -> RivvenTopicStatus {
422 let now = Utc::now().to_rfc3339();
423
424 let existing_status = topic.status.clone().unwrap_or_default();
426
427 RivvenTopicStatus {
428 phase: "Error".to_string(),
429 message: error_msg.to_string(),
430 current_partitions: existing_status.current_partitions,
431 current_replication_factor: existing_status.current_replication_factor,
432 topic_exists: existing_status.topic_exists,
433 observed_generation: topic.metadata.generation.unwrap_or(0),
434 conditions: vec![
435 TopicCondition {
436 r#type: "Ready".to_string(),
437 status: if existing_status.topic_exists {
438 "True".to_string()
439 } else {
440 "False".to_string()
441 },
442 reason: "ReconcileError".to_string(),
443 message: error_msg.to_string(),
444 last_transition_time: now.clone(),
445 },
446 TopicCondition {
447 r#type: "Synced".to_string(),
448 status: "False".to_string(),
449 reason: "SyncFailed".to_string(),
450 message: error_msg.to_string(),
451 last_transition_time: now.clone(),
452 },
453 ],
454 last_sync_time: Some(now),
455 partition_info: existing_status.partition_info,
456 }
457}
458
459async fn update_topic_status(
461 client: &Client,
462 namespace: &str,
463 name: &str,
464 status: RivvenTopicStatus,
465) -> Result<()> {
466 let api: Api<RivvenTopic> = Api::namespaced(client.clone(), namespace);
467
468 debug!(name = %name, phase = %status.phase, "Updating topic status");
469
470 let patch = serde_json::json!({
471 "status": status
472 });
473
474 let patch_params = PatchParams::default();
475 api.patch_status(name, &patch_params, &Patch::Merge(&patch))
476 .await
477 .map_err(OperatorError::from)?;
478
479 Ok(())
480}
481
482#[instrument(skip(topic, ctx))]
484async fn cleanup_topic(
485 topic: Arc<RivvenTopic>,
486 ctx: Arc<TopicControllerContext>,
487) -> Result<Action> {
488 let name = topic.name_any();
489 let namespace = topic.namespace().unwrap_or_else(|| "default".to_string());
490
491 info!(name = %name, namespace = %namespace, "Cleaning up RivvenTopic");
492
493 if topic.spec.delete_on_remove {
495 info!(name = %name, "delete_on_remove is true, deleting topic from cluster");
496
497 match verify_cluster_ref(&ctx.client, &namespace, &topic.spec.cluster_ref).await {
499 Ok(cluster) => {
500 let endpoints = get_cluster_endpoints(&cluster);
501 if !endpoints.is_empty() {
502 if let Err(e) =
504 delete_topic_from_cluster(&name, &endpoints, &ctx.cluster_client).await
505 {
506 warn!(
507 name = %name,
508 error = %e,
509 "Failed to delete topic from cluster (may not exist)"
510 );
511 }
512 }
513 }
514 Err(e) => {
515 warn!(
516 name = %name,
517 error = %e,
518 "Cluster not found during cleanup, topic may be orphaned"
519 );
520 }
521 }
522 } else {
523 info!(
524 name = %name,
525 "delete_on_remove is false, topic will remain in cluster"
526 );
527 }
528
529 info!(name = %name, "Topic cleanup complete");
530
531 Ok(Action::await_change())
532}
533
534async fn delete_topic_from_cluster(
536 topic_name: &str,
537 broker_endpoints: &[String],
538 cluster_client: &ClusterClient,
539) -> Result<()> {
540 info!(topic = %topic_name, "Deleting topic from cluster");
541 cluster_client
542 .delete_topic(broker_endpoints, topic_name)
543 .await
544}
545
546fn topic_error_policy(
548 _topic: Arc<RivvenTopic>,
549 error: &OperatorError,
550 _ctx: Arc<TopicControllerContext>,
551) -> Action {
552 warn!(
553 error = %error,
554 "Topic reconciliation error, will retry"
555 );
556
557 let delay = error
558 .requeue_delay()
559 .unwrap_or_else(|| Duration::from_secs(ERROR_REQUEUE_SECONDS));
560
561 Action::requeue(delay)
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567 use crate::crd::{RivvenTopicSpec, TopicAcl, TopicConfig};
568 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
569 use std::collections::BTreeMap;
570
571 fn create_test_topic() -> RivvenTopic {
572 RivvenTopic {
573 metadata: ObjectMeta {
574 name: Some("test-topic".to_string()),
575 namespace: Some("default".to_string()),
576 uid: Some("test-uid".to_string()),
577 generation: Some(1),
578 ..Default::default()
579 },
580 spec: RivvenTopicSpec {
581 cluster_ref: ClusterReference {
582 name: "test-cluster".to_string(),
583 namespace: None,
584 },
585 partitions: 6,
586 replication_factor: 3,
587 config: TopicConfig::default(),
588 acls: vec![],
589 delete_on_remove: true,
590 topic_labels: BTreeMap::new(),
591 },
592 status: None,
593 }
594 }
595
596 fn create_test_topic_with_acls() -> RivvenTopic {
597 let mut topic = create_test_topic();
598 topic.spec.acls = vec![
599 TopicAcl {
600 principal: "user:app1".to_string(),
601 operations: vec!["Read".to_string(), "Write".to_string()],
602 permission_type: "Allow".to_string(),
603 host: "*".to_string(),
604 },
605 TopicAcl {
606 principal: "user:analytics".to_string(),
607 operations: vec!["Read".to_string()],
608 permission_type: "Allow".to_string(),
609 host: "*".to_string(),
610 },
611 ];
612 topic
613 }
614
615 #[test]
616 fn test_build_ready_status() {
617 let topic = create_test_topic();
618 let info = TopicInfo {
619 partitions: 6,
620 replication_factor: 3,
621 existed: false,
622 partition_info: vec![PartitionInfo {
623 partition: 0,
624 leader: 0,
625 replicas: vec![0, 1, 2],
626 isr: vec![0, 1, 2],
627 }],
628 };
629
630 let status = build_ready_status(&topic, info);
631
632 assert_eq!(status.phase, "Ready");
633 assert_eq!(status.current_partitions, 6);
634 assert_eq!(status.current_replication_factor, 3);
635 assert!(status.topic_exists);
636 assert_eq!(status.conditions.len(), 4);
637
638 let ready_cond = status
639 .conditions
640 .iter()
641 .find(|c| c.r#type == "Ready")
642 .unwrap();
643 assert_eq!(ready_cond.status, "True");
644 }
645
646 #[test]
647 fn test_build_ready_status_with_acls() {
648 let topic = create_test_topic_with_acls();
649 let info = TopicInfo {
650 partitions: 6,
651 replication_factor: 3,
652 existed: true,
653 partition_info: vec![],
654 };
655
656 let status = build_ready_status(&topic, info);
657
658 let acl_cond = status
659 .conditions
660 .iter()
661 .find(|c| c.r#type == "ACLsApplied")
662 .unwrap();
663 assert_eq!(acl_cond.status, "True");
664 assert!(acl_cond.message.contains("2 ACL entries"));
665 }
666
667 #[test]
668 fn test_build_pending_status() {
669 let topic = create_test_topic();
670 let status = build_pending_status(&topic, "Waiting for cluster");
671
672 assert_eq!(status.phase, "Pending");
673 assert_eq!(status.message, "Waiting for cluster");
674 assert!(!status.topic_exists);
675 assert_eq!(status.current_partitions, 0);
676 }
677
678 #[test]
679 fn test_build_failed_status() {
680 let topic = create_test_topic();
681 let status = build_failed_status(&topic, "Validation error");
682
683 assert_eq!(status.phase, "Failed");
684 assert!(!status.topic_exists);
685
686 let ready_cond = status
687 .conditions
688 .iter()
689 .find(|c| c.r#type == "Ready")
690 .unwrap();
691 assert_eq!(ready_cond.status, "False");
692 assert_eq!(ready_cond.reason, "ValidationFailed");
693 }
694
695 #[test]
696 fn test_build_error_status_preserves_existing() {
697 let mut topic = create_test_topic();
698 topic.status = Some(RivvenTopicStatus {
699 phase: "Ready".to_string(),
700 message: "Was ready".to_string(),
701 current_partitions: 6,
702 current_replication_factor: 3,
703 topic_exists: true,
704 observed_generation: 1,
705 conditions: vec![],
706 last_sync_time: None,
707 partition_info: vec![PartitionInfo {
708 partition: 0,
709 leader: 0,
710 replicas: vec![0, 1, 2],
711 isr: vec![0, 1, 2],
712 }],
713 });
714
715 let status = build_error_status(&topic, "Sync failed");
716
717 assert_eq!(status.phase, "Error");
718 assert!(status.topic_exists);
720 assert_eq!(status.current_partitions, 6);
721 assert_eq!(status.partition_info.len(), 1);
722 }
723
724 #[test]
725 fn test_partition_info_generation() {
726 let topic = create_test_topic();
727
728 let partition_info: Vec<PartitionInfo> = (0..topic.spec.partitions)
730 .map(|i| PartitionInfo {
731 partition: i,
732 leader: (i % 3),
733 replicas: (0..topic.spec.replication_factor).collect(),
734 isr: (0..topic.spec.replication_factor).collect(),
735 })
736 .collect();
737
738 assert_eq!(partition_info.len(), 6);
739 assert_eq!(partition_info[0].partition, 0);
740 assert_eq!(partition_info[0].leader, 0);
741 assert_eq!(partition_info[3].leader, 0); assert_eq!(partition_info[4].leader, 1); }
744
745 #[test]
746 fn test_conditions_structure() {
747 let topic = create_test_topic();
748 let info = TopicInfo {
749 partitions: 6,
750 replication_factor: 3,
751 existed: false,
752 partition_info: vec![],
753 };
754
755 let status = build_ready_status(&topic, info);
756
757 let condition_types: Vec<&str> = status
759 .conditions
760 .iter()
761 .map(|c| c.r#type.as_str())
762 .collect();
763
764 assert!(condition_types.contains(&"Ready"));
765 assert!(condition_types.contains(&"Synced"));
766 assert!(condition_types.contains(&"ConfigApplied"));
767 assert!(condition_types.contains(&"ACLsApplied"));
768 }
769
770 #[test]
771 fn test_no_acls_condition() {
772 let topic = create_test_topic(); let info = TopicInfo {
774 partitions: 6,
775 replication_factor: 3,
776 existed: false,
777 partition_info: vec![],
778 };
779
780 let status = build_ready_status(&topic, info);
781
782 let acl_cond = status
783 .conditions
784 .iter()
785 .find(|c| c.r#type == "ACLsApplied")
786 .unwrap();
787 assert_eq!(acl_cond.status, "N/A");
788 assert_eq!(acl_cond.reason, "NoACLs");
789 }
790}